3939 * BufFile infrastructure supports temporary files that exceed the OS file size
4040 * limit, (b) provides a way for automatic clean up on the error and (c) provides
4141 * a way to survive these files across local transactions and allow to open and
42- * close at stream start and close. We decided to use SharedFileSet
42+ * close at stream start and close. We decided to use FileSet
4343 * infrastructure as without that it deletes the files on the closure of the
4444 * file and if we decide to keep stream files open across the start/stop stream
4545 * then it will consume a lot of memory (more than 8K for each BufFile and
4646 * there could be multiple such BufFiles as the subscriber could receive
4747 * multiple start/stop streams for different transactions before getting the
48- * commit). Moreover, if we don't use SharedFileSet then we also need to invent
48+ * commit). Moreover, if we don't use FileSet then we also need to invent
4949 * a new way to pass filenames to BufFile APIs so that we are allowed to open
5050 * the file we desired across multiple stream-open calls for the same
5151 * transaction.
@@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
246246typedef struct StreamXidHash
247247{
248248 TransactionId xid ; /* xid is the hash key and must be first */
249- SharedFileSet * stream_fileset ; /* shared file set for stream data */
250- SharedFileSet * subxact_fileset ; /* shared file set for subxact info */
249+ FileSet * stream_fileset ; /* file set for stream data */
250+ FileSet * subxact_fileset ; /* file set for subxact info */
251251} StreamXidHash ;
252252
253253static MemoryContext ApplyMessageContext = NULL ;
@@ -270,8 +270,8 @@ static bool in_streamed_transaction = false;
270270static TransactionId stream_xid = InvalidTransactionId ;
271271
272272/*
273- * Hash table for storing the streaming xid information along with shared file
274- * set for streaming and subxact files.
273+ * Hash table for storing the streaming xid information along with filesets
274+ * for streaming and subxact files.
275275 */
276276static HTAB * xidhash = NULL ;
277277
@@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s)
12971297
12981298 /* open the changes file */
12991299 changes_filename (path , MyLogicalRepWorker -> subid , xid );
1300- fd = BufFileOpenShared (ent -> stream_fileset , path , O_RDWR );
1300+ fd = BufFileOpenFileSet (ent -> stream_fileset , path , O_RDWR );
13011301
13021302 /* OK, truncate the file at the right offset */
1303- BufFileTruncateShared (fd , subxact_data .subxacts [subidx ].fileno ,
1304- subxact_data .subxacts [subidx ].offset );
1303+ BufFileTruncateFileSet (fd , subxact_data .subxacts [subidx ].fileno ,
1304+ subxact_data .subxacts [subidx ].offset );
13051305 BufFileClose (fd );
13061306
13071307 /* discard the subxacts added later */
@@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
13551355 errmsg_internal ("transaction %u not found in stream XID hash table" ,
13561356 xid )));
13571357
1358- fd = BufFileOpenShared (ent -> stream_fileset , path , O_RDONLY );
1358+ fd = BufFileOpenFileSet (ent -> stream_fileset , path , O_RDONLY );
13591359
13601360 buffer = palloc (BLCKSZ );
13611361 initStringInfo (& s2 );
@@ -2541,6 +2541,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
25412541 }
25422542}
25432543
2544+ /*
2545+ * Cleanup filesets.
2546+ */
2547+ void
2548+ logicalrep_worker_cleanupfileset (void )
2549+ {
2550+ HASH_SEQ_STATUS status ;
2551+ StreamXidHash * hentry ;
2552+
2553+ /* Remove all the pending stream and subxact filesets. */
2554+ if (xidhash )
2555+ {
2556+ hash_seq_init (& status , xidhash );
2557+ while ((hentry = (StreamXidHash * ) hash_seq_search (& status )) != NULL )
2558+ {
2559+ FileSetDeleteAll (hentry -> stream_fileset );
2560+
2561+ /* Delete the subxact fileset iff it is created. */
2562+ if (hentry -> subxact_fileset )
2563+ FileSetDeleteAll (hentry -> subxact_fileset );
2564+ }
2565+ }
2566+ }
2567+
25442568/*
25452569 * Apply main loop.
25462570 */
@@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid)
30243048 if (ent -> subxact_fileset )
30253049 {
30263050 cleanup_subxact_info ();
3027- SharedFileSetDeleteAll (ent -> subxact_fileset );
3051+ FileSetDeleteAll (ent -> subxact_fileset );
30283052 pfree (ent -> subxact_fileset );
30293053 ent -> subxact_fileset = NULL ;
30303054 }
@@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid)
30423066 MemoryContext oldctx ;
30433067
30443068 /*
3045- * We need to maintain shared fileset across multiple stream
3046- * start/stop calls. So, need to allocate it in a persistent context.
3069+ * We need to maintain fileset across multiple stream start/stop
3070+ * calls. So, need to allocate it in a persistent context.
30473071 */
30483072 oldctx = MemoryContextSwitchTo (ApplyContext );
3049- ent -> subxact_fileset = palloc (sizeof (SharedFileSet ));
3050- SharedFileSetInit (ent -> subxact_fileset , NULL );
3073+ ent -> subxact_fileset = palloc (sizeof (FileSet ));
3074+ FileSetInit (ent -> subxact_fileset );
30513075 MemoryContextSwitchTo (oldctx );
30523076
3053- fd = BufFileCreateShared (ent -> subxact_fileset , path );
3077+ fd = BufFileCreateFileSet (ent -> subxact_fileset , path );
30543078 }
30553079 else
3056- fd = BufFileOpenShared (ent -> subxact_fileset , path , O_RDWR );
3080+ fd = BufFileOpenFileSet (ent -> subxact_fileset , path , O_RDWR );
30573081
30583082 len = sizeof (SubXactInfo ) * subxact_data .nsubxacts ;
30593083
@@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid)
31073131
31083132 subxact_filename (path , subid , xid );
31093133
3110- fd = BufFileOpenShared (ent -> subxact_fileset , path , O_RDONLY );
3134+ fd = BufFileOpenFileSet (ent -> subxact_fileset , path , O_RDONLY );
31113135
31123136 /* read number of subxact items */
31133137 if (BufFileRead (fd , & subxact_data .nsubxacts ,
@@ -3264,15 +3288,15 @@ stream_cleanup_files(Oid subid, TransactionId xid)
32643288
32653289 /* Delete the change file and release the stream fileset memory */
32663290 changes_filename (path , subid , xid );
3267- SharedFileSetDeleteAll (ent -> stream_fileset );
3291+ FileSetDeleteAll (ent -> stream_fileset );
32683292 pfree (ent -> stream_fileset );
32693293 ent -> stream_fileset = NULL ;
32703294
32713295 /* Delete the subxact file and release the memory, if it exist */
32723296 if (ent -> subxact_fileset )
32733297 {
32743298 subxact_filename (path , subid , xid );
3275- SharedFileSetDeleteAll (ent -> subxact_fileset );
3299+ FileSetDeleteAll (ent -> subxact_fileset );
32763300 pfree (ent -> subxact_fileset );
32773301 ent -> subxact_fileset = NULL ;
32783302 }
@@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
32883312 *
32893313 * Open a file for streamed changes from a toplevel transaction identified
32903314 * by stream_xid (global variable). If it's the first chunk of streamed
3291- * changes for this transaction, initialize the shared fileset and create the
3292- * buffile, otherwise open the previously created file.
3315+ * changes for this transaction, initialize the fileset and create the buffile,
3316+ * otherwise open the previously created file.
32933317 *
32943318 * This can only be called at the beginning of a "streaming" block, i.e.
32953319 * between stream_start/stream_stop messages from the upstream.
@@ -3330,24 +3354,24 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
33303354 if (first_segment )
33313355 {
33323356 MemoryContext savectx ;
3333- SharedFileSet * fileset ;
3357+ FileSet * fileset ;
33343358
33353359 if (found )
33363360 ereport (ERROR ,
33373361 (errcode (ERRCODE_PROTOCOL_VIOLATION ),
33383362 errmsg_internal ("incorrect first-segment flag for streamed replication transaction" )));
33393363
33403364 /*
3341- * We need to maintain shared fileset across multiple stream
3342- * start/stop calls. So, need to allocate it in a persistent context.
3365+ * We need to maintain fileset across multiple stream start/stop
3366+ * calls. So, need to allocate it in a persistent context.
33433367 */
33443368 savectx = MemoryContextSwitchTo (ApplyContext );
3345- fileset = palloc (sizeof (SharedFileSet ));
3369+ fileset = palloc (sizeof (FileSet ));
33463370
3347- SharedFileSetInit (fileset , NULL );
3371+ FileSetInit (fileset );
33483372 MemoryContextSwitchTo (savectx );
33493373
3350- stream_fd = BufFileCreateShared (fileset , path );
3374+ stream_fd = BufFileCreateFileSet (fileset , path );
33513375
33523376 /* Remember the fileset for the next stream of the same transaction */
33533377 ent -> xid = xid ;
@@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
33653389 * Open the file and seek to the end of the file because we always
33663390 * append the changes file.
33673391 */
3368- stream_fd = BufFileOpenShared (ent -> stream_fileset , path , O_RDWR );
3392+ stream_fd = BufFileOpenFileSet (ent -> stream_fileset , path , O_RDWR );
33693393 BufFileSeek (stream_fd , 0 , 0 , SEEK_END );
33703394 }
33713395
0 commit comments