2323#define MTM_ERRMSG (fmt ,...) errmsg(MTM_TAG fmt, ## __VA_ARGS__)
2424
2525#if DEBUG_LEVEL == 0
26- #define MTM_LOG1 (fmt , ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27- #define MTM_LOG2 (fmt , ...)
28- #define MTM_LOG3(fmt, ...)
29- #define MTM_LOG4 (fmt , ...)
26+ #define MTM_LOG1 (fmt , ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27+ #define MTM_LOG2 (fmt , ...)
28+ #define MTM_LOG3 (fmt , ...)
29+ #define MTM_LOG4 (fmt , ...)
3030#elif DEBUG_LEVEL == 1
31- #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32- #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33- #define MTM_LOG3 (fmt , ...)
34- #define MTM_LOG4(fmt, ...)
31+ #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32+ #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33+ #define MTM_LOG3 (fmt , ...)
34+ #define MTM_LOG4 (fmt , ...)
3535#elif DEBUG_LEVEL == 2
36- #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37- #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38- #define MTM_LOG3 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39- #define MTM_LOG4 (fmt , ...)
36+ #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37+ #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38+ #define MTM_LOG3 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39+ #define MTM_LOG4 (fmt , ...)
4040#elif DEBUG_LEVEL >= 3
41- #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42- #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43- #define MTM_LOG3 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44- #define MTM_LOG4 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
41+ #define MTM_LOG1 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42+ #define MTM_LOG2 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43+ #define MTM_LOG3 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44+ #define MTM_LOG4 (fmt , ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4545#endif
4646
4747#if MTM_TRACE == 0
@@ -98,7 +98,7 @@ typedef char pgid_t[MULTIMASTER_MAX_GID_SIZE];
9898#define SELF_CONNECTIVITY_MASK (Mtm->nodes[MtmNodeId-1].connectivityMask)
9999
100100typedef enum
101- {
101+ {
102102 PGLOGICAL_COMMIT ,
103103 PGLOGICAL_PREPARE ,
104104 PGLOGICAL_COMMIT_PREPARED ,
@@ -107,7 +107,7 @@ typedef enum
107107} PGLOGICAL_EVENT ;
108108
109109/* Identifier of global transaction */
110- typedef struct
110+ typedef struct
111111{
112112 int node ; /* Zero based index of node initiating transaction */
113113 TransactionId xid ; /* Transaction ID at this node */
@@ -116,7 +116,7 @@ typedef struct
116116#define EQUAL_GTID (x ,y ) ((x).node == (y).node && (x).xid == (y).xid)
117117
118118typedef enum
119- {
119+ {
120120 MSG_INVALID ,
121121 MSG_HANDSHAKE ,
122122 MSG_PREPARED ,
@@ -153,12 +153,12 @@ typedef enum
153153typedef struct
154154{
155155 MtmMessageCode code ; /* Message code: MSG_PREPARE, MSG_PRECOMMIT, MSG_COMMIT, MSG_ABORT,... */
156- int node ; /* Sender node ID */
156+ int node ; /* Sender node ID */
157157 bool lockReq ;/* Whether sender node needs to lock cluster to let wal-sender caught-up and complete recovery */
158158 bool locked ; /* Whether sender node is locked */
159159 TransactionId dxid ; /* Transaction ID at destination node */
160- TransactionId sxid ; /* Transaction ID at sender node */
161- XidStatus status ; /* Transaction status */
160+ TransactionId sxid ; /* Transaction ID at sender node */
161+ XidStatus status ; /* Transaction status */
162162 csn_t csn ; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
163163 csn_t oldestSnapshot ; /* Oldest snapshot used by active transactions at this node */
164164 nodemask_t disabledNodeMask ; /* Bitmask of disabled nodes at the sender of message */
@@ -185,13 +185,13 @@ typedef struct MtmMessageQueue
185185 struct MtmMessageQueue * next ;
186186} MtmMessageQueue ;
187187
188- typedef struct
188+ typedef struct
189189{
190190 MtmArbiterMessage hdr ;
191191 char connStr [MULTIMASTER_MAX_CONN_STR_SIZE ];
192192} MtmHandshakeMessage ;
193193
194- typedef struct
194+ typedef struct
195195{
196196 int used ;
197197 int size ;
@@ -227,7 +227,7 @@ typedef struct
227227 int senderPid ;
228228 int receiverPid ;
229229 lsn_t flushPos ;
230- csn_t oldestSnapshot ; /* Oldest snapshot used by active transactions at this node */
230+ csn_t oldestSnapshot ; /* Oldest snapshot used by active transactions at this node */
231231 lsn_t restartLSN ;
232232 RepOriginId originId ;
233233 int timeline ;
@@ -246,12 +246,12 @@ typedef struct MtmL2List
246246typedef struct MtmTransState
247247{
248248 TransactionId xid ;
249- XidStatus status ;
249+ XidStatus status ;
250250 pgid_t gid ; /* Global transaction ID (used for 2PC) */
251251 GlobalTransactionId gtid ; /* Transaction id at coordinator */
252252 csn_t csn ; /* commit serial number */
253253 csn_t snapshot ; /* transaction snapshot, or INVALID_CSN for local transactions */
254- int procno ; /* pgprocno of transaction coordinator waiting for responses from replicas,
254+ int procno ; /* pgprocno of transaction coordinator waiting for responses from replicas,
255255 used to notify coordinator by arbiter */
256256 int nSubxids ; /* Number of subtransanctions */
257257 struct MtmTransState * next ; /* Next element in L1 list of all finished transaction present in xid2state hash */
@@ -293,7 +293,7 @@ typedef struct
293293 nodemask_t pglogicalSenderMask ; /* bitmask of started pglogic senders */
294294 nodemask_t currentLockNodeMask ; /* Mask of nodes IDs which are locking the cluster */
295295 nodemask_t inducedLockNodeMask ; /* Mask of node IDs which requested cluster-wide lock */
296- nodemask_t originLockNodeMask ; /* Mask of node IDs which WAL-senders are locking the cluster.
296+ nodemask_t originLockNodeMask ; /* Mask of node IDs which WAL-senders are locking the cluster.
297297 * MtmNodeId bit is used by recovered node to complete recovery and by MtmLockCluster method */
298298 nodemask_t reconnectMask ; /* Mask of nodes connection to which has to be reestablished by sender */
299299 int lastLockHolder ; /* PID of process last obtaining the node lock */
@@ -319,13 +319,13 @@ typedef struct
319319 MtmTransState * * transListTail ; /* Tail of L1 list of all finished transactions, used to append new elements.
320320 This list is expected to be in CSN ascending order, by strict order may be violated */
321321 MtmL2List activeTransList ; /* List of active transactions */
322- ulong64 transCount ; /* Counter of transactions performed by this node */
322+ ulong64 transCount ; /* Counter of transactions performed by this node */
323323 ulong64 gcCount ; /* Number of global transactions performed since last GC */
324324 MtmMessageQueue * sendQueue ; /* Messages to be sent by arbiter sender */
325325 MtmMessageQueue * freeQueue ; /* Free messages */
326326 lsn_t recoveredLSN ; /* LSN at the moment of recovery completion */
327327 BgwPool pool ; /* Pool of background workers for applying logical replication patches */
328- MtmNodeInfo nodes [1 ]; /* [Mtm->nAllNodes]: per-node data */
328+ MtmNodeInfo nodes [1 ]; /* [Mtm->nAllNodes]: per-node data */
329329} MtmState ;
330330
331331typedef struct MtmFlushPosition
@@ -342,7 +342,7 @@ typedef struct MtmSeqPosition
342342 Oid seqid ;
343343 int64 next ;
344344} MtmSeqPosition ;
345-
345+
346346#define MtmIsCoordinator (ts ) (ts->gtid.node == MtmNodeId)
347347
348348extern char const * const MtmNodeStatusMnem [];
@@ -394,14 +394,15 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
394394extern void MtmLock (LWLockMode mode );
395395extern void MtmUnlock (void );
396396extern void MtmLockNode (int nodeId , LWLockMode mode );
397+ extern bool MtmTryLockNode (int nodeId , LWLockMode mode );
397398extern void MtmUnlockNode (int nodeId );
398399extern void MtmStopNode (int nodeId , bool dropSlot );
399400extern void MtmReconnectNode (int nodeId );
400401extern void MtmRecoverNode (int nodeId );
401402extern void MtmOnNodeDisconnect (int nodeId );
402403extern void MtmOnNodeConnect (int nodeId );
403404extern void MtmWakeUpBackend (MtmTransState * ts );
404- extern void MtmSleep (timestamp_t interval );
405+ extern void MtmSleep (timestamp_t interval );
405406extern void MtmAbortTransaction (MtmTransState * ts );
406407extern void MtmSetCurrentTransactionGID (char const * gid );
407408extern csn_t MtmGetTransactionCSN (TransactionId xid );
0 commit comments