@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
9999int max_replication_slots = 0 ; /* the maximum number of replication
100100 * slots */
101101
102+ static ReplicationSlot * SearchNamedReplicationSlot (const char * name );
103+ static int ReplicationSlotAcquireInternal (ReplicationSlot * slot ,
104+ const char * name , SlotAcquireBehavior behavior );
102105static void ReplicationSlotDropAcquired (void );
103106static void ReplicationSlotDropPtr (ReplicationSlot * slot );
104107
@@ -322,102 +325,142 @@ ReplicationSlotCreate(const char *name, bool db_specific,
322325}
323326
324327/*
325- * Find a previously created slot and mark it as used by this backend .
328+ * Search for the named replication slot .
326329 *
327- * The return value is only useful if behavior is SAB_Inquire, in which
328- * it's zero if we successfully acquired the slot, or the PID of the
329- * owning process otherwise. If behavior is SAB_Error, then trying to
330- * acquire an owned slot is an error. If SAB_Block, we sleep until the
331- * slot is released by the owning process.
330+ * Return the replication slot if found, otherwise NULL.
331+ *
332+ * The caller must hold ReplicationSlotControlLock in shared mode.
332333 */
333- int
334- ReplicationSlotAcquire (const char * name , SlotAcquireBehavior behavior )
334+ static ReplicationSlot *
335+ SearchNamedReplicationSlot (const char * name )
335336{
336- ReplicationSlot * slot ;
337- int active_pid ;
338337 int i ;
338+ ReplicationSlot * slot = NULL ;
339339
340- retry :
341- Assert ( MyReplicationSlot == NULL );
340+ Assert ( LWLockHeldByMeInMode ( ReplicationSlotControlLock ,
341+ LW_SHARED ) );
342342
343- /*
344- * Search for the named slot and mark it active if we find it. If the
345- * slot is already active, we exit the loop with active_pid set to the PID
346- * of the backend that owns it.
347- */
348- active_pid = 0 ;
349- slot = NULL ;
350- LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
351343 for (i = 0 ; i < max_replication_slots ; i ++ )
352344 {
353345 ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
354346
355347 if (s -> in_use && strcmp (name , NameStr (s -> data .name )) == 0 )
356348 {
357- /*
358- * This is the slot we want; check if it's active under some other
359- * process. In single user mode, we don't need this check.
360- */
361- if (IsUnderPostmaster )
362- {
363- /*
364- * Get ready to sleep on it in case it is active. (We may end
365- * up not sleeping, but we don't want to do this while holding
366- * the spinlock.)
367- */
368- ConditionVariablePrepareToSleep (& s -> active_cv );
369-
370- SpinLockAcquire (& s -> mutex );
371-
372- active_pid = s -> active_pid ;
373- if (active_pid == 0 )
374- active_pid = s -> active_pid = MyProcPid ;
375-
376- SpinLockRelease (& s -> mutex );
377- }
378- else
379- active_pid = MyProcPid ;
380349 slot = s ;
381-
382350 break ;
383351 }
384352 }
385- LWLockRelease (ReplicationSlotControlLock );
386353
387- /* If we did not find the slot, error out. */
388- if (slot == NULL )
354+ return slot ;
355+ }
356+
357+ /*
358+ * Find a previously created slot and mark it as used by this process.
359+ *
360+ * The return value is only useful if behavior is SAB_Inquire, in which
361+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
362+ * exists, or the PID of the owning process otherwise. If behavior is
363+ * SAB_Error, then trying to acquire an owned slot is an error.
364+ * If SAB_Block, we sleep until the slot is released by the owning process.
365+ */
366+ int
367+ ReplicationSlotAcquire (const char * name , SlotAcquireBehavior behavior )
368+ {
369+ return ReplicationSlotAcquireInternal (NULL , name , behavior );
370+ }
371+
372+ /*
373+ * Mark the specified slot as used by this process.
374+ *
375+ * Only one of slot and name can be specified.
376+ * If slot == NULL, search for the slot with the given name.
377+ *
378+ * See comments about the return value in ReplicationSlotAcquire().
379+ */
380+ static int
381+ ReplicationSlotAcquireInternal (ReplicationSlot * slot , const char * name ,
382+ SlotAcquireBehavior behavior )
383+ {
384+ ReplicationSlot * s ;
385+ int active_pid ;
386+
387+ AssertArg ((slot == NULL ) ^ (name == NULL ));
388+
389+ retry :
390+ Assert (MyReplicationSlot == NULL );
391+
392+ LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
393+
394+ /*
395+ * Search for the slot with the specified name if the slot to acquire is
396+ * not given. If the slot is not found, we either return -1 or error out.
397+ */
398+ s = slot ? slot : SearchNamedReplicationSlot (name );
399+ if (s == NULL || !s -> in_use )
400+ {
401+ LWLockRelease (ReplicationSlotControlLock );
402+
403+ if (behavior == SAB_Inquire )
404+ return -1 ;
389405 ereport (ERROR ,
390406 (errcode (ERRCODE_UNDEFINED_OBJECT ),
391- errmsg ("replication slot \"%s\" does not exist" , name )));
407+ errmsg ("replication slot \"%s\" does not exist" ,
408+ name ? name : NameStr (slot -> data .name ))));
409+ }
392410
393411 /*
394- * If we found the slot but it's already active in another backend, we
395- * either error out or retry after a short wait, as caller specified.
412+ * This is the slot we want; check if it's active under some other
413+ * process. In single user mode, we don't need this check.
414+ */
415+ if (IsUnderPostmaster )
416+ {
417+ /*
418+ * Get ready to sleep on the slot in case it is active if SAB_Block.
419+ * (We may end up not sleeping, but we don't want to do this while
420+ * holding the spinlock.)
421+ */
422+ if (behavior == SAB_Block )
423+ ConditionVariablePrepareToSleep (& s -> active_cv );
424+
425+ SpinLockAcquire (& s -> mutex );
426+ if (s -> active_pid == 0 )
427+ s -> active_pid = MyProcPid ;
428+ active_pid = s -> active_pid ;
429+ SpinLockRelease (& s -> mutex );
430+ }
431+ else
432+ active_pid = MyProcPid ;
433+ LWLockRelease (ReplicationSlotControlLock );
434+
435+ /*
436+ * If we found the slot but it's already active in another process, we
437+ * either error out, return the PID of the owning process, or retry
438+ * after a short wait, as caller specified.
396439 */
397440 if (active_pid != MyProcPid )
398441 {
399442 if (behavior == SAB_Error )
400443 ereport (ERROR ,
401444 (errcode (ERRCODE_OBJECT_IN_USE ),
402445 errmsg ("replication slot \"%s\" is active for PID %d" ,
403- name , active_pid )));
446+ NameStr ( s -> data . name ) , active_pid )));
404447 else if (behavior == SAB_Inquire )
405448 return active_pid ;
406449
407450 /* Wait here until we get signaled, and then restart */
408- ConditionVariableSleep (& slot -> active_cv ,
451+ ConditionVariableSleep (& s -> active_cv ,
409452 WAIT_EVENT_REPLICATION_SLOT_DROP );
410453 ConditionVariableCancelSleep ();
411454 goto retry ;
412455 }
413- else
414- ConditionVariableCancelSleep (); /* no sleep needed after all */
456+ else if ( behavior == SAB_Block )
457+ ConditionVariableCancelSleep (); /* no sleep needed after all */
415458
416459 /* Let everybody know we've modified this slot */
417- ConditionVariableBroadcast (& slot -> active_cv );
460+ ConditionVariableBroadcast (& s -> active_cv );
418461
419462 /* We made this slot active, so it's ours now. */
420- MyReplicationSlot = slot ;
463+ MyReplicationSlot = s ;
421464
422465 /* success */
423466 return 0 ;
@@ -1100,43 +1143,82 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
11001143 ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
11011144 XLogRecPtr restart_lsn = InvalidXLogRecPtr ;
11021145 NameData slotname ;
1146+ int wspid ;
1147+ int last_signaled_pid = 0 ;
11031148
11041149 if (!s -> in_use )
11051150 continue ;
11061151
11071152 SpinLockAcquire (& s -> mutex );
1108- if (s -> data .restart_lsn == InvalidXLogRecPtr ||
1109- s -> data .restart_lsn >= oldestLSN )
1110- {
1111- SpinLockRelease (& s -> mutex );
1112- continue ;
1113- }
1114-
11151153 slotname = s -> data .name ;
11161154 restart_lsn = s -> data .restart_lsn ;
1117-
11181155 SpinLockRelease (& s -> mutex );
1156+
1157+ if (XLogRecPtrIsInvalid (restart_lsn ) || restart_lsn >= oldestLSN )
1158+ continue ;
11191159 LWLockRelease (ReplicationSlotControlLock );
11201160
1161+ /* Get ready to sleep on the slot in case it is active */
1162+ ConditionVariablePrepareToSleep (& s -> active_cv );
1163+
11211164 for (;;)
11221165 {
1123- int wspid = ReplicationSlotAcquire (NameStr (slotname ),
1124- SAB_Inquire );
1166+ /*
1167+ * Try to mark this slot as used by this process.
1168+ *
1169+ * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
1170+ * should not cancel the prepared condition variable
1171+ * if this slot is active in other process. Because in this case
1172+ * we have to wait on that CV for the process owning
1173+ * the slot to be terminated, later.
1174+ */
1175+ wspid = ReplicationSlotAcquireInternal (s , NULL , SAB_Inquire );
11251176
1126- /* no walsender? success! */
1127- if (wspid == 0 )
1177+ /*
1178+ * Exit the loop if we successfully acquired the slot or
1179+ * the slot was dropped during waiting for the owning process
1180+ * to be terminated. For example, the latter case is likely to
1181+ * happen when the slot is temporary because it's automatically
1182+ * dropped by the termination of the owning process.
1183+ */
1184+ if (wspid <= 0 )
11281185 break ;
11291186
1130- ereport (LOG ,
1131- (errmsg ("terminating walsender %d because replication slot \"%s\" is too far behind" ,
1132- wspid , NameStr (slotname ))));
1133- (void ) kill (wspid , SIGTERM );
1187+ /*
1188+ * Signal to terminate the process that owns the slot.
1189+ *
1190+ * There is the race condition where other process may own
1191+ * the slot after the process using it was terminated and before
1192+ * this process owns it. To handle this case, we signal again
1193+ * if the PID of the owning process is changed than the last.
1194+ *
1195+ * XXX This logic assumes that the same PID is not reused
1196+ * very quickly.
1197+ */
1198+ if (last_signaled_pid != wspid )
1199+ {
1200+ ereport (LOG ,
1201+ (errmsg ("terminating process %d because replication slot \"%s\" is too far behind" ,
1202+ wspid , NameStr (slotname ))));
1203+ (void ) kill (wspid , SIGTERM );
1204+ last_signaled_pid = wspid ;
1205+ }
11341206
11351207 ConditionVariableTimedSleep (& s -> active_cv , 10 ,
11361208 WAIT_EVENT_REPLICATION_SLOT_DROP );
11371209 }
11381210 ConditionVariableCancelSleep ();
11391211
1212+ /*
1213+ * Do nothing here and start from scratch if the slot has
1214+ * already been dropped.
1215+ */
1216+ if (wspid == -1 )
1217+ {
1218+ CHECK_FOR_INTERRUPTS ();
1219+ goto restart ;
1220+ }
1221+
11401222 ereport (LOG ,
11411223 (errmsg ("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size" ,
11421224 NameStr (slotname ),
0 commit comments