4444#include "storage/lmgr.h"
4545
4646#include "utils/builtins.h"
47+ #include "utils/guc.h"
4748#include "utils/lsyscache.h"
4849#include "utils/memutils.h"
4950#include "utils/syscache.h"
@@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
6061static void
6162parse_subscription_options (List * options , bool * connect , bool * enabled_given ,
6263 bool * enabled , bool * create_slot , char * * slot_name ,
63- bool * copy_data )
64+ bool * copy_data , char * * synchronous_commit )
6465{
6566 ListCell * lc ;
6667 bool connect_given = false;
@@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
8081 * slot_name = NULL ;
8182 if (copy_data )
8283 * copy_data = true;
84+ if (synchronous_commit )
85+ * synchronous_commit = NULL ;
8386
8487 /* Parse options */
8588 foreach (lc , options )
@@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
165168 copy_data_given = true;
166169 * copy_data = !defGetBoolean (defel );
167170 }
171+ else if (strcmp (defel -> defname , "synchronous_commit" ) == 0 &&
172+ synchronous_commit )
173+ {
174+ if (* synchronous_commit )
175+ ereport (ERROR ,
176+ (errcode (ERRCODE_SYNTAX_ERROR ),
177+ errmsg ("conflicting or redundant options" )));
178+
179+ * synchronous_commit = defGetString (defel );
180+
181+ /* Test if the given value is valid for synchronous_commit GUC. */
182+ (void ) set_config_option ("synchronous_commit" , * synchronous_commit ,
183+ PGC_BACKEND , PGC_S_TEST , GUC_ACTION_SET ,
184+ false, 0 , false);
185+ }
168186 else
169187 elog (ERROR , "unrecognized option: %s" , defel -> defname );
170188 }
@@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
269287 bool enabled_given ;
270288 bool enabled ;
271289 bool copy_data ;
290+ char * synchronous_commit ;
272291 char * conninfo ;
273292 char * slotname ;
274293 char originname [NAMEDATALEN ];
@@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
280299 * Connection and publication should not be specified here.
281300 */
282301 parse_subscription_options (stmt -> options , & connect , & enabled_given ,
283- & enabled , & create_slot , & slotname , & copy_data );
302+ & enabled , & create_slot , & slotname , & copy_data ,
303+ & synchronous_commit );
284304
285305 /*
286306 * Since creating a replication slot is not transactional, rolling back
@@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
311331
312332 if (slotname == NULL )
313333 slotname = stmt -> subname ;
334+ /* The default for synchronous_commit of subscriptions is off. */
335+ if (synchronous_commit == NULL )
336+ synchronous_commit = "off" ;
314337
315338 conninfo = stmt -> conninfo ;
316339 publications = stmt -> publication ;
@@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
334357 CStringGetTextDatum (conninfo );
335358 values [Anum_pg_subscription_subslotname - 1 ] =
336359 DirectFunctionCall1 (namein , CStringGetDatum (slotname ));
360+ values [Anum_pg_subscription_subsynccommit - 1 ] =
361+ CStringGetTextDatum (synchronous_commit );
337362 values [Anum_pg_subscription_subpublications - 1 ] =
338363 publicationListToArray (publications );
339364
@@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
582607 case ALTER_SUBSCRIPTION_OPTIONS :
583608 {
584609 char * slot_name ;
610+ char * synchronous_commit ;
585611
586612 parse_subscription_options (stmt -> options , NULL , NULL , NULL ,
587- NULL , & slot_name , NULL );
613+ NULL , & slot_name , NULL ,
614+ & synchronous_commit );
588615
589- values [Anum_pg_subscription_subslotname - 1 ] =
590- DirectFunctionCall1 (namein , CStringGetDatum (slot_name ));
591- replaces [Anum_pg_subscription_subslotname - 1 ] = true;
616+ if (slot_name )
617+ {
618+ values [Anum_pg_subscription_subslotname - 1 ] =
619+ DirectFunctionCall1 (namein , CStringGetDatum (slot_name ));
620+ replaces [Anum_pg_subscription_subslotname - 1 ] = true;
621+ }
622+ if (synchronous_commit )
623+ {
624+ values [Anum_pg_subscription_subsynccommit - 1 ] =
625+ CStringGetTextDatum (synchronous_commit );
626+ replaces [Anum_pg_subscription_subsynccommit - 1 ] = true;
627+ }
592628
593629 update_tuple = true;
594630 break ;
@@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
601637
602638 parse_subscription_options (stmt -> options , NULL ,
603639 & enabled_given , & enabled , NULL ,
604- NULL , NULL );
640+ NULL , NULL , NULL );
605641 Assert (enabled_given );
606642
607643 values [Anum_pg_subscription_subenabled - 1 ] =
@@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
626662 Subscription * sub = GetSubscription (subid , false);
627663
628664 parse_subscription_options (stmt -> options , NULL , NULL , NULL ,
629- NULL , NULL , & copy_data );
665+ NULL , NULL , & copy_data , NULL );
630666
631667 values [Anum_pg_subscription_subpublications - 1 ] =
632668 publicationListToArray (stmt -> publication );
@@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
652688 Subscription * sub = GetSubscription (subid , false);
653689
654690 parse_subscription_options (stmt -> options , NULL , NULL , NULL ,
655- NULL , NULL , & copy_data );
691+ NULL , NULL , & copy_data , NULL );
656692
657693 AlterSubscription_refresh (sub , copy_data );
658694
0 commit comments