@@ -33,6 +33,16 @@ extern void seg_scanner_finish(void);
3333extern int seg_yydebug;
3434*/
3535
36+ /*
37+ * Auxiliary data structure for picksplit method.
38+ */
39+ typedef struct
40+ {
41+ float center ;
42+ OffsetNumber index ;
43+ SEG * data ;
44+ } gseg_picksplit_item ;
45+
3646/*
3747** Input/Output routines
3848*/
@@ -292,152 +302,104 @@ gseg_penalty(GISTENTRY *origentry, GISTENTRY *newentry, float *result)
292302 return (result );
293303}
294304
305+ /*
306+ * Compare function for gseg_picksplit_item: sort by center.
307+ */
308+ static int
309+ gseg_picksplit_item_cmp (const void * a , const void * b )
310+ {
311+ const gseg_picksplit_item * i1 = (const gseg_picksplit_item * ) a ;
312+ const gseg_picksplit_item * i2 = (const gseg_picksplit_item * ) b ;
295313
314+ if (i1 -> center < i2 -> center )
315+ return -1 ;
316+ else if (i1 -> center == i2 -> center )
317+ return 0 ;
318+ else
319+ return 1 ;
320+ }
296321
297322/*
298- ** The GiST PickSplit method for segments
299- ** We use Guttman's poly time split algorithm
300- */
323+ * The GiST PickSplit method for segments
324+ *
325+ * We used to use Guttman's split algorithm here, but since the data is 1-D
326+ * it's easier and more robust to just sort the segments by center-point and
327+ * split at the middle.
328+ */
301329GIST_SPLITVEC *
302330gseg_picksplit (GistEntryVector * entryvec ,
303331 GIST_SPLITVEC * v )
304332{
305- OffsetNumber i ,
306- j ;
307- SEG * datum_alpha ,
308- * datum_beta ;
333+ int i ;
309334 SEG * datum_l ,
310- * datum_r ;
311- SEG * union_d ,
312- * union_dl ,
313- * union_dr ;
314- SEG * inter_d ;
315- bool firsttime ;
316- float size_alpha ,
317- size_beta ,
318- size_union ,
319- size_inter ;
320- float size_waste ,
321- waste ;
322- float size_l ,
323- size_r ;
324- int nbytes ;
325- OffsetNumber seed_1 = 1 ,
326- seed_2 = 2 ;
335+ * datum_r ,
336+ * seg ;
337+ gseg_picksplit_item * sort_items ;
327338 OffsetNumber * left ,
328339 * right ;
329340 OffsetNumber maxoff ;
341+ OffsetNumber firstright ;
330342
331343#ifdef GIST_DEBUG
332344 fprintf (stderr , "picksplit\n" );
333345#endif
334346
335- maxoff = entryvec -> n - 2 ;
336- nbytes = (maxoff + 2 ) * sizeof (OffsetNumber );
337- v -> spl_left = (OffsetNumber * ) palloc (nbytes );
338- v -> spl_right = (OffsetNumber * ) palloc (nbytes );
347+ /* Valid items in entryvec->vector[] are indexed 1..maxoff */
348+ maxoff = entryvec -> n - 1 ;
339349
340- firsttime = true;
341- waste = 0.0 ;
342-
343- for (i = FirstOffsetNumber ; i < maxoff ; i = OffsetNumberNext (i ))
350+ /*
351+ * Prepare the auxiliary array and sort it.
352+ */
353+ sort_items = (gseg_picksplit_item * )
354+ palloc (maxoff * sizeof (gseg_picksplit_item ));
355+ for (i = 1 ; i <= maxoff ; i ++ )
344356 {
345- datum_alpha = (SEG * ) DatumGetPointer (entryvec -> vector [i ].key );
346- for (j = OffsetNumberNext (i ); j <= maxoff ; j = OffsetNumberNext (j ))
347- {
348- datum_beta = (SEG * ) DatumGetPointer (entryvec -> vector [j ].key );
349-
350- /* compute the wasted space by unioning these guys */
351- /* size_waste = size_union - size_inter; */
352- union_d = seg_union (datum_alpha , datum_beta );
353- rt_seg_size (union_d , & size_union );
354- inter_d = seg_inter (datum_alpha , datum_beta );
355- rt_seg_size (inter_d , & size_inter );
356- size_waste = size_union - size_inter ;
357-
358- /*
359- * are these a more promising split that what we've already seen?
360- */
361- if (size_waste > waste || firsttime )
362- {
363- waste = size_waste ;
364- seed_1 = i ;
365- seed_2 = j ;
366- firsttime = false;
367- }
368- }
357+ seg = (SEG * ) DatumGetPointer (entryvec -> vector [i ].key );
358+ /* center calculation is done this way to avoid possible overflow */
359+ sort_items [i - 1 ].center = seg -> lower * 0.5f + seg -> upper * 0.5f ;
360+ sort_items [i - 1 ].index = i ;
361+ sort_items [i - 1 ].data = seg ;
369362 }
363+ qsort (sort_items , maxoff , sizeof (gseg_picksplit_item ),
364+ gseg_picksplit_item_cmp );
370365
366+ /* sort items below "firstright" will go into the left side */
367+ firstright = maxoff / 2 ;
368+
369+ v -> spl_left = (OffsetNumber * ) palloc (maxoff * sizeof (OffsetNumber ));
370+ v -> spl_right = (OffsetNumber * ) palloc (maxoff * sizeof (OffsetNumber ));
371371 left = v -> spl_left ;
372372 v -> spl_nleft = 0 ;
373373 right = v -> spl_right ;
374374 v -> spl_nright = 0 ;
375375
376- datum_alpha = (SEG * ) DatumGetPointer (entryvec -> vector [seed_1 ].key );
377- datum_l = seg_union (datum_alpha , datum_alpha );
378- rt_seg_size (datum_l , & size_l );
379- datum_beta = (SEG * ) DatumGetPointer (entryvec -> vector [seed_2 ].key );
380- datum_r = seg_union (datum_beta , datum_beta );
381- rt_seg_size (datum_r , & size_r );
382-
383376 /*
384- * Now split up the regions between the two seeds. An important property
385- * of this split algorithm is that the split vector v has the indices of
386- * items to be split in order in its left and right vectors. We exploit
387- * this property by doing a merge in the code that actually splits the
388- * page.
389- *
390- * For efficiency, we also place the new index tuple in this loop. This is
391- * handled at the very end, when we have placed all the existing tuples
392- * and i == maxoff + 1.
377+ * Emit segments to the left output page, and compute its bounding box.
393378 */
394-
395- maxoff = OffsetNumberNext (maxoff );
396- for (i = FirstOffsetNumber ; i <= maxoff ; i = OffsetNumberNext (i ))
379+ datum_l = (SEG * ) palloc (sizeof (SEG ));
380+ memcpy (datum_l , sort_items [0 ].data , sizeof (SEG ));
381+ * left ++ = sort_items [0 ].index ;
382+ v -> spl_nleft ++ ;
383+ for (i = 1 ; i < firstright ; i ++ )
397384 {
398- /*
399- * If we've already decided where to place this item, just put it on
400- * the right list. Otherwise, we need to figure out which page needs
401- * the least enlargement in order to store the item.
402- */
403-
404- if (i == seed_1 )
405- {
406- * left ++ = i ;
407- v -> spl_nleft ++ ;
408- continue ;
409- }
410- else if (i == seed_2 )
411- {
412- * right ++ = i ;
413- v -> spl_nright ++ ;
414- continue ;
415- }
416-
417- /* okay, which page needs least enlargement? */
418- datum_alpha = (SEG * ) DatumGetPointer (entryvec -> vector [i ].key );
419- union_dl = seg_union (datum_l , datum_alpha );
420- union_dr = seg_union (datum_r , datum_alpha );
421- rt_seg_size (union_dl , & size_alpha );
422- rt_seg_size (union_dr , & size_beta );
385+ datum_l = seg_union (datum_l , sort_items [i ].data );
386+ * left ++ = sort_items [i ].index ;
387+ v -> spl_nleft ++ ;
388+ }
423389
424- /* pick which page to add it to */
425- if (size_alpha - size_l < size_beta - size_r )
426- {
427- datum_l = union_dl ;
428- size_l = size_alpha ;
429- * left ++ = i ;
430- v -> spl_nleft ++ ;
431- }
432- else
433- {
434- datum_r = union_dr ;
435- size_r = size_alpha ;
436- * right ++ = i ;
437- v -> spl_nright ++ ;
438- }
390+ /*
391+ * Likewise for the right page.
392+ */
393+ datum_r = (SEG * ) palloc (sizeof (SEG ));
394+ memcpy (datum_r , sort_items [firstright ].data , sizeof (SEG ));
395+ * right ++ = sort_items [firstright ].index ;
396+ v -> spl_nright ++ ;
397+ for (i = firstright + 1 ; i < maxoff ; i ++ )
398+ {
399+ datum_r = seg_union (datum_r , sort_items [i ].data );
400+ * right ++ = sort_items [i ].index ;
401+ v -> spl_nright ++ ;
439402 }
440- * left = * right = FirstOffsetNumber ; /* sentinel value, see dosplit() */
441403
442404 v -> spl_ldatum = PointerGetDatum (datum_l );
443405 v -> spl_rdatum = PointerGetDatum (datum_r );
0 commit comments