🌐 AI搜索 & 代理 主页
Skip to content

Commit 6e9e023

Browse files
authored
Merge pull request #3980 from graphql-java/defer-dataloader
make dataloader work inside defer blocks
2 parents f4d3857 + a7cc794 commit 6e9e023

16 files changed

+457
-407
lines changed

src/main/java/graphql/execution/AsyncExecutionStrategy.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public AsyncExecutionStrategy(DataFetcherExceptionHandler exceptionHandler) {
3939
@SuppressWarnings("FutureReturnValueIgnored")
4040
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
4141
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
42-
dataLoaderDispatcherStrategy.executionStrategy(executionContext, parameters);
4342
Instrumentation instrumentation = executionContext.getInstrumentation();
4443
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
4544

@@ -54,6 +53,9 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
5453
}
5554

5655
DeferredExecutionSupport deferredExecutionSupport = createDeferredExecutionSupport(executionContext, parameters);
56+
57+
dataLoaderDispatcherStrategy.executionStrategy(executionContext, parameters, deferredExecutionSupport.getNonDeferredFieldNames(fieldNames).size());
58+
5759
Async.CombinedBuilder<FieldValueInfo> futures = getAsyncFieldValueInfo(executionContext, parameters, deferredExecutionSupport);
5860

5961
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
@@ -72,14 +74,14 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
7274
for (FieldValueInfo completeValueInfo : completeValueInfos) {
7375
fieldValuesFutures.addObject(completeValueInfo.getFieldValueObject());
7476
}
75-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos);
77+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos, parameters);
7678
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
7779
fieldValuesFutures.await().whenComplete(handleResultsConsumer);
7880
}).exceptionally((ex) -> {
7981
// if there are any issues with combining/handling the field results,
8082
// complete the future at all costs and bubble up any thrown exception so
8183
// the execution does not hang.
82-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex);
84+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex, parameters);
8385
executionStrategyCtx.onFieldValuesException();
8486
overallResult.completeExceptionally(ex);
8587
return null;

src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,13 @@ private Object resolveSerialField(ExecutionContext executionContext,
7474
if (fieldWithInfo instanceof CompletableFuture) {
7575
//noinspection unchecked
7676
return ((CompletableFuture<FieldValueInfo>) fieldWithInfo).thenCompose(fvi -> {
77-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
77+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi), newParameters);
7878
CompletableFuture<Object> fieldValueFuture = fvi.getFieldValueFuture();
7979
return fieldValueFuture;
8080
});
8181
} else {
8282
FieldValueInfo fvi = (FieldValueInfo) fieldWithInfo;
83-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
83+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi), newParameters);
8484
return fvi.getFieldValueObject();
8585
}
8686
}

src/main/java/graphql/execution/DataLoaderDispatchStrategy.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,35 @@ public interface DataLoaderDispatchStrategy {
1414
};
1515

1616

17-
default void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
17+
default void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) {
1818

1919
}
2020

2121
default void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
2222

2323
}
2424

25-
default void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList) {
25+
default void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
2626

2727
}
2828

29-
default void executionStrategyOnFieldValuesException(Throwable t) {
29+
default void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) {
3030

3131
}
3232

3333

34-
default void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters executionStrategyParameters) {
34+
default void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters executionStrategyParameters, int fieldCount) {
3535

3636
}
3737

3838
default void executeObjectOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
3939

4040
}
4141

42+
default void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable throwable, ExecutionStrategyParameters parameters) {
43+
44+
}
45+
4246
default void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) {
4347

4448
}
@@ -55,8 +59,4 @@ default void fieldFetched(ExecutionContext executionContext,
5559
default DataFetcher<?> modifyDataFetcher(DataFetcher<?> dataFetcher) {
5660
return dataFetcher;
5761
}
58-
59-
default void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) {
60-
61-
}
6262
}

src/main/java/graphql/execution/Execution.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import graphql.ExecutionInput;
77
import graphql.ExecutionResult;
88
import graphql.ExecutionResultImpl;
9-
import graphql.ExperimentalApi;
109
import graphql.GraphQLContext;
1110
import graphql.GraphQLError;
1211
import graphql.Internal;
@@ -16,7 +15,6 @@
1615
import graphql.execution.instrumentation.InstrumentationState;
1716
import graphql.execution.instrumentation.dataloader.FallbackDataLoaderDispatchStrategy;
1817
import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategy;
19-
import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch;
2018
import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters;
2119
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters;
2220
import graphql.extensions.ExtensionsBuilder;
@@ -37,7 +35,6 @@
3735
import java.util.Collections;
3836
import java.util.List;
3937
import java.util.Map;
40-
import java.util.Optional;
4138
import java.util.concurrent.CompletableFuture;
4239
import java.util.function.Supplier;
4340

@@ -258,12 +255,7 @@ private DataLoaderDispatchStrategy createDataLoaderDispatchStrategy(ExecutionCon
258255
return DataLoaderDispatchStrategy.NO_OP;
259256
}
260257
if (!executionContext.isSubscriptionOperation()) {
261-
boolean deferEnabled = executionContext.hasIncrementalSupport();
262-
263-
// Dedicated strategy for defer support, for safety purposes.
264-
return deferEnabled ?
265-
new PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch(executionContext) :
266-
new PerLevelDataLoaderDispatchStrategy(executionContext);
258+
return new PerLevelDataLoaderDispatchStrategy(executionContext);
267259
} else {
268260
return new FallbackDataLoaderDispatchStrategy(executionContext);
269261
}

src/main/java/graphql/execution/ExecutionStrategy.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import graphql.EngineRunningState;
66
import graphql.ExecutionResult;
77
import graphql.ExecutionResultImpl;
8-
import graphql.ExperimentalApi;
98
import graphql.GraphQLError;
109
import graphql.Internal;
1110
import graphql.PublicSpi;
@@ -50,7 +49,6 @@
5049
import java.util.Collections;
5150
import java.util.List;
5251
import java.util.Map;
53-
import java.util.Optional;
5452
import java.util.OptionalInt;
5553
import java.util.concurrent.CompletableFuture;
5654
import java.util.concurrent.CompletionException;
@@ -197,7 +195,6 @@ public static String mkNameForPath(List<Field> currentField) {
197195
@DuckTyped(shape = "CompletableFuture<Map<String, Object>> | Map<String, Object>")
198196
protected Object executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
199197
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
200-
dataLoaderDispatcherStrategy.executeObject(executionContext, parameters);
201198
Instrumentation instrumentation = executionContext.getInstrumentation();
202199
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
203200

@@ -212,6 +209,7 @@ protected Object executeObject(ExecutionContext executionContext, ExecutionStrat
212209

213210
CompletableFuture<Map<String, Object>> overallResult = new CompletableFuture<>();
214211
List<String> fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames);
212+
dataLoaderDispatcherStrategy.executeObject(executionContext, parameters, fieldsExecutedOnInitialResult.size());
215213
BiConsumer<List<Object>, Throwable> handleResultsConsumer = buildFieldValueMap(fieldsExecutedOnInitialResult, overallResult, executionContext);
216214

217215
resolveObjectCtx.onDispatched();
@@ -300,7 +298,7 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi
300298
) {
301299
MergedSelectionSet fields = parameters.getFields();
302300

303-
executionContext.getIncrementalCallState().enqueue(deferredExecutionSupport.createCalls(parameters));
301+
executionContext.getIncrementalCallState().enqueue(deferredExecutionSupport.createCalls());
304302

305303
// Only non-deferred fields should be considered for calculating the expected size of futures.
306304
Async.CombinedBuilder<FieldValueInfo> futures = Async
@@ -400,7 +398,6 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec
400398
}
401399

402400
MergedField field = parameters.getField();
403-
String pathString = parameters.getPath().toString();
404401
GraphQLObjectType parentType = (GraphQLObjectType) parameters.getExecutionStepInfo().getUnwrappedNonNullType();
405402

406403
// if the DF (like PropertyDataFetcher) does not use the arguments or execution step info then dont build any
@@ -435,6 +432,7 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec
435432
.parentType(parentType)
436433
.selectionSet(fieldCollector)
437434
.queryDirectives(queryDirectives)
435+
.deferredCallContext(parameters.getDeferredCallContext())
438436
.build();
439437
});
440438

src/main/java/graphql/execution/ExecutionStrategyParameters.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public ExecutionStrategyParameters getParent() {
9494
* @return the deferred call context or null if we're not in the scope of a deferred call
9595
*/
9696
@Nullable
97+
@Internal
9798
public DeferredCallContext getDeferredCallContext() {
9899
return deferredCallContext;
99100
}

src/main/java/graphql/execution/incremental/DeferredCallContext.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import graphql.GraphQLError;
44
import graphql.Internal;
5+
import graphql.VisibleForTesting;
56

67
import java.util.List;
78
import java.util.concurrent.CopyOnWriteArrayList;
@@ -18,8 +19,31 @@
1819
@Internal
1920
public class DeferredCallContext {
2021

22+
private final int startLevel;
23+
private final int fields;
24+
2125
private final List<GraphQLError> errors = new CopyOnWriteArrayList<>();
2226

27+
public DeferredCallContext(int startLevel, int fields) {
28+
this.startLevel = startLevel;
29+
this.fields = fields;
30+
}
31+
32+
@VisibleForTesting
33+
public DeferredCallContext() {
34+
this.startLevel = 0;
35+
this.fields = 0;
36+
}
37+
38+
public int getStartLevel() {
39+
return startLevel;
40+
}
41+
42+
public int getFields() {
43+
return fields;
44+
}
45+
46+
2347
public void addErrors(List<GraphQLError> errors) {
2448
this.errors.addAll(errors);
2549
}
@@ -34,4 +58,6 @@ public void addError(GraphQLError graphqlError) {
3458
public List<GraphQLError> getErrors() {
3559
return errors;
3660
}
61+
62+
3763
}

src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public interface DeferredExecutionSupport {
4545

4646
List<String> getNonDeferredFieldNames(List<String> allFieldNames);
4747

48-
Set<IncrementalCall<? extends IncrementalPayload>> createCalls(ExecutionStrategyParameters executionStrategyParameters);
48+
Set<IncrementalCall<? extends IncrementalPayload>> createCalls();
4949

5050
DeferredExecutionSupport NOOP = new DeferredExecutionSupport.NoOp();
5151

@@ -106,23 +106,24 @@ public List<String> getNonDeferredFieldNames(List<String> allFieldNames) {
106106
}
107107

108108
@Override
109-
public Set<IncrementalCall<? extends IncrementalPayload>> createCalls(ExecutionStrategyParameters executionStrategyParameters) {
109+
public Set<IncrementalCall<? extends IncrementalPayload>> createCalls() {
110110
ImmutableSet<DeferredExecution> deferredExecutions = deferredExecutionToFields.keySet();
111111
Set<IncrementalCall<? extends IncrementalPayload>> set = new HashSet<>(deferredExecutions.size());
112112
for (DeferredExecution deferredExecution : deferredExecutions) {
113-
set.add(this.createDeferredFragmentCall(deferredExecution, executionStrategyParameters));
113+
set.add(this.createDeferredFragmentCall(deferredExecution));
114114
}
115115
return set;
116116
}
117117

118-
private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferredExecution, ExecutionStrategyParameters executionStrategyParameters) {
119-
DeferredCallContext deferredCallContext = new DeferredCallContext();
118+
private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferredExecution) {
119+
int level = parameters.getPath().getLevel() + 1;
120+
DeferredCallContext deferredCallContext = new DeferredCallContext(level, deferredFields.size());
120121

121122
List<MergedField> mergedFields = deferredExecutionToFields.get(deferredExecution);
122123

123124
List<Supplier<CompletableFuture<DeferredFragmentCall.FieldWithExecutionResult>>> calls = FpKit.arrayListSizedTo(mergedFields);
124125
for (MergedField currentField : mergedFields) {
125-
calls.add(this.createResultSupplier(currentField, deferredCallContext, executionStrategyParameters));
126+
calls.add(this.createResultSupplier(currentField, deferredCallContext));
126127
}
127128

128129
return new DeferredFragmentCall(
@@ -135,13 +136,12 @@ private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferr
135136

136137
private Supplier<CompletableFuture<DeferredFragmentCall.FieldWithExecutionResult>> createResultSupplier(
137138
MergedField currentField,
138-
DeferredCallContext deferredCallContext,
139-
ExecutionStrategyParameters executionStrategyParameters
139+
DeferredCallContext deferredCallContext
140140
) {
141141
Map<String, MergedField> fields = new LinkedHashMap<>();
142142
fields.put(currentField.getResultKey(), currentField);
143143

144-
ExecutionStrategyParameters callParameters = parameters.transform(builder ->
144+
ExecutionStrategyParameters executionStrategyParameters = parameters.transform(builder ->
145145
{
146146
MergedSelectionSet mergedSelectionSet = MergedSelectionSet.newMergedSelectionSet().subFields(fields).build();
147147
ResultPath path = parameters.getPath().segment(currentField.getResultKey());
@@ -158,22 +158,23 @@ private Supplier<CompletableFuture<DeferredFragmentCall.FieldWithExecutionResult
158158

159159
instrumentation.beginDeferredField(executionContext.getInstrumentationState());
160160

161+
// todo: handle cached computations
161162
return dfCache.computeIfAbsent(
162163
currentField.getResultKey(),
163164
// The same field can be associated with multiple defer executions, so
164165
// we memoize the field resolution to avoid multiple calls to the same data fetcher
165166
key -> FpKit.interThreadMemoize(() -> {
166-
CompletableFuture<FieldValueInfo> fieldValueResult = resolveFieldWithInfoFn
167-
.apply(executionContext, callParameters);
167+
CompletableFuture<FieldValueInfo> fieldValueResult = resolveFieldWithInfoFn.apply(executionContext, executionStrategyParameters);
168+
169+
fieldValueResult.whenComplete((fieldValueInfo, throwable) -> {
170+
executionContext.getDataLoaderDispatcherStrategy().deferredOnFieldValue(currentField.getResultKey(), fieldValueInfo, throwable, executionStrategyParameters);
171+
});
168172

169-
CompletableFuture<ExecutionResult> executionResultCF = fieldValueResult
170-
.thenCompose(fvi -> {
171-
executionContext.getDataLoaderDispatcherStrategy().executeDeferredOnFieldValueInfo(fvi, executionStrategyParameters);
172173

173-
return fvi
174-
.getFieldValueFuture()
175-
.thenApply(fv -> ExecutionResultImpl.newExecutionResult().data(fv).build());
176-
}
174+
CompletableFuture<ExecutionResult> executionResultCF = fieldValueResult
175+
.thenCompose(fvi -> fvi
176+
.getFieldValueFuture()
177+
.thenApply(fv -> ExecutionResultImpl.newExecutionResult().data(fv).build())
177178
);
178179

179180
return executionResultCF
@@ -207,7 +208,7 @@ public List<String> getNonDeferredFieldNames(List<String> allFieldNames) {
207208
}
208209

209210
@Override
210-
public Set<IncrementalCall<? extends IncrementalPayload>> createCalls(ExecutionStrategyParameters executionStrategyParameters) {
211+
public Set<IncrementalCall<? extends IncrementalPayload>> createCalls() {
211212
return Collections.emptySet();
212213
}
213214
}

src/main/java/graphql/execution/incremental/IncrementalCallState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,5 @@ private Supplier<SingleSubscriberPublisher<DelayedIncrementalPartialResult>> cre
103103
public Publisher<DelayedIncrementalPartialResult> startDeferredCalls() {
104104
return publisher.get();
105105
}
106+
106107
}

0 commit comments

Comments
 (0)