3434#include "lib/stringinfo.h"
3535#include "libpq/pqsignal.h"
3636#include "miscadmin.h"
37+ #include "nodes/pg_list.h"
3738#include "pgtime.h"
3839#include "postmaster/fork_process.h"
3940#include "postmaster/postmaster.h"
@@ -93,11 +94,14 @@ static char *last_file_name = NULL;
9394static char * last_csv_file_name = NULL ;
9495
9596/*
96- * Buffers for saving partial messages from different backends. We don't expect
97- * that there will be very many outstanding at one time, so 20 seems plenty of
98- * leeway. If this array gets full we won't lose messages, but we will lose
99- * the protocol protection against them being partially written or interleaved.
97+ * Buffers for saving partial messages from different backends.
10098 *
99+ * Keep NBUFFER_LISTS lists of these, with the entry for a given source pid
100+ * being in the list numbered (pid % NBUFFER_LISTS), so as to cut down on
101+ * the number of entries we have to examine for any one incoming message.
102+ * There must never be more than one entry for the same source pid.
103+ *
104+ * An inactive buffer is not removed from its list, just held for re-use.
101105 * An inactive buffer has pid == 0 and undefined contents of data.
102106 */
103107typedef struct
@@ -106,8 +110,8 @@ typedef struct
106110 StringInfoData data ; /* accumulated data, as a StringInfo */
107111} save_buffer ;
108112
109- #define CHUNK_SLOTS 20
110- static save_buffer saved_chunks [ CHUNK_SLOTS ];
113+ #define NBUFFER_LISTS 256
114+ static List * buffer_lists [ NBUFFER_LISTS ];
111115
112116/* These must be exported for EXEC_BACKEND case ... annoying */
113117#ifndef WIN32
@@ -592,7 +596,7 @@ SysLogger_Start(void)
592596 * Now we are done with the write end of the pipe.
593597 * CloseHandle() must not be called because the preceding
594598 * close() closes the underlying handle.
595- */
599+ */
596600 syslogPipe [1 ] = 0 ;
597601#endif
598602 redirection_done = true;
@@ -734,6 +738,12 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
734738 (p .is_last == 't' || p .is_last == 'f' ||
735739 p .is_last == 'T' || p .is_last == 'F' ))
736740 {
741+ List * buffer_list ;
742+ ListCell * cell ;
743+ save_buffer * existing_slot = NULL ,
744+ * free_slot = NULL ;
745+ StringInfo str ;
746+
737747 chunklen = PIPE_HEADER_SIZE + p .len ;
738748
739749 /* Fall out of loop if we don't have the whole chunk yet */
@@ -743,80 +753,70 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
743753 dest = (p .is_last == 'T' || p .is_last == 'F' ) ?
744754 LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR ;
745755
746- if (p .is_last == 'f' || p .is_last == 'F' )
756+ /* Locate any existing buffer for this source pid */
757+ buffer_list = buffer_lists [p .pid % NBUFFER_LISTS ];
758+ foreach (cell , buffer_list )
747759 {
748- /*
749- * Save a complete non-final chunk in the per-pid buffer if
750- * possible - if not just write it out.
751- */
752- int free_slot = -1 ,
753- existing_slot = -1 ;
754- int i ;
755- StringInfo str ;
760+ save_buffer * buf = (save_buffer * ) lfirst (cell );
756761
757- for ( i = 0 ; i < CHUNK_SLOTS ; i ++ )
762+ if ( buf -> pid == p . pid )
758763 {
759- if (saved_chunks [i ].pid == p .pid )
760- {
761- existing_slot = i ;
762- break ;
763- }
764- if (free_slot < 0 && saved_chunks [i ].pid == 0 )
765- free_slot = i ;
764+ existing_slot = buf ;
765+ break ;
766766 }
767- if (existing_slot >= 0 )
767+ if (buf -> pid == 0 && free_slot == NULL )
768+ free_slot = buf ;
769+ }
770+
771+ if (p .is_last == 'f' || p .is_last == 'F' )
772+ {
773+ /*
774+ * Save a complete non-final chunk in a per-pid buffer
775+ */
776+ if (existing_slot != NULL )
768777 {
769- str = & (saved_chunks [existing_slot ].data );
778+ /* Add chunk to data from preceding chunks */
779+ str = & (existing_slot -> data );
770780 appendBinaryStringInfo (str ,
771781 cursor + PIPE_HEADER_SIZE ,
772782 p .len );
773783 }
774- else if ( free_slot >= 0 )
784+ else
775785 {
776- saved_chunks [free_slot ].pid = p .pid ;
777- str = & (saved_chunks [free_slot ].data );
786+ /* First chunk of message, save in a new buffer */
787+ if (free_slot == NULL )
788+ {
789+ /*
790+ * Need a free slot, but there isn't one in the list,
791+ * so create a new one and extend the list with it.
792+ */
793+ free_slot = palloc (sizeof (save_buffer ));
794+ buffer_list = lappend (buffer_list , free_slot );
795+ buffer_lists [p .pid % NBUFFER_LISTS ] = buffer_list ;
796+ }
797+ free_slot -> pid = p .pid ;
798+ str = & (free_slot -> data );
778799 initStringInfo (str );
779800 appendBinaryStringInfo (str ,
780801 cursor + PIPE_HEADER_SIZE ,
781802 p .len );
782803 }
783- else
784- {
785- /*
786- * If there is no free slot we'll just have to take our
787- * chances and write out a partial message and hope that
788- * it's not followed by something from another pid.
789- */
790- write_syslogger_file (cursor + PIPE_HEADER_SIZE , p .len ,
791- dest );
792- }
793804 }
794805 else
795806 {
796807 /*
797808 * Final chunk --- add it to anything saved for that pid, and
798809 * either way write the whole thing out.
799810 */
800- int existing_slot = -1 ;
801- int i ;
802- StringInfo str ;
803-
804- for (i = 0 ; i < CHUNK_SLOTS ; i ++ )
805- {
806- if (saved_chunks [i ].pid == p .pid )
807- {
808- existing_slot = i ;
809- break ;
810- }
811- }
812- if (existing_slot >= 0 )
811+ if (existing_slot != NULL )
813812 {
814- str = & (saved_chunks [ existing_slot ]. data );
813+ str = & (existing_slot -> data );
815814 appendBinaryStringInfo (str ,
816815 cursor + PIPE_HEADER_SIZE ,
817816 p .len );
818817 write_syslogger_file (str -> data , str -> len , dest );
819- saved_chunks [existing_slot ].pid = 0 ;
818+ /* Mark the buffer unused, and reclaim string storage */
819+ existing_slot -> pid = 0 ;
820820 pfree (str -> data );
821821 }
822822 else
@@ -872,17 +872,27 @@ static void
872872flush_pipe_input (char * logbuffer , int * bytes_in_logbuffer )
873873{
874874 int i ;
875- StringInfo str ;
876875
877876 /* Dump any incomplete protocol messages */
878- for (i = 0 ; i < CHUNK_SLOTS ; i ++ )
877+ for (i = 0 ; i < NBUFFER_LISTS ; i ++ )
879878 {
880- if (saved_chunks [i ].pid != 0 )
879+ List * list = buffer_lists [i ];
880+ ListCell * cell ;
881+
882+ foreach (cell , list )
881883 {
882- str = & (saved_chunks [i ].data );
883- write_syslogger_file (str -> data , str -> len , LOG_DESTINATION_STDERR );
884- saved_chunks [i ].pid = 0 ;
885- pfree (str -> data );
884+ save_buffer * buf = (save_buffer * ) lfirst (cell );
885+
886+ if (buf -> pid != 0 )
887+ {
888+ StringInfo str = & (buf -> data );
889+
890+ write_syslogger_file (str -> data , str -> len ,
891+ LOG_DESTINATION_STDERR );
892+ /* Mark the buffer unused, and reclaim string storage */
893+ buf -> pid = 0 ;
894+ pfree (str -> data );
895+ }
886896 }
887897 }
888898
0 commit comments