@@ -163,47 +163,44 @@ create_partial_tempscan_plan(PlannerInfo *root, RelOptInfo *rel,
163163 cscan -> custom_plans = custom_plans ;
164164 cscan -> methods = & plan_methods ;
165165 cscan -> flags = best_path -> flags ;
166- cscan -> custom_private = best_path -> custom_private ;
166+ cscan -> custom_private = list_make1 ( makeInteger ( best_path -> path . parallel_workers )) ;
167167
168168 return & cscan -> scan .plan ;
169169}
170170
171171typedef struct SharedTempScanInfo
172172{
173- int nworkers ;
173+ int nworkers_launched ;
174174 dsm_handle handle ;
175175} SharedTempScanInfo ;
176176
177- #define SharedTempScanInfoHeaderSize offsetof(SharedTempScanInfo, data)
178-
179- typedef struct TempScanInfo
180- {
181- shm_mq_handle * * tqueue ;
182- DestReceiver * * receiver ;
183- } TempScanInfo ;
184-
185177typedef struct ParallelTempScanState
186178{
187179 CustomScanState node ;
188180
189181 bool initialized ;
182+ int nworkers ; /* workers planned. Needed to know how much resources to free */
190183 DestReceiver * * receiver ; /* Must be NULL for workers */
191- TempScanInfo ptsi ;
184+ shm_mq_handle * * tqueue ;
185+ ParallelContext * pcxt ;
192186 SharedTempScanInfo * shared ;
193187
194188 TupleQueueReader * reader ;
189+ bool parallelMode ;
195190} ParallelTempScanState ;
196191
197192static Node *
198193create_tempscan_state (CustomScan * cscan )
199194{
200195 ParallelTempScanState * ts = palloc0 (sizeof (ParallelTempScanState ));
201196 CustomScanState * cstate = (CustomScanState * ) ts ;
197+ int path_workers = linitial_node (Integer , cscan -> custom_private )-> ival ;
202198
203199 Assert (list_length (cscan -> custom_plans ) == 1 );
204200
205201 cstate -> ss .ps .type = T_CustomScanState ;
206202 cstate -> methods = & exec_methods ;
203+ ts -> parallelMode = (path_workers > 0 );
207204
208205 /*
209206 * Setup slotOps manually. Although we just put incoming tuple to the result
@@ -213,7 +210,9 @@ create_tempscan_state(CustomScan *cscan)
213210 cstate -> slotOps = & TTSOpsMinimalTuple ;
214211
215212 ts -> receiver = NULL ;
213+ ts -> tqueue = NULL ;
216214 ts -> initialized = false;
215+
217216 ts -> shared = NULL ;
218217
219218 if (!IsParallelWorker ())
@@ -270,76 +269,115 @@ ExecTempScan(CustomScanState *node)
270269{
271270 ParallelTempScanState * ts = (ParallelTempScanState * ) node ;
272271 TupleTableSlot * result = ts -> node .ss .ss_ScanTupleSlot ;
272+ TupleTableSlot * slot ;
273+ bool should_free ;
274+ MinimalTuple tup ;
275+ int i ;
273276
274277 /*
275278 * HACK. At this point Custom DSM already initialised and we can switch off
276279 * this parameter.
277280 */
278- ts -> node .ss .ps .plan -> parallel_aware = false;
279-
280- /* Forbid rescanning */
281- ts -> initialized = true;
281+ if (ts -> pcxt -> nworkers_launched == 0 )
282+ ts -> node .ss .ps .plan -> parallel_aware = false;
282283
283- if (! IsParallelWorker ())
284+ if (IsParallelWorker ())
284285 {
285- TupleTableSlot * slot ;
286- bool should_free ;
287- MinimalTuple tuple ;
288- int i ;
286+ MinimalTuple tup ;
287+ bool done ;
289288
290- Assert (list_length (node -> custom_ps ) == 1 );
289+ /* Parallel worker should receive something from the tqueue */
290+ tup = TupleQueueReaderNext (ts -> reader , false, & done );
291291
292- slot = ExecProcNode ((PlanState * ) linitial (node -> custom_ps ));
293- if (TupIsNull (slot ))
292+ if (done )
294293 {
295- if (ts -> ptsi .receiver != NULL )
296- {
297- for (i = 0 ; i < ts -> shared -> nworkers ; i ++ )
298- {
299- ts -> ptsi .receiver [i ]-> rDestroy (ts -> ptsi .receiver [i ]);
300- ts -> ptsi .receiver [i ] = NULL ;
301- ts -> ptsi .tqueue [i ] = NULL ;
302- }
303- pfree (ts -> ptsi .receiver );
304- ts -> ptsi .receiver = NULL ;
305- }
306-
307- /* The end of the table is achieved, Return empty tuple to all */
294+ Assert (tup == NULL );
308295 return NULL ;
309296 }
310297
298+ /* TODO: should free ? */
299+ ExecStoreMinimalTuple (tup , result , false);
300+ result -> tts_ops -> copyslot (result , result );
301+ return result ;
302+ }
303+
304+ Assert (list_length (node -> custom_ps ) == 1 );
305+
306+ if (!ts -> initialized )
307+ {
308+ /*
309+ * Save number of workers because we will need it on later
310+ * stages of the execution.
311+ */
312+ ts -> shared -> nworkers_launched = ts -> pcxt -> nworkers_launched ;
313+ ts -> initialized = true;
314+ }
315+
316+ slot = ExecProcNode ((PlanState * ) linitial (node -> custom_ps ));
317+ if (ts -> receiver == NULL )
318+ return slot ;
319+
320+ if (TupIsNull (slot ))
321+ {
322+ /* Parallel workers case */
323+ for (i = 0 ; i < ts -> shared -> nworkers_launched ; i ++ )
324+ {
325+ ts -> receiver [i ]-> rDestroy (ts -> receiver [i ]);
326+ ts -> receiver [i ] = NULL ;
327+ ts -> tqueue [i ] = NULL ;
328+ }
329+ pfree (ts -> receiver );
330+ ts -> receiver = NULL ;
331+ /* The end of the table is achieved, Return empty tuple to all */
332+ return NULL ;
333+ }
334+
335+ if (!ts -> parallelMode )
336+ {
311337 /* Prepare mimimal tuple to send all workers and upstream locally. */
312- tuple = ExecFetchSlotMinimalTuple (slot , & should_free );
313- ExecStoreMinimalTuple (tuple , result , should_free );
338+ tup = ExecFetchSlotMinimalTuple (slot , & should_free );
339+ ExecStoreMinimalTuple (tup , result , should_free );
314340
315- if (ts -> ptsi .receiver != NULL )
341+ /* Send the same tuple to each of worker. Don't forget myself */
342+ for (i = 0 ; i < ts -> shared -> nworkers_launched ; ++ i )
316343 {
317- for ( i = 0 ; i < ts -> shared -> nworkers ; ++ i )
318- {
319- ts -> ptsi . receiver [i ]-> receiveSlot (result , ts -> ptsi . receiver [i ]);
320- }
344+ bool ret ;
345+
346+ ret = ts -> receiver [i ]-> receiveSlot (result , ts -> receiver [i ]);
347+ Assert ( ret );
321348 }
349+ return result ;
322350 }
323351 else
324352 {
325- MinimalTuple tup ;
326- bool done ;
353+ int nworkers = ts -> pcxt -> nworkers_launched ;
354+ /* Overwise we should tuple only to one of the workers */
327355
328- /* Parallel worker should receive something from the tqueue */
329- tup = TupleQueueReaderNext (ts -> reader , false, & done );
356+ typedef struct TQueueDestReceiver
357+ {
358+ DestReceiver pub ; /* public fields */
359+ shm_mq_handle * queue ; /* shm_mq to send to */
360+ } TQueueDestReceiver ;
330361
331- if (done )
362+ TQueueDestReceiver * rec ;
363+
364+ while (nworkers > 0 )
332365 {
333- Assert ( tup == NULL );
334- return NULL ;
335- }
366+ /* Prepare mimimal tuple */
367+ tup = ExecFetchSlotMinimalTuple ( slot , & should_free ) ;
368+ ExecStoreMinimalTuple ( tup , result , should_free );
336369
337- /* TODO: should free ? */
338- ExecStoreMinimalTuple (tup , result , false);
339- result -> tts_ops -> copyslot (result , result );
370+ for (i = 0 ; i < nworkers ; i ++ )
371+ {
372+ rec = (TQueueDestReceiver * ) ts -> receiver [i ];
373+ result = shm_mq_send (tqueue -> queue , tuple -> t_len , tuple , false, false);
374+ (void ) WaitLatch (MyLatch ,
375+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH ,
376+ (nap .tv_sec * 1000L ) + (nap .tv_usec / 1000L ),
377+ WAIT_EVENT_AUTOVACUUM_MAIN );
378+ }
379+ }
340380 }
341-
342- return result ;
343381}
344382
345383static void
@@ -355,13 +393,13 @@ EndTempScan(CustomScanState *node)
355393 ExecEndNode ((PlanState * ) linitial (node -> custom_ps ));
356394
357395 /* Can happen if not all tuples needed */
358- if (ts -> ptsi . receiver != NULL )
396+ if (ts -> receiver != NULL )
359397 {
360398 int i ;
361399
362- for (i = 0 ; i < ts -> shared -> nworkers ; ++ i )
400+ for (i = 0 ; i < ts -> nworkers ; ++ i )
363401 {
364- ts -> ptsi . receiver [i ]-> rDestroy (ts -> ptsi . receiver [i ]);
402+ ts -> receiver [i ]-> rDestroy (ts -> receiver [i ]);
365403 }
366404 }
367405 }
@@ -454,14 +492,29 @@ try_partial_tempscan(PlannerInfo *root, RelOptInfo *rel, Index rti,
454492 create_index_paths (root , rel );
455493 create_tidscan_paths (root , rel );
456494
495+ if (rel -> consider_parallel && rel -> lateral_relids == NULL )
496+ {
497+ int parallel_workers ;
498+
499+ parallel_workers = compute_parallel_worker (rel , rel -> pages , -1 ,
500+ max_parallel_workers_per_gather );
501+
502+ /* If any limit was set to zero, the user doesn't want a parallel scan. */
503+ if (parallel_workers <= 0 )
504+ return ;
505+
506+ /* Add an unordered partial path based on a parallel sequential scan. */
507+ add_partial_path (rel , create_seqscan_path (root , rel , NULL , parallel_workers ));
508+ }
509+
457510 /*
458511 * Dangerous zone. But we assume it is strictly local. What about extension
459512 * which could call ours and may have desire to add some partial paths after
460513 * us?
461514 */
462515
463- list_free (rel -> partial_pathlist );
464- rel -> partial_pathlist = NIL ;
516+ // list_free(rel->partial_pathlist);
517+ // rel->partial_pathlist = NIL;
465518
466519 /*
467520 * Set guard over each parallel_safe path
@@ -488,8 +541,8 @@ try_partial_tempscan(PlannerInfo *root, RelOptInfo *rel, Index rti,
488541 * lateral references guarantees we don't need to change any parameters
489542 * on a ReScan?
490543 */
491- add_path (rel , (Path * )
492- create_material_path (cpath -> parent , (Path * ) cpath ));
544+ add_path (rel , (Path * ) cpath
545+ /* create_material_path(cpath->parent, (Path *) cpath)*/ );
493546 }
494547
495548 list_free (parallel_safe_lst );
@@ -607,33 +660,44 @@ InitializeDSMTempScan(CustomScanState *node, ParallelContext *pcxt,
607660 DSM_CREATE_NULL_IF_MAXSEGMENTS );
608661 Assert (seg != NULL ); /* Don't process this case so far */
609662
663+ ts -> pcxt = pcxt ;
664+
610665 /* Save shared data for common usage in parallel workers */
611666 ts -> shared = (SharedTempScanInfo * ) coordinate ;
612667 ts -> shared -> handle = dsm_segment_handle (seg );
668+ ts -> nworkers = pcxt -> nworkers ;
613669
614670 /*
615- * Save number of workers because we will need it on later stages of the
616- * execution.
671+ * We can't initialise queues to workers here because not sure about real
672+ * number of workers will be launched (depends on the number of free slots
673+ * for background workers - see max_worker_processes).
617674 */
618- ts -> shared -> nworkers = pcxt -> nworkers ;
619675
620- if (ts -> shared -> nworkers > 0 )
621- {
622- int i ;
623- dsm_segment * seg = dsm_find_mapping (ts -> shared -> handle );
676+ /*
677+ * Initialise receivers here.
678+ * We don't do it earlier because real number of launched workers
679+ * will be known only after the Gather node launch them.
680+ * Anyway, in the case of any troubles we can initialise them
681+ * earlier and just not use the tail of them during the execution.
682+ */
683+ if (ts -> shared && ts -> nworkers > 0 )
684+ {
685+ int i ;
686+ dsm_segment * seg = dsm_find_mapping (ts -> shared -> handle );
624687
625- ts -> ptsi . tqueue =
626- ExecParallelSetupTupleQueues (ts -> shared -> nworkers ,
688+ ts -> tqueue =
689+ ExecParallelSetupTupleQueues (ts -> nworkers ,
627690 (char * ) dsm_segment_address (seg ),
628691 seg );
629692
630- ts -> ptsi .receiver = palloc (ts -> shared -> nworkers * sizeof (DestReceiver * ));
631- for (i = 0 ; i < ts -> shared -> nworkers ; i ++ )
632- {
633- ts -> ptsi .receiver [i ] =
634- CreateTupleQueueDestReceiver (ts -> ptsi .tqueue [i ]);
635- }
693+ ts -> receiver = palloc (ts -> nworkers * sizeof (DestReceiver * ));
694+ for (i = 0 ; i < ts -> nworkers ; i ++ )
695+ {
696+ ts -> receiver [i ] = CreateTupleQueueDestReceiver (ts -> tqueue [i ]);
636697 }
698+ }
699+ else
700+ elog (WARNING , "Workers do not needed" );
637701}
638702
639703static void
@@ -662,6 +726,7 @@ InitializeWorkerTempScan(CustomScanState *node, shm_toc *toc,
662726 shm_mq_set_receiver (mq , MyProc );
663727
664728 ts -> reader = CreateTupleQueueReader (shm_mq_attach (mq , seg , NULL ));
729+ ts -> initialized = true;
665730}
666731
667732static void
0 commit comments