@@ -103,6 +103,7 @@ PG_MODULE_MAGIC;
103103PG_FUNCTION_INFO_V1 (mtm_start_replication );
104104PG_FUNCTION_INFO_V1 (mtm_stop_replication );
105105PG_FUNCTION_INFO_V1 (mtm_drop_node );
106+ PG_FUNCTION_INFO_V1 (mtm_poll_node );
106107PG_FUNCTION_INFO_V1 (mtm_recover_node );
107108PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
108109PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
@@ -181,6 +182,7 @@ int MtmKeepaliveTimeout;
181182int MtmReconnectAttempts ;
182183int MtmNodeDisableDelay ;
183184bool MtmUseRaftable ;
185+ bool MtmUseDtm ;
184186MtmConnectionInfo * MtmConnections ;
185187
186188static char * MtmConnStrs ;
@@ -339,7 +341,7 @@ TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum)
339341}
340342
341343bool MtmXidInMVCCSnapshot (TransactionId xid , Snapshot snapshot )
342- {
344+ {
343345#if TRACE_SLEEP_TIME
344346 static timestamp_t firstReportTime ;
345347 static timestamp_t prevReportTime ;
@@ -349,6 +351,10 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
349351 timestamp_t delay = MIN_WAIT_TIMEOUT ;
350352 Assert (xid != InvalidTransactionId );
351353
354+ if (!MtmUseDtm ) {
355+ return PgXidInMVCCSnapshot (xid , snapshot );
356+ }
357+
352358 MtmLock (LW_SHARED );
353359
354360#if TRACE_SLEEP_TIME
@@ -512,13 +518,19 @@ MtmAdjustOldestXid(TransactionId xid)
512518 hash_search (MtmXid2State , & prev -> xid , HASH_REMOVE , NULL );
513519 }
514520 }
515- }
516- if (prev != NULL ) {
517- Mtm -> transListHead = prev ;
518- Mtm -> oldestXid = xid = prev -> xid ;
519- } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
520- xid = Mtm -> oldestXid ;
521- }
521+ }
522+ if (MtmUseDtm ) {
523+ if (prev != NULL ) {
524+ Mtm -> transListHead = prev ;
525+ Mtm -> oldestXid = xid = prev -> xid ;
526+ } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
527+ xid = Mtm -> oldestXid ;
528+ }
529+ } else {
530+ if (prev != NULL ) {
531+ Mtm -> transListHead = prev ;
532+ }
533+ }
522534 MtmUnlock ();
523535 }
524536 return xid ;
@@ -753,6 +765,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
753765
754766}
755767
768+ static time_t maxWakeupTime ;
769+
756770static void
757771MtmPostPrepareTransaction (MtmCurrentTrans * x )
758772{
@@ -768,18 +782,23 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
768782 tm -> state = ts ;
769783 ts -> votingCompleted = true;
770784 if (Mtm -> status != MTM_RECOVERY ) {
771- MtmSendNotificationMessage (ts , MSG_READY ); /* send notification to coordinator */
785+ MtmSendNotificationMessage (ts , MtmUseDtm ? MSG_READY : MSG_PREPARED ); /* send notification to coordinator */
772786 } else {
773787 ts -> status = TRANSACTION_STATUS_UNKNOWN ;
774788 }
775789 MtmUnlock ();
776790 MtmResetTransaction (x );
777791 } else {
792+ time_t wakeupTime ;
778793 /* wait votes from all nodes */
779794 while (!ts -> votingCompleted ) {
780795 MtmUnlock ();
781796 WaitLatch (& MyProc -> procLatch , WL_LATCH_SET , -1 );
782797 ResetLatch (& MyProc -> procLatch );
798+ wakeupTime = MtmGetCurrentTime () - ts -> wakeupTime ;
799+ if (wakeupTime > maxWakeupTime ) {
800+ maxWakeupTime = wakeupTime ;
801+ }
783802 MtmLock (LW_SHARED );
784803 }
785804 x -> status = ts -> status ;
@@ -972,6 +991,7 @@ void MtmWakeUpBackend(MtmTransState* ts)
972991{
973992 MTM_LOG3 ("Wakeup backed procno=%d, pid=%d" , ts -> procno , ProcGlobal -> allProcs [ts -> procno ].pid );
974993 ts -> votingCompleted = true;
994+ ts -> wakeupTime = MtmGetCurrentTime ();
975995 SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
976996}
977997
@@ -1651,6 +1671,19 @@ _PG_init(void)
16511671 NULL
16521672 );
16531673
1674+ DefineCustomBoolVariable (
1675+ "multimaster.use_dtm" ,
1676+ "Use distributed transaction manager" ,
1677+ NULL ,
1678+ & MtmUseDtm ,
1679+ true,
1680+ PGC_BACKEND ,
1681+ 0 ,
1682+ NULL ,
1683+ NULL ,
1684+ NULL
1685+ );
1686+
16541687 DefineCustomIntVariable (
16551688 "multimaster.workers" ,
16561689 "Number of multimaster executor workers per node" ,
@@ -2069,6 +2102,27 @@ mtm_drop_node(PG_FUNCTION_ARGS)
20692102 PG_RETURN_VOID ();
20702103}
20712104
2105+ Datum
2106+ mtm_poll_node (PG_FUNCTION_ARGS )
2107+ {
2108+ int nodeId = PG_GETARG_INT32 (0 );
2109+ bool nowait = PG_GETARG_BOOL (1 );
2110+ bool online = true;
2111+ while (BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
2112+ if (nowait ) {
2113+ online = false;
2114+ break ;
2115+ } else {
2116+ MtmSleep (STATUS_POLL_DELAY );
2117+ }
2118+ }
2119+ if (!nowait ) {
2120+ /* Just wait some time until logical repication channels will be reestablished */
2121+ MtmSleep (MtmNodeDisableDelay );
2122+ }
2123+ PG_RETURN_BOOL (online );
2124+ }
2125+
20722126Datum
20732127mtm_recover_node (PG_FUNCTION_ARGS )
20742128{
0 commit comments