|
26 | 26 | #include "preprocessing.h" |
27 | 27 | #include "learn_cache.h" |
28 | 28 | #include "storage.h" |
| 29 | +#include "postmaster/bgworker.h" |
| 30 | +#include "catalog/objectaccess.h" |
29 | 31 |
|
30 | 32 |
|
31 | 33 | PG_MODULE_MAGIC; |
@@ -77,6 +79,8 @@ int auto_tuning_infinite_loop = 8; |
77 | 79 | int aqo_k = 3; |
78 | 80 | double log_selectivity_lower_bound = -30; |
79 | 81 |
|
| 82 | +bool cleanup_bgworker = false; |
| 83 | + |
80 | 84 | /* |
81 | 85 | * Currently we use it only to store query_text string which is initialized |
82 | 86 | * after a query parsing and is used during the query planning. |
@@ -112,6 +116,7 @@ get_parameterized_joinrel_size_hook_type prev_get_parameterized_joinrel_size_hoo |
112 | 116 | ExplainOnePlan_hook_type prev_ExplainOnePlan_hook; |
113 | 117 | ExplainOneNode_hook_type prev_ExplainOneNode_hook; |
114 | 118 | static shmem_request_hook_type prev_shmem_request_hook = NULL; |
| 119 | +object_access_hook_type prev_object_access_hook; |
115 | 120 |
|
116 | 121 | /***************************************************************************** |
117 | 122 | * |
@@ -147,6 +152,93 @@ aqo_shmem_request(void) |
147 | 152 | RequestAddinShmemSpace(aqo_memsize()); |
148 | 153 | } |
149 | 154 |
|
| 155 | +/* |
| 156 | + * Entry point for CleanupWorker's process. |
| 157 | + */ |
| 158 | +void |
| 159 | +aqo_bgworker_cleanup(void) |
| 160 | +{ |
| 161 | + int fs_num; |
| 162 | + int fss_num; |
| 163 | + |
| 164 | + cleanup_aqo_database(true, &fs_num, &fss_num); |
| 165 | +} |
| 166 | + |
| 167 | +/* |
| 168 | + * Object access hook |
| 169 | + */ |
| 170 | +void |
| 171 | +aqo_drop_access_hook(ObjectAccessType access, |
| 172 | + Oid classId, |
| 173 | + Oid objectId, |
| 174 | + int subId, |
| 175 | + void *arg) |
| 176 | +{ |
| 177 | + if (prev_object_access_hook) |
| 178 | + (*prev_object_access_hook) (access, classId, objectId, subId, arg); |
| 179 | + |
| 180 | + if (access == OAT_DROP && cleanup_bgworker) |
| 181 | + { |
| 182 | + MemoryContext old_ctx; |
| 183 | + int status = BGWH_STOPPED; |
| 184 | + pid_t pid; |
| 185 | + |
| 186 | + old_ctx = MemoryContextSwitchTo(AQOTopMemCtx); |
| 187 | + LWLockAcquire(&aqo_state->lock, LW_EXCLUSIVE); |
| 188 | + if (aqo_state->bgw_handle != NULL) |
| 189 | + { |
| 190 | + status = GetBackgroundWorkerPid(aqo_state->bgw_handle, &pid); |
| 191 | + } |
| 192 | + LWLockRelease(&aqo_state->lock); |
| 193 | + if (status != BGWH_STARTED) |
| 194 | + { |
| 195 | + aqo_bgworker_startup(); |
| 196 | + } |
| 197 | + MemoryContextSwitchTo(old_ctx); |
| 198 | + } |
| 199 | +} |
| 200 | + |
| 201 | +void |
| 202 | +aqo_bgworker_startup(void) |
| 203 | +{ |
| 204 | + BackgroundWorker worker; |
| 205 | + BackgroundWorkerHandle *handle; |
| 206 | + BgwHandleStatus status; |
| 207 | + pid_t pid; |
| 208 | + |
| 209 | + MemSet(&worker, 0, sizeof(worker)); |
| 210 | + |
| 211 | + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | |
| 212 | + BGWORKER_BACKEND_DATABASE_CONNECTION; |
| 213 | + worker.bgw_start_time = BgWorkerStart_ConsistentState; |
| 214 | + worker.bgw_restart_time = BGW_NEVER_RESTART; |
| 215 | + worker.bgw_main_arg = Int32GetDatum(0); |
| 216 | + worker.bgw_extra[0] = 0; |
| 217 | + memcpy(worker.bgw_function_name, "aqo_bgworker_cleanup", 21); |
| 218 | + memcpy(worker.bgw_library_name, "aqo", 4); |
| 219 | + memcpy(worker.bgw_name, "aqo cleanup", 12); |
| 220 | + |
| 221 | + /* must set notify PID to wait for startup */ |
| 222 | + worker.bgw_notify_pid = MyProcPid; |
| 223 | + |
| 224 | + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) |
| 225 | + ereport(NOTICE, |
| 226 | + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
| 227 | + errmsg("could not register background process"), |
| 228 | + errhint("You might need to increase max_worker_processes."))); |
| 229 | + |
| 230 | + status = WaitForBackgroundWorkerStartup(handle, &pid); |
| 231 | + if (status != BGWH_STARTED) |
| 232 | + ereport(NOTICE, |
| 233 | + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), |
| 234 | + errmsg("could not start background process"), |
| 235 | + errhint("More details may be available in the server log."))); |
| 236 | + |
| 237 | + LWLockAcquire(&aqo_state->lock, LW_EXCLUSIVE); |
| 238 | + aqo_state->bgw_handle = handle; |
| 239 | + LWLockRelease(&aqo_state->lock); |
| 240 | +} |
| 241 | + |
150 | 242 | void |
151 | 243 | _PG_init(void) |
152 | 244 | { |
@@ -309,6 +401,18 @@ _PG_init(void) |
309 | 401 | NULL |
310 | 402 | ); |
311 | 403 |
|
| 404 | + DefineCustomBoolVariable("aqo.cleanup_bgworker", |
| 405 | + "Enable bgworker which responsible for doing cleanup after drop", |
| 406 | + NULL, |
| 407 | + &cleanup_bgworker, |
| 408 | + false, |
| 409 | + PGC_USERSET, |
| 410 | + 0, |
| 411 | + NULL, |
| 412 | + NULL, |
| 413 | + NULL |
| 414 | + ); |
| 415 | + |
312 | 416 | prev_shmem_startup_hook = shmem_startup_hook; |
313 | 417 | shmem_startup_hook = aqo_init_shmem; |
314 | 418 | prev_planner_hook = planner_hook; |
@@ -349,6 +453,9 @@ _PG_init(void) |
349 | 453 | prev_shmem_request_hook = shmem_request_hook; |
350 | 454 | shmem_request_hook = aqo_shmem_request; |
351 | 455 |
|
| 456 | + prev_object_access_hook = object_access_hook; |
| 457 | + object_access_hook = aqo_drop_access_hook; |
| 458 | + |
352 | 459 | init_deactivated_queries_storage(); |
353 | 460 |
|
354 | 461 | /* |
|
0 commit comments