3838#define STREAMING_HEADER_SIZE (1+sizeof(WalDataMessageHeader))
3939#define STREAMING_KEEPALIVE_SIZE (1+sizeof(PrimaryKeepaliveMessage))
4040
41+ /* fd for currently open WAL file */
42+ static int walfile = -1 ;
43+
44+
4145/*
4246 * Open a new WAL file in the specified directory. Store the name
4347 * (not including the full directory) in namebuf. Assumes there is
@@ -96,6 +100,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
96100 {
97101 fprintf (stderr , _ ("%s: could not pad WAL segment %s: %s\n" ),
98102 progname , fn , strerror (errno ));
103+ free (zerobuf );
99104 close (f );
100105 unlink (fn );
101106 return -1 ;
@@ -120,7 +125,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
120125 * completed writing the whole segment.
121126 */
122127static bool
123- close_walfile (int walfile , char * basedir , char * walname , bool segment_complete )
128+ close_walfile (char * basedir , char * walname , bool segment_complete )
124129{
125130 off_t currpos = lseek (walfile , 0 , SEEK_CUR );
126131
@@ -142,8 +147,10 @@ close_walfile(int walfile, char *basedir, char *walname, bool segment_complete)
142147 {
143148 fprintf (stderr , _ ("%s: could not close file %s: %s\n" ),
144149 progname , walname , strerror (errno ));
150+ walfile = -1 ;
145151 return false;
146152 }
153+ walfile = -1 ;
147154
148155 /*
149156 * Rename the .partial file only if we've completed writing the whole
@@ -270,7 +277,6 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
270277 char current_walfile_name [MAXPGPATH ];
271278 PGresult * res ;
272279 char * copybuf = NULL ;
273- int walfile = -1 ;
274280 int64 last_status = -1 ;
275281 XLogRecPtr blockpos = InvalidXLogRecPtr ;
276282
@@ -315,6 +321,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
315321 {
316322 fprintf (stderr , _ ("%s: could not start replication: %s\n" ),
317323 progname , PQresultErrorMessage (res ));
324+ PQclear (res );
318325 return false;
319326 }
320327 PQclear (res );
@@ -341,9 +348,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
341348 */
342349 if (stream_stop && stream_stop (blockpos , timeline , false))
343350 {
344- if (walfile != -1 )
351+ if (walfile != -1 && ! close_walfile ( basedir , current_walfile_name , rename_partial ) )
345352 /* Potential error message is written by close_walfile */
346- return close_walfile ( walfile , basedir , current_walfile_name , rename_partial ) ;
353+ goto error ;
347354 return true;
348355 }
349356
@@ -370,7 +377,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
370377 {
371378 fprintf (stderr , _ ("%s: could not send feedback packet: %s" ),
372379 progname , PQerrorMessage (conn ));
373- return false ;
380+ goto error ;
374381 }
375382
376383 last_status = now ;
@@ -421,14 +428,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
421428 {
422429 fprintf (stderr , _ ("%s: select() failed: %s\n" ),
423430 progname , strerror (errno ));
424- return false ;
431+ goto error ;
425432 }
426433 /* Else there is actually data on the socket */
427434 if (PQconsumeInput (conn ) == 0 )
428435 {
429436 fprintf (stderr , _ ("%s: could not receive data from WAL stream: %s\n" ),
430437 progname , PQerrorMessage (conn ));
431- return false ;
438+ goto error ;
432439 }
433440 continue ;
434441 }
@@ -439,7 +446,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
439446 {
440447 fprintf (stderr , _ ("%s: could not read copy data: %s\n" ),
441448 progname , PQerrorMessage (conn ));
442- return false ;
449+ goto error ;
443450 }
444451 if (copybuf [0 ] == 'k' )
445452 {
@@ -451,21 +458,21 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
451458 {
452459 fprintf (stderr , _ ("%s: keepalive message is incorrect size: %d\n" ),
453460 progname , r );
454- return false ;
461+ goto error ;
455462 }
456463 continue ;
457464 }
458465 else if (copybuf [0 ] != 'w' )
459466 {
460467 fprintf (stderr , _ ("%s: unrecognized streaming header: \"%c\"\n" ),
461468 progname , copybuf [0 ]);
462- return false ;
469+ goto error ;
463470 }
464471 if (r < STREAMING_HEADER_SIZE + 1 )
465472 {
466473 fprintf (stderr , _ ("%s: streaming header too small: %d\n" ),
467474 progname , r );
468- return false ;
475+ goto error ;
469476 }
470477
471478 /* Extract WAL location for this block */
@@ -483,7 +490,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
483490 {
484491 fprintf (stderr , _ ("%s: received xlog record for offset %u with no file open\n" ),
485492 progname , xlogoff );
486- return false ;
493+ goto error ;
487494 }
488495 }
489496 else
@@ -494,7 +501,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
494501 {
495502 fprintf (stderr , _ ("%s: got WAL data offset %08x, expected %08x\n" ),
496503 progname , xlogoff , (int ) lseek (walfile , 0 , SEEK_CUR ));
497- return false ;
504+ goto error ;
498505 }
499506 }
500507
@@ -520,7 +527,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
520527 basedir , current_walfile_name );
521528 if (walfile == -1 )
522529 /* Error logged by open_walfile */
523- return false ;
530+ goto error ;
524531 }
525532
526533 if (write (walfile ,
@@ -532,7 +539,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
532539 bytes_to_write ,
533540 current_walfile_name ,
534541 strerror (errno ));
535- return false ;
542+ goto error ;
536543 }
537544
538545 /* Write was successful, advance our position */
@@ -544,11 +551,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
544551 /* Did we reach the end of a WAL segment? */
545552 if (blockpos % XLOG_SEG_SIZE == 0 )
546553 {
547- if (!close_walfile (walfile , basedir , current_walfile_name , false))
554+ if (!close_walfile (basedir , current_walfile_name , false))
548555 /* Error message written in close_walfile() */
549- return false ;
556+ goto error ;
550557
551- walfile = -1 ;
552558 xlogoff = 0 ;
553559
554560 if (stream_stop != NULL )
@@ -577,8 +583,22 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
577583 {
578584 fprintf (stderr , _ ("%s: unexpected termination of replication stream: %s\n" ),
579585 progname , PQresultErrorMessage (res ));
580- return false ;
586+ goto error ;
581587 }
582588 PQclear (res );
589+
590+ if (copybuf != NULL )
591+ PQfreemem (copybuf );
592+ if (walfile != -1 && close (walfile ) != 0 )
593+ fprintf (stderr , _ ("%s: could not close file %s: %s\n" ),
594+ progname , current_walfile_name , strerror (errno ));
583595 return true;
596+
597+ error :
598+ if (copybuf != NULL )
599+ PQfreemem (copybuf );
600+ if (walfile != -1 && close (walfile ) != 0 )
601+ fprintf (stderr , _ ("%s: could not close file %s: %s\n" ),
602+ progname , current_walfile_name , strerror (errno ));
603+ return false;
584604}
0 commit comments