@@ -303,7 +303,9 @@ aqo_stat_store(uint64 queryid, bool use_aqo,
303303 entry -> exec_time [pos ] = exec_time ;
304304 entry -> est_error [pos ] = est_error ;
305305 }
306+
306307 entry = memcpy (palloc (sizeof (StatEntry )), entry , sizeof (StatEntry ));
308+ aqo_state -> stat_changed = true;
307309 LWLockRelease (& aqo_state -> stat_lock );
308310 return entry ;
309311}
@@ -425,14 +427,24 @@ aqo_stat_flush(void)
425427 int ret ;
426428 long entries ;
427429
428- LWLockAcquire (& aqo_state -> stat_lock , LW_SHARED );
430+ /* Use exclusive lock to prevent concurrent flushing in different backends. */
431+ LWLockAcquire (& aqo_state -> stat_lock , LW_EXCLUSIVE );
432+
433+ if (!aqo_state -> stat_changed )
434+ /* Hash table wasn't changed, meaningless to store it in permanent storage */
435+ goto end ;
436+
429437 entries = hash_get_num_entries (stat_htab );
430438 hash_seq_init (& hash_seq , stat_htab );
431439 ret = data_store (PGAQO_STAT_FILE , _form_stat_record_cb , entries ,
432440 (void * ) & hash_seq );
433441 if (ret != 0 )
434442 hash_seq_term (& hash_seq );
443+ else
444+ /* Hash table and disk storage are now consistent */
445+ aqo_state -> stat_changed = false;
435446
447+ end :
436448 LWLockRelease (& aqo_state -> stat_lock );
437449}
438450
@@ -469,7 +481,7 @@ aqo_qtexts_flush(void)
469481 long entries ;
470482
471483 dsa_init ();
472- LWLockAcquire (& aqo_state -> qtexts_lock , LW_SHARED );
484+ LWLockAcquire (& aqo_state -> qtexts_lock , LW_EXCLUSIVE );
473485
474486 if (!aqo_state -> qtexts_changed )
475487 /* XXX: mull over forced mode. */
@@ -481,7 +493,9 @@ aqo_qtexts_flush(void)
481493 (void * ) & hash_seq );
482494 if (ret != 0 )
483495 hash_seq_term (& hash_seq );
484- aqo_state -> qtexts_changed = false;
496+ else
497+ /* Hash table and disk storage are now consistent */
498+ aqo_state -> qtexts_changed = false;
485499
486500end :
487501 LWLockRelease (& aqo_state -> qtexts_lock );
@@ -531,7 +545,7 @@ aqo_data_flush(void)
531545 long entries ;
532546
533547 dsa_init ();
534- LWLockAcquire (& aqo_state -> data_lock , LW_SHARED );
548+ LWLockAcquire (& aqo_state -> data_lock , LW_EXCLUSIVE );
535549
536550 if (!aqo_state -> data_changed )
537551 /* XXX: mull over forced mode. */
@@ -548,6 +562,7 @@ aqo_data_flush(void)
548562 */
549563 hash_seq_term (& hash_seq );
550564 else
565+ /* Hash table and disk storage are now consistent */
551566 aqo_state -> data_changed = false;
552567end :
553568 LWLockRelease (& aqo_state -> data_lock );
@@ -574,14 +589,22 @@ aqo_queries_flush(void)
574589 int ret ;
575590 long entries ;
576591
577- LWLockAcquire (& aqo_state -> queries_lock , LW_SHARED );
592+ LWLockAcquire (& aqo_state -> queries_lock , LW_EXCLUSIVE );
593+
594+ if (!aqo_state -> queries_changed )
595+ goto end ;
596+
578597 entries = hash_get_num_entries (queries_htab );
579598 hash_seq_init (& hash_seq , queries_htab );
580599 ret = data_store (PGAQO_QUERIES_FILE , _form_queries_record_cb , entries ,
581600 (void * ) & hash_seq );
582601 if (ret != 0 )
583602 hash_seq_term (& hash_seq );
603+ else
604+ /* Hash table and disk storage are now consistent */
605+ aqo_state -> queries_changed = false;
584606
607+ end :
585608 LWLockRelease (& aqo_state -> queries_lock );
586609}
587610
@@ -621,7 +644,8 @@ data_store(const char *filename, form_record_t callback,
621644 goto error ;
622645 }
623646
624- (void ) durable_rename (tmpfile , filename , LOG );
647+ /* Parallel (re)writing into a file haven't happen. */
648+ (void ) durable_rename (tmpfile , filename , PANIC );
625649 elog (LOG , "[AQO] %d records stored in file %s." , counter , filename );
626650 return 0 ;
627651
@@ -839,7 +863,7 @@ aqo_queries_load(void)
839863
840864 LWLockAcquire (& aqo_state -> queries_lock , LW_EXCLUSIVE );
841865
842- /* Load on postmaster sturtup . So no any concurrent actions possible here. */
866+ /* Load on postmaster startup . So no any concurrent actions possible here. */
843867 Assert (hash_get_num_entries (queries_htab ) == 0 );
844868
845869 data_load (PGAQO_QUERIES_FILE , _deform_queries_record_cb , NULL );
@@ -926,6 +950,9 @@ data_load(const char *filename, deform_record_t callback, void *ctx)
926950static void
927951on_shmem_shutdown (int code , Datum arg )
928952{
953+ /*
954+ * XXX: It can be expensive to rewrite a file on each shutdown of a backend.
955+ */
929956 aqo_qtexts_flush ();
930957 aqo_data_flush ();
931958}
@@ -1201,6 +1228,7 @@ _aqo_data_remove(data_key *key)
12011228
12021229 if (hash_search (data_htab , key , HASH_REMOVE , NULL ) == NULL )
12031230 elog (PANIC , "[AQO] Inconsistent data hash table" );
1231+
12041232 aqo_state -> data_changed = true;
12051233 }
12061234
@@ -1270,8 +1298,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
12701298 char * ptr ;
12711299 ListCell * lc ;
12721300 size_t size ;
1273- bool tblOverflow ;
1274- HASHACTION action ;
1301+ bool tblOverflow ;
1302+ HASHACTION action ;
1303+ bool result ;
12751304
12761305 Assert (!LWLockHeldByMe (& aqo_state -> data_lock ));
12771306
@@ -1387,8 +1416,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
13871416
13881417 aqo_state -> data_changed = true;
13891418end :
1419+ result = aqo_state -> data_changed ;
13901420 LWLockRelease (& aqo_state -> data_lock );
1391- return aqo_state -> data_changed ;
1421+ return result ;
13921422}
13931423
13941424static void
@@ -1496,7 +1526,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
14961526
14971527 dsa_init ();
14981528
1499- LWLockAcquire (& aqo_state -> data_lock , LW_EXCLUSIVE );
1529+ LWLockAcquire (& aqo_state -> data_lock , LW_SHARED );
15001530
15011531 if (!wideSearch )
15021532 {
@@ -1631,7 +1661,8 @@ aqo_data(PG_FUNCTION_ARGS)
16311661 ptr += sizeof (data_key );
16321662
16331663 if (entry -> cols > 0 )
1634- values [AD_FEATURES ] = PointerGetDatum (form_matrix ((double * )ptr , entry -> rows , entry -> cols ));
1664+ values [AD_FEATURES ] = PointerGetDatum (form_matrix ((double * ) ptr ,
1665+ entry -> rows , entry -> cols ));
16351666 else
16361667 nulls [AD_FEATURES ] = true;
16371668
@@ -1719,7 +1750,9 @@ aqo_data_reset(void)
17191750 elog (ERROR , "[AQO] hash table corrupted" );
17201751 num_remove ++ ;
17211752 }
1722- aqo_state -> data_changed = true;
1753+
1754+ if (num_remove > 0 )
1755+ aqo_state -> data_changed = true;
17231756 LWLockRelease (& aqo_state -> data_lock );
17241757 if (num_remove != num_entries )
17251758 elog (ERROR , "[AQO] Query ML memory storage is corrupted or parallel access without a lock has detected." );
@@ -1831,6 +1864,7 @@ aqo_queries_store(uint64 queryid,
18311864 entry -> use_aqo = use_aqo ;
18321865 entry -> auto_tuning = auto_tuning ;
18331866
1867+ aqo_state -> queries_changed = true;
18341868 LWLockRelease (& aqo_state -> queries_lock );
18351869 return true;
18361870}
@@ -1856,7 +1890,10 @@ aqo_queries_reset(void)
18561890 elog (ERROR , "[AQO] hash table corrupted" );
18571891 num_remove ++ ;
18581892 }
1859- aqo_state -> queries_changed = true;
1893+
1894+ if (num_remove > 0 )
1895+ aqo_state -> queries_changed = true;
1896+
18601897 LWLockRelease (& aqo_state -> queries_lock );
18611898
18621899 if (num_remove != num_entries - 1 )
0 commit comments