3434#include "utils/syscache.h"
3535#include "utils/typcache.h"
3636
37+ #include "access/xact.h"
38+ #include "miscadmin.h"
39+ #include "executor/executor.h"
40+ #include "nodes/nodes.h"
41+ #include "postmaster/autovacuum.h"
42+ #include "replication/walsender.h"
43+ #include "storage/latch.h"
44+ #include "storage/proc.h"
45+ #include "storage/ipc.h"
46+ #include "pgstat.h"
47+ #include "tcop/utility.h"
48+
3749PG_MODULE_MAGIC ;
3850
3951/* These must be available to pg_dlsym() */
@@ -85,11 +97,211 @@ static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
8597 ReorderBufferTXN * txn ,
8698 XLogRecPtr abort_lsn );
8799
100+ static void test_decoding_xact_callback (XactEvent event , void * arg );
101+
102+ static void test_decoding_process_utility (PlannedStmt * pstmt ,
103+ const char * queryString , ProcessUtilityContext context ,
104+ ParamListInfo params , DestReceiver * dest , char * completionTag );
105+
106+ static bool test_decoding_twophase_commit ();
107+
108+ static void test_decoding_executor_finish (QueryDesc * queryDesc );
109+
110+ static ProcessUtility_hook_type PreviousProcessUtilityHook ;
111+
112+ static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
113+
114+ static bool CurrentTxContainsDML ;
115+ static bool CurrentTxContainsDDL ;
116+ static bool CurrentTxNonpreparable ;
88117
89118void
90119_PG_init (void )
91120{
92- /* other plugins can perform things here */
121+ PreviousExecutorFinishHook = ExecutorFinish_hook ;
122+ ExecutorFinish_hook = test_decoding_executor_finish ;
123+
124+ PreviousProcessUtilityHook = ProcessUtility_hook ;
125+ ProcessUtility_hook = test_decoding_process_utility ;
126+
127+ if (!IsUnderPostmaster )
128+ RegisterXactCallback (test_decoding_xact_callback , NULL );
129+ }
130+
131+
132+ /* ability to hook into sigle-statement transaction */
133+ static void
134+ test_decoding_xact_callback (XactEvent event , void * arg )
135+ {
136+ switch (event )
137+ {
138+ case XACT_EVENT_START :
139+ case XACT_EVENT_ABORT :
140+ CurrentTxContainsDML = false;
141+ CurrentTxContainsDDL = false;
142+ CurrentTxNonpreparable = false;
143+ break ;
144+ case XACT_EVENT_COMMIT_COMMAND :
145+ if (!IsTransactionBlock ())
146+ test_decoding_twophase_commit ();
147+ break ;
148+ default :
149+ break ;
150+ }
151+ }
152+
153+ /* find out whether transaction had wrote any data or not */
154+ static void
155+ test_decoding_executor_finish (QueryDesc * queryDesc )
156+ {
157+ CmdType operation = queryDesc -> operation ;
158+ EState * estate = queryDesc -> estate ;
159+ if (estate -> es_processed != 0 &&
160+ (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE ))
161+ {
162+ int i ;
163+ for (i = 0 ; i < estate -> es_num_result_relations ; i ++ )
164+ {
165+ Relation rel = estate -> es_result_relations [i ].ri_RelationDesc ;
166+ if (RelationNeedsWAL (rel )) {
167+ CurrentTxContainsDML = true;
168+ break ;
169+ }
170+ }
171+ }
172+
173+ if (PreviousExecutorFinishHook != NULL )
174+ PreviousExecutorFinishHook (queryDesc );
175+ else
176+ standard_ExecutorFinish (queryDesc );
177+ }
178+
179+
180+ /*
181+ * Several things here:
182+ * 1) hook into commit of transaction block
183+ * 2) write logical message for DDL (default path)
184+ * 3) prevent 2pc hook for tx that can not be prepared and
185+ * send them as logical nontransactional message.
186+ */
187+ static void
188+ test_decoding_process_utility (PlannedStmt * pstmt ,
189+ const char * queryString , ProcessUtilityContext context ,
190+ ParamListInfo params , DestReceiver * dest , char * completionTag )
191+ {
192+ Node * parsetree = pstmt -> utilityStmt ;
193+ switch (nodeTag (parsetree ))
194+ {
195+ case T_TransactionStmt :
196+ {
197+ TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
198+ switch (stmt -> kind )
199+ {
200+ case TRANS_STMT_BEGIN :
201+ case TRANS_STMT_START :
202+ break ;
203+ case TRANS_STMT_COMMIT :
204+ if (test_decoding_twophase_commit ())
205+ return ; /* do not proceed */
206+ break ;
207+ case TRANS_STMT_PREPARE :
208+ case TRANS_STMT_COMMIT_PREPARED :
209+ case TRANS_STMT_ROLLBACK_PREPARED :
210+ break ;
211+ default :
212+ break ;
213+ }
214+ }
215+ case T_ReindexStmt :
216+ {
217+ ReindexStmt * stmt = (ReindexStmt * ) parsetree ;
218+ switch (stmt -> kind )
219+ {
220+ case REINDEX_OBJECT_SCHEMA :
221+ case REINDEX_OBJECT_SYSTEM :
222+ case REINDEX_OBJECT_DATABASE :
223+ LogLogicalMessage ("C" , queryString , strlen (queryString ) + 1 , false);
224+ CurrentTxNonpreparable = true;
225+ default :
226+ break ;
227+ }
228+ }
229+ break ;
230+ case T_IndexStmt :
231+ {
232+ IndexStmt * indexStmt = (IndexStmt * ) parsetree ;
233+ if (indexStmt -> concurrent )
234+ {
235+ LogLogicalMessage ("C" , queryString , strlen (queryString ) + 1 , false);
236+ CurrentTxNonpreparable = true;
237+ }
238+ }
239+ break ;
240+ default :
241+ LogLogicalMessage ("D" , queryString , strlen (queryString ) + 1 , true);
242+ CurrentTxContainsDDL = true;
243+ break ;
244+ }
245+
246+ if (PreviousProcessUtilityHook != NULL )
247+ {
248+ PreviousProcessUtilityHook (pstmt , queryString , context ,
249+ params , dest , completionTag );
250+ }
251+ else
252+ {
253+ standard_ProcessUtility (pstmt , queryString , context ,
254+ params , dest , completionTag );
255+ }
256+ }
257+
258+ /*
259+ * Change commit to prepare and wait on latch.
260+ * WalSender will unlock us after decoding and we can proceed.
261+ */
262+ static bool
263+ test_decoding_twophase_commit ()
264+ {
265+ int result = 0 ;
266+ char gid [20 ];
267+
268+ if (IsAutoVacuumLauncherProcess () ||
269+ !IsNormalProcessingMode () ||
270+ am_walsender ||
271+ IsBackgroundWorker ||
272+ IsAutoVacuumWorkerProcess () ||
273+ IsAbortedTransactionBlockState () ||
274+ !(CurrentTxContainsDML || CurrentTxContainsDDL ) ||
275+ CurrentTxNonpreparable )
276+ return false;
277+
278+ snprintf (gid , sizeof (gid ), "test_decoding:%d" , MyProc -> pgprocno );
279+
280+ if (!IsTransactionBlock ())
281+ {
282+ BeginTransactionBlock ();
283+ CommitTransactionCommand ();
284+ StartTransactionCommand ();
285+ }
286+ if (!PrepareTransactionBlock (gid ))
287+ {
288+ fprintf (stderr , "Can't prepare transaction '%s'\n" , gid );
289+ }
290+ CommitTransactionCommand ();
291+
292+ result = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET | WL_POSTMASTER_DEATH , 0 ,
293+ WAIT_EVENT_REPLICATION_SLOT_SYNC );
294+
295+ if (result & WL_POSTMASTER_DEATH )
296+ proc_exit (1 );
297+
298+ if (result & WL_LATCH_SET )
299+ ResetLatch (& MyProc -> procLatch );
300+
301+
302+ StartTransactionCommand ();
303+ FinishPreparedTransaction (gid , true);
304+ return true;
93305}
94306
95307/* specify output plugin callbacks */
@@ -297,74 +509,11 @@ static bool
297509pg_filter_prepare (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
298510 char * gid )
299511{
300- TestDecodingData * data = ctx -> output_plugin_private ;
301-
302- /* treat all transaction as one-phase */
303- if (! data -> twophase_decoding )
512+ /* decode only tx that are prepared by our hook */
513+ if ( strncmp ( gid , "test_decoding:" , 14 ) == 0 )
514+ return false;
515+ else
304516 return true;
305-
306- /*
307- * Two-phase transactions that accessed catalog require special
308- * treatment.
309- *
310- * Right now we don't have a safe way to decode catalog changes made in
311- * prepared transaction that was already aborted by the time of
312- * decoding.
313- *
314- * That kind of problem arises only when we are trying to
315- * retrospectively decode aborted transactions with catalog changes -
316- * including if a transaction aborts while we're decoding it. If one
317- * wants to code distributed commit based on prepare decoding then
318- * commits/aborts will happend strictly after decoding will be
319- * completed, so it is possible to skip any checks/locks here.
320- *
321- * We'll also get stuck trying to acquire locks on catalog relations
322- * we need for decoding if the prepared xact holds a strong lock on
323- * one of them and we also need to decode row changes.
324- */
325- if (txn -> has_catalog_changes )
326- {
327- LWLockAcquire (TwoPhaseStateLock , LW_SHARED );
328-
329- if (TransactionIdIsInProgress (txn -> xid ))
330- {
331- /*
332- * For the sake of simplicity, by default we just
333- * ignore in-progess prepared transactions with catalog
334- * changes in this extension. If they abort during
335- * decoding then tuples we need to decode them may be
336- * overwritten while we're still decoding, causing
337- * wrong catalog lookups.
338- *
339- * It is possible to move that LWLockRelease() to
340- * pg_decode_prepare_txn() and allow decoding of
341- * running prepared tx, but such lock will prevent any
342- * 2pc transaction commit during decoding time. That
343- * can be a long time in case of lots of
344- * changes/inserts in that tx or if the downstream is
345- * slow/unresonsive.
346- *
347- * (Continuing to decode without the lock is unsafe, XXX)
348- */
349- LWLockRelease (TwoPhaseStateLock );
350- return !data -> twophase_decode_with_catalog_changes ;
351- }
352- else if (TransactionIdDidAbort (txn -> xid ))
353- {
354- /*
355- * Here we know that it is already aborted and there is
356- * not much sense in doing something with this
357- * transaction. Consequently ABORT PREPARED will be
358- * suppressed.
359- */
360- LWLockRelease (TwoPhaseStateLock );
361- return true;
362- }
363-
364- LWLockRelease (TwoPhaseStateLock );
365- }
366-
367- return false;
368517}
369518
370519
@@ -374,9 +523,10 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
374523 XLogRecPtr prepare_lsn )
375524{
376525 TestDecodingData * data = ctx -> output_plugin_private ;
526+ int backend_procno ;
377527
378- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
379- return ;
528+ // if (data->skip_empty_xacts && !data->xact_wrote_changes)
529+ // return;
380530
381531 OutputPluginPrepareWrite (ctx , true);
382532
@@ -391,6 +541,10 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
391541 timestamptz_to_str (txn -> commit_time ));
392542
393543 OutputPluginWrite (ctx , true);
544+
545+ /* Unlock backend */
546+ sscanf (txn -> gid , "test_decoding:%d" , & backend_procno );
547+ SetLatch (& ProcGlobal -> allProcs [backend_procno ].procLatch );
394548}
395549
396550/* COMMIT PREPARED callback */
@@ -400,8 +554,8 @@ pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn
400554{
401555 TestDecodingData * data = ctx -> output_plugin_private ;
402556
403- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
404- return ;
557+ // if (data->skip_empty_xacts && !data->xact_wrote_changes)
558+ // return;
405559
406560 OutputPluginPrepareWrite (ctx , true);
407561
@@ -425,8 +579,8 @@ pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
425579{
426580 TestDecodingData * data = ctx -> output_plugin_private ;
427581
428- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
429- return ;
582+ // if (data->skip_empty_xacts && !data->xact_wrote_changes)
583+ // return;
430584
431585 OutputPluginPrepareWrite (ctx , true);
432586
0 commit comments