🌐 AI搜索 & 代理 主页
Skip to content

Commit 8eeb4a0

Browse files
committed
Fix bug where we truncated CLOG that was still needed by LISTEN/NOTIFY
The async notification queue contains the XID of the sender, and when processing notifications we call TransactionIdDidCommit() on the XID. But we had no safeguards to prevent the CLOG segments containing those XIDs from being truncated away. As a result, if a backend didn't for some reason process its notifications for a long time, or when a new backend issued LISTEN, you could get an error like: test=# listen c21; ERROR: 58P01: could not access status of transaction 14279685 DETAIL: Could not open file "pg_xact/000D": No such file or directory. LOCATION: SlruReportIOError, slru.c:1087 To fix, make VACUUM "freeze" the XIDs in the async notification queue before truncating the CLOG. Old XIDs are replaced with FrozenTransactionId or InvalidTransactionId. Note: This commit is not a full fix. A race condition remains, where a backend is executing asyncQueueReadAllNotifications() and has just made a local copy of an async SLRU page which contains old XIDs, while vacuum concurrently truncates the CLOG covering those XIDs. When the backend then calls TransactionIdDidCommit() on those XIDs from the local copy, you still get the error. The next commit will fix that remaining race condition. This was first reported by Sergey Zhuravlev in 2021, with many other people hitting the same issue later. Thanks to: - Alexandra Wang, Daniil Davydov, Andrei Varashen and Jacques Combrink for investigating and providing reproducable test cases, - Matheus Alcantara and Arseniy Mukhin for review and earlier proposed patches to fix this, - Álvaro Herrera and Masahiko Sawada for reviews, - Yura Sokolov aka funny-falcon for the idea of marking transactions as committed in the notification queue, and - Joel Jacobson for the final patch version. I hope I didn't forget anyone. Backpatch to all supported versions. I believe the bug goes back all the way to commit d1e0272, which introduced the SLRU-based async notification queue. Discussion: https://www.postgresql.org/message-id/16961-25f29f95b3604a8a@postgresql.org Discussion: https://www.postgresql.org/message-id/18804-bccbbde5e77a68c2@postgresql.org Discussion: https://www.postgresql.org/message-id/CAK98qZ3wZLE-RZJN_Y%2BTFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw@mail.gmail.com Backpatch-through: 14
1 parent 1b46990 commit 8eeb4a0

File tree

5 files changed

+196
-0
lines changed

5 files changed

+196
-0
lines changed

src/backend/commands/async.c

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2176,6 +2176,120 @@ asyncQueueAdvanceTail(void)
21762176
LWLockRelease(NotifyQueueTailLock);
21772177
}
21782178

2179+
/*
2180+
* AsyncNotifyFreezeXids
2181+
*
2182+
* Prepare the async notification queue for CLOG truncation by freezing
2183+
* transaction IDs that are about to become inaccessible.
2184+
*
2185+
* This function is called by VACUUM before advancing datfrozenxid. It scans
2186+
* the notification queue and replaces XIDs that would become inaccessible
2187+
* after CLOG truncation with special markers:
2188+
* - Committed transactions are set to FrozenTransactionId
2189+
* - Aborted/crashed transactions are set to InvalidTransactionId
2190+
*
2191+
* Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2192+
* pages will be truncated. If XID < newFrozenXid, it cannot still be running
2193+
* (or it would have held back newFrozenXid through ProcArray).
2194+
* Therefore, if TransactionIdDidCommit returns false, we know the transaction
2195+
* either aborted explicitly or crashed, and we can safely mark it invalid.
2196+
*/
2197+
void
2198+
AsyncNotifyFreezeXids(TransactionId newFrozenXid)
2199+
{
2200+
QueuePosition pos;
2201+
QueuePosition head;
2202+
int64 curpage = -1;
2203+
int slotno = -1;
2204+
char *page_buffer = NULL;
2205+
bool page_dirty = false;
2206+
2207+
/*
2208+
* Acquire locks in the correct order to avoid deadlocks. As per the
2209+
* locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2210+
* bank locks.
2211+
*
2212+
* We only need SHARED mode since we're just reading the head/tail
2213+
* positions, not modifying them.
2214+
*/
2215+
LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
2216+
LWLockAcquire(NotifyQueueLock, LW_SHARED);
2217+
2218+
pos = QUEUE_TAIL;
2219+
head = QUEUE_HEAD;
2220+
2221+
/* Release NotifyQueueLock early, we only needed to read the positions */
2222+
LWLockRelease(NotifyQueueLock);
2223+
2224+
/*
2225+
* Scan the queue from tail to head, freezing XIDs as needed. We hold
2226+
* NotifyQueueTailLock throughout to ensure the tail doesn't move while
2227+
* we're working.
2228+
*/
2229+
while (!QUEUE_POS_EQUAL(pos, head))
2230+
{
2231+
AsyncQueueEntry *qe;
2232+
TransactionId xid;
2233+
int64 pageno = QUEUE_POS_PAGE(pos);
2234+
int offset = QUEUE_POS_OFFSET(pos);
2235+
2236+
/* If we need a different page, release old lock and get new one */
2237+
if (pageno != curpage)
2238+
{
2239+
LWLock *lock;
2240+
2241+
/* Release previous page if any */
2242+
if (slotno >= 0)
2243+
{
2244+
if (page_dirty)
2245+
{
2246+
NotifyCtl->shared->page_dirty[slotno] = true;
2247+
page_dirty = false;
2248+
}
2249+
LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
2250+
}
2251+
2252+
lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2253+
LWLockAcquire(lock, LW_EXCLUSIVE);
2254+
slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
2255+
InvalidTransactionId);
2256+
page_buffer = NotifyCtl->shared->page_buffer[slotno];
2257+
curpage = pageno;
2258+
}
2259+
2260+
qe = (AsyncQueueEntry *) (page_buffer + offset);
2261+
xid = qe->xid;
2262+
2263+
if (TransactionIdIsNormal(xid) &&
2264+
TransactionIdPrecedes(xid, newFrozenXid))
2265+
{
2266+
if (TransactionIdDidCommit(xid))
2267+
{
2268+
qe->xid = FrozenTransactionId;
2269+
page_dirty = true;
2270+
}
2271+
else
2272+
{
2273+
qe->xid = InvalidTransactionId;
2274+
page_dirty = true;
2275+
}
2276+
}
2277+
2278+
/* Advance to next entry */
2279+
asyncQueueAdvance(&pos, qe->length);
2280+
}
2281+
2282+
/* Release final page lock if we acquired one */
2283+
if (slotno >= 0)
2284+
{
2285+
if (page_dirty)
2286+
NotifyCtl->shared->page_dirty[slotno] = true;
2287+
LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
2288+
}
2289+
2290+
LWLockRelease(NotifyQueueTailLock);
2291+
}
2292+
21792293
/*
21802294
* ProcessIncomingNotify
21812295
*

src/backend/commands/vacuum.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "catalog/namespace.h"
3838
#include "catalog/pg_database.h"
3939
#include "catalog/pg_inherits.h"
40+
#include "commands/async.h"
4041
#include "commands/cluster.h"
4142
#include "commands/defrem.h"
4243
#include "commands/progress.h"
@@ -1941,6 +1942,12 @@ vac_truncate_clog(TransactionId frozenXID,
19411942
return;
19421943
}
19431944

1945+
/*
1946+
* Freeze any old transaction IDs in the async notification queue before
1947+
* CLOG truncation.
1948+
*/
1949+
AsyncNotifyFreezeXids(frozenXID);
1950+
19441951
/*
19451952
* Advance the oldest value for commit timestamps before truncating, so
19461953
* that if a user requests a timestamp for a transaction we're truncating

src/include/commands/async.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,7 @@ extern void HandleNotifyInterrupt(void);
4646
/* process interrupts */
4747
extern void ProcessNotifyInterrupt(bool flush);
4848

49+
/* freeze old transaction IDs in notify queue (called by VACUUM) */
50+
extern void AsyncNotifyFreezeXids(TransactionId newFrozenXid);
51+
4952
#endif /* ASYNC_H */

src/test/modules/xid_wraparound/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ tests += {
3030
't/001_emergency_vacuum.pl',
3131
't/002_limits.pl',
3232
't/003_wraparounds.pl',
33+
't/004_notify_freeze.pl',
3334
],
3435
},
3536
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Copyright (c) 2024-2025, PostgreSQL Global Development Group
2+
#
3+
# Test freezing XIDs in the async notification queue. This isn't
4+
# really wraparound-related, but the test depends on the
5+
# consume_xids() helper function.
6+
7+
use strict;
8+
use warnings FATAL => 'all';
9+
use PostgreSQL::Test::Cluster;
10+
use Test::More;
11+
12+
my $node = PostgreSQL::Test::Cluster->new('node');
13+
$node->init;
14+
$node->start;
15+
16+
if (!$ENV{PG_TEST_EXTRA} || $ENV{PG_TEST_EXTRA} !~ /\bxid_wraparound\b/)
17+
{
18+
plan skip_all => "test xid_wraparound not enabled in PG_TEST_EXTRA";
19+
}
20+
21+
# Setup
22+
$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
23+
$node->safe_psql('postgres',
24+
'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
25+
26+
# Start Session 1 and leave it idle in transaction
27+
my $psql_session1 = $node->background_psql('postgres');
28+
$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
29+
$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
30+
31+
# Send some notifys from other sessions
32+
for my $i (1 .. 10)
33+
{
34+
$node->safe_psql('postgres', "NOTIFY s, '$i'");
35+
}
36+
37+
# Consume enough XIDs to trigger truncation, and one more with
38+
# 'txid_current' to bump up the freeze horizon.
39+
$node->safe_psql('postgres', 'select consume_xids(10000000);');
40+
$node->safe_psql('postgres', 'select txid_current()');
41+
42+
# Remember current datfrozenxid before vacuum freeze so that we can
43+
# check that it is advanced. (Taking the min() this way assumes that
44+
# XID wraparound doesn't happen.)
45+
my $datafronzenxid = $node->safe_psql('postgres',
46+
"select min(datfrozenxid::text::bigint) from pg_database");
47+
48+
# Execute vacuum freeze on all databases
49+
$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
50+
"vacuumdb --all --freeze");
51+
52+
# Check that vacuumdb advanced datfrozenxid
53+
my $datafronzenxid_freeze = $node->safe_psql('postgres',
54+
"select min(datfrozenxid::text::bigint) from pg_database");
55+
ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid advanced');
56+
57+
# On Session 1, commit and ensure that the all the notifications are
58+
# received. This depends on correctly freezing the XIDs in the pending
59+
# notification entries.
60+
my $res = $psql_session1->query_safe('commit;', "commit listen s;");
61+
my $notifications_count = 0;
62+
foreach my $i (split('\n', $res))
63+
{
64+
$notifications_count++;
65+
like($i,
66+
qr/Asynchronous notification "s" with payload "$notifications_count" received/
67+
);
68+
}
69+
is($notifications_count, 10, 'received all committed notifications');
70+
71+
done_testing();

0 commit comments

Comments
 (0)