1616
1717#include "access/global_snapshot.h"
1818#include "access/htup_details.h"
19- #include "catalog/pg_user_mapping.h"
20- #include "access/xact.h"
2119#include "access/transam.h"
20+ #include "access/twophase.h"
21+ #include "access/xact.h"
2222#include "access/xlog.h" /* GetSystemIdentifier() */
23+ #include "catalog/pg_user_mapping.h"
2324#include "libpq-int.h"
2425#include "mb/pg_wchar.h"
2526#include "miscadmin.h"
@@ -81,7 +82,7 @@ static HTAB *ConnectionHash = NULL;
8182 */
8283typedef struct FdwTransactionState
8384{
84- char * gid ;
85+ char gid [ GIDSIZE ] ;
8586 int nparticipants ;
8687 GlobalCSN global_csn ;
8788 bool two_phase_commit ;
@@ -839,7 +840,84 @@ pgfdw_xact_callback(XactEvent event, void *arg)
839840 if (!xact_got_connection )
840841 return ;
841842
842- /* Handle possible two-phase commit */
843+ /*
844+ * Hack for shardman loader: it allows to do 2PC on user-issued
845+ * prepare. In this case we won't be able to commit xacts because we we
846+ * don't record participants info anywhere; this must be done by loader or
847+ * human behind it.
848+ */
849+ if (event == XACT_EVENT_PRE_PREPARE &&
850+ UseGlobalSnapshots &&
851+ strncmp ("pgfdw:" , GetPrepareGid (), strlen ("pgfdw:" )) == 0 &&
852+ strstr (GetPrepareGid (), "shmnloader" ) != 0 )
853+ {
854+ /*
855+ * Remember gid. We will PREPARE on other nodes and finish global
856+ * snaps on XACT_EVENT_POST_PREPARE.
857+ */
858+ strncpy (fdwTransState -> gid , GetPrepareGid (), GIDSIZE );
859+ /*
860+ * xact_depth and fdwTransState will be cleaned up on
861+ * XACT_EVENT_POST_PREPARE.
862+ */
863+ elog (WARNING , "pre prepare gid %s" , fdwTransState -> gid );
864+ return ;
865+ }
866+ if (event == XACT_EVENT_PREPARE && fdwTransState -> gid [0 ] != '\0' )
867+ return ; /* prevent cleanup */
868+ if (event == XACT_EVENT_POST_PREPARE )
869+ {
870+ GlobalCSN max_csn = InProgressGlobalCSN ;
871+ GlobalCSN my_csn = InProgressGlobalCSN ;
872+ bool res ;
873+ char * sql ;
874+ elog (WARNING , "fdw post prepare" );
875+
876+ if (fdwTransState -> gid [0 ] == '\0' )
877+ {
878+ /*
879+ * Nothing to do here; since this cb is not present in vanilla,
880+ * exit to avoid harming state machine
881+ */
882+ return ;
883+ }
884+ sql = psprintf ("PREPARE TRANSACTION '%s'" , fdwTransState -> gid );
885+ res = BroadcastCmd (sql );
886+ if (!res )
887+ goto error ;
888+
889+ /* Broadcast pg_global_snapshot_prepare() */
890+ my_csn = GlobalSnapshotPrepareTwophase (fdwTransState -> gid );
891+
892+ sql = psprintf ("SELECT pg_global_snapshot_prepare('%s')" ,
893+ fdwTransState -> gid );
894+ res = BroadcastStmt (sql , PGRES_TUPLES_OK , MaxCsnCB , & max_csn );
895+ if (!res )
896+ goto error ;
897+
898+ /* select maximal global csn */
899+ if (my_csn > max_csn )
900+ max_csn = my_csn ;
901+
902+ /* Broadcast pg_global_snapshot_assign() */
903+ GlobalSnapshotAssignCsnTwoPhase (fdwTransState -> gid , max_csn );
904+ sql = psprintf ("SELECT pg_global_snapshot_assign('%s'," UINT64_FORMAT ")" ,
905+ fdwTransState -> gid , max_csn );
906+ res = BroadcastFunc (sql );
907+
908+ error :
909+ elog (WARNING , "post prepare gid %s, res %d" , fdwTransState -> gid , res );
910+ if (!res )
911+ {
912+ sql = psprintf ("ABORT PREPARED '%s'" , fdwTransState -> gid );
913+ BroadcastCmd (sql );
914+ elog (ERROR , "failed to PREPARE transaction on remote node, ABORT PREPARED this xact" );
915+ }
916+ }
917+
918+ /*
919+ * Handle possible two-phase commit.
920+ */
843921 if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT )
844922 {
845923 bool include_local_tx = false;
@@ -862,29 +940,31 @@ pgfdw_xact_callback(XactEvent event, void *arg)
862940 bool res ;
863941 char * sql ;
864942
865- fdwTransState -> gid = psprintf ("pgfdw:%lld:%llu:%d:%u:%d:%d" ,
866- (long long ) GetCurrentTimestamp (),
867- (long long ) GetSystemIdentifier (),
868- MyProcPid ,
869- GetCurrentTransactionIdIfAny (),
870- ++ two_phase_xact_count ,
871- fdwTransState -> nparticipants );
943+ snprintf (fdwTransState -> gid ,
944+ GIDSIZE ,
945+ "pgfdw:%lld:%llu:%d:%u:%d:%d" ,
946+ (long long ) GetCurrentTimestamp (),
947+ (long long ) GetSystemIdentifier (),
948+ MyProcPid ,
949+ GetCurrentTransactionIdIfAny (),
950+ ++ two_phase_xact_count ,
951+ fdwTransState -> nparticipants );
872952
873953 /* Broadcast PREPARE */
874954 sql = psprintf ("PREPARE TRANSACTION '%s'" , fdwTransState -> gid );
875955 res = BroadcastCmd (sql );
876956 if (!res )
877- goto error ;
957+ goto error_user2pc ;
878958
879959 /* Broadcast pg_global_snapshot_prepare() */
880960 if (include_local_tx )
881961 my_csn = GlobalSnapshotPrepareCurrent ();
882962
883963 sql = psprintf ("SELECT pg_global_snapshot_prepare('%s')" ,
884- fdwTransState -> gid );
964+ fdwTransState -> gid );
885965 res = BroadcastStmt (sql , PGRES_TUPLES_OK , MaxCsnCB , & max_csn );
886966 if (!res )
887- goto error ;
967+ goto error_user2pc ;
888968
889969 /* select maximal global csn */
890970 if (include_local_tx && my_csn > max_csn )
@@ -894,10 +974,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
894974 if (include_local_tx )
895975 GlobalSnapshotAssignCsnCurrent (max_csn );
896976 sql = psprintf ("SELECT pg_global_snapshot_assign('%s'," UINT64_FORMAT ")" ,
897- fdwTransState -> gid , max_csn );
977+ fdwTransState -> gid , max_csn );
898978 res = BroadcastFunc (sql );
899979
900- error :
980+ error_user2pc :
901981 if (!res )
902982 {
903983 sql = psprintf ("ABORT PREPARED '%s'" , fdwTransState -> gid );
@@ -959,6 +1039,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
9591039 break ;
9601040 case XACT_EVENT_PRE_PREPARE :
9611041
1042+ if (fdwTransState -> gid [0 ] != '\0' )
1043+ /* See comments above */
1044+ break ;
1045+
9621046 /*
9631047 * We disallow remote transactions that modified anything,
9641048 * since it's not very reasonable to hold them open until
@@ -980,6 +1064,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
9801064 elog (ERROR , "missed cleaning up connection during pre-commit" );
9811065 break ;
9821066 case XACT_EVENT_PREPARE :
1067+ if (fdwTransState -> gid [0 ] != '\0' )
1068+ break ;
1069+
9831070 /* Pre-commit should have closed the open transaction */
9841071 elog (ERROR , "missed cleaning up connection during pre-commit" );
9851072 break ;
@@ -1046,6 +1133,14 @@ pgfdw_xact_callback(XactEvent event, void *arg)
10461133 /* Disarm changing_xact_state if it all worked. */
10471134 entry -> changing_xact_state = abort_cleanup_failure ;
10481135 break ;
1136+ case XACT_EVENT_POST_PREPARE :
1137+ /*
1138+ * New event can break our state machine, so let's list
1139+ * them here explicitely and force compiler warning in
1140+ * case of unhandled event.
1141+ */
1142+ break ;
1143+
10491144 }
10501145 }
10511146
0 commit comments