@@ -865,10 +865,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
865865
866866void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
867867{
868- MtmLock (LW_EXCLUSIVE );
869- MtmSyncClock (globalSnapshot );
870- MtmUnlock ();
871-
868+ if (globalSnapshot != INVALID_CSN ) {
869+ MtmLock (LW_EXCLUSIVE );
870+ MtmSyncClock (globalSnapshot );
871+ MtmUnlock ();
872+ } else {
873+ globalSnapshot = MtmTx .snapshot ;
874+ }
872875 if (!TransactionIdIsValid (gtid -> xid )) {
873876 /* In case of recovery InvalidTransactionId is passed */
874877 Assert (Mtm -> status == MTM_RECOVERY );
@@ -1877,6 +1880,14 @@ void MtmDropNode(int nodeId, bool dropSlot)
18771880 }
18781881 }
18791882}
1883+ static void
1884+ MtmOnProcExit (int code , Datum arg )
1885+ {
1886+ if (MtmReplicationNodeId >= 0 ) {
1887+ elog (WARNING , "WAL-sender to %d is terminated" , MtmReplicationNodeId );
1888+ MtmOnNodeDisconnect (MtmReplicationNodeId );
1889+ }
1890+ }
18801891
18811892static void
18821893MtmReplicationStartupHook (struct PGLogicalStartupHookArgs * args )
@@ -1923,13 +1934,17 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
19231934 elog (NOTICE , "Node %d start logical replication to node %d in normal mode" , MtmNodeId , MtmReplicationNodeId );
19241935 }
19251936 MtmUnlock ();
1937+ on_proc_exit (MtmOnProcExit , 0 );
19261938}
19271939
19281940static void
19291941MtmReplicationShutdownHook (struct PGLogicalShutdownHookArgs * args )
19301942{
1931- elog (WARNING , "Logical replication to node %d is stopped" , MtmReplicationNodeId );
1932- MtmOnNodeDisconnect (MtmReplicationNodeId );
1943+ if (MtmReplicationNodeId >= 0 ) {
1944+ elog (WARNING , "Logical replication to node %d is stopped" , MtmReplicationNodeId );
1945+ MtmOnNodeDisconnect (MtmReplicationNodeId );
1946+ MtmReplicationNodeId = -1 ; /* defuse on_proc_exit hook */
1947+ }
19331948}
19341949
19351950static bool
@@ -2159,14 +2174,34 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
21592174
21602175 * errmsg = palloc0 (errlen );
21612176
2162- /* Strip "ERROR:\t " from beginning and "\n" from end of error string */
2177+ /* Strip "ERROR: " from beginning and "\n" from end of error string */
21632178 strncpy (* errmsg , errstr + 8 , errlen - 1 - 8 );
21642179 }
21652180
21662181 PQclear (result );
21672182 return ret ;
21682183}
21692184
2185+ static void
2186+ MtmNoticeReceiver (void * i , const PGresult * res )
2187+ {
2188+ char * notice = PQresultErrorMessage (res );
2189+ char * stripped_notice ;
2190+ int len = strlen (notice );
2191+
2192+ /* Skip notices from other nodes */
2193+ if ( (* (int * )i ) != MtmNodeId - 1 )
2194+ return ;
2195+
2196+ stripped_notice = palloc0 (len );
2197+
2198+ /* Strip "NOTICE: " from beginning and "\n" from end of error string */
2199+ strncpy (stripped_notice , notice + 9 , len - 1 - 9 );
2200+
2201+ elog (NOTICE , stripped_notice );
2202+ pfree (stripped_notice );
2203+ }
2204+
21702205static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError )
21712206{
21722207 int i = 0 ;
@@ -2195,6 +2230,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
21952230 elog (ERROR , "Failed to establish connection '%s' to node %d" , Mtm -> nodes [i ].con .connStr , failedNode );
21962231 }
21972232 }
2233+ PQsetNoticeReceiver (conns [i ], MtmNoticeReceiver , & i );
21982234 }
21992235 }
22002236 Assert (i == MtmNodes );
@@ -2211,9 +2247,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
22112247 }
22122248 if (!MtmRunUtilityStmt (conns [i ], sql , & utility_errmsg ) && !ignoreError )
22132249 {
2214- // errorMsg = "Failed to run command at node %d";
2215- // XXX: add check for our node
2216- errorMsg = utility_errmsg ;
2250+ if (i + 1 == MtmNodeId )
2251+ errorMsg = utility_errmsg ;
2252+ else
2253+ errorMsg = "Failed to run command at node %d" ;
22172254
22182255 failedNode = i ;
22192256 break ;
@@ -2418,6 +2455,23 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24182455 skipCommand = stmt -> relation -> relpersistence == RELPERSISTENCE_TEMP ;
24192456 }
24202457 break ;
2458+ case T_IndexStmt :
2459+ {
2460+ Oid relid ;
2461+ Relation rel ;
2462+ IndexStmt * stmt = (IndexStmt * ) parsetree ;
2463+ bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL );
2464+
2465+ if (stmt -> concurrent )
2466+ PreventTransactionChain (isTopLevel ,
2467+ "CREATE INDEX CONCURRENTLY" );
2468+
2469+ relid = RelnameGetRelid (stmt -> relation -> relname );
2470+ rel = heap_open (relid , ShareLock );
2471+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2472+ heap_close (rel , NoLock );
2473+ }
2474+ break ;
24212475 default :
24222476 skipCommand = false;
24232477 break ;
0 commit comments