@@ -109,6 +109,12 @@ struct shm_mq
109109 * locally by copying the chunks into a backend-local buffer. mqh_buffer is
110110 * the buffer, and mqh_buflen is the number of bytes allocated for it.
111111 *
112+ * mqh_send_pending, is number of bytes that is written to the queue but not
113+ * yet updated in the shared memory. We will not update it until the written
114+ * data is 1/4th of the ring size or the tuple queue is full. This will
115+ * prevent frequent CPU cache misses, and it will also avoid frequent
116+ * SetLatch() calls, which are quite expensive.
117+ *
112118 * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
113119 * are used to track the state of non-blocking operations. When the caller
114120 * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
@@ -137,6 +143,7 @@ struct shm_mq_handle
137143 char * mqh_buffer ;
138144 Size mqh_buflen ;
139145 Size mqh_consume_pending ;
146+ Size mqh_send_pending ;
140147 Size mqh_partial_bytes ;
141148 Size mqh_expected_bytes ;
142149 bool mqh_length_word_complete ;
@@ -292,6 +299,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
292299 mqh -> mqh_buffer = NULL ;
293300 mqh -> mqh_buflen = 0 ;
294301 mqh -> mqh_consume_pending = 0 ;
302+ mqh -> mqh_send_pending = 0 ;
295303 mqh -> mqh_partial_bytes = 0 ;
296304 mqh -> mqh_expected_bytes = 0 ;
297305 mqh -> mqh_length_word_complete = false;
@@ -319,14 +327,15 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
319327 * Write a message into a shared message queue.
320328 */
321329shm_mq_result
322- shm_mq_send (shm_mq_handle * mqh , Size nbytes , const void * data , bool nowait )
330+ shm_mq_send (shm_mq_handle * mqh , Size nbytes , const void * data , bool nowait ,
331+ bool force_flush )
323332{
324333 shm_mq_iovec iov ;
325334
326335 iov .data = data ;
327336 iov .len = nbytes ;
328337
329- return shm_mq_sendv (mqh , & iov , 1 , nowait );
338+ return shm_mq_sendv (mqh , & iov , 1 , nowait , force_flush );
330339}
331340
332341/*
@@ -343,9 +352,15 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
343352 * arguments, each time the process latch is set. (Once begun, the sending
344353 * of a message cannot be aborted except by detaching from the queue; changing
345354 * the length or payload will corrupt the queue.)
355+ *
356+ * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
357+ * and notify the receiver (if it is already attached). Otherwise, we don't
358+ * update it until we have written an amount of data greater than 1/4th of the
359+ * ring size.
346360 */
347361shm_mq_result
348- shm_mq_sendv (shm_mq_handle * mqh , shm_mq_iovec * iov , int iovcnt , bool nowait )
362+ shm_mq_sendv (shm_mq_handle * mqh , shm_mq_iovec * iov , int iovcnt , bool nowait ,
363+ bool force_flush )
349364{
350365 shm_mq_result res ;
351366 shm_mq * mq = mqh -> mqh_queue ;
@@ -518,8 +533,18 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
518533 mqh -> mqh_counterparty_attached = true;
519534 }
520535
521- /* Notify receiver of the newly-written data, and return. */
522- SetLatch (& receiver -> procLatch );
536+ /*
537+ * If the caller has requested force flush or we have written more than 1/4
538+ * of the ring size, mark it as written in shared memory and notify the
539+ * receiver.
540+ */
541+ if (force_flush || mqh -> mqh_send_pending > (mq -> mq_ring_size >> 2 ))
542+ {
543+ shm_mq_inc_bytes_written (mq , mqh -> mqh_send_pending );
544+ SetLatch (& receiver -> procLatch );
545+ mqh -> mqh_send_pending = 0 ;
546+ }
547+
523548 return SHM_MQ_SUCCESS ;
524549}
525550
@@ -816,6 +841,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
816841void
817842shm_mq_detach (shm_mq_handle * mqh )
818843{
844+ /* Before detaching, notify the receiver about any already-written data. */
845+ if (mqh -> mqh_send_pending > 0 )
846+ {
847+ shm_mq_inc_bytes_written (mqh -> mqh_queue , mqh -> mqh_send_pending );
848+ mqh -> mqh_send_pending = 0 ;
849+ }
850+
819851 /* Notify counterparty that we're outta here. */
820852 shm_mq_detach_internal (mqh -> mqh_queue );
821853
@@ -894,7 +926,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
894926
895927 /* Compute number of ring buffer bytes used and available. */
896928 rb = pg_atomic_read_u64 (& mq -> mq_bytes_read );
897- wb = pg_atomic_read_u64 (& mq -> mq_bytes_written );
929+ wb = pg_atomic_read_u64 (& mq -> mq_bytes_written ) + mqh -> mqh_send_pending ;
898930 Assert (wb >= rb );
899931 used = wb - rb ;
900932 Assert (used <= ringsize );
@@ -951,6 +983,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
951983 }
952984 else if (available == 0 )
953985 {
986+ /* Update the pending send bytes in the shared memory. */
987+ shm_mq_inc_bytes_written (mq , mqh -> mqh_send_pending );
988+
954989 /*
955990 * Since mq->mqh_counterparty_attached is known to be true at this
956991 * point, mq_receiver has been set, and it can't change once set.
@@ -959,6 +994,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
959994 Assert (mqh -> mqh_counterparty_attached );
960995 SetLatch (& mq -> mq_receiver -> procLatch );
961996
997+ /*
998+ * We have just updated the mqh_send_pending bytes in the shared
999+ * memory so reset it.
1000+ */
1001+ mqh -> mqh_send_pending = 0 ;
1002+
9621003 /* Skip manipulation of our latch if nowait = true. */
9631004 if (nowait )
9641005 {
@@ -1009,13 +1050,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
10091050 * MAXIMUM_ALIGNOF, and each read is as well.
10101051 */
10111052 Assert (sent == nbytes || sendnow == MAXALIGN (sendnow ));
1012- shm_mq_inc_bytes_written (mq , MAXALIGN (sendnow ));
10131053
10141054 /*
1015- * For efficiency, we don't set the reader's latch here. We'll do
1016- * that only when the buffer fills up or after writing an entire
1017- * message.
1055+ * For efficiency, we don't update the bytes written in the shared
1056+ * memory and also don't set the reader's latch here. Refer to
1057+ * the comments atop the shm_mq_handle structure for more
1058+ * information.
10181059 */
1060+ mqh -> mqh_send_pending += MAXALIGN (sendnow );
10191061 }
10201062 }
10211063
0 commit comments