1818#include <time.h>
1919#include <fcntl.h>
2020
21+ #ifdef WITH_RSOCKET
22+ #include <rdma/rsocket.h>
23+ #endif
24+
2125#include "postgres.h"
2226#include "fmgr.h"
2327#include "miscadmin.h"
28+ #include "pg_socket.h"
2429#include "postmaster/postmaster.h"
2530#include "postmaster/bgworker.h"
2631#include "storage/s_lock.h"
5863#include "tcop/utility.h"
5964#include "libpq/ip.h"
6065
66+
6167#ifndef USE_EPOLL
6268#ifdef __linux__
6369#define USE_EPOLL 0
@@ -185,7 +191,7 @@ static void MtmUnregisterSocket(int fd)
185191static void MtmDisconnect (int node )
186192{
187193 MtmUnregisterSocket (sockets [node ]);
188- close (sockets [node ]);
194+ pg_closesocket (sockets [node ], MtmUseRDMA );
189195 sockets [node ] = -1 ;
190196 MtmOnNodeDisconnect (node + 1 );
191197}
@@ -208,7 +214,7 @@ static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
208214 FD_SET (sd , & set );
209215 tv .tv_sec = (deadline - now )/USECS_PER_SEC ;
210216 tv .tv_usec = (deadline - now )%USECS_PER_SEC ;
211- } while ((rc = select ( sd + 1 , forWrite ? NULL : & set , forWrite ? & set : NULL , NULL , & tv )) < 0 && errno == EINTR );
217+ } while ((rc = pg_select ([ sd + 1 , forWrite ? NULL : & set , forWrite ? & set : NULL , NULL , & tv , MtmUseRDMA )) < 0 && errno == EINTR );
212218
213219 return rc ;
214220}
@@ -219,7 +225,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
219225 while (size != 0 ) {
220226 int rc = MtmWaitSocket (sd , true, MtmHeartbeatSendTimeout );
221227 if (rc == 1 ) {
222- while ((rc = send (sd , src , size , 0 )) < 0 && errno == EINTR );
228+ while ((rc = pg_send (sd , src , size , 0 , MtmUseRDMA )) < 0 && errno == EINTR );
223229 if (rc < 0 ) {
224230 if (errno == EINPROGRESS ) {
225231 continue ;
@@ -238,11 +244,11 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
238244static int MtmReadSocket (int sd , void * buf , int buf_size )
239245{
240246 int rc ;
241- while ((rc = recv (sd , buf , buf_size , 0 )) < 0 && errno == EINTR );
247+ while ((rc = pg_recv (sd , buf , buf_size , 0 , MtmUseRDMA )) < 0 && errno == EINTR );
242248 if (rc <= 0 && (errno == EAGAIN || errno == EINPROGRESS )) {
243249 rc = MtmWaitSocket (sd , false, MtmHeartbeatSendTimeout );
244250 if (rc == 1 ) {
245- while ((rc = recv (sd , buf , buf_size , 0 )) < 0 && errno == EINTR );
251+ while ((rc = pg_recv (sd , buf , buf_size , 0 , MtmUseRDMA )) < 0 && errno == EINTR );
246252 }
247253 }
248254 return rc ;
@@ -254,25 +260,25 @@ static void MtmSetSocketOptions(int sd)
254260{
255261#ifdef TCP_NODELAY
256262 int on = 1 ;
257- if (setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , (char const * )& on , sizeof (on )) < 0 ) {
263+ if (pg_setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , (char const * )& on , sizeof (on ), MtmUseRDMA ) < 0 ) {
258264 MTM_ELOG (WARNING , "Failed to set TCP_NODELAY: %m" );
259265 }
260266#endif
261- if (setsockopt (sd , SOL_SOCKET , SO_KEEPALIVE , (char const * )& on , sizeof (on )) < 0 ) {
267+ if (pg_setsockopt (sd , SOL_SOCKET , SO_KEEPALIVE , (char const * )& on , sizeof (on ), MtmUseRDMA ) < 0 ) {
262268 MTM_ELOG (WARNING , "Failed to set SO_KEEPALIVE: %m" );
263269 }
264270
265271 if (tcp_keepalives_idle ) {
266272#ifdef TCP_KEEPIDLE
267- if (setsockopt (sd , IPPROTO_TCP , TCP_KEEPIDLE ,
268- (char * ) & tcp_keepalives_idle , sizeof (tcp_keepalives_idle )) < 0 )
273+ if (pg_setsockopt (sd , IPPROTO_TCP , TCP_KEEPIDLE ,
274+ (char * ) & tcp_keepalives_idle , sizeof (tcp_keepalives_idle ), MtmUseRDMA ) < 0 )
269275 {
270276 MTM_ELOG (WARNING , "Failed to set TCP_KEEPIDLE: %m" );
271277 }
272278#else
273279#ifdef TCP_KEEPALIVE
274- if (setsockopt (sd , IPPROTO_TCP , TCP_KEEPALIVE ,
275- (char * ) & tcp_keepalives_idle , sizeof (tcp_keepalives_idle )) < 0 )
280+ if (pg_setsockopt (sd , IPPROTO_TCP , TCP_KEEPALIVE ,
281+ (char * ) & tcp_keepalives_idle , sizeof (tcp_keepalives_idle ), MtmUseRDMA ) < 0 )
276282 {
277283 MTM_ELOG (WARNING , "Failed to set TCP_KEEPALIVE: %m" );
278284 }
@@ -281,17 +287,17 @@ static void MtmSetSocketOptions(int sd)
281287 }
282288#ifdef TCP_KEEPINTVL
283289 if (tcp_keepalives_interval ) {
284- if (setsockopt (sd , IPPROTO_TCP , TCP_KEEPINTVL ,
285- (char * ) & tcp_keepalives_interval , sizeof (tcp_keepalives_interval )) < 0 )
290+ if (pg_setsockopt (sd , IPPROTO_TCP , TCP_KEEPINTVL ,
291+ (char * ) & tcp_keepalives_interval , sizeof (tcp_keepalives_interval ), MtmUseRDMA ) < 0 )
286292 {
287293 MTM_ELOG (WARNING , "Failed to set TCP_KEEPINTVL: %m" );
288294 }
289295 }
290296#endif
291297#ifdef TCP_KEEPCNT
292298 if (tcp_keepalives_count ) {
293- if (setsockopt (sd , IPPROTO_TCP , TCP_KEEPCNT ,
294- (char * ) & tcp_keepalives_count , sizeof (tcp_keepalives_count )) < 0 )
299+ if (pg_setsockopt (sd , IPPROTO_TCP , TCP_KEEPCNT ,
300+ (char * ) & tcp_keepalives_count , sizeof (tcp_keepalives_count ), MtmUseRDMA ) < 0 )
295301 {
296302 MTM_ELOG (WARNING , "Failed to set TCP_KEEPCNT: %m" );
297303 }
@@ -375,7 +381,7 @@ static void MtmSendHeartbeat()
375381 /* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
376382 if (BIT_CHECK (SELF_CONNECTIVITY_MASK , i )) {
377383 MTM_LOG1 ("Force reconnect to node %d" , i + 1 );
378- close (sockets [i ]);
384+ pg_closesocket (sockets [i ], MtmUseRDMA );
379385 sockets [i ] = -1 ;
380386 MtmReconnectNode (i + 1 ); /* set reconnect mask to force node reconnent */
381387 }
@@ -436,20 +442,20 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
436442 Retry :
437443 while (1 ) {
438444 int rc = -1 ;
439- sd = socket (AF_INET , SOCK_STREAM , 0 );
445+ sd = pg_socket (AF_INET , SOCK_STREAM , 0 , MtmUseRDMA );
440446 if (sd < 0 ) {
441447 MTM_ELOG (LOG , "Arbiter failed to create socket: %d" , errno );
442448 goto Error ;
443449 }
444- rc = fcntl (sd , F_SETFL , O_NONBLOCK );
450+ rc = pg_fcntl (sd , F_SETFL , O_NONBLOCK , MtmUseRDMA );
445451 if (rc < 0 ) {
446452 MTM_ELOG (LOG , "Arbiter failed to switch socket to non-blocking mode: %d" , errno );
447453 goto Error ;
448454 }
449455 for (addr = addrs ; addr != NULL ; addr = addr -> ai_next )
450456 {
451457 do {
452- rc = connect (sd , addr -> ai_addr , addr -> ai_addrlen );
458+ rc = pg_connect (sd , addr -> ai_addr , addr -> ai_addrlen , MtmUseRDMA );
453459 } while (rc < 0 && errno == EINTR );
454460
455461 if (rc >= 0 || errno == EINPROGRESS ) {
@@ -479,7 +485,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
479485 } else {
480486 MTM_ELOG (WARNING , "Arbiter waiting socket to %s:%d: rc=%d, error=%d" , host , port , rc , errno );
481487 }
482- close (sd );
488+ pg_closesocket (sd , MtmUseRDMA );
483489 afterWait = MtmGetSystemTime ();
484490 if (afterWait < beforeWait + MSEC_TO_USEC (MtmHeartbeatSendTimeout )) {
485491 MtmSleep (beforeWait + MSEC_TO_USEC (MtmHeartbeatSendTimeout ) - afterWait );
@@ -495,17 +501,17 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
495501 strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
496502 if (!MtmWriteSocket (sd , & req , sizeof req )) {
497503 MTM_ELOG (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
498- close (sd );
504+ pg_closesocket (sd , MtmUseRDMA );
499505 goto Retry ;
500506 }
501507 if (MtmReadSocket (sd , & resp , sizeof resp ) != sizeof (resp )) {
502508 MTM_ELOG (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: errno=%d" , host , port , errno );
503- close (sd );
509+ pg_closesocket (sd , MtmUseRDMA );
504510 goto Retry ;
505511 }
506512 if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
507513 MTM_ELOG (WARNING , "Arbiter get unexpected response %d for handshake message from %s:%d" , resp .code , host , port );
508- close (sd );
514+ pg_closesocket (sd , MtmUseRDMA );
509515 goto Retry ;
510516 }
511517 if (addrs )
@@ -524,7 +530,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
524530 Error :
525531 busy_mask = save_mask ;
526532 if (sd >= 0 ) {
527- close (sd );
533+ pg_closesocket (sd , MtmUseRDMA );
528534 }
529535 if (addrs ) {
530536 pg_freeaddrinfo_all (hint .ai_family , addrs );
@@ -572,7 +578,7 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
572578 */
573579 if (sockets [node ] >= 0 && BIT_CHECK (Mtm -> reconnectMask , node )) {
574580 MTM_ELOG (WARNING , "Arbiter is forced to reconnect to node %d" , node + 1 );
575- close (sockets [node ]);
581+ pg_closesocket (sockets [node ], MtmUseRDMA );
576582 sockets [node ] = -1 ;
577583 }
578584#endif
@@ -584,7 +590,7 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
584590 if (sockets [node ] < 0 || !MtmWriteSocket (sockets [node ], buf , size )) {
585591 if (sockets [node ] >= 0 ) {
586592 MTM_ELOG (WARNING , "Arbiter fail to write to node %d: %d" , node + 1 , errno );
587- close (sockets [node ]);
593+ pg_closesocket (sockets [node ], MtmUseRDMA );
588594 sockets [node ] = -1 ;
589595 }
590596 sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort , reconnectTimeout );
@@ -615,23 +621,23 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
615621
616622static void MtmAcceptOneConnection ()
617623{
618- int fd = accept (gateway , NULL , NULL );
624+ int fd = pg_accept (gateway , NULL , NULL , MtmUseRDMA );
619625 if (fd < 0 ) {
620626 MTM_ELOG (WARNING , "Arbiter failed to accept socket: %d" , errno );
621627 } else {
622628 MtmHandshakeMessage req ;
623629 MtmArbiterMessage resp ;
624- int rc = fcntl (fd , F_SETFL , O_NONBLOCK );
630+ int rc = pg_fcntl (fd , F_SETFL , O_NONBLOCK , MtmUseRDMA );
625631 if (rc < 0 ) {
626632 MTM_ELOG (ERROR , "Arbiter failed to switch socket to non-blocking mode: %d" , errno );
627633 }
628634 rc = MtmReadSocket (fd , & req , sizeof req );
629635 if (rc < sizeof (req )) {
630636 MTM_ELOG (WARNING , "Arbiter failed to handshake socket: %d, errno=%d" , rc , errno );
631- close (fd );
637+ pg_closesocket (fd , MtmUseRDMA );
632638 } else if (req .hdr .code != MSG_HANDSHAKE && req .hdr .dxid != HANDSHAKE_MAGIC ) {
633639 MTM_ELOG (WARNING , "Arbiter get unexpected handshake message %d" , req .hdr .code );
634- close (fd );
640+ pg_closesocket (fd , MtmUseRDMA );
635641 } else {
636642 int node = req .hdr .node - 1 ;
637643 Assert (node >= 0 && node < Mtm -> nAllNodes && node + 1 != MtmNodeId );
@@ -648,7 +654,7 @@ static void MtmAcceptOneConnection()
648654 MtmUpdateNodeConnectionInfo (& Mtm -> nodes [node ].con , req .connStr );
649655 if (!MtmWriteSocket (fd , & resp , sizeof resp )) {
650656 MTM_ELOG (WARNING , "Arbiter failed to write response for handshake message to node %d" , node + 1 );
651- close (fd );
657+ pg_closesocket (fd , MtmUseRDMA );
652658 } else {
653659 MTM_LOG1 ("Arbiter established connection with node %d" , node + 1 );
654660 if (sockets [node ] >= 0 ) {
@@ -678,18 +684,18 @@ static void MtmAcceptIncomingConnections()
678684 sock_inet .sin_addr .s_addr = htonl (INADDR_ANY );
679685 sock_inet .sin_port = htons (MtmArbiterPort );
680686
681- gateway = socket (sock_inet .sin_family , SOCK_STREAM , 0 );
687+ gateway = pg_socket (sock_inet .sin_family , SOCK_STREAM , 0 , MtmUseRDMA );
682688 if (gateway < 0 ) {
683689 MTM_ELOG (ERROR , "Arbiter failed to create socket: %s" , strerror (errno ));
684690 }
685- if (setsockopt (gateway , SOL_SOCKET , SO_REUSEADDR , (char * )& on , sizeof on ) < 0 ) {
691+ if (pg_setsockopt (gateway , SOL_SOCKET , SO_REUSEADDR , (char * )& on , sizeof on ) < 0 ) {
686692 MTM_ELOG (ERROR , "Arbiter failed to set options for socket: %s" , strerror (errno ));
687693 }
688694
689- if (bind (gateway , (struct sockaddr * )& sock_inet , sizeof (sock_inet )) < 0 ) {
695+ if (pg_bind (gateway , (struct sockaddr * )& sock_inet , sizeof (sock_inet ), MtmUseRDMA ) < 0 ) {
690696 MTM_ELOG (ERROR , "Arbiter failed to bind socket: %s" , strerror (errno ));
691697 }
692- if (listen (gateway , nNodes ) < 0 ) {
698+ if (pg_listen (gateway , nNodes , MtmUseRDMA ) < 0 ) {
693699 MTM_ELOG (ERROR , "Arbiter failed to listen socket: %s" , strerror (errno ));
694700 }
695701
@@ -790,7 +796,7 @@ static bool MtmRecovery()
790796 fd_set tryset ;
791797 FD_ZERO (& tryset );
792798 FD_SET (sd , & tryset );
793- if (select (sd + 1 , & tryset , NULL , NULL , & tm ) < 0 ) {
799+ if (pg_select (sd + 1 , & tryset , NULL , NULL , & tm , MtmUseRDMA ) < 0 ) {
794800 MTM_ELOG (WARNING , "Arbiter lost connection with node %d" , i + 1 );
795801 MtmDisconnect (i );
796802 recovered = true;
@@ -883,7 +889,7 @@ static void MtmReceiver(Datum arg)
883889 tv .tv_sec = selectTimeout /1000 ;
884890 tv .tv_usec = selectTimeout %1000 * 1000 ;
885891 do {
886- n = select (max_fd + 1 , & events , NULL , NULL , & tv );
892+ n = pg_select (max_fd + 1 , & events , NULL , NULL , & tv , MtmUseRDMA );
887893 } while (n < 0 && errno == EINTR );
888894 } while (n < 0 && MtmRecovery ());
889895
0 commit comments