@@ -287,8 +287,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
287287 {
288288 xl_xact_parsed_prepare parsed ;
289289
290- /* check that output plugin is capable of twophase decoding */
291- if (!ctx -> options .enable_twophase )
290+ /*
291+ * Check that output plugin is capable of twophase decoding.
292+ * We also don't offer to do 2PC if snap is not yet consistent
293+ * as of reading PREPARE.
294+ */
295+ if (!ctx -> options .enable_twophase ||
296+ SnapBuildCurrentState (builder ) < SNAPBUILD_CONSISTENT )
292297 {
293298 ReorderBufferProcessXid (reorder , XLogRecGetXid (r ), buf -> origptr );
294299 break ;
@@ -584,13 +589,23 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
584589 TimestampTz commit_time = parsed -> xact_time ;
585590 RepOriginId origin_id = XLogRecGetOrigin (buf -> record );
586591 int i ;
592+ bool reorderbuffer_has_xid ;
587593
588594 if (parsed -> xinfo & XACT_XINFO_HAS_ORIGIN )
589595 {
590596 origin_lsn = parsed -> origin_lsn ;
591597 commit_time = parsed -> origin_timestamp ;
592598 }
593599
600+ /*
601+ * If this is COMMIT PREPARED and ReorderBuffer doesn't have this xid,
602+ * either the plugin refused to do 2PC on this xact or we didn't have
603+ * consistent snapshot yet during PREPARE processing. Anyway, in this case
604+ * we don't do 2PC and replay xact fully now. We must check this early
605+ * since invalidation addition below might add the record to the RB.
606+ */
607+ reorderbuffer_has_xid = ReorderBufferHasXid (ctx -> reorder , xid );
608+
594609 /*
595610 * Process invalidation messages, even if we're not interested in the
596611 * transaction's contents, since the various caches need to always be
@@ -663,10 +678,14 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
663678 * For output plugins that do not support PREPARE-time decoding of
664679 * two-phase transactions, we never even see the PREPARE and all two-phase
665680 * transactions simply fall through to the second branch.
681+ *
682+ * We rely on existence of xid in reorderbuffer to determine was 2PC done
683+ * or not. This is correct because we always see PREPARE before COMMIT
684+ * PREPARED if the latter was after consistent point.
685+ *
666686 */
667687 if (TransactionIdIsValid (parsed -> twophase_xid ) &&
668- ReorderBufferTxnIsPrepared (ctx -> reorder ,
669- parsed -> twophase_xid , parsed -> twophase_gid ))
688+ !reorderbuffer_has_xid )
670689 {
671690 Assert (xid == parsed -> twophase_xid );
672691 /* we are processing COMMIT PREPARED */
@@ -765,7 +784,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
765784 !SnapBuildXactNeedsSkip (ctx -> snapshot_builder , buf -> origptr ) &&
766785 parsed -> dbId == ctx -> slot -> data .database &&
767786 !FilterByOrigin (ctx , origin_id ) &&
768- ReorderBufferTxnIsPrepared (ctx -> reorder , xid , parsed -> twophase_gid ))
787+ ! ReorderBufferHasXid (ctx -> reorder , xid ))
769788 {
770789 ReorderBufferFinishPrepared (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
771790 commit_time , origin_id , origin_lsn ,
0 commit comments