@@ -147,6 +147,8 @@ static bool MtmIsRecoverySession;
147147
148148static MtmCurrentTrans MtmTx ;
149149
150+ static dlist_head MtmLsnMapping = DLIST_STATIC_INIT (MtmLsnMapping );
151+
150152static TransactionManager MtmTM = {
151153 PgTransactionIdGetStatus ,
152154 PgTransactionIdSetTreeStatus ,
@@ -1033,6 +1035,7 @@ void MtmHandleApplyError(void)
10331035 kill (PostmasterPid , SIGQUIT );
10341036 break ;
10351037 }
1038+ FreeErrorData (edata );
10361039}
10371040
10381041
@@ -1507,6 +1510,7 @@ static void MtmInitialize()
15071510 Mtm -> nodes [i ].transDelay = 0 ;
15081511 Mtm -> nodes [i ].lastStatusChangeTime = time (NULL );
15091512 Mtm -> nodes [i ].con = MtmConnections [i ];
1513+ Mtm -> nodes [i ].flushPos = 0 ;
15101514 }
15111515 PGSemaphoreCreate (& Mtm -> votingSemaphore );
15121516 PGSemaphoreReset (& Mtm -> votingSemaphore );
@@ -2084,6 +2088,45 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20842088 on_shmem_exit (MtmOnProcExit , 0 );
20852089}
20862090
2091+ XLogRecPtr MtmGetFlushPosition (int nodeId )
2092+ {
2093+ return Mtm -> nodes [nodeId - 1 ].flushPos ;
2094+ }
2095+
2096+ void MtmUpdateLsnMapping (int node_id , XLogRecPtr end_lsn )
2097+ {
2098+ dlist_mutable_iter iter ;
2099+ MtmFlushPosition * flushpos ;
2100+ XLogRecPtr local_flush = GetFlushRecPtr ();
2101+ MemoryContext old_context = MemoryContextSwitchTo (TopMemoryContext );
2102+
2103+ /* Track commit lsn */
2104+ flushpos = (MtmFlushPosition * ) palloc (sizeof (MtmFlushPosition ));
2105+ flushpos -> node_id = node_id ;
2106+ flushpos -> local_end = XactLastCommitEnd ;
2107+ flushpos -> remote_end = end_lsn ;
2108+ dlist_push_tail (& MtmLsnMapping , & flushpos -> node );
2109+
2110+ MtmLock (LW_EXCLUSIVE );
2111+ dlist_foreach_modify (iter , & MtmLsnMapping )
2112+ {
2113+ flushpos = dlist_container (MtmFlushPosition , node , iter .cur );
2114+ if (flushpos -> local_end <= local_flush )
2115+ {
2116+ if (Mtm -> nodes [node_id - 1 ].flushPos < local_flush ) {
2117+ Mtm -> nodes [node_id - 1 ].flushPos = local_flush ;
2118+ }
2119+ dlist_delete (iter .cur );
2120+ pfree (flushpos );
2121+ } else {
2122+ break ;
2123+ }
2124+ }
2125+ MtmUnlock ();
2126+ MemoryContextSwitchTo (old_context );
2127+ }
2128+
2129+
20872130static void
20882131MtmReplicationShutdownHook (struct PGLogicalShutdownHookArgs * args )
20892132{
0 commit comments