|
23 | 23 | #include "access/xlog_internal.h" |
24 | 24 | #include "replication/walprotocol.h" |
25 | 25 | #include "utils/datetime.h" |
| 26 | +#include "utils/timestamp.h" |
26 | 27 |
|
27 | 28 | #include "receivelog.h" |
28 | 29 | #include "streamutil.h" |
@@ -195,6 +196,51 @@ localGetCurrentTimestamp(void) |
195 | 196 | return result; |
196 | 197 | } |
197 | 198 |
|
| 199 | +/* |
| 200 | + * Local version of TimestampDifference(), since we are not |
| 201 | + * linked with backend code. |
| 202 | + */ |
| 203 | +static void |
| 204 | +localTimestampDifference(TimestampTz start_time, TimestampTz stop_time, |
| 205 | + long *secs, int *microsecs) |
| 206 | +{ |
| 207 | + TimestampTz diff = stop_time - start_time; |
| 208 | + |
| 209 | + if (diff <= 0) |
| 210 | + { |
| 211 | + *secs = 0; |
| 212 | + *microsecs = 0; |
| 213 | + } |
| 214 | + else |
| 215 | + { |
| 216 | +#ifdef HAVE_INT64_TIMESTAMP |
| 217 | + *secs = (long) (diff / USECS_PER_SEC); |
| 218 | + *microsecs = (int) (diff % USECS_PER_SEC); |
| 219 | +#else |
| 220 | + *secs = (long) diff; |
| 221 | + *microsecs = (int) ((diff - *secs) * 1000000.0); |
| 222 | +#endif |
| 223 | + } |
| 224 | +} |
| 225 | + |
| 226 | +/* |
| 227 | + * Local version of TimestampDifferenceExceeds(), since we are not |
| 228 | + * linked with backend code. |
| 229 | + */ |
| 230 | +static bool |
| 231 | +localTimestampDifferenceExceeds(TimestampTz start_time, |
| 232 | + TimestampTz stop_time, |
| 233 | + int msec) |
| 234 | +{ |
| 235 | + TimestampTz diff = stop_time - start_time; |
| 236 | + |
| 237 | +#ifdef HAVE_INT64_TIMESTAMP |
| 238 | + return (diff >= msec * INT64CONST(1000)); |
| 239 | +#else |
| 240 | + return (diff * 1000.0 >= msec); |
| 241 | +#endif |
| 242 | +} |
| 243 | + |
198 | 244 | /* |
199 | 245 | * Receive a log stream starting at the specified position. |
200 | 246 | * |
@@ -306,7 +352,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi |
306 | 352 | */ |
307 | 353 | now = localGetCurrentTimestamp(); |
308 | 354 | if (standby_message_timeout > 0 && |
309 | | - last_status < now - standby_message_timeout * 1000000) |
| 355 | + localTimestampDifferenceExceeds(last_status, now, |
| 356 | + standby_message_timeout)) |
310 | 357 | { |
311 | 358 | /* Time to send feedback! */ |
312 | 359 | char replybuf[sizeof(StandbyReplyMessage) + 1]; |
@@ -345,10 +392,16 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi |
345 | 392 | FD_SET(PQsocket(conn), &input_mask); |
346 | 393 | if (standby_message_timeout) |
347 | 394 | { |
348 | | - timeout.tv_sec = last_status + standby_message_timeout - now - 1; |
| 395 | + TimestampTz targettime; |
| 396 | + |
| 397 | + targettime = TimestampTzPlusMilliseconds(last_status, |
| 398 | + standby_message_timeout - 1); |
| 399 | + localTimestampDifference(now, |
| 400 | + targettime, |
| 401 | + &timeout.tv_sec, |
| 402 | + (int *)&timeout.tv_usec); |
349 | 403 | if (timeout.tv_sec <= 0) |
350 | 404 | timeout.tv_sec = 1; /* Always sleep at least 1 sec */ |
351 | | - timeout.tv_usec = 0; |
352 | 405 | timeoutptr = &timeout; |
353 | 406 | } |
354 | 407 | else |
|
0 commit comments