1717#include "access/gin_private.h"
1818#include "access/xloginsert.h"
1919#include "miscadmin.h"
20+ #include "utils/memutils.h"
2021#include "utils/rel.h"
2122
2223static void ginFindParents (GinBtree btree , GinBtreeStack * stack );
@@ -310,27 +311,45 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
310311 * Insert a new item to a page.
311312 *
312313 * Returns true if the insertion was finished. On false, the page was split and
313- * the parent needs to be updated. (a root split returns true as it doesn't
314- * need any further action by the caller to complete)
314+ * the parent needs to be updated. (A root split returns true as it doesn't
315+ * need any further action by the caller to complete. )
315316 *
316317 * When inserting a downlink to an internal page, 'childbuf' contains the
317318 * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
318- * atomically with the insert. Also, the existing item at the given location
319- * is updated to point to ' updateblkno' .
319+ * atomically with the insert. Also, the existing item at offset stack->off
320+ * in the target page is updated to point to updateblkno.
320321 *
321322 * stack->buffer is locked on entry, and is kept locked.
323+ * Likewise for childbuf, if given.
322324 */
323325static bool
324326ginPlaceToPage (GinBtree btree , GinBtreeStack * stack ,
325327 void * insertdata , BlockNumber updateblkno ,
326328 Buffer childbuf , GinStatsData * buildStats )
327329{
328330 Page page = BufferGetPage (stack -> buffer );
331+ bool result ;
329332 GinPlaceToPageRC rc ;
330333 uint16 xlflags = 0 ;
331334 Page childpage = NULL ;
332335 Page newlpage = NULL ,
333336 newrpage = NULL ;
337+ void * ptp_workspace = NULL ;
338+ MemoryContext tmpCxt ;
339+ MemoryContext oldCxt ;
340+
341+ /*
342+ * We do all the work of this function and its subfunctions in a temporary
343+ * memory context. This avoids leakages and simplifies APIs, since some
344+ * subfunctions allocate storage that has to survive until we've finished
345+ * the WAL insertion.
346+ */
347+ tmpCxt = AllocSetContextCreate (CurrentMemoryContext ,
348+ "ginPlaceToPage temporary context" ,
349+ ALLOCSET_DEFAULT_MINSIZE ,
350+ ALLOCSET_DEFAULT_INITSIZE ,
351+ ALLOCSET_DEFAULT_MAXSIZE );
352+ oldCxt = MemoryContextSwitchTo (tmpCxt );
334353
335354 if (GinPageIsData (page ))
336355 xlflags |= GIN_INSERT_ISDATA ;
@@ -348,40 +367,42 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
348367 }
349368
350369 /*
351- * Try to put the incoming tuple on the page. placeToPage will decide if
352- * the page needs to be split.
353- *
354- * WAL-logging this operation is a bit funny:
355- *
356- * We're responsible for calling XLogBeginInsert() and XLogInsert().
357- * XLogBeginInsert() must be called before placeToPage, because
358- * placeToPage can register some data to the WAL record.
359- *
360- * If placeToPage returns INSERTED, placeToPage has already called
361- * START_CRIT_SECTION() and XLogBeginInsert(), and registered any data
362- * required to replay the operation, in block index 0. We're responsible
363- * for filling in the main data portion of the WAL record, calling
364- * XLogInsert(), and END_CRIT_SECTION.
365- *
366- * If placeToPage returns SPLIT, we're wholly responsible for WAL logging.
367- * Splits happen infrequently, so we just make a full-page image of all
368- * the pages involved.
370+ * See if the incoming tuple will fit on the page. beginPlaceToPage will
371+ * decide if the page needs to be split, and will compute the split
372+ * contents if so. See comments for beginPlaceToPage and execPlaceToPage
373+ * functions for more details of the API here.
369374 */
370- rc = btree -> placeToPage (btree , stack -> buffer , stack ,
371- insertdata , updateblkno ,
372- & newlpage , & newrpage );
373- if (rc == UNMODIFIED )
375+ rc = btree -> beginPlaceToPage (btree , stack -> buffer , stack ,
376+ insertdata , updateblkno ,
377+ & ptp_workspace ,
378+ & newlpage , & newrpage );
379+
380+ if (rc == GPTP_NO_WORK )
374381 {
375- XLogResetInsertion ();
376- return true;
382+ /* Nothing to do */
383+ result = true;
377384 }
378- else if (rc == INSERTED )
385+ else if (rc == GPTP_INSERT )
379386 {
380- /* placeToPage did START_CRIT_SECTION() */
387+ /* It will fit, perform the insertion */
388+ START_CRIT_SECTION ();
389+
390+ if (RelationNeedsWAL (btree -> index ))
391+ {
392+ XLogBeginInsert ();
393+ XLogRegisterBuffer (0 , stack -> buffer , REGBUF_STANDARD );
394+ if (BufferIsValid (childbuf ))
395+ XLogRegisterBuffer (1 , childbuf , REGBUF_STANDARD );
396+ }
397+
398+ /* Perform the page update, and register any extra WAL data */
399+ btree -> execPlaceToPage (btree , stack -> buffer , stack ,
400+ insertdata , updateblkno , ptp_workspace );
401+
381402 MarkBufferDirty (stack -> buffer );
382403
383404 /* An insert to an internal page finishes the split of the child. */
384- if (childbuf != InvalidBuffer )
405+ if (BufferIsValid ( childbuf ) )
385406 {
386407 GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
387408 MarkBufferDirty (childbuf );
@@ -393,21 +414,15 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
393414 ginxlogInsert xlrec ;
394415 BlockIdData childblknos [2 ];
395416
396- /*
397- * placetopage already registered stack->buffer as block 0.
398- */
399417 xlrec .flags = xlflags ;
400418
401- if (childbuf != InvalidBuffer )
402- XLogRegisterBuffer (1 , childbuf , REGBUF_STANDARD );
403-
404419 XLogRegisterData ((char * ) & xlrec , sizeof (ginxlogInsert ));
405420
406421 /*
407422 * Log information about child if this was an insertion of a
408423 * downlink.
409424 */
410- if (childbuf != InvalidBuffer )
425+ if (BufferIsValid ( childbuf ) )
411426 {
412427 BlockIdSet (& childblknos [0 ], BufferGetBlockNumber (childbuf ));
413428 BlockIdSet (& childblknos [1 ], GinPageGetOpaque (childpage )-> rightlink );
@@ -417,23 +432,29 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
417432
418433 recptr = XLogInsert (RM_GIN_ID , XLOG_GIN_INSERT );
419434 PageSetLSN (page , recptr );
420- if (childbuf != InvalidBuffer )
435+ if (BufferIsValid ( childbuf ) )
421436 PageSetLSN (childpage , recptr );
422437 }
423438
424439 END_CRIT_SECTION ();
425440
426- return true;
441+ /* Insertion is complete. */
442+ result = true;
427443 }
428- else if (rc == SPLIT )
444+ else if (rc == GPTP_SPLIT )
429445 {
430- /* Didn't fit, had to split */
446+ /*
447+ * Didn't fit, need to split. The split has been computed in newlpage
448+ * and newrpage, which are pointers to palloc'd pages, not associated
449+ * with buffers. stack->buffer is not touched yet.
450+ */
431451 Buffer rbuffer ;
432452 BlockNumber savedRightLink ;
433453 ginxlogSplit data ;
434454 Buffer lbuffer = InvalidBuffer ;
435455 Page newrootpg = NULL ;
436456
457+ /* Get a new index page to become the right page */
437458 rbuffer = GinNewBuffer (btree -> index );
438459
439460 /* During index build, count the new page */
@@ -447,19 +468,11 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
447468
448469 savedRightLink = GinPageGetOpaque (page )-> rightlink ;
449470
450- /*
451- * newlpage and newrpage are pointers to memory pages, not associated
452- * with buffers. stack->buffer is not touched yet.
453- */
454-
471+ /* Begin setting up WAL record */
455472 data .node = btree -> index -> rd_node ;
456473 data .flags = xlflags ;
457- if (childbuf != InvalidBuffer )
474+ if (BufferIsValid ( childbuf ) )
458475 {
459- Page childpage = BufferGetPage (childbuf );
460-
461- GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
462-
463476 data .leftChildBlkno = BufferGetBlockNumber (childbuf );
464477 data .rightChildBlkno = GinPageGetOpaque (childpage )-> rightlink ;
465478 }
@@ -469,12 +482,12 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
469482 if (stack -> parent == NULL )
470483 {
471484 /*
472- * split root, so we need to allocate new left page and place
473- * pointer on root to left and right page
485+ * splitting the root, so we need to allocate new left page and
486+ * place pointers to left and right page on root page.
474487 */
475488 lbuffer = GinNewBuffer (btree -> index );
476489
477- /* During index build, count the newly-added root page */
490+ /* During index build, count the new left page */
478491 if (buildStats )
479492 {
480493 if (btree -> isData )
@@ -491,9 +504,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
491504
492505 /*
493506 * Construct a new root page containing downlinks to the new left
494- * and right pages. (do this in a temporary copy first rather than
495- * overwriting the original page directly, so that we can still
496- * abort gracefully if this fails .)
507+ * and right pages. (Do this in a temporary copy rather than
508+ * overwriting the original page directly, since we're not in the
509+ * critical section yet .)
497510 */
498511 newrootpg = PageGetTempPage (newrpage );
499512 GinInitPage (newrootpg , GinPageGetOpaque (newlpage )-> flags & ~(GIN_LEAF | GIN_COMPRESSED ), BLCKSZ );
@@ -504,7 +517,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
504517 }
505518 else
506519 {
507- /* split non-root page */
520+ /* splitting a non-root page */
508521 data .rrlink = savedRightLink ;
509522
510523 GinPageGetOpaque (newrpage )-> rightlink = savedRightLink ;
@@ -513,41 +526,44 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
513526 }
514527
515528 /*
516- * Ok , we have the new contents of the left page in a temporary copy
517- * now (newlpage), and the newly-allocated right block has been filled
518- * in . The original page is still unchanged.
529+ * OK , we have the new contents of the left page in a temporary copy
530+ * now (newlpage), and likewise for the new contents of the
531+ * newly-allocated right block . The original page is still unchanged.
519532 *
520533 * If this is a root split, we also have a temporary page containing
521- * the new contents of the root. Copy the new left page to a
522- * newly-allocated block, and initialize the (original) root page the
523- * new copy. Otherwise, copy over the temporary copy of the new left
524- * page over the old left page.
534+ * the new contents of the root.
525535 */
526536
527537 START_CRIT_SECTION ();
528538
529539 MarkBufferDirty (rbuffer );
530540 MarkBufferDirty (stack -> buffer );
531- if (BufferIsValid (childbuf ))
532- MarkBufferDirty (childbuf );
533541
534542 /*
535- * Restore the temporary copies over the real buffers. But don't free
536- * the temporary copies yet, WAL record data points to them.
543+ * Restore the temporary copies over the real buffers.
537544 */
538545 if (stack -> parent == NULL )
539546 {
547+ /* Splitting the root, three pages to update */
540548 MarkBufferDirty (lbuffer );
541- memcpy (BufferGetPage ( stack -> buffer ) , newrootpg , BLCKSZ );
549+ memcpy (page , newrootpg , BLCKSZ );
542550 memcpy (BufferGetPage (lbuffer ), newlpage , BLCKSZ );
543551 memcpy (BufferGetPage (rbuffer ), newrpage , BLCKSZ );
544552 }
545553 else
546554 {
547- memcpy (BufferGetPage (stack -> buffer ), newlpage , BLCKSZ );
555+ /* Normal split, only two pages to update */
556+ memcpy (page , newlpage , BLCKSZ );
548557 memcpy (BufferGetPage (rbuffer ), newrpage , BLCKSZ );
549558 }
550559
560+ /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
561+ if (BufferIsValid (childbuf ))
562+ {
563+ GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
564+ MarkBufferDirty (childbuf );
565+ }
566+
551567 /* write WAL record */
552568 if (RelationNeedsWAL (btree -> index ))
553569 {
@@ -572,12 +588,13 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
572588 XLogRegisterBuffer (1 , rbuffer , REGBUF_FORCE_IMAGE | REGBUF_STANDARD );
573589 }
574590 if (BufferIsValid (childbuf ))
575- XLogRegisterBuffer (3 , childbuf , 0 );
591+ XLogRegisterBuffer (3 , childbuf , REGBUF_STANDARD );
576592
577593 XLogRegisterData ((char * ) & data , sizeof (ginxlogSplit ));
578594
579595 recptr = XLogInsert (RM_GIN_ID , XLOG_GIN_SPLIT );
580- PageSetLSN (BufferGetPage (stack -> buffer ), recptr );
596+
597+ PageSetLSN (page , recptr );
581598 PageSetLSN (BufferGetPage (rbuffer ), recptr );
582599 if (stack -> parent == NULL )
583600 PageSetLSN (BufferGetPage (lbuffer ), recptr );
@@ -587,33 +604,31 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
587604 END_CRIT_SECTION ();
588605
589606 /*
590- * We can release the lock on the right page now, but keep the
591- * original buffer locked.
607+ * We can release the locks/pins on the new pages now, but keep
608+ * stack-> buffer locked. childbuf doesn't get unlocked either .
592609 */
593610 UnlockReleaseBuffer (rbuffer );
594611 if (stack -> parent == NULL )
595612 UnlockReleaseBuffer (lbuffer );
596613
597- pfree (newlpage );
598- pfree (newrpage );
599- if (newrootpg )
600- pfree (newrootpg );
601-
602614 /*
603615 * If we split the root, we're done. Otherwise the split is not
604616 * complete until the downlink for the new page has been inserted to
605617 * the parent.
606618 */
607- if (stack -> parent == NULL )
608- return true;
609- else
610- return false;
619+ result = (stack -> parent == NULL );
611620 }
612621 else
613622 {
614- elog (ERROR , "unknown return code from GIN placeToPage method: %d" , rc );
615- return false; /* keep compiler quiet */
623+ elog (ERROR , "invalid return code from GIN placeToPage method: %d" , rc );
624+ result = false; /* keep compiler quiet */
616625 }
626+
627+ /* Clean up temp context */
628+ MemoryContextSwitchTo (oldCxt );
629+ MemoryContextDelete (tmpCxt );
630+
631+ return result ;
617632}
618633
619634/*
0 commit comments