@@ -140,7 +140,7 @@ HTAB* MtmXid2State;
140140static HTAB * MtmGid2State ;
141141static HTAB * MtmLocalTables ;
142142
143- static bool MtmIsRecoverySession ;
143+ static bool MtmIsRecoverySession ;
144144
145145static MtmCurrentTrans MtmTx ;
146146
@@ -199,6 +199,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
199199 ProcessUtilityContext context , ParamListInfo params ,
200200 DestReceiver * dest , char * completionTag );
201201
202+ static StringInfo MtmGUCBuffer ;
203+ static bool MtmGUCBufferAllocated = false;
204+
202205/*
203206 * -------------------------------------------
204207 * Synchronize access to MTM structures.
@@ -1934,7 +1937,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
19341937 elog (NOTICE , "Node %d start logical replication to node %d in normal mode" , MtmNodeId , MtmReplicationNodeId );
19351938 }
19361939 MtmUnlock ();
1937- on_proc_exit (MtmOnProcExit , 0 );
1940+ on_shmem_exit (MtmOnProcExit , 0 );
19381941}
19391942
19401943static void
@@ -2239,6 +2242,12 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
22392242 {
22402243 if (conns [i ])
22412244 {
2245+ if (MtmGUCBufferAllocated && !MtmRunUtilityStmt (conns [i ], MtmGUCBuffer -> data , & utility_errmsg ) && !ignoreError )
2246+ {
2247+ errorMsg = "Failed to set GUC variables at node %d" ;
2248+ failedNode = i ;
2249+ break ;
2250+ }
22422251 if (!MtmRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" , & utility_errmsg ) && !ignoreError )
22432252 {
22442253 errorMsg = "Failed to start transaction at node %d" ;
@@ -2250,7 +2259,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
22502259 if (i + 1 == MtmNodeId )
22512260 errorMsg = utility_errmsg ;
22522261 else
2262+ {
2263+ elog (ERROR , utility_errmsg );
22532264 errorMsg = "Failed to run command at node %d" ;
2265+ }
22542266
22552267 failedNode = i ;
22562268 break ;
@@ -2381,7 +2393,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
23812393 ProcessUtilityContext context , ParamListInfo params ,
23822394 DestReceiver * dest , char * completionTag )
23832395{
2384- bool skipCommand ;
2396+ bool skipCommand = false ;
23852397 MTM_TRACE ("%d: Process utility statement %s\n" , MyProcPid , queryString );
23862398 switch (nodeTag (parsetree ))
23872399 {
@@ -2412,7 +2424,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24122424 case T_FetchStmt :
24132425 case T_DoStmt :
24142426 case T_CreateTableSpaceStmt :
2415- case T_DropTableSpaceStmt :
24162427 case T_AlterTableSpaceOptionsStmt :
24172428 case T_TruncateStmt :
24182429 case T_CommentStmt : /* XXX: we could replicate these */ ;
@@ -2421,9 +2432,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24212432 case T_ExecuteStmt :
24222433 case T_DeallocateStmt :
24232434 case T_GrantStmt : /* XXX: we could replicate some of these these */ ;
2424- case T_GrantRoleStmt :
2425- case T_AlterDatabaseStmt :
2426- case T_AlterDatabaseSetStmt :
2435+ // case T_GrantRoleStmt:
2436+ // case T_AlterDatabaseStmt:
2437+ // case T_AlterDatabaseSetStmt:
24272438 case T_NotifyStmt :
24282439 case T_ListenStmt :
24292440 case T_UnlistenStmt :
@@ -2432,22 +2443,46 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24322443 case T_VacuumStmt :
24332444 case T_ExplainStmt :
24342445 case T_AlterSystemStmt :
2435- case T_VariableSetStmt :
24362446 case T_VariableShowStmt :
24372447 case T_DiscardStmt :
2438- case T_CreateEventTrigStmt :
2439- case T_AlterEventTrigStmt :
2440- case T_CreateRoleStmt :
2441- case T_AlterRoleStmt :
2442- case T_AlterRoleSetStmt :
2443- case T_DropRoleStmt :
2448+ // case T_CreateEventTrigStmt:
2449+ // case T_AlterEventTrigStmt:
2450+ // case T_CreateRoleStmt:
2451+ // case T_AlterRoleStmt:
2452+ // case T_AlterRoleSetStmt:
2453+ // case T_DropRoleStmt:
24442454 case T_ReassignOwnedStmt :
24452455 case T_LockStmt :
2446- case T_ConstraintsSetStmt :
2456+ // case T_ConstraintsSetStmt:
24472457 case T_CheckPointStmt :
24482458 case T_ReindexStmt :
24492459 skipCommand = true;
24502460 break ;
2461+ case T_VariableSetStmt :
2462+ {
2463+ //VariableSetStmt *stmt = (VariableSetStmt *) parsetree;
2464+
2465+ if (!MtmGUCBufferAllocated )
2466+ {
2467+ MemoryContext oldcontext ;
2468+
2469+ oldcontext = MemoryContextSwitchTo (TopMemoryContext );
2470+ MtmGUCBuffer = makeStringInfo ();
2471+ MemoryContextSwitchTo (oldcontext );
2472+ MtmGUCBufferAllocated = true;
2473+ }
2474+
2475+ //appendStringInfoString(MtmGUCBuffer, "SET ");
2476+ //appendStringInfoString(MtmGUCBuffer, stmt->name);
2477+ //appendStringInfoString(MtmGUCBuffer, " TO ");
2478+ //appendStringInfoString(MtmGUCBuffer, ExtractSetVariableArgs(stmt));
2479+ //appendStringInfoString(MtmGUCBuffer, "; ");
2480+
2481+ appendStringInfoString (MtmGUCBuffer , queryString );
2482+
2483+ skipCommand = true;
2484+ }
2485+ break ;
24512486 case T_CreateStmt :
24522487 {
24532488 /* Do not replicate temp tables */
@@ -2467,9 +2502,32 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24672502 "CREATE INDEX CONCURRENTLY" );
24682503
24692504 relid = RelnameGetRelid (stmt -> relation -> relname );
2470- rel = heap_open (relid , ShareLock );
2471- skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2472- heap_close (rel , NoLock );
2505+
2506+ if (OidIsValid (relid ))
2507+ {
2508+ rel = heap_open (relid , ShareLock );
2509+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2510+ heap_close (rel , NoLock );
2511+ }
2512+ }
2513+ break ;
2514+ case T_DropStmt :
2515+ {
2516+ DropStmt * stmt = (DropStmt * ) parsetree ;
2517+
2518+ if (stmt -> removeType == OBJECT_TABLE )
2519+ {
2520+ RangeVar * rv = makeRangeVarFromNameList (
2521+ (List * ) lfirst (list_head (stmt -> objects )));
2522+ Oid relid = RelnameGetRelid (rv -> relname );
2523+
2524+ if (OidIsValid (relid ))
2525+ {
2526+ Relation rel = heap_open (relid , ShareLock );
2527+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2528+ heap_close (rel , ShareLock );
2529+ }
2530+ }
24732531 }
24742532 break ;
24752533 default :
0 commit comments