@@ -244,6 +244,7 @@ static void WalSndKeepalive(bool requestReply);
244244static void WalSndKeepaliveIfNecessary (void );
245245static void WalSndCheckTimeOut (void );
246246static long WalSndComputeSleeptime (TimestampTz now );
247+ static void WalSndWait (uint32 socket_events , long timeout , uint32 wait_event );
247248static void WalSndPrepareWrite (LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid , bool last_write );
248249static void WalSndWriteData (LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid , bool last_write );
249250static void WalSndUpdateProgress (LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid );
@@ -1287,7 +1288,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12871288 /* If we have pending write here, go to slow path */
12881289 for (;;)
12891290 {
1290- int wakeEvents ;
12911291 long sleeptime ;
12921292
12931293 /* Check for input from the client */
@@ -1304,13 +1304,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
13041304
13051305 sleeptime = WalSndComputeSleeptime (GetCurrentTimestamp ());
13061306
1307- wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1308- WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT ;
1309-
13101307 /* Sleep until something happens or we time out */
1311- (void ) WaitLatchOrSocket (MyLatch , wakeEvents ,
1312- MyProcPort -> sock , sleeptime ,
1313- WAIT_EVENT_WAL_SENDER_WRITE_DATA );
1308+ WalSndWait (WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE , sleeptime ,
1309+ WAIT_EVENT_WAL_SENDER_WRITE_DATA );
13141310
13151311 /* Clear any already-pending wakeups */
13161312 ResetLatch (MyLatch );
@@ -1480,15 +1476,12 @@ WalSndWaitForWal(XLogRecPtr loc)
14801476 */
14811477 sleeptime = WalSndComputeSleeptime (GetCurrentTimestamp ());
14821478
1483- wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1484- WL_SOCKET_READABLE | WL_TIMEOUT ;
1479+ wakeEvents = WL_SOCKET_READABLE ;
14851480
14861481 if (pq_is_send_pending ())
14871482 wakeEvents |= WL_SOCKET_WRITEABLE ;
14881483
1489- (void ) WaitLatchOrSocket (MyLatch , wakeEvents ,
1490- MyProcPort -> sock , sleeptime ,
1491- WAIT_EVENT_WAL_SENDER_WAIT_WAL );
1484+ WalSndWait (wakeEvents , sleeptime , WAIT_EVENT_WAL_SENDER_WAIT_WAL );
14921485 }
14931486
14941487 /* reactivate latch so WalSndLoop knows to continue */
@@ -2348,10 +2341,10 @@ WalSndLoop(WalSndSendDataCallback send_data)
23482341 long sleeptime ;
23492342 int wakeEvents ;
23502343
2351- wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT ;
2352-
23532344 if (!streamingDoneReceiving )
2354- wakeEvents |= WL_SOCKET_READABLE ;
2345+ wakeEvents = WL_SOCKET_READABLE ;
2346+ else
2347+ wakeEvents = 0 ;
23552348
23562349 /*
23572350 * Use fresh timestamp, not last_processing, to reduce the chance
@@ -2363,9 +2356,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
23632356 wakeEvents |= WL_SOCKET_WRITEABLE ;
23642357
23652358 /* Sleep until something happens or we time out */
2366- (void ) WaitLatchOrSocket (MyLatch , wakeEvents ,
2367- MyProcPort -> sock , sleeptime ,
2368- WAIT_EVENT_WAL_SENDER_MAIN );
2359+ WalSndWait (wakeEvents , sleeptime , WAIT_EVENT_WAL_SENDER_MAIN );
23692360 }
23702361 }
23712362}
@@ -3121,6 +3112,22 @@ WalSndWakeup(void)
31213112 }
31223113}
31233114
3115+ /*
3116+ * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3117+ * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3118+ * on postmaster death.
3119+ */
3120+ static void
3121+ WalSndWait (uint32 socket_events , long timeout , uint32 wait_event )
3122+ {
3123+ WaitEvent event ;
3124+
3125+ ModifyWaitEvent (FeBeWaitSet , FeBeWaitSetSocketPos , socket_events , NULL );
3126+ if (WaitEventSetWait (FeBeWaitSet , timeout , & event , 1 , wait_event ) == 1 &&
3127+ (event .events & WL_POSTMASTER_DEATH ))
3128+ proc_exit (1 );
3129+ }
3130+
31243131/*
31253132 * Signal all walsenders to move to stopping state.
31263133 *
0 commit comments