1212 */
1313#include "postgres.h"
1414
15+ #include "access/genam.h"
16+ #include "access/heapam.h"
1517#include "catalog/pg_publication.h"
1618
19+ #include "nodes/makefuncs.h"
20+
1721#include "replication/logical.h"
1822#include "replication/logicalproto.h"
1923#include "replication/origin.h"
2024#include "replication/pgoutput.h"
2125
26+ #include "utils/guc.h"
2227#include "utils/inval.h"
2328#include "utils/int8.h"
2429#include "utils/memutils.h"
2732
2833PG_MODULE_MAGIC ;
2934
35+ extern void _PG_init (void );
36+
3037extern void _PG_output_plugin_init (OutputPluginCallbacks * cb );
3138
3239static void pgoutput_startup (LogicalDecodingContext * ctx ,
@@ -62,6 +69,7 @@ static bool publications_valid;
6269static List * LoadPublications (List * pubnames );
6370static void publication_invalidation_cb (Datum arg , int cacheid ,
6471 uint32 hashvalue );
72+ static char * append_shardman_node_id (const char * gid );
6573
6674/* Entry in the map used to remember which relation schemas we sent. */
6775typedef struct RelationSyncEntry
@@ -75,12 +83,29 @@ typedef struct RelationSyncEntry
7583/* Map used to remember which relation schemas we sent. */
7684static HTAB * RelationSyncCache = NULL ;
7785
86+ /* GUC just for tests */
87+ static bool use_twophase ;
88+
7889static void init_rel_sync_cache (MemoryContext decoding_context );
7990static RelationSyncEntry * get_rel_sync_entry (PGOutputData * data , Oid relid );
8091static void rel_sync_cache_relation_cb (Datum arg , Oid relid );
8192static void rel_sync_cache_publication_cb (Datum arg , int cacheid ,
8293 uint32 hashvalue );
8394
95+ void
96+ _PG_init (void )
97+ {
98+ DefineCustomBoolVariable (
99+ "pgoutput.use_twophase" ,
100+ "Toggle 2PC" ,
101+ NULL ,
102+ & use_twophase ,
103+ false,
104+ PGC_SUSET ,
105+ 0 ,
106+ NULL , NULL , NULL );
107+ }
108+
84109/*
85110 * Specify output plugin callbacks
86111 */
@@ -337,10 +362,17 @@ static void
337362pgoutput_prepare_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
338363 XLogRecPtr prepare_lsn )
339364{
365+ char * gid = txn -> gid ;
366+
340367 OutputPluginUpdateProgress (ctx );
341368
342369 OutputPluginPrepareWrite (ctx , true);
343- logicalrep_write_prepare (ctx -> out , txn , prepare_lsn );
370+ /* Append :sysid to gid to avoid collision */
371+ if (strstr (gid , "pgfdw:" ) != NULL )
372+ gid = psprintf ("%s:%lx" , txn -> gid , GetSystemIdentifier ());
373+ logicalrep_write_prepare (ctx -> out , txn , prepare_lsn , gid );
374+ if (strstr (gid , "pgfdw:" ) != NULL )
375+ pfree (gid );
344376 OutputPluginWrite (ctx , true);
345377}
346378
@@ -351,23 +383,38 @@ static void
351383pgoutput_commit_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
352384 XLogRecPtr prepare_lsn )
353385{
386+ char * gid = txn -> gid ;
387+
354388 OutputPluginUpdateProgress (ctx );
355389
356390 OutputPluginPrepareWrite (ctx , true);
357- logicalrep_write_prepare (ctx -> out , txn , prepare_lsn );
391+ /* Append :sysid to gid to avoid collision */
392+ if (strstr (gid , "pgfdw:" ) != NULL )
393+ gid = psprintf ("%s:%lx" , txn -> gid , GetSystemIdentifier ());
394+ logicalrep_write_prepare (ctx -> out , txn , prepare_lsn , gid );
395+ if (strstr (gid , "pgfdw:" ) != NULL )
396+ pfree (gid );
358397 OutputPluginWrite (ctx , true);
359398}
399+
360400/*
361401 * PREPARE callback
362402 */
363403static void
364404pgoutput_abort_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
365405 XLogRecPtr prepare_lsn )
366406{
407+ char * gid = txn -> gid ;
408+
367409 OutputPluginUpdateProgress (ctx );
368410
369411 OutputPluginPrepareWrite (ctx , true);
370- logicalrep_write_prepare (ctx -> out , txn , prepare_lsn );
412+ /* Append :sysid to gid to avoid collision */
413+ if (strstr (gid , "pgfdw:" ) != NULL )
414+ gid = psprintf ("%s:%lx" , txn -> gid , GetSystemIdentifier ());
415+ logicalrep_write_prepare (ctx -> out , txn , prepare_lsn , gid );
416+ if (strstr (gid , "pgfdw:" ) != NULL )
417+ pfree (gid );
371418 OutputPluginWrite (ctx , true);
372419}
373420
@@ -502,13 +549,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
502549/*
503550 * Filter out unnecessary two-phase transactions.
504551 *
505- * Currently, we forward all two-phase transactions
552+ * Make 2PC on shardman's xacts.
506553 */
507554static bool
508555pgoutput_filter_prepare (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
509- TransactionId xid , const char * gid )
556+ TransactionId xid , const char * gid )
510557{
511- return false;
558+ if (strstr (gid , "pgfdw:" ) != NULL ) /* shardman */
559+ {
560+ return false;
561+ }
562+ return !use_twophase ;
512563}
513564
514565/*
0 commit comments