@@ -70,6 +70,9 @@ typedef struct storeInfo
7070 AttInMetadata * attinmeta ;
7171 MemoryContext tmpcontext ;
7272 char * * cstrs ;
73+ /* temp storage for results to avoid leaks on exception */
74+ PGresult * last_res ;
75+ PGresult * cur_res ;
7376} storeInfo ;
7477
7578/*
@@ -83,8 +86,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo,
8386 const char * conname ,
8487 const char * sql ,
8588 bool fail );
86- static int storeHandler ( PGresult * res , const PGdataValue * columns ,
87- const char * * errmsgp , void * param );
89+ static PGresult * storeQueryResult ( storeInfo * sinfo , PGconn * conn , const char * sql );
90+ static void storeRow ( storeInfo * sinfo , PGresult * res , bool first );
8891static remoteConn * getConnectionByName (const char * name );
8992static HTAB * createConnHash (void );
9093static void createNewConnection (const char * name , remoteConn * rconn );
@@ -630,7 +633,7 @@ dblink_send_query(PG_FUNCTION_ARGS)
630633 /* async query send */
631634 retval = PQsendQuery (conn , sql );
632635 if (retval != 1 )
633- elog (NOTICE , "%s" , PQerrorMessage (conn ));
636+ elog (NOTICE , "could not send query: %s" , PQerrorMessage (conn ));
634637
635638 PG_RETURN_INT32 (retval );
636639}
@@ -927,8 +930,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
927930/*
928931 * Execute the given SQL command and store its results into a tuplestore
929932 * to be returned as the result of the current function.
933+ *
930934 * This is equivalent to PQexec followed by materializeResult, but we make
931- * use of libpq's "row processor" API to reduce per-row overhead.
935+ * use of libpq's single-row mode to avoid accumulating the whole result
936+ * inside libpq before it gets transferred to the tuplestore.
932937 */
933938static void
934939materializeQueryResult (FunctionCallInfo fcinfo ,
@@ -944,19 +949,14 @@ materializeQueryResult(FunctionCallInfo fcinfo,
944949 /* prepTuplestoreResult must have been called previously */
945950 Assert (rsinfo -> returnMode == SFRM_Materialize );
946951
952+ /* initialize storeInfo to empty */
953+ memset (& sinfo , 0 , sizeof (sinfo ));
954+ sinfo .fcinfo = fcinfo ;
955+
947956 PG_TRY ();
948957 {
949- /* initialize storeInfo to empty */
950- memset (& sinfo , 0 , sizeof (sinfo ));
951- sinfo .fcinfo = fcinfo ;
952-
953- /* We'll collect tuples using storeHandler */
954- PQsetRowProcessor (conn , storeHandler , & sinfo );
955-
956- res = PQexec (conn , sql );
957-
958- /* We don't keep the custom row processor installed permanently */
959- PQsetRowProcessor (conn , NULL , NULL );
958+ /* execute query, collecting any tuples into the tuplestore */
959+ res = storeQueryResult (& sinfo , conn , sql );
960960
961961 if (!res ||
962962 (PQresultStatus (res ) != PGRES_COMMAND_OK &&
@@ -975,8 +975,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
975975 else if (PQresultStatus (res ) == PGRES_COMMAND_OK )
976976 {
977977 /*
978- * storeHandler didn't get called, so we need to convert the
979- * command status string to a tuple manually
978+ * storeRow didn't get called, so we need to convert the command
979+ * status string to a tuple manually
980980 */
981981 TupleDesc tupdesc ;
982982 AttInMetadata * attinmeta ;
@@ -1008,49 +1008,103 @@ materializeQueryResult(FunctionCallInfo fcinfo,
10081008 tuplestore_puttuple (tupstore , tuple );
10091009
10101010 PQclear (res );
1011+ res = NULL ;
10111012 }
10121013 else
10131014 {
10141015 Assert (PQresultStatus (res ) == PGRES_TUPLES_OK );
1015- /* storeHandler should have created a tuplestore */
1016+ /* storeRow should have created a tuplestore */
10161017 Assert (rsinfo -> setResult != NULL );
10171018
10181019 PQclear (res );
1020+ res = NULL ;
10191021 }
1022+ PQclear (sinfo .last_res );
1023+ sinfo .last_res = NULL ;
1024+ PQclear (sinfo .cur_res );
1025+ sinfo .cur_res = NULL ;
10201026 }
10211027 PG_CATCH ();
10221028 {
1023- /* be sure to unset the custom row processor */
1024- PQsetRowProcessor (conn , NULL , NULL );
10251029 /* be sure to release any libpq result we collected */
1026- if (res )
1027- PQclear (res );
1030+ PQclear (res );
1031+ PQclear (sinfo .last_res );
1032+ PQclear (sinfo .cur_res );
10281033 /* and clear out any pending data in libpq */
1029- while ((res = PQskipResult (conn )) != NULL )
1034+ while ((res = PQgetResult (conn )) != NULL )
10301035 PQclear (res );
10311036 PG_RE_THROW ();
10321037 }
10331038 PG_END_TRY ();
10341039}
10351040
10361041/*
1037- * Custom row processor for materializeQueryResult.
1038- * Prototype of this function must match PQrowProcessor.
1042+ * Execute query, and send any result rows to sinfo->tuplestore.
10391043 */
1040- static int
1041- storeHandler (PGresult * res , const PGdataValue * columns ,
1042- const char * * errmsgp , void * param )
1044+ static PGresult *
1045+ storeQueryResult (storeInfo * sinfo , PGconn * conn , const char * sql )
1046+ {
1047+ bool first = true;
1048+ PGresult * res ;
1049+
1050+ if (!PQsendQuery (conn , sql ))
1051+ elog (ERROR , "could not send query: %s" , PQerrorMessage (conn ));
1052+
1053+ if (!PQsetSingleRowMode (conn )) /* shouldn't fail */
1054+ elog (ERROR , "failed to set single-row mode for dblink query" );
1055+
1056+ for (;;)
1057+ {
1058+ CHECK_FOR_INTERRUPTS ();
1059+
1060+ sinfo -> cur_res = PQgetResult (conn );
1061+ if (!sinfo -> cur_res )
1062+ break ;
1063+
1064+ if (PQresultStatus (sinfo -> cur_res ) == PGRES_SINGLE_TUPLE )
1065+ {
1066+ /* got one row from possibly-bigger resultset */
1067+ storeRow (sinfo , sinfo -> cur_res , first );
1068+
1069+ PQclear (sinfo -> cur_res );
1070+ sinfo -> cur_res = NULL ;
1071+ first = false;
1072+ }
1073+ else
1074+ {
1075+ /* if empty resultset, fill tuplestore header */
1076+ if (first && PQresultStatus (sinfo -> cur_res ) == PGRES_TUPLES_OK )
1077+ storeRow (sinfo , sinfo -> cur_res , first );
1078+
1079+ /* store completed result at last_res */
1080+ PQclear (sinfo -> last_res );
1081+ sinfo -> last_res = sinfo -> cur_res ;
1082+ sinfo -> cur_res = NULL ;
1083+ first = true;
1084+ }
1085+ }
1086+
1087+ /* return last_res */
1088+ res = sinfo -> last_res ;
1089+ sinfo -> last_res = NULL ;
1090+ return res ;
1091+ }
1092+
1093+ /*
1094+ * Send single row to sinfo->tuplestore.
1095+ *
1096+ * If "first" is true, create the tuplestore using PGresult's metadata
1097+ * (in this case the PGresult might contain either zero or one row).
1098+ */
1099+ static void
1100+ storeRow (storeInfo * sinfo , PGresult * res , bool first )
10431101{
1044- storeInfo * sinfo = (storeInfo * ) param ;
10451102 int nfields = PQnfields (res );
1046- char * * cstrs = sinfo -> cstrs ;
10471103 HeapTuple tuple ;
1048- char * pbuf ;
1049- int pbuflen ;
10501104 int i ;
10511105 MemoryContext oldcontext ;
10521106
1053- if (columns == NULL )
1107+ if (first )
10541108 {
10551109 /* Prepare for new result set */
10561110 ReturnSetInfo * rsinfo = (ReturnSetInfo * ) sinfo -> fcinfo -> resultinfo ;
@@ -1098,13 +1152,16 @@ storeHandler(PGresult *res, const PGdataValue *columns,
10981152 sinfo -> attinmeta = TupleDescGetAttInMetadata (tupdesc );
10991153
11001154 /* Create a new, empty tuplestore */
1101- oldcontext = MemoryContextSwitchTo (
1102- rsinfo -> econtext -> ecxt_per_query_memory );
1155+ oldcontext = MemoryContextSwitchTo (rsinfo -> econtext -> ecxt_per_query_memory );
11031156 sinfo -> tuplestore = tuplestore_begin_heap (true, false, work_mem );
11041157 rsinfo -> setResult = sinfo -> tuplestore ;
11051158 rsinfo -> setDesc = tupdesc ;
11061159 MemoryContextSwitchTo (oldcontext );
11071160
1161+ /* Done if empty resultset */
1162+ if (PQntuples (res ) == 0 )
1163+ return ;
1164+
11081165 /*
11091166 * Set up sufficiently-wide string pointers array; this won't change
11101167 * in size so it's easy to preallocate.
@@ -1121,11 +1178,10 @@ storeHandler(PGresult *res, const PGdataValue *columns,
11211178 ALLOCSET_DEFAULT_MINSIZE ,
11221179 ALLOCSET_DEFAULT_INITSIZE ,
11231180 ALLOCSET_DEFAULT_MAXSIZE );
1124-
1125- return 1 ;
11261181 }
11271182
1128- CHECK_FOR_INTERRUPTS ();
1183+ /* Should have a single-row result if we get here */
1184+ Assert (PQntuples (res ) == 1 );
11291185
11301186 /*
11311187 * Do the following work in a temp context that we reset after each tuple.
@@ -1135,46 +1191,24 @@ storeHandler(PGresult *res, const PGdataValue *columns,
11351191 oldcontext = MemoryContextSwitchTo (sinfo -> tmpcontext );
11361192
11371193 /*
1138- * The strings passed to us are not null-terminated, but the datatype
1139- * input functions we're about to call require null termination. Copy the
1140- * strings and add null termination. As a micro-optimization, allocate
1141- * all the strings with one palloc.
1194+ * Fill cstrs with null-terminated strings of column values.
11421195 */
1143- pbuflen = nfields ; /* count the null terminators themselves */
11441196 for (i = 0 ; i < nfields ; i ++ )
11451197 {
1146- int len = columns [i ].len ;
1147-
1148- if (len > 0 )
1149- pbuflen += len ;
1150- }
1151- pbuf = (char * ) palloc (pbuflen );
1152-
1153- for (i = 0 ; i < nfields ; i ++ )
1154- {
1155- int len = columns [i ].len ;
1156-
1157- if (len < 0 )
1158- cstrs [i ] = NULL ;
1198+ if (PQgetisnull (res , 0 , i ))
1199+ sinfo -> cstrs [i ] = NULL ;
11591200 else
1160- {
1161- cstrs [i ] = pbuf ;
1162- memcpy (pbuf , columns [i ].value , len );
1163- pbuf += len ;
1164- * pbuf ++ = '\0' ;
1165- }
1201+ sinfo -> cstrs [i ] = PQgetvalue (res , 0 , i );
11661202 }
11671203
11681204 /* Convert row to a tuple, and add it to the tuplestore */
1169- tuple = BuildTupleFromCStrings (sinfo -> attinmeta , cstrs );
1205+ tuple = BuildTupleFromCStrings (sinfo -> attinmeta , sinfo -> cstrs );
11701206
11711207 tuplestore_puttuple (sinfo -> tuplestore , tuple );
11721208
11731209 /* Clean up */
11741210 MemoryContextSwitchTo (oldcontext );
11751211 MemoryContextReset (sinfo -> tmpcontext );
1176-
1177- return 1 ;
11781212}
11791213
11801214/*
0 commit comments