6464#include "catalog/indexing.h"
6565#include "catalog/namespace.h"
6666#include "catalog/pg_constraint_fn.h"
67+ #include "catalog/pg_proc.h"
6768#include "pglogical_output/hooks.h"
6869#include "parser/analyze.h"
6970#include "parser/parse_relation.h"
@@ -255,8 +256,6 @@ bool MtmUseDtm;
255256bool MtmPreserveCommitOrder ;
256257bool MtmVolksWagenMode ; /* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
257258
258- TransactionId MtmUtilityProcessedInXid ;
259-
260259static char * MtmConnStrs ;
261260static char * MtmRemoteFunctionsList ;
262261static char * MtmClusterName ;
@@ -275,6 +274,7 @@ static bool MtmClusterLocked;
275274static bool MtmInsideTransaction ;
276275static bool MtmReferee ;
277276static bool MtmMonotonicSequences ;
277+ static void const * MtmDDLStatement ;
278278
279279static ExecutorStart_hook_type PreviousExecutorStartHook ;
280280static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -923,6 +923,7 @@ MtmResetTransaction()
923923 x -> csn = INVALID_CSN ;
924924 x -> status = TRANSACTION_STATUS_UNKNOWN ;
925925 x -> gid [0 ] = '\0' ;
926+ MtmDDLStatement = NULL ;
926927}
927928
928929#if 0
@@ -986,6 +987,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
986987 MtmCheckClusterLock ();
987988 }
988989 MtmInsideTransaction = true;
990+ MtmDDLStatement = NULL ;
989991 Mtm -> nRunningTransactions += 1 ;
990992
991993 x -> snapshot = MtmAssignCSN ();
@@ -3447,7 +3449,7 @@ _PG_init(void)
34473449 & MtmRemoteFunctionsList ,
34483450 "lo_create,lo_unlink" ,
34493451 PGC_USERSET , /* context */
3450- 0 , /* flags */
3452+ GUC_LIST_INPUT | GUC_LIST_QUOTE , /* flags */
34513453 NULL , /* GucStringCheckHook check_hook */
34523454 MtmSetRemoteFunction , /* GucStringAssignHook assign_hook */
34533455 NULL /* GucShowHook show_hook */
@@ -4961,14 +4963,17 @@ static void MtmGucDiscard()
49614963 dlist_init (& MtmGucList );
49624964
49634965 hash_destroy (MtmGucHash );
4964- MtmGucInit () ;
4966+ MtmGucHash = NULL ;
49654967}
49664968
49674969static inline void MtmGucUpdate (const char * key , char * value )
49684970{
49694971 MtmGucEntry * hentry ;
49704972 bool found ;
49714973
4974+ if (!MtmGucHash )
4975+ MtmGucInit ();
4976+
49724977 hentry = (MtmGucEntry * )hash_search (MtmGucHash , key , HASH_ENTER , & found );
49734978 if (found )
49744979 {
@@ -4984,6 +4989,9 @@ static inline void MtmGucRemove(const char *key)
49844989 MtmGucEntry * hentry ;
49854990 bool found ;
49864991
4992+ if (!MtmGucHash )
4993+ MtmGucInit ();
4994+
49874995 hentry = (MtmGucEntry * )hash_search (MtmGucHash , key , HASH_FIND , & found );
49884996 if (found )
49894997 {
@@ -5042,23 +5050,19 @@ char* MtmGucSerialize(void)
50425050
50435051 serialized_gucs = makeStringInfo ();
50445052
5045- /*
5046- * Crutch for scheduler. It sets search_path through SetConfigOption()
5047- * so our callback do not react on that.
5048- */
5049- search_path = GetConfigOption ("search_path" , false, true);
5050- appendStringInfo (serialized_gucs , "SET search_path TO %s; " , search_path );
5051-
50525053 dlist_foreach (iter , & MtmGucList )
50535054 {
50545055 MtmGucEntry * cur_entry = dlist_container (MtmGucEntry , list_node , iter .cur );
50555056
5057+ if (strcmp (cur_entry -> key , "search_path" ) == 0 )
5058+ continue ;
5059+
50565060 appendStringInfoString (serialized_gucs , "SET " );
50575061 appendStringInfoString (serialized_gucs , cur_entry -> key );
50585062 appendStringInfoString (serialized_gucs , " TO " );
50595063
50605064 /* quite a crutch */
5061- if (strstr (cur_entry -> key , "_mem" ) != NULL || * (cur_entry -> value ) == '\0' || strchr ( cur_entry -> value , ',' ) != NULL )
5065+ if (strstr (cur_entry -> key , "_mem" ) != NULL || * (cur_entry -> value ) == '\0' )
50625066 {
50635067 appendStringInfoString (serialized_gucs , "'" );
50645068 appendStringInfoString (serialized_gucs , cur_entry -> value );
@@ -5071,6 +5075,13 @@ char* MtmGucSerialize(void)
50715075 appendStringInfoString (serialized_gucs , "; " );
50725076 }
50735077
5078+ /*
5079+ * Crutch for scheduler. It sets search_path through SetConfigOption()
5080+ * so our callback do not react on that.
5081+ */
5082+ search_path = GetConfigOption ("search_path" , false, true);
5083+ appendStringInfo (serialized_gucs , "SET search_path TO %s; " , search_path );
5084+
50745085 return serialized_gucs -> data ;
50755086}
50765087
@@ -5363,6 +5374,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53635374 return ;
53645375 }
53655376 }
5377+ else if (stmt -> removeType == OBJECT_FUNCTION && MtmTx .isReplicated )
5378+ {
5379+ /* Make it possible to drop functions which were not replicated */
5380+ stmt -> missing_ok = true;
5381+ }
53665382 }
53675383 break ;
53685384
@@ -5395,16 +5411,14 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53955411 break ;
53965412 }
53975413
5398- if (!skipCommand && !MtmTx .isReplicated && ( context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId ()) )
5414+ if (!skipCommand && !MtmTx .isReplicated && ! MtmDDLStatement )
53995415 {
5400- MtmUtilityProcessedInXid = GetCurrentTransactionId ();
5401- if (context == PROCESS_UTILITY_TOPLEVEL || !ActivePortal ) {
5402- MtmProcessDDLCommand (queryString , true);
5403- } else {
5404- MtmProcessDDLCommand (ActivePortal -> sourceText , true);
5405- }
5416+ MTM_LOG3 ("Process DDL statement '%s', MtmTx.isReplicated=%d, MtmIsLogicalReceiver=%d" , queryString , MtmTx .isReplicated , MtmIsLogicalReceiver );
5417+ MtmProcessDDLCommand (queryString , true);
54065418 executed = true;
5419+ MtmDDLStatement = queryString ;
54075420 }
5421+ else MTM_LOG3 ("Skip utility statement '%s': skip=%d, insideDDL=%d" , queryString , skipCommand , MtmDDLStatement != NULL );
54085422
54095423 if (PreviousProcessUtilityHook != NULL )
54105424 {
@@ -5423,16 +5437,17 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54235437#endif
54245438 if (MyXactAccessedTempRel )
54255439 {
5426- MTM_LOG1 ("Xact accessed temp table, stopping replication" );
5440+ MTM_LOG1 ("Xact accessed temp table, stopping replication of statement '%s'" , queryString );
54275441 MtmTx .isDistributed = false; /* Skip */
54285442 MtmTx .snapshot = INVALID_CSN ;
54295443 }
54305444
54315445 if (executed )
54325446 {
54335447 MtmFinishDDLCommand ();
5448+ MtmDDLStatement = NULL ;
54345449 }
5435- if (nodeTag (parsetree ) == T_CreateStmt )
5450+ if (IsA (parsetree , CreateStmt ) )
54365451 {
54375452 CreateStmt * create = (CreateStmt * )parsetree ;
54385453 Oid relid = RangeVarGetRelid (create -> relation , NoLock , true);
@@ -5449,15 +5464,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54495464 }
54505465 }
54515466 }
5452- if (context == PROCESS_UTILITY_TOPLEVEL ) {
5453- MtmUtilityProcessedInXid = InvalidTransactionId ;
5454- }
54555467}
54565468
54575469static void
54585470MtmExecutorStart (QueryDesc * queryDesc , int eflags )
54595471{
5460- if (!MtmTx .isReplicated && ActivePortal )
5472+ if (!MtmTx .isReplicated && ! MtmDDLStatement )
54615473 {
54625474 ListCell * tlist ;
54635475
@@ -5471,11 +5483,32 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
54715483 TargetEntry * tle = (TargetEntry * ) lfirst (tlist );
54725484 if (tle -> expr && IsA (tle -> expr , FuncExpr ))
54735485 {
5474- if (hash_search (MtmRemoteFunctions , & ((FuncExpr * )tle -> expr )-> funcid , HASH_FIND , NULL ))
5486+ Oid func_oid = ((FuncExpr * )tle -> expr )-> funcid ;
5487+ if (!hash_search (MtmRemoteFunctions , & func_oid , HASH_FIND , NULL ))
54755488 {
5476- MtmProcessDDLCommand (ActivePortal -> sourceText , true);
5477- break ;
5489+ Form_pg_proc funcform ;
5490+ bool is_sec_def ;
5491+ HeapTuple func_tuple = SearchSysCache1 (PROCOID , ObjectIdGetDatum (func_oid ));
5492+ if (!HeapTupleIsValid (func_tuple ))
5493+ elog (ERROR , "cache lookup failed for function %u" , func_oid );
5494+ funcform = (Form_pg_proc ) GETSTRUCT (func_tuple );
5495+ is_sec_def = funcform -> prosecdef ;
5496+ ReleaseSysCache (func_tuple );
5497+ elog (LOG , "Function %s security defined=%d" , tle -> resname , is_sec_def );
5498+ if (!is_sec_def )
5499+ {
5500+ continue ;
5501+ }
54785502 }
5503+ /*
5504+ * Execute security defined functions or functions marked as remote at replicated nodes.
5505+ * Them are executed as DDL statements.
5506+ * All data modifications done inside this function are not replicated.
5507+ * As a result generated content can vary at different nodes.
5508+ */
5509+ MtmProcessDDLCommand (queryDesc -> sourceText , true);
5510+ MtmDDLStatement = queryDesc ;
5511+ break ;
54795512 }
54805513 }
54815514 }
@@ -5524,6 +5557,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
55245557 {
55255558 standard_ExecutorFinish (queryDesc );
55265559 }
5560+
5561+ if (MtmDDLStatement == queryDesc )
5562+ {
5563+ MtmFinishDDLCommand ();
5564+ MtmDDLStatement = NULL ;
5565+ }
55275566}
55285567
55295568static void MtmSeqNextvalHook (Oid seqid , int64 next )
0 commit comments