@@ -301,23 +301,16 @@ pglogical_receiver_main(Datum main_arg)
301301 }
302302
303303 query = createPQExpBuffer ();
304- if ((mode == REPLMODE_OPEN_EXISTED && timeline != Mtm -> nodes [nodeId - 1 ].timeline )
305- || mode == REPLMODE_CREATE_NEW )
306- { /* recreate slot */
307- timestamp_t start = MtmGetSystemTime ();
308- appendPQExpBuffer (query , "DROP_REPLICATION_SLOT \"%s\"" , slotName );
309- res = PQexec (conn , query -> data );
310- elog (LOG , "Drop replication slot %s: %ld milliseconds" , slotName , (long )USEC_TO_MSEC (MtmGetSystemTime () - start ));
311- PQclear (res );
312- resetPQExpBuffer (query );
313- timeline = Mtm -> nodes [nodeId - 1 ].timeline ;
314- }
315- /* My original assumption was that we can perfrom recovery only from existed slot,
316- * but unfortunately looks like slots can "disapear" together with WAL-sender.
317- * So let's try to recreate slot always. */
318- /* if (mode != REPLMODE_REPLICATION) */
319- {
320- timestamp_t start = MtmGetSystemTime ();
304+
305+ /* Start logical replication at specified position */
306+ originStartPos = replorigin_get_progress (originId , false);
307+ if (originStartPos == INVALID_LSN ) {
308+ /*
309+ * We are just creating new replication slot.
310+ * It is assumed that state of local and remote nodes is the same at this moment.
311+ * They are either empty, either new node is synchronized using base_backup.
312+ * So we assume that LSNs are the same for local and remote node
313+ */
321314 appendPQExpBuffer (query , "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"" , slotName , MULTIMASTER_NAME );
322315 res = PQexec (conn , query -> data );
323316 if (PQresultStatus (res ) != PGRES_TUPLES_OK )
@@ -331,30 +324,14 @@ pglogical_receiver_main(Datum main_arg)
331324 goto OnError ;
332325 }
333326 }
334- elog (LOG , "Recreate replication slot %s: %ld milliseconds" , slotName , (long )USEC_TO_MSEC (MtmGetSystemTime () - start ));
335327 PQclear (res );
336328 resetPQExpBuffer (query );
337- }
338-
339- /* Start logical replication at specified position */
340- if (originStartPos == INVALID_LSN ) {
341- originStartPos = replorigin_get_progress (originId , false);
342- if (originStartPos == INVALID_LSN ) {
343- /*
344- * We are just creating new replication slot.
345- * It is assumed that state of local and remote nodes is the same at this moment.
346- * Them are either empty, either new node is synchronized using base_backup.
347- * So we assume that LSNs are the same for local and remote node
348- */
349- originStartPos = INVALID_LSN ;
350- MTM_LOG1 ("Start logical receiver at position %llx from node %d" , originStartPos , nodeId );
351- } else {
352- if (Mtm -> nodes [nodeId - 1 ].restartLSN < originStartPos ) {
353- MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" , nodeId , Mtm -> nodes [nodeId - 1 ].restartLSN , originStartPos );
354- Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
355- }
356- MTM_LOG1 ("Restart logical receiver at position %llx with origin=%d from node %d" , originStartPos , originId , nodeId );
329+ } else {
330+ if (Mtm -> nodes [nodeId - 1 ].restartLSN < originStartPos ) {
331+ MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" , nodeId , Mtm -> nodes [nodeId - 1 ].restartLSN , originStartPos );
332+ Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
357333 }
334+ MTM_LOG1 ("Restart logical receiver at position %llx with origin=%d from node %d" , originStartPos , originId , nodeId );
358335 }
359336
360337 MTM_LOG1 ("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx" ,
@@ -373,10 +350,21 @@ pglogical_receiver_main(Datum main_arg)
373350 res = PQexec (conn , query -> data );
374351 if (PQresultStatus (res ) != PGRES_COPY_BOTH )
375352 {
376- PQclear (res );
377- ereport (WARNING , (MTM_ERRMSG ("%s: Could not start logical replication" ,
378- worker_proc )));
379- goto OnError ;
353+ int i , n_deleted_slots = 0 ;
354+
355+ elog (WARNING , "Can't find slot on node%d. Shutting down receiver." , nodeId );
356+ Mtm -> nodes [nodeId - 1 ].slotDeleted = true;
357+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
358+ {
359+ if (Mtm -> nodes [i ].slotDeleted )
360+ n_deleted_slots ++ ;
361+ }
362+ if (n_deleted_slots == Mtm -> nAllNodes - 1 )
363+ {
364+ elog (WARNING , "All neighbour nopes have no replication slot for us. Exiting." );
365+ kill (PostmasterPid , SIGTERM );
366+ }
367+ proc_exit (1 );
380368 }
381369 PQclear (res );
382370 resetPQExpBuffer (query );
0 commit comments