3434#include "access/xlogutils.h"
3535#include "access/xlogreader.h"
3636#include "access/xlogrecord.h"
37+ #include "access/twophase.h"
3738
3839#include "catalog/pg_control.h"
3940
@@ -73,6 +74,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
7374 xl_xact_parsed_commit * parsed , TransactionId xid );
7475static void DecodeAbort (LogicalDecodingContext * ctx , XLogRecordBuffer * buf ,
7576 xl_xact_parsed_abort * parsed , TransactionId xid );
77+ static void DecodePrepare (LogicalDecodingContext * ctx , XLogRecordBuffer * buf ,
78+ xl_xact_parsed_prepare * parsed );
7679
7780/* common function to decode tuples */
7881static void DecodeXLogTuple (char * data , Size len , ReorderBufferTupleBuf * tup );
@@ -281,16 +284,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
281284 break ;
282285 }
283286 case XLOG_XACT_PREPARE :
287+ {
288+ xl_xact_parsed_prepare parsed ;
284289
285- /*
286- * Currently decoding ignores PREPARE TRANSACTION and will just
287- * decode the transaction when the COMMIT PREPARED is sent or
288- * throw away the transaction's contents when a ROLLBACK PREPARED
289- * is received. In the future we could add code to expose prepared
290- * transactions in the changestream allowing for a kind of
291- * distributed 2PC.
292- */
293- ReorderBufferProcessXid (reorder , XLogRecGetXid (r ), buf -> origptr );
290+ /* check that output plugin is capable of twophase decoding */
291+ if (!ctx -> options .enable_twophase )
292+ {
293+ ReorderBufferProcessXid (reorder , XLogRecGetXid (r ), buf -> origptr );
294+ break ;
295+ }
296+
297+ /* ok, parse it */
298+ ParsePrepareRecord (XLogRecGetInfo (buf -> record ),
299+ XLogRecGetData (buf -> record ), & parsed );
300+
301+ /* does output plugin want this particular transaction? */
302+ if (ctx -> callbacks .filter_prepare_cb &&
303+ ReorderBufferPrepareNeedSkip (reorder , parsed .twophase_xid ,
304+ parsed .twophase_gid ))
305+ {
306+ ReorderBufferProcessXid (reorder , parsed .twophase_xid ,
307+ buf -> origptr );
308+ break ;
309+ }
310+
311+ DecodePrepare (ctx , buf , & parsed );
312+ break ;
313+ }
294314 break ;
295315 default :
296316 elog (ERROR , "unexpected RM_XACT_ID record type: %u" , info );
@@ -633,9 +653,90 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
633653 buf -> origptr , buf -> endptr );
634654 }
635655
656+ /*
657+ * Decide if we're processing COMMIT PREPARED, or a regular COMMIT.
658+ * Regular commit simply triggers a replay of transaction changes from the
659+ * reorder buffer. For COMMIT PREPARED that however already happened at
660+ * PREPARE time, and so we only need to notify the subscriber that the GID
661+ * finally committed.
662+ *
663+ * For output plugins that do not support PREPARE-time decoding of
664+ * two-phase transactions, we never even see the PREPARE and all two-phase
665+ * transactions simply fall through to the second branch.
666+ */
667+ if (TransactionIdIsValid (parsed -> twophase_xid ) &&
668+ ReorderBufferTxnIsPrepared (ctx -> reorder ,
669+ parsed -> twophase_xid , parsed -> twophase_gid ))
670+ {
671+ Assert (xid == parsed -> twophase_xid );
672+ /* we are processing COMMIT PREPARED */
673+ ReorderBufferFinishPrepared (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
674+ commit_time , origin_id , origin_lsn ,
675+ parsed -> twophase_gid , true);
676+ }
677+ else
678+ {
679+ /* replay actions of all transaction + subtransactions in order */
680+ ReorderBufferCommit (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
681+ commit_time , origin_id , origin_lsn );
682+ }
683+ }
684+
685+ /*
686+ * Decode PREPARE record. Similar logic as in COMMIT
687+ */
688+ static void
689+ DecodePrepare (LogicalDecodingContext * ctx , XLogRecordBuffer * buf ,
690+ xl_xact_parsed_prepare * parsed )
691+ {
692+ XLogRecPtr origin_lsn = parsed -> origin_lsn ;
693+ TimestampTz commit_time = parsed -> origin_timestamp ;
694+ XLogRecPtr origin_id = XLogRecGetOrigin (buf -> record );
695+ int i ;
696+ TransactionId xid = parsed -> twophase_xid ;
697+
698+ /*
699+ * Process invalidation messages, even if we're not interested in the
700+ * transaction's contents, since the various caches need to always be
701+ * consistent.
702+ */
703+ if (parsed -> nmsgs > 0 )
704+ {
705+ if (!ctx -> fast_forward )
706+ ReorderBufferAddInvalidations (ctx -> reorder , xid , buf -> origptr ,
707+ parsed -> nmsgs , parsed -> msgs );
708+ ReorderBufferXidSetCatalogChanges (ctx -> reorder , xid , buf -> origptr );
709+ }
710+
711+ /*
712+ * Tell the reorderbuffer about the surviving subtransactions. We need to
713+ * do this because the main transaction itself has not committed since we
714+ * are in the prepare phase right now. So we need to be sure the snapshot
715+ * is setup correctly for the main transaction in case all changes
716+ * happened in subtransanctions
717+ */
718+ for (i = 0 ; i < parsed -> nsubxacts ; i ++ )
719+ {
720+ ReorderBufferCommitChild (ctx -> reorder , xid , parsed -> subxacts [i ],
721+ buf -> origptr , buf -> endptr );
722+ }
723+
724+ if (SnapBuildXactNeedsSkip (ctx -> snapshot_builder , buf -> origptr ) ||
725+ (parsed -> dbId != InvalidOid && parsed -> dbId != ctx -> slot -> data .database ) ||
726+ ctx -> fast_forward || FilterByOrigin (ctx , origin_id ))
727+ {
728+ for (i = 0 ; i < parsed -> nsubxacts ; i ++ )
729+ {
730+ ReorderBufferForget (ctx -> reorder , parsed -> subxacts [i ], buf -> origptr );
731+ }
732+ ReorderBufferForget (ctx -> reorder , xid , buf -> origptr );
733+
734+ return ;
735+ }
736+
636737 /* replay actions of all transaction + subtransactions in order */
637- ReorderBufferCommit (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
638- commit_time , origin_id , origin_lsn );
738+ ReorderBufferPrepare (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
739+ commit_time , origin_id , origin_lsn , parsed -> twophase_gid );
639740}
640741
641742/*
@@ -647,6 +748,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
647748 xl_xact_parsed_abort * parsed , TransactionId xid )
648749{
649750 int i ;
751+ XLogRecPtr origin_lsn = InvalidXLogRecPtr ;
752+ TimestampTz commit_time = 0 ;
753+ XLogRecPtr origin_id = XLogRecGetOrigin (buf -> record );
754+
755+ if (parsed -> xinfo & XACT_XINFO_HAS_ORIGIN )
756+ {
757+ origin_lsn = parsed -> origin_lsn ;
758+ commit_time = parsed -> origin_timestamp ;
759+ }
760+
761+ /*
762+ * If it's ROLLBACK PREPARED then handle it via callbacks.
763+ */
764+ if (TransactionIdIsValid (xid ) &&
765+ !SnapBuildXactNeedsSkip (ctx -> snapshot_builder , buf -> origptr ) &&
766+ parsed -> dbId == ctx -> slot -> data .database &&
767+ !FilterByOrigin (ctx , origin_id ) &&
768+ ReorderBufferTxnIsPrepared (ctx -> reorder , xid , parsed -> twophase_gid ))
769+ {
770+ ReorderBufferFinishPrepared (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
771+ commit_time , origin_id , origin_lsn ,
772+ parsed -> twophase_gid , false);
773+ return ;
774+ }
650775
651776 for (i = 0 ; i < parsed -> nsubxacts ; i ++ )
652777 {
0 commit comments