6868 * CommitTransaction() which will then do the actual transaction commit.
6969 *
7070 * After commit we are called another time (AtCommit_Notify()). Here we
71- * make the actual updates to the effective listen state (listenChannels).
72- *
73- * Finally, after we are out of the transaction altogether, we check if
74- * we need to signal listening backends. In SignalBackends() we scan the
75- * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
76- * to every listening backend (we don't know which backend is listening on
77- * which channel so we must signal them all). We can exclude backends that
78- * are already up to date, though, and we can also exclude backends that
79- * are in other databases (unless they are way behind and should be kicked
80- * to make them advance their pointers). We don't bother with a
81- * self-signal either, but just process the queue directly.
71+ * make any actual updates to the effective listen state (listenChannels).
72+ * Then we signal any backends that may be interested in our messages
73+ * (including our own backend, if listening). This is done by
74+ * SignalBackends(), which scans the list of listening backends and sends a
75+ * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
76+ * know which backend is listening on which channel so we must signal them
77+ * all). We can exclude backends that are already up to date, though, and
78+ * we can also exclude backends that are in other databases (unless they
79+ * are way behind and should be kicked to make them advance their
80+ * pointers).
81+ *
82+ * Finally, after we are out of the transaction altogether and about to go
83+ * idle, we scan the queue for messages that need to be sent to our
84+ * frontend (which might be notifies from other backends, or self-notifies
85+ * from our own). This step is not part of the CommitTransaction sequence
86+ * for two important reasons. First, we could get errors while sending
87+ * data to our frontend, and it's really bad for errors to happen in
88+ * post-commit cleanup. Second, in cases where a procedure issues commits
89+ * within a single frontend command, we don't want to send notifies to our
90+ * frontend until the command is done; but notifies to other backends
91+ * should go out immediately after each commit.
8292 *
8393 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
8494 * sets the process's latch, which triggers the event to be processed
@@ -426,11 +436,8 @@ static bool unlistenExitRegistered = false;
426436/* True if we're currently registered as a listener in asyncQueueControl */
427437static bool amRegisteredListener = false;
428438
429- /* has this backend sent notifications in the current transaction? */
430- static bool backendHasSentNotifications = false;
431-
432439/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
433- static bool backendTryAdvanceTail = false;
440+ static bool tryAdvanceTail = false;
434441
435442/* GUC parameter */
436443bool Trace_notify = false;
@@ -459,7 +466,7 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
459466 char * page_buffer ,
460467 Snapshot snapshot );
461468static void asyncQueueAdvanceTail (void );
462- static void ProcessIncomingNotify (void );
469+ static void ProcessIncomingNotify (bool flush );
463470static bool AsyncExistsPendingNotify (Notification * n );
464471static void AddEventToPendingNotifies (Notification * n );
465472static uint32 notification_hash (const void * key , Size keysize );
@@ -950,8 +957,6 @@ PreCommit_Notify(void)
950957 AccessExclusiveLock );
951958
952959 /* Now push the notifications into the queue */
953- backendHasSentNotifications = true;
954-
955960 nextNotify = list_head (pendingNotifies -> events );
956961 while (nextNotify != NULL )
957962 {
@@ -976,6 +981,8 @@ PreCommit_Notify(void)
976981 nextNotify = asyncQueueAddEntries (nextNotify );
977982 LWLockRelease (NotifyQueueLock );
978983 }
984+
985+ /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
979986 }
980987}
981988
@@ -985,6 +992,11 @@ PreCommit_Notify(void)
985992 * This is called at transaction commit, after committing to clog.
986993 *
987994 * Update listenChannels and clear transaction-local state.
995+ *
996+ * If we issued any notifications in the transaction, send signals to
997+ * listening backends (possibly including ourselves) to process them.
998+ * Also, if we filled enough queue pages with new notifies, try to
999+ * advance the queue tail pointer.
9881000 */
9891001void
9901002AtCommit_Notify (void )
@@ -1027,6 +1039,29 @@ AtCommit_Notify(void)
10271039 if (amRegisteredListener && listenChannels == NIL )
10281040 asyncQueueUnregister ();
10291041
1042+ /*
1043+ * Send signals to listening backends. We need do this only if there are
1044+ * pending notifies, which were previously added to the shared queue by
1045+ * PreCommit_Notify().
1046+ */
1047+ if (pendingNotifies != NULL )
1048+ SignalBackends ();
1049+
1050+ /*
1051+ * If it's time to try to advance the global tail pointer, do that.
1052+ *
1053+ * (It might seem odd to do this in the sender, when more than likely the
1054+ * listeners won't yet have read the messages we just sent. However,
1055+ * there's less contention if only the sender does it, and there is little
1056+ * need for urgency in advancing the global tail. So this typically will
1057+ * be clearing out messages that were sent some time ago.)
1058+ */
1059+ if (tryAdvanceTail )
1060+ {
1061+ tryAdvanceTail = false;
1062+ asyncQueueAdvanceTail ();
1063+ }
1064+
10301065 /* And clean up */
10311066 ClearPendingActionsAndNotifies ();
10321067}
@@ -1199,85 +1234,6 @@ Exec_UnlistenAllCommit(void)
11991234 listenChannels = NIL ;
12001235}
12011236
1202- /*
1203- * ProcessCompletedNotifies --- send out signals and self-notifies
1204- *
1205- * This is called from postgres.c just before going idle at the completion
1206- * of a transaction. If we issued any notifications in the just-completed
1207- * transaction, send signals to other backends to process them, and also
1208- * process the queue ourselves to send messages to our own frontend.
1209- * Also, if we filled enough queue pages with new notifies, try to advance
1210- * the queue tail pointer.
1211- *
1212- * The reason that this is not done in AtCommit_Notify is that there is
1213- * a nonzero chance of errors here (for example, encoding conversion errors
1214- * while trying to format messages to our frontend). An error during
1215- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
1216- * to ensure that a transaction's self-notifies are delivered to the frontend
1217- * before it gets the terminating ReadyForQuery message.
1218- *
1219- * Note that we send signals and process the queue even if the transaction
1220- * eventually aborted. This is because we need to clean out whatever got
1221- * added to the queue.
1222- *
1223- * NOTE: we are outside of any transaction here.
1224- */
1225- void
1226- ProcessCompletedNotifies (void )
1227- {
1228- MemoryContext caller_context ;
1229-
1230- /* Nothing to do if we didn't send any notifications */
1231- if (!backendHasSentNotifications )
1232- return ;
1233-
1234- /*
1235- * We reset the flag immediately; otherwise, if any sort of error occurs
1236- * below, we'd be locked up in an infinite loop, because control will come
1237- * right back here after error cleanup.
1238- */
1239- backendHasSentNotifications = false;
1240-
1241- /*
1242- * We must preserve the caller's memory context (probably MessageContext)
1243- * across the transaction we do here.
1244- */
1245- caller_context = CurrentMemoryContext ;
1246-
1247- if (Trace_notify )
1248- elog (DEBUG1 , "ProcessCompletedNotifies" );
1249-
1250- /*
1251- * We must run asyncQueueReadAllNotifications inside a transaction, else
1252- * bad things happen if it gets an error.
1253- */
1254- StartTransactionCommand ();
1255-
1256- /* Send signals to other backends */
1257- SignalBackends ();
1258-
1259- if (listenChannels != NIL )
1260- {
1261- /* Read the queue ourselves, and send relevant stuff to the frontend */
1262- asyncQueueReadAllNotifications ();
1263- }
1264-
1265- /*
1266- * If it's time to try to advance the global tail pointer, do that.
1267- */
1268- if (backendTryAdvanceTail )
1269- {
1270- backendTryAdvanceTail = false;
1271- asyncQueueAdvanceTail ();
1272- }
1273-
1274- CommitTransactionCommand ();
1275-
1276- MemoryContextSwitchTo (caller_context );
1277-
1278- /* We don't need pq_flush() here since postgres.c will do one shortly */
1279- }
1280-
12811237/*
12821238 * Test whether we are actively listening on the given channel name.
12831239 *
@@ -1543,7 +1499,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
15431499 * pointer (we don't want to actually do that right here).
15441500 */
15451501 if (QUEUE_POS_PAGE (queue_head ) % QUEUE_CLEANUP_DELAY == 0 )
1546- backendTryAdvanceTail = true;
1502+ tryAdvanceTail = true;
15471503
15481504 /* And exit the loop */
15491505 break ;
@@ -1658,8 +1614,6 @@ asyncQueueFillWarning(void)
16581614/*
16591615 * Send signals to listening backends.
16601616 *
1661- * We never signal our own process; that should be handled by our caller.
1662- *
16631617 * Normally we signal only backends in our own database, since only those
16641618 * backends could be interested in notifies we send. However, if there's
16651619 * notify traffic in our database but no traffic in another database that
@@ -1668,6 +1622,9 @@ asyncQueueFillWarning(void)
16681622 * advance their queue position pointers, allowing the global tail to advance.
16691623 *
16701624 * Since we know the BackendId and the Pid the signaling is quite cheap.
1625+ *
1626+ * This is called during CommitTransaction(), so it's important for it
1627+ * to have very low probability of failure.
16711628 */
16721629static void
16731630SignalBackends (void )
@@ -1682,8 +1639,7 @@ SignalBackends(void)
16821639 * list of target PIDs.
16831640 *
16841641 * XXX in principle these pallocs could fail, which would be bad. Maybe
1685- * preallocate the arrays? But in practice this is only run in trivial
1686- * transactions, so there should surely be space available.
1642+ * preallocate the arrays? They're not that large, though.
16871643 */
16881644 pids = (int32 * ) palloc (MaxBackends * sizeof (int32 ));
16891645 ids = (BackendId * ) palloc (MaxBackends * sizeof (BackendId ));
@@ -1696,8 +1652,6 @@ SignalBackends(void)
16961652 QueuePosition pos ;
16971653
16981654 Assert (pid != InvalidPid );
1699- if (pid == MyProcPid )
1700- continue ; /* never signal self */
17011655 pos = QUEUE_BACKEND_POS (i );
17021656 if (QUEUE_BACKEND_DBOID (i ) == MyDatabaseId )
17031657 {
@@ -1730,6 +1684,16 @@ SignalBackends(void)
17301684 {
17311685 int32 pid = pids [i ];
17321686
1687+ /*
1688+ * If we are signaling our own process, no need to involve the kernel;
1689+ * just set the flag directly.
1690+ */
1691+ if (pid == MyProcPid )
1692+ {
1693+ notifyInterruptPending = true;
1694+ continue ;
1695+ }
1696+
17331697 /*
17341698 * Note: assuming things aren't broken, a signal failure here could
17351699 * only occur if the target backend exited since we released
@@ -1910,15 +1874,20 @@ HandleNotifyInterrupt(void)
19101874 * via the process's latch, and this routine will get called.
19111875 * If we are truly idle (ie, *not* inside a transaction block),
19121876 * process the incoming notifies.
1877+ *
1878+ * If "flush" is true, force any frontend messages out immediately.
1879+ * This can be false when being called at the end of a frontend command,
1880+ * since we'll flush after sending ReadyForQuery.
19131881 */
19141882void
1915- ProcessNotifyInterrupt (void )
1883+ ProcessNotifyInterrupt (bool flush )
19161884{
19171885 if (IsTransactionOrTransactionBlock ())
19181886 return ; /* not really idle */
19191887
1888+ /* Loop in case another signal arrives while sending messages */
19201889 while (notifyInterruptPending )
1921- ProcessIncomingNotify ();
1890+ ProcessIncomingNotify (flush );
19221891}
19231892
19241893
@@ -2180,6 +2149,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
21802149/*
21812150 * Advance the shared queue tail variable to the minimum of all the
21822151 * per-backend tail pointers. Truncate pg_notify space if possible.
2152+ *
2153+ * This is (usually) called during CommitTransaction(), so it's important for
2154+ * it to have very low probability of failure.
21832155 */
21842156static void
21852157asyncQueueAdvanceTail (void )
@@ -2253,17 +2225,16 @@ asyncQueueAdvanceTail(void)
22532225/*
22542226 * ProcessIncomingNotify
22552227 *
2256- * Deal with arriving NOTIFYs from other backends as soon as it's safe to
2257- * do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
2258- * signal handler, but isn't anymore .
2228+ * Scan the queue for arriving notifications and report them to the front
2229+ * end. The notifications might be from other sessions, or our own;
2230+ * there's no need to distinguish here .
22592231 *
2260- * Scan the queue for arriving notifications and report them to my front
2261- * end.
2232+ * If "flush" is true, force any frontend messages out immediately.
22622233 *
22632234 * NOTE: since we are outside any transaction, we must create our own.
22642235 */
22652236static void
2266- ProcessIncomingNotify (void )
2237+ ProcessIncomingNotify (bool flush )
22672238{
22682239 /* We *must* reset the flag */
22692240 notifyInterruptPending = false;
@@ -2288,9 +2259,11 @@ ProcessIncomingNotify(void)
22882259 CommitTransactionCommand ();
22892260
22902261 /*
2291- * Must flush the notify messages to ensure frontend gets them promptly.
2262+ * If this isn't an end-of-command case, we must flush the notify messages
2263+ * to ensure frontend gets them promptly.
22922264 */
2293- pq_flush ();
2265+ if (flush )
2266+ pq_flush ();
22942267
22952268 set_ps_display ("idle" );
22962269
@@ -2315,9 +2288,9 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
23152288 pq_endmessage (& buf );
23162289
23172290 /*
2318- * NOTE: we do not do pq_flush() here. For a self-notify, it will
2319- * happen at the end of the transaction, and for incoming notifies
2320- * ProcessIncomingNotify will do it after finding all the notifies .
2291+ * NOTE: we do not do pq_flush() here. Some level of caller will
2292+ * handle it later, allowing this message to be combined into a packet
2293+ * with other ones .
23212294 */
23222295 }
23232296 else
0 commit comments