2828#include "optimizer/cost.h"
2929#include "optimizer/pathnode.h"
3030#include "partitioning/partbounds.h"
31+ #include "postgres_fdw.h"
3132#include "utils/lsyscache.h"
3233#include "utils/rel.h"
3334#include "utils/syscache.h"
@@ -514,9 +515,17 @@ exchange_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEnt
514515 break ;
515516
516517 case T_ForeignPath :
518+ {
519+ PgFdwRelationInfo * fpinfo =
520+ (PgFdwRelationInfo * ) subpath -> parent -> fdw_private ;
521+
517522 serverid = subpath -> parent -> serverid ;
518523 tmpPath = make_local_scan_path (tmpLocalScanPath ,
519524 subpath -> parent , & indexinfo );
525+ Assert (subpath -> parent -> fdw_private != NULL );
526+ tmpPath -> rows = fpinfo -> rows ;
527+ tmpPath -> total_cost += fpinfo -> total_cost - fpinfo -> startup_cost ;
528+ }
520529 break ;
521530
522531 default :
@@ -539,7 +548,6 @@ exchange_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEnt
539548 PATH_REQ_OUTER (tmpLocalScanPath ), 0 , false,
540549 ((AppendPath * ) path )-> partitioned_rels , -1 );
541550 path = (Path * ) create_exchange_path (root , rel , (Path * ) ap , EXCH_GATHER );
542-
543551 set_exchange_altrel (EXCH_GATHER , (ExchangePath * ) path , rel , NULL , NULL ,
544552 servers );
545553
@@ -559,6 +567,7 @@ exchange_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEnt
559567 path = (Path * ) create_distexec_path (root , rel , path , servers );
560568
561569 distributed_pathlist = lappend (distributed_pathlist , path );
570+ bms_free (servers );
562571 }
563572 return distributed_pathlist ;
564573}
@@ -607,13 +616,13 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, ExchangePath *expath)
607616 * subtree M/N local tuples, send to network [M-M/N] tuples and same to
608617 * receive.
609618 */
610- path -> rows /= expath -> altrel .nparts ;
619+ // path->rows /= expath->altrel.nparts;
611620 instances = expath -> altrel .nparts ;
612621 send_rows = path -> rows - (path -> rows /instances );
613622 received_rows = send_rows ;
614623 local_rows = path -> rows /instances ;
615624 path -> total_cost += (send_rows + local_rows ) * cpu_tuple_cost ;
616- path -> total_cost += (received_rows ) * cpu_tuple_cost * 4 . ;
625+ path -> total_cost += (received_rows ) * cpu_tuple_cost * 10 . ;
617626 }
618627 break ;
619628 default :
@@ -1062,11 +1071,10 @@ init_state_ifany(ExchangeState *state)
10621071 state -> hasLocal = true;
10631072 state -> init = true;
10641073}
1065-
1074+ int print1 = 0 ;
10661075static TupleTableSlot *
10671076EXCHANGE_Execute (CustomScanState * node )
10681077{
1069- ScanState * ss = & node -> ss ;
10701078 ScanState * subPlanState = linitial (node -> custom_ps );
10711079 ExchangeState * state = (ExchangeState * ) node ;
10721080 bool readRemote = false;
@@ -1080,32 +1088,34 @@ EXCHANGE_Execute(CustomScanState *node)
10801088
10811089 readRemote = !readRemote ;
10821090
1083- if ((state -> activeRemotes > 0 ) && readRemote )
1091+ if ((state -> activeRemotes > 0 ) /* && readRemote */ )
10841092 {
10851093 int status ;
1094+ status = RecvTuple (state -> stream , node -> ss .ss_ScanTupleSlot );
10861095
1087- slot = RecvTuple (ss -> ss_ScanTupleSlot -> tts_tupleDescriptor ,
1088- state -> stream , & status );
10891096 switch (status )
10901097 {
10911098 case -1 :
10921099 /* No tuples currently */
10931100 break ;
10941101 case 0 :
1095- Assert (!TupIsNull (slot ));
1102+ Assert (!TupIsNull (node -> ss . ss_ScanTupleSlot ));
10961103 state -> rtuples ++ ;
1097- return slot ;
1104+ return node -> ss . ss_ScanTupleSlot ;
10981105 case 1 :
10991106 state -> activeRemotes -- ;
1100- // elog(LOG, "[%s] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d", state->stream,
1101- // state->activeRemotes, state->ltuples, state->rtuples, state->hasLocal, state->stuples);
1107+ // elog(LOG, "[%s %d] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d",\
1108+ // state->stream, state->mode, state->activeRemotes,
1109+ // state->ltuples,
1110+ // state->rtuples, state->hasLocal, state->stuples);
11021111 break ;
11031112 case 2 : /* Close EXCHANGE channel */
11041113 break ;
11051114 default :
11061115 /* Any system message */
11071116 break ;
11081117 }
1118+ slot = NULL ;
11091119 }
11101120
11111121 if ((state -> hasLocal ) && (!readRemote ))
@@ -1170,7 +1180,6 @@ EXCHANGE_Execute(CustomScanState *node)
11701180 {
11711181 state -> stuples ++ ;
11721182 SendTuple (dest , state -> stream , slot , false);
1173- // elog(LOG, "Send tuple: %d", state->stuples);
11741183 }
11751184 }
11761185 return NULL ;
@@ -1241,7 +1250,7 @@ EXCHANGE_Explain(CustomScanState *node, List *ancestors, ExplainState *es)
12411250 }
12421251
12431252 appendStringInfo (& str , "mode: %s, stream: %s. " , mode , state -> stream );
1244- appendStringInfo (& str , "qual: %s." , nodeToString (state -> partexprs ));
1253+ // appendStringInfo(&str, "qual: %s.", nodeToString(state->partexprs));
12451254 ExplainPropertyText ("Exchange" , str .data , es );
12461255}
12471256
0 commit comments