@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_get_snapshot);
108108PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
109109PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
110110PG_FUNCTION_INFO_V1 (mtm_make_table_local );
111+ PG_FUNCTION_INFO_V1 (mtm_dump_lock_graph );
111112
112113static Snapshot MtmGetSnapshot (Snapshot snapshot );
113114static void MtmInitialize (void );
@@ -140,7 +141,7 @@ HTAB* MtmXid2State;
140141static HTAB * MtmGid2State ;
141142static HTAB * MtmLocalTables ;
142143
143- static bool MtmIsRecoverySession ;
144+ static bool MtmIsRecoverySession ;
144145
145146static MtmCurrentTrans MtmTx ;
146147
@@ -199,6 +200,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
199200 ProcessUtilityContext context , ParamListInfo params ,
200201 DestReceiver * dest , char * completionTag );
201202
203+ static StringInfo MtmGUCBuffer ;
204+ static bool MtmGUCBufferAllocated = false;
205+
202206/*
203207 * -------------------------------------------
204208 * Synchronize access to MTM structures.
@@ -2153,6 +2157,31 @@ Datum mtm_make_table_local(PG_FUNCTION_ARGS)
21532157 return false;
21542158}
21552159
2160+ Datum mtm_dump_lock_graph (PG_FUNCTION_ARGS )
2161+ {
2162+ StringInfo s = makeStringInfo ();
2163+ int i ;
2164+ for (i = 0 ; i < MtmNodes ; i ++ )
2165+ {
2166+ size_t size ;
2167+ char * data = RaftableGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2168+ if (!data ) continue ;
2169+ GlobalTransactionId * gtid = (GlobalTransactionId * )data ;
2170+ GlobalTransactionId * last = (GlobalTransactionId * )(data + size );
2171+ appendStringInfo (s , "node-%d lock graph: " , i + 1 );
2172+ while (gtid != last ) {
2173+ GlobalTransactionId * src = gtid ++ ;
2174+ appendStringInfo (s , "%d:%d -> " , src -> node , src -> xid );
2175+ while (gtid -> node != 0 ) {
2176+ GlobalTransactionId * dst = gtid ++ ;
2177+ appendStringInfo (s , "%d:%d, " , dst -> node , dst -> xid );
2178+ }
2179+ gtid += 1 ;
2180+ }
2181+ appendStringInfo (s , "\n" );
2182+ }
2183+ return CStringGetTextDatum (s -> data );
2184+ }
21562185
21572186/*
21582187 * -------------------------------------------
@@ -2241,6 +2270,12 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
22412270 {
22422271 if (conns [i ])
22432272 {
2273+ if (MtmGUCBufferAllocated && !MtmRunUtilityStmt (conns [i ], MtmGUCBuffer -> data , & utility_errmsg ) && !ignoreError )
2274+ {
2275+ errorMsg = "Failed to set GUC variables at node %d" ;
2276+ failedNode = i ;
2277+ break ;
2278+ }
22442279 if (!MtmRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" , & utility_errmsg ) && !ignoreError )
22452280 {
22462281 errorMsg = "Failed to start transaction at node %d" ;
@@ -2252,7 +2287,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
22522287 if (i + 1 == MtmNodeId )
22532288 errorMsg = utility_errmsg ;
22542289 else
2290+ {
2291+ elog (ERROR , utility_errmsg );
22552292 errorMsg = "Failed to run command at node %d" ;
2293+ }
22562294
22572295 failedNode = i ;
22582296 break ;
@@ -2383,7 +2421,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
23832421 ProcessUtilityContext context , ParamListInfo params ,
23842422 DestReceiver * dest , char * completionTag )
23852423{
2386- bool skipCommand ;
2424+ bool skipCommand = false ;
23872425 MTM_TRACE ("%d: Process utility statement %s\n" , MyProcPid , queryString );
23882426 switch (nodeTag (parsetree ))
23892427 {
@@ -2414,7 +2452,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24142452 case T_FetchStmt :
24152453 case T_DoStmt :
24162454 case T_CreateTableSpaceStmt :
2417- case T_DropTableSpaceStmt :
24182455 case T_AlterTableSpaceOptionsStmt :
24192456 case T_TruncateStmt :
24202457 case T_CommentStmt : /* XXX: we could replicate these */ ;
@@ -2423,9 +2460,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24232460 case T_ExecuteStmt :
24242461 case T_DeallocateStmt :
24252462 case T_GrantStmt : /* XXX: we could replicate some of these these */ ;
2426- case T_GrantRoleStmt :
2427- case T_AlterDatabaseStmt :
2428- case T_AlterDatabaseSetStmt :
2463+ // case T_GrantRoleStmt:
2464+ // case T_AlterDatabaseStmt:
2465+ // case T_AlterDatabaseSetStmt:
24292466 case T_NotifyStmt :
24302467 case T_ListenStmt :
24312468 case T_UnlistenStmt :
@@ -2434,22 +2471,46 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24342471 case T_VacuumStmt :
24352472 case T_ExplainStmt :
24362473 case T_AlterSystemStmt :
2437- case T_VariableSetStmt :
24382474 case T_VariableShowStmt :
24392475 case T_DiscardStmt :
2440- case T_CreateEventTrigStmt :
2441- case T_AlterEventTrigStmt :
2442- case T_CreateRoleStmt :
2443- case T_AlterRoleStmt :
2444- case T_AlterRoleSetStmt :
2445- case T_DropRoleStmt :
2476+ // case T_CreateEventTrigStmt:
2477+ // case T_AlterEventTrigStmt:
2478+ // case T_CreateRoleStmt:
2479+ // case T_AlterRoleStmt:
2480+ // case T_AlterRoleSetStmt:
2481+ // case T_DropRoleStmt:
24462482 case T_ReassignOwnedStmt :
24472483 case T_LockStmt :
2448- case T_ConstraintsSetStmt :
2484+ // case T_ConstraintsSetStmt:
24492485 case T_CheckPointStmt :
24502486 case T_ReindexStmt :
24512487 skipCommand = true;
24522488 break ;
2489+ case T_VariableSetStmt :
2490+ {
2491+ //VariableSetStmt *stmt = (VariableSetStmt *) parsetree;
2492+
2493+ if (!MtmGUCBufferAllocated )
2494+ {
2495+ MemoryContext oldcontext ;
2496+
2497+ oldcontext = MemoryContextSwitchTo (TopMemoryContext );
2498+ MtmGUCBuffer = makeStringInfo ();
2499+ MemoryContextSwitchTo (oldcontext );
2500+ MtmGUCBufferAllocated = true;
2501+ }
2502+
2503+ //appendStringInfoString(MtmGUCBuffer, "SET ");
2504+ //appendStringInfoString(MtmGUCBuffer, stmt->name);
2505+ //appendStringInfoString(MtmGUCBuffer, " TO ");
2506+ //appendStringInfoString(MtmGUCBuffer, ExtractSetVariableArgs(stmt));
2507+ //appendStringInfoString(MtmGUCBuffer, "; ");
2508+
2509+ appendStringInfoString (MtmGUCBuffer , queryString );
2510+
2511+ skipCommand = true;
2512+ }
2513+ break ;
24532514 case T_CreateStmt :
24542515 {
24552516 /* Do not replicate temp tables */
@@ -2469,9 +2530,32 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24692530 "CREATE INDEX CONCURRENTLY" );
24702531
24712532 relid = RelnameGetRelid (stmt -> relation -> relname );
2472- rel = heap_open (relid , ShareLock );
2473- skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2474- heap_close (rel , NoLock );
2533+
2534+ if (OidIsValid (relid ))
2535+ {
2536+ rel = heap_open (relid , ShareLock );
2537+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2538+ heap_close (rel , NoLock );
2539+ }
2540+ }
2541+ break ;
2542+ case T_DropStmt :
2543+ {
2544+ DropStmt * stmt = (DropStmt * ) parsetree ;
2545+
2546+ if (stmt -> removeType == OBJECT_TABLE )
2547+ {
2548+ RangeVar * rv = makeRangeVarFromNameList (
2549+ (List * ) lfirst (list_head (stmt -> objects )));
2550+ Oid relid = RelnameGetRelid (rv -> relname );
2551+
2552+ if (OidIsValid (relid ))
2553+ {
2554+ Relation rel = heap_open (relid , ShareLock );
2555+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2556+ heap_close (rel , ShareLock );
2557+ }
2558+ }
24752559 }
24762560 break ;
24772561 default :
0 commit comments