@@ -61,10 +61,11 @@ typedef enum {
6161} aqo_queries_cols ;
6262
6363typedef void * (* form_record_t ) (void * ctx , size_t * size );
64- typedef void (* deform_record_t ) (void * data , size_t size );
64+ typedef bool (* deform_record_t ) (void * data , size_t size );
6565
6666
6767int querytext_max_size = 1000 ;
68+ int dsm_size_max = 100 ; /* in MB */
6869
6970HTAB * stat_htab = NULL ;
7071HTAB * queries_htab = NULL ;
@@ -642,7 +643,7 @@ data_store(const char *filename, form_record_t callback,
642643 return -1 ;
643644}
644645
645- static void
646+ static bool
646647_deform_stat_record_cb (void * data , size_t size )
647648{
648649 bool found ;
@@ -656,24 +657,35 @@ _deform_stat_record_cb(void *data, size_t size)
656657 entry = (StatEntry * ) hash_search (stat_htab , & queryid , HASH_ENTER , & found );
657658 Assert (!found );
658659 memcpy (entry , data , sizeof (StatEntry ));
660+ return true;
659661}
660662
661663void
662664aqo_stat_load (void )
663665{
664- long entries ;
665-
666666 Assert (!LWLockHeldByMe (& aqo_state -> stat_lock ));
667667
668668 LWLockAcquire (& aqo_state -> stat_lock , LW_EXCLUSIVE );
669- entries = hash_get_num_entries (stat_htab );
670- Assert (entries == 0 );
669+
670+ /* Load on postmaster sturtup. So no any concurrent actions possible here. */
671+ Assert (hash_get_num_entries (stat_htab ) == 0 );
672+
671673 data_load (PGAQO_STAT_FILE , _deform_stat_record_cb , NULL );
672674
673675 LWLockRelease (& aqo_state -> stat_lock );
674676}
675677
676- static void
678+ static bool
679+ _check_dsa_validity (dsa_pointer ptr )
680+ {
681+ if (DsaPointerIsValid (ptr ))
682+ return true;
683+
684+ elog (LOG , "[AQO] DSA Pointer isn't valid. Is the memory limit exceeded?" );
685+ return false;
686+ }
687+
688+ static bool
677689_deform_qtexts_record_cb (void * data , size_t size )
678690{
679691 bool found ;
@@ -690,9 +702,19 @@ _deform_qtexts_record_cb(void *data, size_t size)
690702 Assert (!found );
691703
692704 entry -> qtext_dp = dsa_allocate (qtext_dsa , len );
693- Assert (DsaPointerIsValid (entry -> qtext_dp ));
705+ if (!_check_dsa_validity (entry -> qtext_dp ))
706+ {
707+ /*
708+ * DSA stuck into problems. Rollback changes. Return false in belief
709+ * that caller recognize it and don't try to call us more.
710+ */
711+ (void ) hash_search (qtexts_htab , & queryid , HASH_REMOVE , NULL );
712+ return false;
713+ }
714+
694715 strptr = (char * ) dsa_get_address (qtext_dsa , entry -> qtext_dp );
695716 strlcpy (strptr , query_string , len );
717+ return true;
696718}
697719
698720void
@@ -705,7 +727,15 @@ aqo_qtexts_load(void)
705727 Assert (qtext_dsa != NULL );
706728
707729 LWLockAcquire (& aqo_state -> qtexts_lock , LW_EXCLUSIVE );
708- Assert (hash_get_num_entries (qtexts_htab ) == 0 );
730+
731+ if (hash_get_num_entries (qtexts_htab ) != 0 )
732+ {
733+ /* Someone have done it concurrently. */
734+ elog (LOG , "[AQO] Another backend have loaded query texts concurrently." );
735+ LWLockRelease (& aqo_state -> qtexts_lock );
736+ return ;
737+ }
738+
709739 data_load (PGAQO_TEXT_FILE , _deform_qtexts_record_cb , NULL );
710740
711741 /* Check existence of default feature space */
@@ -725,7 +755,7 @@ aqo_qtexts_load(void)
725755 * Getting a data chunk from a caller, add a record into the 'ML data'
726756 * shmem hash table. Allocate and fill DSA chunk for variadic part of the data.
727757 */
728- static void
758+ static bool
729759_deform_data_record_cb (void * data , size_t size )
730760{
731761 bool found ;
@@ -737,7 +767,7 @@ _deform_data_record_cb(void *data, size_t size)
737767
738768 Assert (LWLockHeldByMeInMode (& aqo_state -> data_lock , LW_EXCLUSIVE ));
739769 entry = (DataEntry * ) hash_search (data_htab , & fentry -> key ,
740- HASH_ENTER , & found );
770+ HASH_ENTER , & found );
741771 Assert (!found );
742772
743773 /* Copy fixed-size part of entry byte-by-byte even with caves */
@@ -747,9 +777,20 @@ _deform_data_record_cb(void *data, size_t size)
747777 sz = _compute_data_dsa (entry );
748778 Assert (sz + offsetof(DataEntry , data_dp ) == size );
749779 entry -> data_dp = dsa_allocate (data_dsa , sz );
750- Assert (DsaPointerIsValid (entry -> data_dp ));
780+
781+ if (!_check_dsa_validity (entry -> data_dp ))
782+ {
783+ /*
784+ * DSA stuck into problems. Rollback changes. Return false in belief
785+ * that caller recognize it and don't try to call us more.
786+ */
787+ (void ) hash_search (data_htab , & fentry -> key , HASH_REMOVE , NULL );
788+ return false;
789+ }
790+
751791 dsa_ptr = (char * ) dsa_get_address (data_dsa , entry -> data_dp );
752792 memcpy (dsa_ptr , ptr , sz );
793+ return true;
753794}
754795
755796void
@@ -759,14 +800,22 @@ aqo_data_load(void)
759800 Assert (data_dsa != NULL );
760801
761802 LWLockAcquire (& aqo_state -> data_lock , LW_EXCLUSIVE );
762- Assert (hash_get_num_entries (data_htab ) == 0 );
803+
804+ if (hash_get_num_entries (data_htab ) != 0 )
805+ {
806+ /* Someone have done it concurrently. */
807+ elog (LOG , "[AQO] Another backend have loaded query data concurrently." );
808+ LWLockRelease (& aqo_state -> data_lock );
809+ return ;
810+ }
811+
763812 data_load (PGAQO_DATA_FILE , _deform_data_record_cb , NULL );
764813
765814 aqo_state -> data_changed = false; /* mem data is consistent with disk */
766815 LWLockRelease (& aqo_state -> data_lock );
767816}
768817
769- static void
818+ static bool
770819_deform_queries_record_cb (void * data , size_t size )
771820{
772821 bool found ;
@@ -780,20 +829,22 @@ _deform_queries_record_cb(void *data, size_t size)
780829 entry = (QueriesEntry * ) hash_search (queries_htab , & queryid , HASH_ENTER , & found );
781830 Assert (!found );
782831 memcpy (entry , data , sizeof (QueriesEntry ));
832+ return true;
783833}
784834
785835void
786836aqo_queries_load (void )
787837{
788- long entries ;
789838 bool found ;
790839 uint64 queryid = 0 ;
791840
792841 Assert (!LWLockHeldByMe (& aqo_state -> queries_lock ));
793842
794843 LWLockAcquire (& aqo_state -> queries_lock , LW_EXCLUSIVE );
795- entries = hash_get_num_entries (queries_htab );
796- Assert (entries == 0 );
844+
845+ /* Load on postmaster sturtup. So no any concurrent actions possible here. */
846+ Assert (hash_get_num_entries (queries_htab ) == 0 );
847+
797848 data_load (PGAQO_QUERIES_FILE , _deform_queries_record_cb , NULL );
798849
799850 /* Check existence of default feature space */
@@ -836,14 +887,23 @@ data_load(const char *filename, deform_record_t callback, void *ctx)
836887 {
837888 void * data ;
838889 size_t size ;
890+ bool res ;
839891
840892 if (fread (& size , sizeof (size ), 1 , file ) != 1 )
841893 goto read_error ;
842894 data = palloc (size );
843895 if (fread (data , size , 1 , file ) != 1 )
844896 goto read_error ;
845- callback (data , size );
897+ res = callback (data , size );
846898 pfree (data );
899+
900+ if (!res )
901+ {
902+ /* Error detected. Do not try to read tails of the storage. */
903+ elog (LOG , "[AQO] Because of an error skip %ld storage records." ,
904+ num - i );
905+ break ;
906+ }
847907 }
848908
849909 FreeFile (file );
@@ -896,11 +956,15 @@ dsa_init()
896956 Assert (aqo_state -> data_dsa_handler == DSM_HANDLE_INVALID );
897957
898958 qtext_dsa = dsa_create (aqo_state -> qtext_trancheid );
959+ Assert (qtext_dsa != NULL );
960+
961+ if (dsm_size_max > 0 )
962+ dsa_set_size_limit (qtext_dsa , dsm_size_max * 1024 * 1024 );
963+
899964 dsa_pin (qtext_dsa );
900965 aqo_state -> qtexts_dsa_handler = dsa_get_handle (qtext_dsa );
901966
902- data_dsa = dsa_create (aqo_state -> data_trancheid );
903- dsa_pin (data_dsa );
967+ data_dsa = qtext_dsa ;
904968 aqo_state -> data_dsa_handler = dsa_get_handle (data_dsa );
905969
906970 /* Load and initialize query texts hash table */
@@ -910,11 +974,10 @@ dsa_init()
910974 else
911975 {
912976 qtext_dsa = dsa_attach (aqo_state -> qtexts_dsa_handler );
913- data_dsa = dsa_attach ( aqo_state -> data_dsa_handler ) ;
977+ data_dsa = qtext_dsa ;
914978 }
915979
916980 dsa_pin_mapping (qtext_dsa );
917- dsa_pin_mapping (data_dsa );
918981 MemoryContextSwitchTo (old_context );
919982 LWLockRelease (& aqo_state -> lock );
920983
@@ -973,7 +1036,17 @@ aqo_qtext_store(uint64 queryid, const char *query_string)
9731036 entry -> queryid = queryid ;
9741037 size = size > querytext_max_size ? querytext_max_size : size ;
9751038 entry -> qtext_dp = dsa_allocate (qtext_dsa , size );
976- Assert (DsaPointerIsValid (entry -> qtext_dp ));
1039+
1040+ if (!_check_dsa_validity (entry -> qtext_dp ))
1041+ {
1042+ /*
1043+ * DSA stuck into problems. Rollback changes. Return false in belief
1044+ * that caller recognize it and don't try to call us more.
1045+ */
1046+ (void ) hash_search (qtexts_htab , & queryid , HASH_REMOVE , NULL );
1047+ return false;
1048+ }
1049+
9771050 strptr = (char * ) dsa_get_address (qtext_dsa , entry -> qtext_dp );
9781051 strlcpy (strptr , query_string , size );
9791052 aqo_state -> qtexts_changed = true;
@@ -1173,7 +1246,16 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
11731246
11741247 size = _compute_data_dsa (entry );
11751248 entry -> data_dp = dsa_allocate0 (data_dsa , size );
1176- Assert (DsaPointerIsValid (entry -> data_dp ));
1249+
1250+ if (!_check_dsa_validity (entry -> data_dp ))
1251+ {
1252+ /*
1253+ * DSA stuck into problems. Rollback changes. Return false in belief
1254+ * that caller recognize it and don't try to call us more.
1255+ */
1256+ (void ) hash_search (data_htab , & key , HASH_REMOVE , NULL );
1257+ return false;
1258+ }
11771259 }
11781260
11791261 Assert (DsaPointerIsValid (entry -> data_dp ));
@@ -1195,7 +1277,16 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
11951277 /* Need to re-allocate DSA chunk */
11961278 dsa_free (data_dsa , entry -> data_dp );
11971279 entry -> data_dp = dsa_allocate0 (data_dsa , size );
1198- Assert (DsaPointerIsValid (entry -> data_dp ));
1280+
1281+ if (!_check_dsa_validity (entry -> data_dp ))
1282+ {
1283+ /*
1284+ * DSA stuck into problems. Rollback changes. Return false in belief
1285+ * that caller recognize it and don't try to call us more.
1286+ */
1287+ (void ) hash_search (data_htab , & key , HASH_REMOVE , NULL );
1288+ return false;
1289+ }
11991290 }
12001291 ptr = (char * ) dsa_get_address (data_dsa , entry -> data_dp );
12011292
0 commit comments