@@ -225,6 +225,7 @@ char const* const MtmNodeStatusMnem[] =
225225 "Recovery" ,
226226 "Recovered" ,
227227 "InMinor" ,
228+ "OutOfClique" ,
228229 "OutOfService"
229230};
230231
@@ -373,6 +374,7 @@ void MtmLock(LWLockMode mode)
373374 if (mode == LW_EXCLUSIVE ) {
374375 Assert (MtmLockCount == 0 );
375376 Mtm -> lastLockHolder = MyProcPid ;
377+ Assert (MyProcPid );
376378 MtmLockCount = 1 ;
377379 }
378380 }
@@ -1155,7 +1157,6 @@ bool MtmWatchdog(timestamp_t now)
11551157 MTM_LOG1 ("[STATE] Node %i: Disconnect due to heartbeat timeout (%d msec)" ,
11561158 i + 1 , (int )USEC_TO_MSEC (now - Mtm -> nodes [i ].lastHeartbeat ));
11571159 MtmOnNodeDisconnect (i + 1 );
1158- MtmDisableNode (i + 1 );
11591160 allAlive = false;
11601161 }
11611162 }
@@ -1176,6 +1177,7 @@ void MtmPrecommitTransaction(char const* gid)
11761177 MTM_ELOG (WARNING , "MtmPrecommitTransaction: transaction '%s' is not found" , gid );
11771178 } else {
11781179 MtmTransState * ts = tm -> state ;
1180+ // Assert(ts != NULL);
11791181 if (ts == NULL ) {
11801182 MTM_ELOG (WARNING , "MtmPrecommitTransaction: transaction '%s' is not yet prepared, status %s" , gid , MtmTxnStatusMnem [tm -> status ]);
11811183 MtmUnlock ();
@@ -1501,6 +1503,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
15011503 if (!(ts -> status == TRANSACTION_STATUS_UNKNOWN
15021504 || (ts -> status == TRANSACTION_STATUS_IN_PROGRESS && Mtm -> status == MTM_RECOVERY )))
15031505 {
1506+ MtmUnlock ();
15041507 MTM_ELOG (ERROR , "Attempt to commit %s transaction %s (%llu)" ,
15051508 MtmTxnStatusMnem [ts -> status ], ts -> gid , (long64 )ts -> xid );
15061509 }
@@ -2026,15 +2029,19 @@ static int64 MtmGetSlotLag(int nodeId)
20262029 */
20272030bool MtmIsRecoveredNode (int nodeId )
20282031{
2029- if (BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
2030- if (!MtmIsRecoverySession ) {
2031- MTM_ELOG (ERROR , "Node %d is marked as disabled but is not in recovery mode" , nodeId );
2032- }
2033- return true;
2034- } else {
2035- MtmIsRecoverySession = false; /* recovery is completed */
2036- return false;
2037- }
2032+ if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 ))
2033+ Assert (!MtmIsRecoverySession );
2034+
2035+ return BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 ) && MtmIsRecoverySession ;
2036+ // if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2037+ // if (!MtmIsRecoverySession) {
2038+ // MTM_ELOG(WARNING, "Node %d is marked as disabled but is not in recovery mode", nodeId);
2039+ // }
2040+ // return true;
2041+ // } else {
2042+ // MtmIsRecoverySession = false; /* recovery is completed */
2043+ // return false;
2044+ // }
20382045}
20392046
20402047/*
@@ -2060,7 +2067,7 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
20602067 */
20612068 MTM_LOG1 ("Node %d is almost caught-up: slot position %llx, WAL position %llx, active transactions %d" ,
20622069 nodeId , slotLSN , walLSN , Mtm -> nActiveTransactions );
2063- BIT_SET (Mtm -> originLockNodeMask , nodeId - 1 );
2070+ BIT_SET (Mtm -> originLockNodeMask , nodeId - 1 ); // XXXX: log that
20642071 } else {
20652072 MTM_LOG2 ("Continue recovery of node %d, slot position %llx, WAL position %llx,"
20662073 " WAL sender position %llx, lockers %llx, active transactions %d" , nodeId , slotLSN ,
@@ -2082,6 +2089,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20822089 if (MtmIsRecoveredNode (nodeId ) && Mtm -> nActiveTransactions == 0 ) {
20832090 MtmStateProcessNeighborEvent (nodeId , MTM_NEIGHBOR_RECOVERY_CAUGHTUP );
20842091 caughtUp = true;
2092+ MtmIsRecoverySession = false;
20852093 }
20862094 MtmUnlock ();
20872095 return caughtUp ;
@@ -2099,6 +2107,7 @@ MtmLockCluster(void)
20992107 }
21002108 MtmLock (LW_EXCLUSIVE );
21012109 if (BIT_CHECK (Mtm -> originLockNodeMask , MtmNodeId - 1 )) {
2110+ MtmUnlock ();
21022111 elog (ERROR , "There is already pending exclusive lock" );
21032112 }
21042113 BIT_SET (Mtm -> originLockNodeMask , MtmNodeId - 1 );
@@ -2351,6 +2360,7 @@ static void MtmInitialize()
23512360 Mtm -> nLiveNodes = 0 ; //MtmNodes;
23522361 Mtm -> nAllNodes = MtmNodes ;
23532362 Mtm -> disabledNodeMask = 7 ; //XXXX
2363+ Mtm -> clique = 7 ; // XXXX
23542364 Mtm -> stalledNodeMask = 0 ;
23552365 Mtm -> stoppedNodeMask = 0 ;
23562366 Mtm -> deadNodeMask = 0 ;
@@ -2383,7 +2393,7 @@ static void MtmInitialize()
23832393 for (i = 0 ; i < MtmNodes ; i ++ ) {
23842394 Mtm -> nodes [i ].oldestSnapshot = 0 ;
23852395 Mtm -> nodes [i ].disabledNodeMask = 0 ;
2386- Mtm -> nodes [i ].connectivityMask = 7 ;
2396+ Mtm -> nodes [i ].connectivityMask = 7 ; // XXXX
23872397 Mtm -> nodes [i ].lockGraphUsed = 0 ;
23882398 Mtm -> nodes [i ].lockGraphAllocated = 0 ;
23892399 Mtm -> nodes [i ].lockGraphData = NULL ;
@@ -3308,17 +3318,56 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33083318 Mtm -> preparedTransactionsLoaded = true;
33093319 }
33103320
3311- while (BIT_CHECK (Mtm -> disabledNodeMask , MtmNodeId - 1 ) ||
3312- BIT_CHECK (SELF_CONNECTIVITY_MASK , nodeId - 1 ))
3321+ // while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1) ||
3322+ // BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1) ||
3323+ // !BIT_CHECK(Mtm->clique, nodeId - 1) ||
3324+ // !BIT_CHECK(Mtm->clique, MtmNodeId - 1) )
3325+ // {
3326+ // if (*shutdown)
3327+ // {
3328+ // MtmUnlock();
3329+ // return REPLMODE_EXIT;
3330+ // }
3331+
3332+ // if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) &&
3333+ // (!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1)))
3334+ // {
3335+ // /* Lock on us */
3336+ // Mtm->recoverySlot = nodeId;
3337+ // MtmPollStatusOfPreparedTransactions();
3338+ // MtmUnlock();
3339+ // return REPLMODE_RECOVERY;
3340+ // }
3341+
3342+ // MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3343+ // nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3344+
3345+ // MtmUnlock();
3346+ // /* delay opening of other slots until recovery is completed */
3347+ // MtmSleep(STATUS_POLL_DELAY);
3348+ // MtmLock(LW_EXCLUSIVE);
3349+ // }
3350+
3351+ // MtmUnlock();
3352+
3353+ // return REPLMODE_RECOVERED;
3354+
3355+ /* Await until node is connected and both receiver and sender are in clique */
3356+ while (BIT_CHECK (SELF_CONNECTIVITY_MASK , nodeId - 1 ) ||
3357+ !BIT_CHECK (Mtm -> clique , nodeId - 1 ) ||
3358+ !BIT_CHECK (Mtm -> clique , MtmNodeId - 1 ) )
33133359 {
3360+ MtmUnlock ();
33143361 if (* shutdown )
3315- {
3316- MtmUnlock ();
33173362 return REPLMODE_EXIT ;
3318- }
3363+ MtmSleep (STATUS_POLL_DELAY );
3364+ MtmLock (LW_EXCLUSIVE );
3365+ }
33193366
3320- if ((Mtm -> recoverySlot == 0 || Mtm -> recoverySlot == nodeId ) &&
3321- (!BIT_CHECK (SELF_CONNECTIVITY_MASK , nodeId - 1 )))
3367+ if (BIT_CHECK (Mtm -> disabledNodeMask , MtmNodeId - 1 ))
3368+ {
3369+ /* Ok, then start recovery by luckiest walreceiver */
3370+ if (Mtm -> recoverySlot == 0 || Mtm -> recoverySlot == nodeId )
33223371 {
33233372 /* Lock on us */
33243373 Mtm -> recoverySlot = nodeId ;
@@ -3327,87 +3376,19 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33273376 return REPLMODE_RECOVERY ;
33283377 }
33293378
3330- MTM_LOG1 ("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx" ,
3331- nodeId , Mtm -> recoverySlot , Mtm -> donorNodeId , SELF_CONNECTIVITY_MASK , Mtm -> disabledNodeMask );
3332-
3333- MtmUnlock ();
3334- /* delay opening of other slots until recovery is completed */
3335- MtmSleep (STATUS_POLL_DELAY );
3336- MtmLock (LW_EXCLUSIVE );
3379+ /* And force less lucky walreceivers wait until recovery is completed */
3380+ while (BIT_CHECK (Mtm -> disabledNodeMask , MtmNodeId - 1 ))
3381+ {
3382+ MtmUnlock ();
3383+ if (* shutdown )
3384+ return REPLMODE_EXIT ;
3385+ MtmSleep (STATUS_POLL_DELAY );
3386+ MtmLock (LW_EXCLUSIVE );
3387+ }
33373388 }
33383389
33393390 MtmUnlock ();
3340-
33413391 return REPLMODE_RECOVERED ;
3342-
3343-
3344-
3345-
3346-
3347- // while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_RECOVERED && Mtm->status != MTM_ONLINE)
3348- // || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3349- // // while (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3350- // {
3351- // if (*shutdown)
3352- // {
3353- // MtmUnlock();
3354- // return REPLMODE_EXIT;
3355- // }
3356- // // /* We are not interested in receiving any deteriorated logical messages from recovered node, so recreate slot */
3357- // // if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
3358- // // mode = REPLMODE_CREATE_NEW;
3359- // // }
3360- // // MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
3361-
3362- // if (Mtm->status == MTM_RECOVERY) {
3363- // mode = REPLMODE_RECOVERED;
3364- // /* Choose node for recovery if
3365- // * 1. It is not chosen yet or the same node was chosen before
3366- // * 2. It is donor node or there is no donor node
3367- // * 3. Connections with all other live nodes were established
3368- // */
3369- // if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
3370- // && (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId)
3371- // && (SELF_CONNECTIVITY_MASK & ~Mtm->disabledNodeMask) == 0)
3372- // {
3373- // /* Choose for recovery first available slot or slot of donor node (if any) */
3374- // if (Mtm->nAllNodes >= 3) {
3375- // MTM_ELOG(WARNING, "Process %d starts recovery from node %d restartLSNs={%llx, %llx, %llx}",
3376- // MyProcPid, nodeId, Mtm->nodes[0].restartLSN, Mtm->nodes[1].restartLSN, Mtm->nodes[2].restartLSN);
3377- // } else {
3378- // MTM_ELOG(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
3379- // }
3380- // Mtm->recoverySlot = nodeId;
3381- // // Mtm->nReceivers = 0;
3382- // // Mtm->nSenders = 0;
3383- // // Mtm->recoveryCount += 1;
3384- // // Mtm->pglogicalReceiverMask = 0;
3385- // // Mtm->pglogicalSenderMask = 0;
3386- // MtmPollStatusOfPreparedTransactions();
3387- // MtmUnlock();
3388- // return REPLMODE_RECOVERY;
3389- // }
3390- // }
3391- // MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3392- // nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3393- // MtmUnlock();
3394- // /* delay opening of other slots until recovery is completed */
3395- // MtmSleep(STATUS_POLL_DELAY);
3396- // MtmLock(LW_EXCLUSIVE);
3397- // }
3398- // if (Mtm->status == MTM_RECOVERED) {
3399- // mode = REPLMODE_RECOVERED;
3400- // }
3401- // // if (mode == REPLMODE_RECOVERED) {
3402- // // MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
3403- // // } else if (mode == REPLMODE_CREATE_NEW) {
3404- // // MTM_LOG1("%d: Start replication from recovered node %d", MyProcPid, nodeId);
3405- // // } else {
3406- // // MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
3407- // // }
3408- // BIT_SET(Mtm->reconnectMask, nodeId-1); /* arbiter should try to reestablish connection with this node */
3409- // MtmUnlock();
3410- // return mode;
34113392}
34123393
34133394static bool MtmIsBroadcast ()
0 commit comments