1515#include "postgres.h"
1616
1717#include "access/gin_private.h"
18+ #include "miscadmin.h"
1819#include "utils/rel.h"
1920
20- /*
21- * Merge two ordered arrays of itempointers, eliminating any duplicates.
22- * Returns the number of items in the result.
23- * Caller is responsible that there is enough space at *dst.
24- */
25- uint32
26- ginMergeItemPointers (ItemPointerData * dst ,
27- ItemPointerData * a , uint32 na ,
28- ItemPointerData * b , uint32 nb )
29- {
30- ItemPointerData * dptr = dst ;
31- ItemPointerData * aptr = a ,
32- * bptr = b ;
33-
34- while (aptr - a < na && bptr - b < nb )
35- {
36- int cmp = ginCompareItemPointers (aptr , bptr );
37-
38- if (cmp > 0 )
39- * dptr ++ = * bptr ++ ;
40- else if (cmp == 0 )
41- {
42- /* we want only one copy of the identical items */
43- * dptr ++ = * bptr ++ ;
44- aptr ++ ;
45- }
46- else
47- * dptr ++ = * aptr ++ ;
48- }
49-
50- while (aptr - a < na )
51- * dptr ++ = * aptr ++ ;
52-
53- while (bptr - b < nb )
54- * dptr ++ = * bptr ++ ;
55-
56- return dptr - dst ;
57- }
58-
5921/*
6022 * Checks, should we move to right link...
6123 * Compares inserting itemp pointer with right bound of current page
@@ -387,9 +349,12 @@ dataPrepareData(GinBtree btree, Page page, OffsetNumber off)
387349/*
388350 * Places keys to page and fills WAL record. In case leaf page and
389351 * build mode puts all ItemPointers to page.
352+ *
353+ * If none of the keys fit, returns false without modifying the page.
390354 */
391- static void
392- dataPlaceToPage (GinBtree btree , Buffer buf , OffsetNumber off , XLogRecData * * prdata )
355+ static bool
356+ dataPlaceToPage (GinBtree btree , Buffer buf , OffsetNumber off ,
357+ XLogRecData * * prdata )
393358{
394359 Page page = BufferGetPage (buf );
395360 int sizeofitem = GinSizeOfDataPageItem (page );
@@ -399,6 +364,10 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prda
399364 static XLogRecData rdata [3 ];
400365 static ginxlogInsert data ;
401366
367+ /* quick exit if it doesn't fit */
368+ if (!dataIsEnoughSpace (btree , buf , off ))
369+ return false;
370+
402371 * prdata = rdata ;
403372 Assert (GinPageIsData (page ));
404373
@@ -464,6 +433,8 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prda
464433 }
465434 else
466435 GinDataPageAddPostingItem (page , & (btree -> pitem ), off );
436+
437+ return true;
467438}
468439
469440/*
@@ -545,8 +516,8 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
545516 }
546517
547518 /*
548- * we suppose that during index creation table scaned from begin to end,
549- * so ItemPointers are monotonically increased. .
519+ * we assume that during index creation the table scanned from beginning
520+ * to end, so ItemPointers are in monotonically increasing order .
550521 */
551522 if (btree -> isBuild && GinPageRightMost (lpage ))
552523 separator = freeSpace / sizeofitem ;
@@ -575,15 +546,6 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
575546
576547 GinPageGetOpaque (rpage )-> maxoff = maxoff - separator ;
577548
578- PostingItemSetBlockNumber (& (btree -> pitem ), BufferGetBlockNumber (lbuf ));
579- if (GinPageIsLeaf (lpage ))
580- btree -> pitem .key = * GinDataPageGetItemPointer (lpage ,
581- GinPageGetOpaque (lpage )-> maxoff );
582- else
583- btree -> pitem .key = GinDataPageGetPostingItem (lpage ,
584- GinPageGetOpaque (lpage )-> maxoff )-> key ;
585- btree -> rightblkno = BufferGetBlockNumber (rbuf );
586-
587549 /* set up right bound for left page */
588550 bound = GinDataPageGetRightBound (lpage );
589551 * bound = btree -> pitem .key ;
@@ -613,6 +575,16 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
613575 rdata [1 ].len = MAXALIGN (maxoff * sizeofitem );
614576 rdata [1 ].next = NULL ;
615577
578+ /* Prepare a downlink tuple for insertion to the parent */
579+ PostingItemSetBlockNumber (& (btree -> pitem ), BufferGetBlockNumber (lbuf ));
580+ if (GinPageIsLeaf (lpage ))
581+ btree -> pitem .key = * GinDataPageGetItemPointer (lpage ,
582+ GinPageGetOpaque (lpage )-> maxoff );
583+ else
584+ btree -> pitem .key = GinDataPageGetPostingItem (lpage ,
585+ GinPageGetOpaque (lpage )-> maxoff )-> key ;
586+ btree -> rightblkno = BufferGetBlockNumber (rbuf );
587+
616588 return lpage ;
617589}
618590
@@ -638,6 +610,92 @@ ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
638610 GinDataPageAddPostingItem (page , & ri , InvalidOffsetNumber );
639611}
640612
613+ /*
614+ * Creates new posting tree containing the given TIDs. Returns the page
615+ * number of the root of the new posting tree.
616+ *
617+ * items[] must be in sorted order with no duplicates.
618+ */
619+ BlockNumber
620+ createPostingTree (Relation index , ItemPointerData * items , uint32 nitems ,
621+ GinStatsData * buildStats )
622+ {
623+ BlockNumber blkno ;
624+ Buffer buffer ;
625+ Page page ;
626+ int itemsCount ;
627+
628+ /* Calculate how many TIDs will fit on first page. */
629+ itemsCount = Min (nitems , GinMaxLeafDataItems );
630+
631+ /*
632+ * Create the root page.
633+ */
634+ buffer = GinNewBuffer (index );
635+ page = BufferGetPage (buffer );
636+ blkno = BufferGetBlockNumber (buffer );
637+
638+ START_CRIT_SECTION ();
639+
640+ GinInitBuffer (buffer , GIN_DATA | GIN_LEAF );
641+ memcpy (GinDataPageGetData (page ), items , sizeof (ItemPointerData ) * nitems );
642+ GinPageGetOpaque (page )-> maxoff = nitems ;
643+
644+ MarkBufferDirty (buffer );
645+
646+ if (RelationNeedsWAL (index ))
647+ {
648+ XLogRecPtr recptr ;
649+ XLogRecData rdata [2 ];
650+ ginxlogCreatePostingTree data ;
651+
652+ data .node = index -> rd_node ;
653+ data .blkno = blkno ;
654+ data .nitem = nitems ;
655+
656+ rdata [0 ].buffer = InvalidBuffer ;
657+ rdata [0 ].data = (char * ) & data ;
658+ rdata [0 ].len = sizeof (ginxlogCreatePostingTree );
659+ rdata [0 ].next = & rdata [1 ];
660+
661+ rdata [1 ].buffer = InvalidBuffer ;
662+ rdata [1 ].data = (char * ) items ;
663+ rdata [1 ].len = sizeof (ItemPointerData ) * itemsCount ;
664+ rdata [1 ].next = NULL ;
665+
666+ recptr = XLogInsert (RM_GIN_ID , XLOG_GIN_CREATE_PTREE , rdata );
667+ PageSetLSN (page , recptr );
668+ }
669+
670+ UnlockReleaseBuffer (buffer );
671+
672+ END_CRIT_SECTION ();
673+
674+ /* During index build, count the newly-added data page */
675+ if (buildStats )
676+ buildStats -> nDataPages ++ ;
677+
678+ /*
679+ * Add any remaining TIDs to the newly-created posting tree.
680+ */
681+ if (itemsCount < nitems )
682+ {
683+ GinPostingTreeScan * gdi ;
684+
685+ gdi = ginPrepareScanPostingTree (index , blkno , FALSE);
686+ gdi -> btree .isBuild = (buildStats != NULL );
687+
688+ ginInsertItemPointers (gdi ,
689+ items + itemsCount ,
690+ nitems - itemsCount ,
691+ buildStats );
692+
693+ pfree (gdi );
694+ }
695+
696+ return blkno ;
697+ }
698+
641699void
642700ginPrepareDataScan (GinBtree btree , Relation index )
643701{
@@ -650,7 +708,6 @@ ginPrepareDataScan(GinBtree btree, Relation index)
650708 btree -> findItem = dataLocateLeafItem ;
651709 btree -> findChildPtr = dataFindChildPtr ;
652710 btree -> getLeftMostPage = dataGetLeftMostPage ;
653- btree -> isEnoughSpace = dataIsEnoughSpace ;
654711 btree -> placeToPage = dataPlaceToPage ;
655712 btree -> splitPage = dataSplitPage ;
656713 btree -> fillRoot = ginDataFillRoot ;
0 commit comments