@@ -97,7 +97,7 @@ static void MtmSender(Datum arg);
9797static void MtmReceiver (Datum arg );
9898static void MtmMonitor (Datum arg );
9999static void MtmSendHeartbeat (void );
100- static bool MtmSendToNode (int node , void const * buf , int size , time_t reconnectTimeout );
100+ static bool MtmSendToNode (int node , void const * buf , int size );
101101
102102char const * const MtmMessageKindMnem [] =
103103{
@@ -166,7 +166,7 @@ static void MtmRegisterSocket(int fd, int node)
166166 ev .events = EPOLLIN ;
167167 ev .data .u32 = node ;
168168 if (epoll_ctl (epollfd , EPOLL_CTL_ADD , fd , & ev ) < 0 ) {
169- MTM_ELOG (LOG , "Arbiter failed to add socket to epoll set: %d " , errno );
169+ MTM_ELOG (LOG , "Arbiter failed to add socket to epoll set: %s " , strerror ( errno ) );
170170 }
171171#else
172172 FD_SET (fd , & inset );
@@ -180,7 +180,7 @@ static void MtmUnregisterSocket(int fd)
180180{
181181#if USE_EPOLL
182182 if (epoll_ctl (epollfd , EPOLL_CTL_DEL , fd , NULL ) < 0 ) {
183- MTM_ELOG (LOG , "Arbiter failed to unregister socket from epoll set: %d " , errno );
183+ MTM_ELOG (LOG , "Arbiter failed to unregister socket from epoll set: %s " , strerror ( errno ) );
184184 }
185185#else
186186 FD_CLR (fd , & inset );
@@ -371,7 +371,7 @@ static void MtmSendHeartbeat()
371371 || !BIT_CHECK (Mtm -> disabledNodeMask , i )
372372 || BIT_CHECK (Mtm -> reconnectMask , i )))
373373 {
374- if (!MtmSendToNode (i , & msg , sizeof (msg ), MtmHeartbeatRecvTimeout )) {
374+ if (!MtmSendToNode (i , & msg , sizeof (msg ))) {
375375 MTM_ELOG (LOG , "Arbiter failed to send heartbeat to node %d" , i + 1 );
376376 } else {
377377 if (last_heartbeat_to_node [i ] + MSEC_TO_USEC (MtmHeartbeatSendTimeout )* 2 < now ) {
@@ -408,7 +408,7 @@ void MtmCheckHeartbeat()
408408}
409409
410410
411- static int MtmConnectSocket (int node , int port , time_t timeout )
411+ static int MtmConnectSocket (int node , int port )
412412{
413413 struct addrinfo * addrs = NULL ;
414414 struct addrinfo * addr ;
@@ -417,12 +417,9 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
417417 MtmHandshakeMessage req ;
418418 MtmArbiterMessage resp ;
419419 int sd = -1 ;
420- int ret ;
421- timestamp_t start = MtmGetSystemTime ();
420+ int rc ;
422421 char const * host = Mtm -> nodes [node ].con .hostName ;
423422 nodemask_t save_mask = busy_mask ;
424- timestamp_t afterWait ;
425- timestamp_t beforeWait ;
426423
427424 /* Initialize hint structure */
428425 MemSet (& hint , 0 , sizeof (hint ));
@@ -431,67 +428,60 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
431428
432429 snprintf (portstr , sizeof (portstr ), "%d" , port );
433430
434- ret = pg_getaddrinfo_all (host , portstr , & hint , & addrs );
435- if (ret != 0 )
431+ rc = pg_getaddrinfo_all (host , portstr , & hint , & addrs );
432+ if (rc != 0 )
436433 {
437- MTM_ELOG (LOG , "Arbiter failed to resolve host '%s' by name: (%d) % s" , host , ret , gai_strerror (ret ));
434+ MTM_ELOG (LOG , "Arbiter failed to resolve host '%s' by name: % s" , host , gai_strerror (rc ));
438435 return -1 ;
439436 }
440437 BIT_SET (busy_mask , node );
441438
442- Retry :
443- while (1 ) {
444- int rc = -1 ;
445- sd = pg_socket (AF_INET , SOCK_STREAM , 0 , MtmUseRDMA );
446- if (sd < 0 ) {
447- MTM_ELOG (LOG , "Arbiter failed to create socket: %d" , errno );
448- goto Error ;
449- }
450- rc = pg_fcntl (sd , F_SETFL , O_NONBLOCK , MtmUseRDMA );
451- if (rc < 0 ) {
452- MTM_ELOG (LOG , "Arbiter failed to switch socket to non-blocking mode: %d" , errno );
453- goto Error ;
454- }
455- for (addr = addrs ; addr != NULL ; addr = addr -> ai_next )
456- {
457- do {
458- rc = pg_connect (sd , addr -> ai_addr , addr -> ai_addrlen , MtmUseRDMA );
459- } while (rc < 0 && errno == EINTR );
439+ Retry :
460440
461- if (rc >= 0 || errno == EINPROGRESS ) {
462- break ;
463- }
464- }
465- if (rc == 0 ) {
441+ sd = socket (AF_INET , SOCK_STREAM , 0 );
442+ if (sd < 0 ) {
443+ MTM_ELOG (LOG , "Arbiter failed to create socket: %s" , strerror (errno ));
444+ goto Error ;
445+ }
446+ rc = fcntl (sd , F_SETFL , O_NONBLOCK );
447+ if (rc < 0 ) {
448+ MTM_ELOG (LOG , "Arbiter failed to switch socket to non-blocking mode: %s" , strerror (errno ));
449+ goto Error ;
450+ }
451+ for (addr = addrs ; addr != NULL ; addr = addr -> ai_next )
452+ {
453+ do {
454+ rc = connect (sd , addr -> ai_addr , addr -> ai_addrlen );
455+ } while (rc < 0 && errno == EINTR );
456+
457+ if (rc >= 0 || errno == EINPROGRESS ) {
466458 break ;
467459 }
468- beforeWait = MtmGetSystemTime ();
469- if (errno != EINPROGRESS || start + MSEC_TO_USEC (timeout ) < beforeWait ) {
470- MTM_ELOG (WARNING , "Arbiter failed to connect to %s:%d: error=%d" , host , port , errno );
471- goto Error ;
472- } else {
473- rc = MtmWaitSocket (sd , true, MtmHeartbeatSendTimeout );
474- if (rc == 1 ) {
475- socklen_t optlen = sizeof (int );
476- if (getsockopt (sd , SOL_SOCKET , SO_ERROR , (void * )& rc , & optlen ) < 0 ) {
477- MTM_ELOG (WARNING , "Arbiter failed to getsockopt for %s:%d: error=%d" , host , port , errno );
478- goto Error ;
479- }
480- if (rc == 0 ) {
481- break ;
482- } else {
483- MTM_ELOG (WARNING , "Arbiter trying to connect to %s:%d: rc=%d, error=%d" , host , port , rc , errno );
484- }
485- } else {
486- MTM_ELOG (WARNING , "Arbiter waiting socket to %s:%d: rc=%d, error=%d" , host , port , rc , errno );
460+ }
461+
462+ if (rc != 0 && errno == EINPROGRESS ) {
463+ rc = MtmWaitSocket (sd , true, MtmHeartbeatSendTimeout );
464+ if (rc == 1 ) {
465+ socklen_t optlen = sizeof (int );
466+ int errcode ;
467+
468+ if (getsockopt (sd , SOL_SOCKET , SO_ERROR , (void * )& errcode , & optlen ) < 0 ) {
469+ MTM_ELOG (WARNING , "Arbiter failed to getsockopt for %s:%d: %s" , host , port , strerror (errcode ));
470+ goto Error ;
487471 }
488- pg_closesocket (sd , MtmUseRDMA );
489- afterWait = MtmGetSystemTime ();
490- if (afterWait < beforeWait + MSEC_TO_USEC (MtmHeartbeatSendTimeout )) {
491- MtmSleep (beforeWait + MSEC_TO_USEC (MtmHeartbeatSendTimeout ) - afterWait );
472+ if (errcode != 0 ) {
473+ MTM_ELOG (WARNING , "Arbiter trying to connect to %s:%d: %s" , host , port , strerror (errcode ));
474+ goto Error ;
492475 }
476+ } else {
477+ MTM_ELOG (WARNING , "Arbiter waiting socket to %s:%d: %s" , host , port , strerror (errno ));
493478 }
494479 }
480+ else if (rc != 0 ) {
481+ MTM_ELOG (WARNING , "Arbiter failed to connect to %s:%d: (%d) %s" , host , port , rc , strerror (errno ));
482+ goto Error ;
483+ }
484+
495485 MtmSetSocketOptions (sd );
496486 MtmInitMessage (& req .hdr , MSG_HANDSHAKE );
497487 req .hdr .node = MtmNodeId ;
@@ -500,13 +490,13 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
500490 req .hdr .csn = MtmGetCurrentTime ();
501491 strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
502492 if (!MtmWriteSocket (sd , & req , sizeof req )) {
503- MTM_ELOG (WARNING , "Arbiter failed to send handshake message to %s:%d: %d " , host , port , errno );
504- pg_closesocket (sd , MtmUseRDMA );
493+ MTM_ELOG (WARNING , "Arbiter failed to send handshake message to %s:%d: %s " , host , port , strerror ( errno ) );
494+ close (sd );
505495 goto Retry ;
506496 }
507497 if (MtmReadSocket (sd , & resp , sizeof resp ) != sizeof (resp )) {
508- MTM_ELOG (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: errno=%d " , host , port , errno );
509- pg_closesocket (sd , MtmUseRDMA );
498+ MTM_ELOG (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: %s " , host , port , strerror ( errno ) );
499+ close (sd );
510500 goto Retry ;
511501 }
512502 if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
@@ -527,7 +517,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
527517
528518 return sd ;
529519
530- Error :
520+ Error :
531521 busy_mask = save_mask ;
532522 if (sd >= 0 ) {
533523 pg_closesocket (sd , MtmUseRDMA );
@@ -551,7 +541,7 @@ static void MtmOpenConnections()
551541 }
552542 for (i = 0 ; i < nNodes ; i ++ ) {
553543 if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
554- sockets [i ] = MtmConnectSocket (i , Mtm -> nodes [i ].con .arbiterPort , MtmConnectTimeout );
544+ sockets [i ] = MtmConnectSocket (i , Mtm -> nodes [i ].con .arbiterPort );
555545 if (sockets [i ] < 0 ) {
556546 MtmOnNodeDisconnect (i + 1 );
557547 }
@@ -566,7 +556,7 @@ static void MtmOpenConnections()
566556}
567557
568558
569- static bool MtmSendToNode (int node , void const * buf , int size , time_t reconnectTimeout )
559+ static bool MtmSendToNode (int node , void const * buf , int size )
570560{
571561 bool result = true;
572562 nodemask_t save_mask = busy_mask ;
@@ -589,11 +579,11 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
589579 }
590580 if (sockets [node ] < 0 || !MtmWriteSocket (sockets [node ], buf , size )) {
591581 if (sockets [node ] >= 0 ) {
592- MTM_ELOG (WARNING , "Arbiter fail to write to node %d: %d " , node + 1 , errno );
593- pg_closesocket (sockets [node ], MtmUseRDMA );
582+ MTM_ELOG (WARNING , "Arbiter fail to write to node %d: %s " , node + 1 , strerror ( errno ) );
583+ close (sockets [node ]);
594584 sockets [node ] = -1 ;
595585 }
596- sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort , reconnectTimeout );
586+ sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort );
597587 if (sockets [node ] < 0 ) {
598588 MtmOnNodeDisconnect (node + 1 );
599589 result = false;
@@ -613,7 +603,7 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
613603{
614604 int rc = MtmReadSocket (sockets [node ], buf , buf_size );
615605 if (rc <= 0 ) {
616- MTM_ELOG (WARNING , "Arbiter failed to read from node=%d, rc=%d, errno=%d " , node + 1 , rc , errno );
606+ MTM_ELOG (WARNING , "Arbiter failed to read from node=%d: %s " , node + 1 , strerror ( errno ) );
617607 MtmDisconnect (node );
618608 }
619609 return rc ;
@@ -623,17 +613,17 @@ static void MtmAcceptOneConnection()
623613{
624614 int fd = pg_accept (gateway , NULL , NULL , MtmUseRDMA );
625615 if (fd < 0 ) {
626- MTM_ELOG (WARNING , "Arbiter failed to accept socket: %d " , errno );
616+ MTM_ELOG (WARNING , "Arbiter failed to accept socket: %s " , strerror ( errno ) );
627617 } else {
628618 MtmHandshakeMessage req ;
629619 MtmArbiterMessage resp ;
630620 int rc = pg_fcntl (fd , F_SETFL , O_NONBLOCK , MtmUseRDMA );
631621 if (rc < 0 ) {
632- MTM_ELOG (ERROR , "Arbiter failed to switch socket to non-blocking mode: %d " , errno );
622+ MTM_ELOG (ERROR , "Arbiter failed to switch socket to non-blocking mode: %s " , strerror ( errno ) );
633623 }
634624 rc = MtmReadSocket (fd , & req , sizeof req );
635625 if (rc < sizeof (req )) {
636- MTM_ELOG (WARNING , "Arbiter failed to handshake socket: %d, errno=%d " , rc , errno );
626+ MTM_ELOG (WARNING , "Arbiter failed to handshake socket: %s " , strerror ( errno ) );
637627 pg_closesocket (fd , MtmUseRDMA );
638628 } else if (req .hdr .code != MSG_HANDSHAKE && req .hdr .dxid != HANDSHAKE_MAGIC ) {
639629 MTM_ELOG (WARNING , "Arbiter get unexpected handshake message %d" , req .hdr .code );
@@ -770,7 +760,7 @@ static void MtmSender(Datum arg)
770760
771761 for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
772762 if (txBuffer [i ].used != 0 ) {
773- MtmSendToNode (i , txBuffer [i ].data , txBuffer [i ].used * sizeof (MtmArbiterMessage ), MtmReconnectTimeout );
763+ MtmSendToNode (i , txBuffer [i ].data , txBuffer [i ].used * sizeof (MtmArbiterMessage ));
774764 txBuffer [i ].used = 0 ;
775765 }
776766 }
@@ -870,7 +860,7 @@ static void MtmReceiver(Datum arg)
870860 if (errno == EINTR ) {
871861 continue ;
872862 }
873- MTM_ELOG (ERROR , "Arbiter failed to poll sockets: %d " , errno );
863+ MTM_ELOG (ERROR , "Arbiter failed to poll sockets: %s " , strerror ( errno ) );
874864 }
875865 for (j = 0 ; j < n ; j ++ ) {
876866 i = events [j ].data .u32 ;
@@ -894,7 +884,7 @@ static void MtmReceiver(Datum arg)
894884 } while (n < 0 && MtmRecovery ());
895885
896886 if (n < 0 ) {
897- MTM_ELOG (ERROR , "Arbiter failed to select sockets: %d " , errno );
887+ MTM_ELOG (ERROR , "Arbiter failed to select sockets: %s " , strerror ( errno ) );
898888 }
899889 for (i = 0 ; i < nNodes ; i ++ ) {
900890 if (sockets [i ] >= 0 && FD_ISSET (sockets [i ], & events ))
0 commit comments