diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..10ef831 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +version: 2 +updates: + - package-ecosystem: "gradle" + directory: "/" + schedule: + interval: "weekly" + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 01b89bc..3be85c9 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -10,22 +10,24 @@ jobs: env: MAVEN_CENTRAL_USER: ${{ secrets.MAVEN_CENTRAL_USER }} MAVEN_CENTRAL_PASSWORD: ${{ secrets.MAVEN_CENTRAL_PASSWORD }} + MAVEN_CENTRAL_USER_NEW: ${{ secrets.MAVEN_CENTRAL_USER_NEW }} + MAVEN_CENTRAL_PASSWORD_NEW: ${{ secrets.MAVEN_CENTRAL_PASSWORD_NEW }} MAVEN_CENTRAL_PGP_KEY: ${{ secrets.MAVEN_CENTRAL_PGP_KEY }} steps: - - uses: actions/checkout@v4 - - uses: gradle/actions/wrapper-validation@v3 - - name: Set up JDK 11 - uses: actions/setup-java@v4 + - uses: actions/checkout@v5 + - uses: gradle/actions/wrapper-validation@v5 + - name: Set up JDK 21 + uses: actions/setup-java@v5 with: - java-version: '11' - distribution: 'temurin' + java-version: '21' + distribution: 'corretto' check-latest: true # Configure Gradle for optimal use in GiHub Actions, including caching of downloaded dependencies. # See: https://github.com/gradle/actions/blob/main/setup-gradle/README.md - name: Setup Gradle - uses: gradle/actions/setup-gradle@v4 + uses: gradle/actions/setup-gradle@v5 - name: build test and publish - run: ./gradlew assemble && ./gradlew check --info && ./gradlew publishToSonatype closeAndReleaseSonatypeStagingRepository -x check --info --stacktrace + run: ./gradlew assemble && ./gradlew check --info && ./gradlew jcstress && ./gradlew publishToSonatype closeAndReleaseSonatypeStagingRepository -x check --info --stacktrace env: CI: true diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index f16bf96..41a1e88 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -13,19 +13,19 @@ jobs: buildAndTest: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - uses: gradle/actions/wrapper-validation@v3 - - name: Set up JDK 11 - uses: actions/setup-java@v4 + - uses: actions/checkout@v5 + - uses: gradle/actions/wrapper-validation@v5 + - name: Set up JDK 21 + uses: actions/setup-java@v5 with: - java-version: '11' - distribution: 'temurin' + java-version: '21' + distribution: 'corretto' check-latest: true # Configure Gradle for optimal use in GiHub Actions, including caching of downloaded dependencies. # See: https://github.com/gradle/actions/blob/main/setup-gradle/README.md - name: Setup Gradle - uses: gradle/actions/setup-gradle@v4 + uses: gradle/actions/setup-gradle@v5 - name: build and test - run: ./gradlew assemble && ./gradlew check --info --stacktrace + run: ./gradlew assemble && ./gradlew check --info --stacktrace && ./gradlew jcstress env: CI: true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a574a68..ba43744 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,21 +14,23 @@ jobs: MAVEN_CENTRAL_PGP_KEY: ${{ secrets.MAVEN_CENTRAL_PGP_KEY }} MAVEN_CENTRAL_USER: ${{ secrets.MAVEN_CENTRAL_USER }} MAVEN_CENTRAL_PASSWORD: ${{ secrets.MAVEN_CENTRAL_PASSWORD }} + MAVEN_CENTRAL_USER_NEW: ${{ secrets.MAVEN_CENTRAL_USER_NEW }} + MAVEN_CENTRAL_PASSWORD_NEW: ${{ secrets.MAVEN_CENTRAL_PASSWORD_NEW }} RELEASE_VERSION: ${{ github.event.inputs.version }} steps: - - uses: actions/checkout@v4 - - uses: gradle/actions/wrapper-validation@v3 - - name: Set up JDK 11 - uses: actions/setup-java@v4 + - uses: actions/checkout@v5 + - uses: gradle/actions/wrapper-validation@v5 + - name: Set up JDK 21 + uses: actions/setup-java@v5 with: - java-version: '11' - distribution: 'temurin' + java-version: '21' + distribution: 'corretto' check-latest: true # Configure Gradle for optimal use in GiHub Actions, including caching of downloaded dependencies. # See: https://github.com/gradle/actions/blob/main/setup-gradle/README.md - name: Setup Gradle - uses: gradle/actions/setup-gradle@v4 + uses: gradle/actions/setup-gradle@v5 - name: build test and publish run: ./gradlew assemble && ./gradlew check --info && ./gradlew publishToSonatype closeAndReleaseSonatypeStagingRepository -x check --info --stacktrace env: diff --git a/.github/workflows/stale-pr-issue.yml b/.github/workflows/stale-pr-issue.yml index d945402..0f385ee 100644 --- a/.github/workflows/stale-pr-issue.yml +++ b/.github/workflows/stale-pr-issue.yml @@ -16,7 +16,7 @@ jobs: close-pending: runs-on: ubuntu-latest steps: - - uses: actions/stale@v9 + - uses: actions/stale@v10 with: # GLOBAL ------------------------------------------------------------ # Exempt any PRs or issues already added to a milestone diff --git a/build.gradle b/build.gradle index eb57158..7d2c806 100644 --- a/build.gradle +++ b/build.gradle @@ -1,3 +1,7 @@ +import net.ltgt.gradle.errorprone.CheckSeverity +import org.jetbrains.kotlin.gradle.dsl.JvmTarget +import org.jetbrains.kotlin.gradle.dsl.KotlinVersion + import java.text.SimpleDateFormat plugins { @@ -7,15 +11,33 @@ plugins { id 'maven-publish' id 'signing' id 'groovy' - id 'biz.aQute.bnd.builder' version '6.2.0' - id 'io.github.gradle-nexus.publish-plugin' version '1.0.0' - id 'com.github.ben-manes.versions' version '0.51.0' + id 'biz.aQute.bnd.builder' version '7.1.0' + id 'io.github.gradle-nexus.publish-plugin' version '2.0.0' + id 'com.github.ben-manes.versions' version '0.53.0' id "me.champeau.jmh" version "0.7.3" + id "net.ltgt.errorprone" version '4.3.0' + id "io.github.reyerizo.gradle.jcstress" version "0.9.0" + + // Kotlin just for tests - not production code + id 'org.jetbrains.kotlin.jvm' version '2.2.21' } java { toolchain { - languageVersion = JavaLanguageVersion.of(11) + languageVersion = JavaLanguageVersion.of(21) + } +} + +kotlin { + compilerOptions { + apiVersion = KotlinVersion.KOTLIN_2_0 + languageVersion = KotlinVersion.KOTLIN_2_0 + jvmTarget = JvmTarget.JVM_11 + javaParameters = true + freeCompilerArgs = [ + '-Xemit-jvm-type-annotations', + '-Xjspecify-annotations=strict', + ] } } @@ -59,13 +81,15 @@ repositories { jar { manifest { - attributes('Automatic-Module-Name': 'org.dataloader', - '-exportcontents': 'org.dataloader.*', - '-removeheaders': 'Private-Package') + attributes('Automatic-Module-Name': 'org.dataloader') } - bnd(''' + bundle { + bnd(''' +-exportcontents: org.dataloader.* +-removeheaders: Private-Package Import-Package: org.jspecify.annotations;resolution:=optional,* ''') + } } dependencies { @@ -75,8 +99,35 @@ dependencies { // this is needed for the idea jmh plugin to work correctly jmh 'org.openjdk.jmh:jmh-core:1.37' jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.37' + + errorprone 'com.uber.nullaway:nullaway:0.12.10' + errorprone 'com.google.errorprone:error_prone_core:2.43.0' + + // just tests + testImplementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' } +tasks.withType(JavaCompile) { + options.release = 11 + options.errorprone { + disableAllChecks = true + check("NullAway", CheckSeverity.ERROR) + // + // end state has us with this config turned on - eg all classes + // + //option("NullAway:AnnotatedPackages", "org.dataloader") + option("NullAway:OnlyNullMarked", "true") + option("NullAway:JSpecifyMode", "true") + } + // Include to disable NullAway on test code + if (name.toLowerCase().contains("test")) { + options.errorprone { + disable("NullAway") + } + } +} + + task sourcesJar(type: Jar) { dependsOn classes archiveClassifier.set('sources') @@ -92,10 +143,7 @@ task javadocJar(type: Jar, dependsOn: javadoc) { from javadoc.destinationDir } -artifacts { - archives sourcesJar - archives javadocJar -} + testing { suites { @@ -128,9 +176,9 @@ publishing { publications { graphqlJava(MavenPublication) { from components.java - groupId 'com.graphql-java' - artifactId 'java-dataloader' - version project.version + groupId = 'com.graphql-java' + artifactId = 'java-dataloader' + version = project.version artifact sourcesJar artifact javadocJar @@ -177,14 +225,18 @@ publishing { nexusPublishing { repositories { sonatype { - username = System.env.MAVEN_CENTRAL_USER - password = System.env.MAVEN_CENTRAL_PASSWORD + username = System.env.MAVEN_CENTRAL_USER_NEW + password = System.env.MAVEN_CENTRAL_PASSWORD_NEW + // https://central.sonatype.org/publish/publish-portal-ossrh-staging-api/#configuration + nexusUrl.set(uri("https://ossrh-staging-api.central.sonatype.com/service/local/")) + // GraphQL Java does not publish snapshots, but adding this URL for completeness + snapshotRepositoryUrl.set(uri("https://central.sonatype.com/repository/maven-snapshots/")) } } } signing { - required { !project.hasProperty('publishToMavenLocal') } + required = { !project.hasProperty('publishToMavenLocal') } def signingKey = System.env.MAVEN_CENTRAL_PGP_KEY useInMemoryPgpKeys(signingKey, "") sign publishing.publications @@ -208,3 +260,7 @@ tasks.named("dependencyUpdates").configure { isNonStable(it.candidate.version) } } + +jcstress { +// verbose = true +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 428b6e2..db4b332 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,4 +23,8 @@ hamcrest_version=2.2 awaitility_version=2.0.0 reactor_core_version=3.6.6 caffeine_version=3.1.8 -reactive_streams_version=1.0.3 \ No newline at end of file +reactive_streams_version=1.0.3 +# Prevents the Kotlin stdlib being a POM dependency +# +# https://kotlinlang.org/docs/gradle-configure-project.html#dependency-on-the-standard-library +kotlin.stdlib.default.dependency=false \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 7454180..a4b76b9 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e2847c8..bad7c24 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.0-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index 1b6c787..f5feea6 100755 --- a/gradlew +++ b/gradlew @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# SPDX-License-Identifier: Apache-2.0 +# ############################################################################## # @@ -55,7 +57,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -80,13 +82,12 @@ do esac done -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -APP_NAME="Gradle" +# This is normally unused +# shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s +' "$PWD" ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,22 +134,29 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac case $MAX_FD in #( '' | soft) :;; #( *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -193,11 +201,15 @@ if "$cygwin" || "$msys" ; then done fi -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ @@ -205,6 +217,12 @@ set -- \ org.gradle.wrapper.GradleWrapperMain \ "$@" +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + # Use "xargs" to parse quoted args. # # With -n1 it outputs one arg per line, with the quotes and backslashes removed. diff --git a/gradlew.bat b/gradlew.bat index ac1b06f..9b42019 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -13,8 +13,10 @@ @rem See the License for the specific language governing permissions and @rem limitations under the License. @rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem -@if "%DEBUG%" == "" @echo off +@if "%DEBUG%"=="" @echo off @rem ########################################################################## @rem @rem Gradle startup script for Windows @@ -25,7 +27,8 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% @@ -40,13 +43,13 @@ if defined JAVA_HOME goto findJavaFromJavaHome set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto execute +if %ERRORLEVEL% equ 0 goto execute -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail @@ -56,11 +59,11 @@ set JAVA_EXE=%JAVA_HOME%/bin/java.exe if exist "%JAVA_EXE%" goto execute -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail @@ -75,13 +78,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar :end @rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd +if %ERRORLEVEL% equ 0 goto mainEnd :fail rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% :mainEnd if "%OS%"=="Windows_NT" endlocal diff --git a/settings.gradle b/settings.gradle index 47404e7..0d61ab8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,6 +1,6 @@ plugins { - id 'com.gradle.develocity' version '3.19' - id 'org.gradle.toolchains.foojay-resolver-convention' version '0.9.0' + id 'com.gradle.develocity' version '4.2.2' + id 'org.gradle.toolchains.foojay-resolver-convention' version '1.0.0' } develocity { diff --git a/src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java b/src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java new file mode 100644 index 0000000..f0d8263 --- /dev/null +++ b/src/jcstress/java/org/dataloader/DataLoader_Batching_Caching_JCStress.java @@ -0,0 +1,108 @@ +package org.dataloader; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.II_Result; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; + +@JCStressTest +@State +@Outcome(id = "2000, 2000", expect = ACCEPTABLE, desc = "accepted") +public class DataLoader_Batching_Caching_JCStress { + + + AtomicInteger counter = new AtomicInteger(); + AtomicInteger batchLoaderCount = new AtomicInteger(); + volatile boolean finished1; + volatile boolean finished2; + + + BatchLoader batchLoader = keys -> { + return CompletableFuture.supplyAsync(() -> { + batchLoaderCount.getAndAdd(keys.size()); + return keys; + }); + }; + DataLoader dataLoader = DataLoaderFactory.newDataLoader(batchLoader); + + public DataLoader_Batching_Caching_JCStress() { + + } + + @Actor + public void load1() { + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } + // we load the same keys again + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } + finished1 = true; + } + + @Actor + public void load2() { + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-2-" + i); + } + // we load the same keys again + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-2-" + i); + } + finished2 = true; + } + + + @Actor + public void dispatch1() { + while (!finished1 || !finished2) { + try { + List dispatchedResult = dataLoader.dispatch().get(); + counter.getAndAdd(dispatchedResult.size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + try { + List dispatchedResult = dataLoader.dispatch().get(); + counter.getAndAdd(dispatchedResult.size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Actor + public void dispatch2() { + while (!finished1 || !finished2) { + try { + List dispatchedResult = dataLoader.dispatch().get(); + counter.getAndAdd(dispatchedResult.size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + try { + List dispatchedResult = dataLoader.dispatch().get(); + counter.getAndAdd(dispatchedResult.size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = counter.get(); + r.r2 = batchLoaderCount.get(); + } + + +} diff --git a/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java b/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java new file mode 100644 index 0000000..6b46ed1 --- /dev/null +++ b/src/jcstress/java/org/dataloader/DataLoader_NoBatching_Caching_JCStress.java @@ -0,0 +1,54 @@ +package org.dataloader; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.II_Result; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE_INTERESTING; + +@JCStressTest +@State +@Outcome(id = "1000, 1000", expect = ACCEPTABLE, desc = "No keys loaded twice") +@Outcome(id = "1.*, 1000", expect = ACCEPTABLE_INTERESTING, desc = "Some keys loaded twice") +public class DataLoader_NoBatching_Caching_JCStress { + + + AtomicInteger batchLoaderCount = new AtomicInteger(); + + BatchLoader batchLoader = keys -> { + batchLoaderCount.getAndAdd(keys.size()); + return CompletableFuture.completedFuture(keys); + }; + + + DataLoader dataLoader = DataLoaderFactory.newDataLoader(batchLoader, DataLoaderOptions.newOptions().setBatchingEnabled(false).build()); + + @Actor + public void load1() { + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } + } + + @Actor + public void load2() { + for (int i = 0; i < 1000; i++) { + dataLoader.load("load-1-" + i); + } + } + + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = batchLoaderCount.get(); + r.r2 = dataLoader.getCacheMap().size(); + } + +} diff --git a/src/jmh/java/performance/DataLoaderDispatchPerformance.java b/src/jmh/java/performance/DataLoaderDispatchPerformance.java index 0b4696d..ad2060c 100644 --- a/src/jmh/java/performance/DataLoaderDispatchPerformance.java +++ b/src/jmh/java/performance/DataLoaderDispatchPerformance.java @@ -12,6 +12,7 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; @@ -280,15 +281,20 @@ public void setup() { } + DataLoader ownerDL = DataLoaderFactory.newDataLoader(ownerBatchLoader); + DataLoader petDL = DataLoaderFactory.newDataLoader(petBatchLoader); + + } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) + @Threads(Threads.MAX) public void loadAndDispatch(MyState myState, Blackhole blackhole) { - DataLoader ownerDL = DataLoaderFactory.newDataLoader(ownerBatchLoader); - DataLoader petDL = DataLoaderFactory.newDataLoader(petBatchLoader); + DataLoader ownerDL = myState.ownerDL; + DataLoader petDL = myState.petDL; for (Owner owner : owners.values()) { ownerDL.load(owner.id); diff --git a/src/main/java/org/dataloader/BatchLoader.java b/src/main/java/org/dataloader/BatchLoader.java index 2b0c3c5..df11f89 100644 --- a/src/main/java/org/dataloader/BatchLoader.java +++ b/src/main/java/org/dataloader/BatchLoader.java @@ -17,8 +17,8 @@ package org.dataloader; import org.dataloader.annotations.PublicSpi; -import org.jspecify.annotations.NonNull; import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; import java.util.List; import java.util.concurrent.CompletionStage; @@ -40,7 +40,7 @@ * 2, 9, 6, 1 * ] * - * + *

* and loading from a back-end service returned this list of values: * *

@@ -50,7 +50,7 @@
  *      { id: 2, name: 'San Francisco' },
  *  ]
  * 
- * + *

* then the batch loader function contract has been broken. *

* The back-end service returned results in a different order than we requested, likely because it was more efficient for it to @@ -77,7 +77,7 @@ @FunctionalInterface @PublicSpi @NullMarked -public interface BatchLoader { +public interface BatchLoader { /** * Called to batch load the provided keys and return a promise to a list of values. @@ -85,7 +85,6 @@ public interface BatchLoader { * If you need calling context then implement {@link org.dataloader.BatchLoaderWithContext} * * @param keys the collection of keys to load - * * @return a promise of the values for those keys */ CompletionStage> load(List keys); diff --git a/src/main/java/org/dataloader/BatchLoaderEnvironment.java b/src/main/java/org/dataloader/BatchLoaderEnvironment.java index 6b84e70..c7a2ed8 100644 --- a/src/main/java/org/dataloader/BatchLoaderEnvironment.java +++ b/src/main/java/org/dataloader/BatchLoaderEnvironment.java @@ -19,11 +19,11 @@ @NullMarked public class BatchLoaderEnvironment { - private final Object context; + private final @Nullable Object context; private final Map keyContexts; private final List keyContextsList; - private BatchLoaderEnvironment(Object context, List keyContextsList, Map keyContexts) { + private BatchLoaderEnvironment(@Nullable Object context, List keyContextsList, Map keyContexts) { this.context = context; this.keyContexts = keyContexts; this.keyContextsList = keyContextsList; @@ -33,7 +33,6 @@ private BatchLoaderEnvironment(Object context, List keyContextsList, Map * Returns the overall context object provided by {@link org.dataloader.BatchLoaderContextProvider} * * @param the type you would like the object to be - * * @return a context object or null if there isn't one */ @SuppressWarnings("unchecked") @@ -68,7 +67,7 @@ public static Builder newBatchLoaderEnvironment() { } public static class Builder { - private Object context; + private @Nullable Object context; private Map keyContexts = Collections.emptyMap(); private List keyContextsList = Collections.emptyList(); diff --git a/src/main/java/org/dataloader/BatchLoaderWithContext.java b/src/main/java/org/dataloader/BatchLoaderWithContext.java index eba26e4..fb6ff71 100644 --- a/src/main/java/org/dataloader/BatchLoaderWithContext.java +++ b/src/main/java/org/dataloader/BatchLoaderWithContext.java @@ -2,6 +2,7 @@ import org.dataloader.annotations.PublicSpi; import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; import java.util.List; import java.util.concurrent.CompletionStage; @@ -16,7 +17,7 @@ */ @PublicSpi @NullMarked -public interface BatchLoaderWithContext { +public interface BatchLoaderWithContext { /** * Called to batch load the provided keys and return a promise to a list of values. This default * version can be given an environment object to that maybe be useful during the call. A typical use case diff --git a/src/main/java/org/dataloader/CacheMap.java b/src/main/java/org/dataloader/CacheMap.java index 54b1b49..3e6d895 100644 --- a/src/main/java/org/dataloader/CacheMap.java +++ b/src/main/java/org/dataloader/CacheMap.java @@ -28,7 +28,13 @@ * CacheMap is used by data loaders that use caching promises to values aka {@link CompletableFuture}<V>. A better name for this * class might have been FutureCache but that is history now. *

- * The default implementation used by the data loader is based on a {@link java.util.LinkedHashMap}. + * The default implementation used by the data loader is based on a {@link java.util.concurrent.ConcurrentHashMap} because + * the data loader code requires the cache to prove atomic writes especially the {@link #putIfAbsentAtomically(Object, CompletableFuture)} + * method. + *

+ * The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your + * own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is + * returned always. *

* This is really a cache of completed {@link CompletableFuture}<V> values in memory. It is used, when caching is enabled, to * give back the same future to any code that may call it. If you need a cache of the underlying values that is possible external to the JVM @@ -42,7 +48,7 @@ */ @PublicSpi @NullMarked -public interface CacheMap { +public interface CacheMap { /** * Creates a new cache map, using the default implementation that is based on a {@link java.util.LinkedHashMap}. @@ -74,23 +80,31 @@ static CacheMap simpleMap() { * * @return the cached value, or {@code null} if not found (depends on cache implementation) */ - @Nullable CompletableFuture get(K key); + @Nullable CompletableFuture get(K key); /** * Gets a collection of CompletableFutures from the cache map. + * * @return the collection of cached values */ Collection> getAll(); /** - * Creates a new cache map entry with the specified key and value, or updates the value if the key already exists. + * Atomically creates a new cache map entry with the specified key and value, or updates the value if the key already exists. + *

+ * The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your + * own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is + * returned always. + *

+ * The default implementation of this method uses {@link java.util.concurrent.ConcurrentHashMap} has its implementation so these CAS + * writes work as expected. * * @param key the key to cache * @param value the value to cache * - * @return the cache map for fluent coding + * @return atomically the previous value for the key or null if the value is not present. */ - CacheMap set(K key, CompletableFuture value); + @Nullable CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value); /** * Deletes the entry with the specified key from the cache map, if it exists. @@ -107,4 +121,13 @@ static CacheMap simpleMap() { * @return the cache map for fluent coding */ CacheMap clear(); + + /** + * Returns the current size of the cache. This is not used by DataLoader directly + * and intended for testing and debugging. + * If a cache doesn't support it, it can throw an Exception. + * + * @return the size of the cache + */ + int size(); } diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index 7a50619..125efe1 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -63,12 +63,13 @@ * * @param type parameter indicating the type of the data load keys * @param type parameter indicating the type of the data that is returned + * * @author Arnold Schrijver * @author Brad Baker */ @PublicApi @NullMarked -public class DataLoader { +public class DataLoader { private final @Nullable String name; private final DataLoaderHelper helper; @@ -93,7 +94,6 @@ public class DataLoader { this.batchLoadFunction = nonNull(batchLoadFunction); this.options = loaderOptions; this.name = name; - this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.valueCache, this.stats, clock); } @@ -133,6 +133,7 @@ public Object getBatchLoadFunction() { * This allows you to change the current {@link DataLoader} and turn it into a new one * * @param builderConsumer the {@link DataLoaderFactory.Builder} consumer for changing the {@link DataLoader} + * * @return a newly built {@link DataLoader} instance */ public DataLoader transform(Consumer> builderConsumer) { @@ -159,20 +160,6 @@ public Duration getTimeSinceDispatch() { return Duration.between(helper.getLastDispatchTime(), helper.now()); } - /** - * Requests to load the data with the specified key asynchronously, and returns a future of the resulting value. - *

- * If batching is enabled (the default), you'll have to call {@link DataLoader#dispatch()} at a later stage to - * start batch execution. If you forget this call the future will never be completed (unless already completed, - * and returned from cache). - * - * @param key the key to load - * @return the future of the value - */ - public CompletableFuture load(K key) { - return load(key, null); - } - /** * This will return an optional promise to a value previously loaded via a {@link #load(Object)} call or empty if not call has been made for that key. *

@@ -184,6 +171,7 @@ public CompletableFuture load(K key) { * NOTE : This will NOT cause a data load to happen. You must call {@link #load(Object)} for that to happen. * * @param key the key to check + * * @return an Optional to the future of the value */ public Optional> getIfPresent(K key) { @@ -202,6 +190,7 @@ public Optional> getIfPresent(K key) { * NOTE : This will NOT cause a data load to happen. You must call {@link #load(Object)} for that to happen. * * @param key the key to check + * * @return an Optional to the future of the value */ public Optional> getIfCompleted(K key) { @@ -209,6 +198,24 @@ public Optional> getIfCompleted(K key) { } + private CompletableFuture loadImpl(@NonNull K key, @Nullable Object keyContext) { + return helper.load(nonNull(key), keyContext); + } + + /** + * Requests to load the data with the specified key asynchronously, and returns a future of the resulting value. + *

+ * If batching is enabled (the default), you'll have to call {@link DataLoader#dispatch()} at a later stage to + * start batch execution. If you forget this call the future will never be completed (unless already completed, + * and returned from cache). + * + * @param key the key to load + * @return the future of the value + */ + public CompletableFuture load(K key) { + return loadImpl(key, null); + } + /** * Requests to load the data with the specified key asynchronously, and returns a future of the resulting value. *

@@ -221,10 +228,11 @@ public Optional> getIfCompleted(K key) { * * @param key the key to load * @param keyContext a context object that is specific to this key + * * @return the future of the value */ public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { - return helper.load(nonNull(key), keyContext); + return loadImpl(key, keyContext); } /** @@ -236,6 +244,7 @@ public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { * and returned from cache). * * @param keys the list of keys to load + * * @return the composite future of the list of values */ public CompletableFuture> loadMany(List keys) { @@ -255,24 +264,23 @@ public CompletableFuture> loadMany(List keys) { * * @param keys the list of keys to load * @param keyContexts the list of key calling context objects + * * @return the composite future of the list of values */ public CompletableFuture> loadMany(List keys, List keyContexts) { nonNull(keys); nonNull(keyContexts); - synchronized (this) { - List> collect = new ArrayList<>(keys.size()); - for (int i = 0; i < keys.size(); i++) { - K key = keys.get(i); - Object keyContext = null; - if (i < keyContexts.size()) { - keyContext = keyContexts.get(i); - } - collect.add(load(key, keyContext)); + List> collect = new ArrayList<>(keys.size()); + for (int i = 0; i < keys.size(); i++) { + K key = keys.get(i); + Object keyContext = null; + if (i < keyContexts.size()) { + keyContext = keyContexts.get(i); } - return CompletableFutureKit.allOf(collect); + collect.add(loadImpl(key, keyContext)); } + return CompletableFutureKit.allOf(collect); } /** @@ -287,20 +295,19 @@ public CompletableFuture> loadMany(List keys, List keyContext * {@link org.dataloader.MappedBatchLoaderWithContext} to help retrieve data. * * @param keysAndContexts the map of keys to their respective contexts + * * @return the composite future of the map of keys and values */ public CompletableFuture> loadMany(Map keysAndContexts) { nonNull(keysAndContexts); - synchronized (this) { - Map> collect = new HashMap<>(keysAndContexts.size()); - for (Map.Entry entry : keysAndContexts.entrySet()) { - K key = entry.getKey(); - Object keyContext = entry.getValue(); - collect.put(key, load(key, keyContext)); - } - return CompletableFutureKit.allOf(collect); + Map> collect = new HashMap<>(keysAndContexts.size()); + for (Map.Entry entry : keysAndContexts.entrySet()) { + K key = entry.getKey(); + Object keyContext = entry.getValue(); + collect.put(key, loadImpl(key, keyContext)); } + return CompletableFutureKit.allOf(collect); } /** @@ -359,6 +366,7 @@ public int dispatchDepth() { * on the next load request. * * @param key the key to remove + * * @return the data loader for fluent coding */ public DataLoader clear(K key) { @@ -372,14 +380,13 @@ public DataLoader clear(K key) { * * @param key the key to remove * @param handler a handler that will be called after the async remote clear completes + * * @return the data loader for fluent coding */ public DataLoader clear(K key, BiConsumer handler) { Object cacheKey = getCacheKey(key); - synchronized (this) { - futureCache.delete(cacheKey); - valueCache.delete(key).whenComplete(handler); - } + futureCache.delete(cacheKey); + valueCache.delete(key).whenComplete(handler); return this; } @@ -397,13 +404,12 @@ public DataLoader clearAll() { * Clears the entire cache map of the loader, and of the cached value store. * * @param handler a handler that will be called after the async remote clear all completes + * * @return the data loader for fluent coding */ public DataLoader clearAll(BiConsumer handler) { - synchronized (this) { - futureCache.clear(); - valueCache.clear().whenComplete(handler); - } + futureCache.clear(); + valueCache.clear().whenComplete(handler); return this; } @@ -414,6 +420,7 @@ public DataLoader clearAll(BiConsumer handler) { * * @param key the key * @param value the value + * * @return the data loader for fluent coding */ public DataLoader prime(K key, V value) { @@ -425,6 +432,7 @@ public DataLoader prime(K key, V value) { * * @param key the key * @param error the exception to prime instead of a value + * * @return the data loader for fluent coding */ public DataLoader prime(K key, Exception error) { @@ -438,15 +446,12 @@ public DataLoader prime(K key, Exception error) { * * @param key the key * @param value the value + * * @return the data loader for fluent coding */ public DataLoader prime(K key, CompletableFuture value) { Object cacheKey = getCacheKey(key); - synchronized (this) { - if (!futureCache.containsKey(cacheKey)) { - futureCache.set(cacheKey, value); - } - } + futureCache.putIfAbsentAtomically(cacheKey, value); return this; } @@ -457,6 +462,7 @@ public DataLoader prime(K key, CompletableFuture value) { * If no cache key function is present in {@link DataLoaderOptions}, then the returned value equals the input key. * * @param key the input key + * * @return the cache key after the input is transformed with the cache key function */ public Object getCacheKey(K key) { @@ -495,8 +501,8 @@ public ValueCache getValueCache() { @Override public String toString() { return "DataLoader{" + - "name='" + name + '\'' + - ", stats=" + stats + - '}'; + "name='" + name + '\'' + + ", stats=" + stats + + '}'; } } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 7858780..feb6184 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -1,6 +1,5 @@ package org.dataloader; -import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; import org.dataloader.instrumentation.DataLoaderInstrumentation; @@ -13,11 +12,13 @@ import org.dataloader.stats.context.IncrementCacheHitCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; +import org.jspecify.annotations.Nullable; import org.reactivestreams.Subscriber; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; @@ -48,23 +49,27 @@ @Internal class DataLoaderHelper { - static class LoaderQueueEntry { + private static class LoaderQueueEntry { final K key; - final V value; + final CompletableFuture value; final Object callContext; + final LoaderQueueEntry prev; + final int queueSize; - public LoaderQueueEntry(K key, V value, Object callContext) { + public LoaderQueueEntry(K key, CompletableFuture value, Object callContext, LoaderQueueEntry prev, int queueSize) { this.key = key; this.value = value; this.callContext = callContext; + this.prev = prev; + this.queueSize = queueSize; } K getKey() { return key; } - V getValue() { + CompletableFuture getValue() { return value; } @@ -78,7 +83,8 @@ Object getCallContext() { private final DataLoaderOptions loaderOptions; private final CacheMap futureCache; private final ValueCache valueCache; - private final List>> loaderQueue; + // private final List>> loaderQueue; + private final AtomicReference<@Nullable LoaderQueueEntry> loaderQueue = new AtomicReference<>(); private final StatisticsCollector stats; private final Clock clock; private final AtomicReference lastDispatchTime; @@ -95,7 +101,6 @@ Object getCallContext() { this.loaderOptions = loaderOptions; this.futureCache = futureCache; this.valueCache = valueCache; - this.loaderQueue = new ArrayList<>(); this.stats = stats; this.clock = clock; this.lastDispatchTime = new AtomicReference<>(); @@ -111,31 +116,27 @@ public Instant getLastDispatchTime() { } Optional> getIfPresent(K key) { - synchronized (dataLoader) { - boolean cachingEnabled = loaderOptions.cachingEnabled(); - if (cachingEnabled) { - Object cacheKey = getCacheKey(nonNull(key)); - try { - CompletableFuture cacheValue = futureCache.get(cacheKey); - if (cacheValue != null) { - stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key)); - return Optional.of(cacheValue); - } - } catch (Exception ignored) { + boolean cachingEnabled = loaderOptions.cachingEnabled(); + if (cachingEnabled) { + Object cacheKey = getCacheKey(nonNull(key)); + try { + CompletableFuture cacheValue = futureCache.get(cacheKey); + if (cacheValue != null) { + stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key)); + return Optional.of(cacheValue); } + } catch (Exception ignored) { } } return Optional.empty(); } Optional> getIfCompleted(K key) { - synchronized (dataLoader) { - Optional> cachedPromise = getIfPresent(key); - if (cachedPromise.isPresent()) { - CompletableFuture promise = cachedPromise.get(); - if (promise.isDone()) { - return cachedPromise; - } + Optional> cachedPromise = getIfPresent(key); + if (cachedPromise.isPresent()) { + CompletableFuture promise = cachedPromise.get(); + if (promise.isDone()) { + return cachedPromise; } } return Optional.empty(); @@ -143,21 +144,68 @@ Optional> getIfCompleted(K key) { CompletableFuture load(K key, Object loadContext) { - synchronized (dataLoader) { - boolean batchingEnabled = loaderOptions.batchingEnabled(); - boolean cachingEnabled = loaderOptions.cachingEnabled(); - - stats.incrementLoadCount(new IncrementLoadCountStatisticsContext<>(key, loadContext)); - DataLoaderInstrumentationContext ctx = ctxOrNoopCtx(instrumentation().beginLoad(dataLoader, key,loadContext)); - CompletableFuture cf; - if (cachingEnabled) { - cf = loadFromCache(key, loadContext, batchingEnabled); - } else { - cf = queueOrInvokeLoader(key, loadContext, batchingEnabled, false); + boolean batchingEnabled = loaderOptions.batchingEnabled(); + boolean futureCachingEnabled = loaderOptions.cachingEnabled(); + + stats.incrementLoadCount(new IncrementLoadCountStatisticsContext<>(key, loadContext)); + DataLoaderInstrumentationContext ctx = ctxOrNoopCtx(instrumentation().beginLoad(dataLoader, key, loadContext)); + Object cacheKey = null; + if (futureCachingEnabled) { + cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext); + try { + CompletableFuture cachedFuture = futureCache.get(cacheKey); + if (cachedFuture != null) { + // We already have a promise for this key, no need to check value cache or queue this load + return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture); + } + } catch (Exception ignored) { + } + } + CompletableFuture loadCallFuture; + if (batchingEnabled) { + loadCallFuture = new CompletableFuture<>(); + if (futureCachingEnabled) { + CompletableFuture cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture); + if (cachedFuture != null) { + // another thread was faster and created a matching CF ... hence this is really a cache hit and we are done + return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture); + } + } + addEntryToLoaderQueue(key, loadCallFuture, loadContext); + } else { + stats.incrementBatchLoadCountBy(1, new IncrementBatchLoadCountByStatisticsContext<>(key, loadContext)); + // immediate execution of batch function + loadCallFuture = invokeLoaderImmediately(key, loadContext, true); + if (futureCachingEnabled) { + CompletableFuture cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture); + if (cachedFuture != null) { + // another thread was faster and the loader was invoked twice with the same key + // we are disregarding the result of our dispatch call and use the already cached value + // meaning this is a cache hit and we are done + return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture); + } + } + } + + ctx.onDispatched(); + loadCallFuture.whenComplete(ctx::onCompleted); + return loadCallFuture; + } + + private CompletableFuture incrementCacheHitAndReturnCF(DataLoaderInstrumentationContext ctx, K key, Object loadContext, CompletableFuture cachedFuture) { + stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); + ctx.onDispatched(); + cachedFuture.whenComplete(ctx::onCompleted); + return cachedFuture; + } + + private void addEntryToLoaderQueue(K key, CompletableFuture future, Object loadContext) { + while (true) { + LoaderQueueEntry prev = loaderQueue.get(); + LoaderQueueEntry curr = new LoaderQueueEntry<>(key, future, loadContext, prev, prev != null ? prev.queueSize + 1 : 1); + if (loaderQueue.compareAndSet(prev, curr)) { + return; } - ctx.onDispatched(); - cf.whenComplete(ctx::onCompleted); - return cf; } } @@ -173,38 +221,49 @@ Object getCacheKeyWithContext(K key, Object context) { loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key; } + @SuppressWarnings("unchecked") DispatchResult dispatch() { DataLoaderInstrumentationContext> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader)); boolean batchingEnabled = loaderOptions.batchingEnabled(); - final List keys; - final List callContexts; - final List> queuedFutures; - synchronized (dataLoader) { - int queueSize = loaderQueue.size(); - if (queueSize == 0) { - lastDispatchTime.set(now()); - instrCtx.onDispatched(); - return endDispatchCtx(instrCtx, emptyDispatchResult()); - } - // we copy the pre-loaded set of futures ready for dispatch - keys = new ArrayList<>(queueSize); - callContexts = new ArrayList<>(queueSize); - queuedFutures = new ArrayList<>(queueSize); - - loaderQueue.forEach(entry -> { - keys.add(entry.getKey()); - queuedFutures.add(entry.getValue()); - callContexts.add(entry.getCallContext()); - }); - loaderQueue.clear(); + LoaderQueueEntry loaderQueueEntryHead; + while (true) { + loaderQueueEntryHead = loaderQueue.get(); + if (loaderQueue.compareAndSet(loaderQueueEntryHead, null)) { + // one or more threads competed and this one won. This thread holds + // the loader queue root in the local variable loaderQueueEntryHead + break; + } + } + if (loaderQueueEntryHead == null) { lastDispatchTime.set(now()); + instrCtx.onDispatched(); + return endDispatchCtx(instrCtx, emptyDispatchResult()); + } + int queueSize = loaderQueueEntryHead.queueSize; + // we copy the pre-loaded set of futures ready for dispatch + Object[] keysArray = new Object[queueSize]; + CompletableFuture[] queuedFuturesArray = new CompletableFuture[queueSize]; + Object[] callContextsArray = new Object[queueSize]; + int index = queueSize - 1; + while (loaderQueueEntryHead != null) { + keysArray[index] = loaderQueueEntryHead.getKey(); + queuedFuturesArray[index] = loaderQueueEntryHead.getValue(); + callContextsArray[index] = loaderQueueEntryHead.getCallContext(); + loaderQueueEntryHead = loaderQueueEntryHead.prev; + index--; } + final List keys = (List) Arrays.asList(keysArray); + final List> queuedFutures = Arrays.asList(queuedFuturesArray); + final List callContexts = Arrays.asList(callContextsArray); + + lastDispatchTime.set(now()); if (!batchingEnabled) { instrCtx.onDispatched(); return endDispatchCtx(instrCtx, emptyDispatchResult()); } + final int totalEntriesHandled = keys.size(); // // order of keys -> values matter in data loader hence the use of linked hash map @@ -334,38 +393,6 @@ private void possiblyClearCacheEntriesOnExceptions(List keys) { } } - @GuardedBy("dataLoader") - private CompletableFuture loadFromCache(K key, Object loadContext, boolean batchingEnabled) { - final Object cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext); - - try { - CompletableFuture cacheValue = futureCache.get(cacheKey); - if (cacheValue != null) { - // We already have a promise for this key, no need to check value cache or queue up load - stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); - return cacheValue; - } - } catch (Exception ignored) { - } - - CompletableFuture loadCallFuture = queueOrInvokeLoader(key, loadContext, batchingEnabled, true); - futureCache.set(cacheKey, loadCallFuture); - return loadCallFuture; - } - - @GuardedBy("dataLoader") - private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, boolean cachingEnabled) { - if (batchingEnabled) { - CompletableFuture loadCallFuture = new CompletableFuture<>(); - loaderQueue.add(new LoaderQueueEntry<>(key, loadCallFuture, loadContext)); - return loadCallFuture; - } else { - stats.incrementBatchLoadCountBy(1, new IncrementBatchLoadCountByStatisticsContext<>(key, loadContext)); - // immediate execution of batch function - return invokeLoaderImmediately(key, loadContext, cachingEnabled); - } - } - CompletableFuture invokeLoaderImmediately(K key, Object keyContext, boolean cachingEnabled) { List keys = singletonList(key); List keyContexts = singletonList(keyContext); @@ -594,11 +621,11 @@ private boolean isMapLoader() { } private boolean isPublisher() { - return batchLoadFunction instanceof BatchPublisher; + return batchLoadFunction instanceof BatchPublisher || batchLoadFunction instanceof BatchPublisherWithContext; } private boolean isMappedPublisher() { - return batchLoadFunction instanceof MappedBatchPublisher; + return batchLoadFunction instanceof MappedBatchPublisher || batchLoadFunction instanceof MappedBatchPublisherWithContext; } private DataLoaderInstrumentation instrumentation() { @@ -606,11 +633,15 @@ private DataLoaderInstrumentation instrumentation() { } int dispatchDepth() { - synchronized (dataLoader) { - return loaderQueue.size(); + LoaderQueueEntry loaderQueueEntry = loaderQueue.get(); + if (loaderQueueEntry != null) { + return loaderQueueEntry.queueSize; + } else { + return 0; } } + private final List> NOT_SUPPORTED_LIST = emptyList(); private final CompletableFuture>> NOT_SUPPORTED = CompletableFuture.completedFuture(NOT_SUPPORTED_LIST); private final Try ALWAYS_FAILED = Try.alwaysFailed(); diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 0988697..6bc79f6 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -1,6 +1,7 @@ package org.dataloader; import org.dataloader.annotations.PublicApi; +import org.dataloader.impl.Assertions; import org.dataloader.instrumentation.ChainedDataLoaderInstrumentation; import org.dataloader.instrumentation.DataLoaderInstrumentation; import org.dataloader.instrumentation.DataLoaderInstrumentationHelper; @@ -14,6 +15,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -141,8 +143,7 @@ private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, * @return this registry */ public DataLoaderRegistry register(DataLoader dataLoader) { - String name = dataLoader.getName(); - assertState(name != null, () -> "The DataLoader must have a non null name"); + String name = Assertions.nonNull(dataLoader.getName(), () -> "The DataLoader must have a non null name"); dataLoaders.put(name, nameAndInstrumentDL(name, instrumentation, dataLoader)); return this; } @@ -176,7 +177,7 @@ public DataLoaderRegistry register(String key, DataLoader dataLoader) { */ public DataLoader registerAndGet(String key, DataLoader dataLoader) { dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader)); - return getDataLoader(key); + return Objects.requireNonNull(getDataLoader(key)); } /** @@ -251,10 +252,10 @@ public DataLoaderRegistry unregister(String key) { * @param key the key of the data loader * @param the type of keys * @param the type of values - * @return a data loader or null if its not present + * @return a data loader or null if it's not present */ @SuppressWarnings("unchecked") - public DataLoader getDataLoader(String key) { + public @Nullable DataLoader getDataLoader(String key) { return (DataLoader) dataLoaders.get(key); } diff --git a/src/main/java/org/dataloader/DelegatingDataLoader.java b/src/main/java/org/dataloader/DelegatingDataLoader.java index 7cffced..0175059 100644 --- a/src/main/java/org/dataloader/DelegatingDataLoader.java +++ b/src/main/java/org/dataloader/DelegatingDataLoader.java @@ -9,6 +9,7 @@ import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @@ -66,19 +67,31 @@ public DataLoader getDelegate() { return delegate; } - /** - * The {@link DataLoader#load(Object)} and {@link DataLoader#loadMany(List)} type methods all call back - * to the {@link DataLoader#load(Object, Object)} and hence we don't override them. - * - * @param key the key to load - * @param keyContext a context object that is specific to this key - * @return the future of the value - */ + @Override + public CompletableFuture load(K key) { + return delegate.load(key); + } + @Override public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { return delegate.load(key, keyContext); } + @Override + public CompletableFuture> loadMany(List keys) { + return delegate.loadMany(keys); + } + + @Override + public CompletableFuture> loadMany(List keys, List keyContexts) { + return delegate.loadMany(keys, keyContexts); + } + + @Override + public CompletableFuture> loadMany(Map keysAndContexts) { + return delegate.loadMany(keysAndContexts); + } + @Override public DataLoader transform(Consumer> builderConsumer) { return delegate.transform(builderConsumer); diff --git a/src/main/java/org/dataloader/MappedBatchLoader.java b/src/main/java/org/dataloader/MappedBatchLoader.java index 1ad4c79..179d6a2 100644 --- a/src/main/java/org/dataloader/MappedBatchLoader.java +++ b/src/main/java/org/dataloader/MappedBatchLoader.java @@ -18,6 +18,7 @@ import org.dataloader.annotations.PublicSpi; import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; import java.util.Map; import java.util.Set; @@ -59,7 +60,7 @@ */ @PublicSpi @NullMarked -public interface MappedBatchLoader { +public interface MappedBatchLoader { /** * Called to batch load the provided keys and return a promise to a map of values. diff --git a/src/main/java/org/dataloader/MappedBatchLoaderWithContext.java b/src/main/java/org/dataloader/MappedBatchLoaderWithContext.java index 9559260..9f342d4 100644 --- a/src/main/java/org/dataloader/MappedBatchLoaderWithContext.java +++ b/src/main/java/org/dataloader/MappedBatchLoaderWithContext.java @@ -18,6 +18,7 @@ import org.dataloader.annotations.PublicSpi; import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; import java.util.Map; import java.util.Set; @@ -33,7 +34,7 @@ */ @PublicSpi @NullMarked -public interface MappedBatchLoaderWithContext { +public interface MappedBatchLoaderWithContext { /** * Called to batch load the provided keys and return a promise to a map of values. * diff --git a/src/main/java/org/dataloader/MappedBatchPublisher.java b/src/main/java/org/dataloader/MappedBatchPublisher.java index 493401f..6637157 100644 --- a/src/main/java/org/dataloader/MappedBatchPublisher.java +++ b/src/main/java/org/dataloader/MappedBatchPublisher.java @@ -2,6 +2,7 @@ import org.dataloader.annotations.PublicSpi; import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; import org.reactivestreams.Subscriber; import java.util.Map; @@ -20,7 +21,7 @@ */ @PublicSpi @NullMarked -public interface MappedBatchPublisher { +public interface MappedBatchPublisher { /** * Called to batch the provided keys into a stream of map entries of keys and values. *

diff --git a/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java b/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java index 7b862ca..dd8b5f9 100644 --- a/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java +++ b/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java @@ -2,6 +2,7 @@ import org.dataloader.annotations.PublicSpi; import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; import org.reactivestreams.Subscriber; import java.util.List; @@ -17,7 +18,7 @@ */ @PublicSpi @NullMarked -public interface MappedBatchPublisherWithContext { +public interface MappedBatchPublisherWithContext { /** * Called to batch the provided keys into a stream of map entries of keys and values. diff --git a/src/main/java/org/dataloader/ValueCache.java b/src/main/java/org/dataloader/ValueCache.java index 80c8402..b06fdb8 100644 --- a/src/main/java/org/dataloader/ValueCache.java +++ b/src/main/java/org/dataloader/ValueCache.java @@ -3,6 +3,7 @@ import org.dataloader.annotations.PublicSpi; import org.dataloader.impl.CompletableFutureKit; import org.dataloader.impl.NoOpValueCache; +import org.jspecify.annotations.Nullable; import org.jspecify.annotations.NullMarked; import java.util.ArrayList; @@ -40,7 +41,7 @@ */ @PublicSpi @NullMarked -public interface ValueCache { +public interface ValueCache { /** * Creates a new value cache, using the default no-op implementation. diff --git a/src/main/java/org/dataloader/impl/DefaultCacheMap.java b/src/main/java/org/dataloader/impl/DefaultCacheMap.java index fa89bb0..e8db681 100644 --- a/src/main/java/org/dataloader/impl/DefaultCacheMap.java +++ b/src/main/java/org/dataloader/impl/DefaultCacheMap.java @@ -18,11 +18,12 @@ import org.dataloader.CacheMap; import org.dataloader.annotations.Internal; +import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; /** * Default implementation of {@link CacheMap} that is based on a regular {@link java.util.HashMap}. @@ -33,15 +34,16 @@ * @author Arnold Schrijver */ @Internal +@NullMarked public class DefaultCacheMap implements CacheMap { - private final Map> cache; + private final ConcurrentHashMap> cache; /** * Default constructor */ public DefaultCacheMap() { - cache = new HashMap<>(); + cache = new ConcurrentHashMap<>(); } /** @@ -57,7 +59,7 @@ public boolean containsKey(K key) { * {@inheritDoc} */ @Override - public CompletableFuture get(K key) { + public @Nullable CompletableFuture get(K key) { return cache.get(key); } @@ -73,9 +75,8 @@ public Collection> getAll() { * {@inheritDoc} */ @Override - public CacheMap set(K key, CompletableFuture value) { - cache.put(key, value); - return this; + public @Nullable CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value) { + return cache.putIfAbsent(key, value); } /** @@ -95,4 +96,9 @@ public CacheMap clear() { cache.clear(); return this; } + + @Override + public int size() { + return cache.size(); + } } diff --git a/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java index c2f5438..76c94b3 100644 --- a/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java +++ b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java @@ -10,6 +10,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static org.dataloader.impl.Assertions.assertState; @@ -25,6 +27,7 @@ abstract class AbstractBatchSubscriber implements Subscriber { final List callContexts; final List> queuedFutures; final ReactiveSupport.HelperIntegration helperIntegration; + final Lock lock = new ReentrantLock(); List clearCacheKeys = new ArrayList<>(); List completedValues = new ArrayList<>(); diff --git a/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java index d0b8110..74a500a 100644 --- a/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java +++ b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java @@ -29,58 +29,74 @@ class BatchSubscriberImpl extends AbstractBatchSubscriber { super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); } - // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee + // onNext may be called by multiple threads - for the time being, we use a ReentrantLock to guarantee // correctness (at the cost of speed). @Override - public synchronized void onNext(V value) { - super.onNext(value); + public void onNext(V value) { + try { + lock.lock(); - if (idx >= keys.size()) { - // hang on they have given us more values than we asked for in keys - // we cant handle this - return; - } - K key = keys.get(idx); - Object callContext = callContexts.get(idx); - CompletableFuture future = queuedFutures.get(idx); - onNextValue(key, value, callContext, List.of(future)); + super.onNext(value); + + if (idx >= keys.size()) { + // hang on they have given us more values than we asked for in keys + // we cant handle this + return; + } + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture future = queuedFutures.get(idx); + onNextValue(key, value, callContext, List.of(future)); - completedValues.add(value); - idx++; + completedValues.add(value); + idx++; + } finally { + lock.unlock(); + } } @Override - public synchronized void onComplete() { - super.onComplete(); - if (keys.size() != completedValues.size()) { - // we have more or less values than promised - // we will go through all the outstanding promises and mark those that - // have not finished as failed - for (CompletableFuture queuedFuture : queuedFutures) { - if (!queuedFuture.isDone()) { - queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list")); + public void onComplete() { + try { + lock.lock(); + super.onComplete(); + if (keys.size() != completedValues.size()) { + // we have more or less values than promised + // we will go through all the outstanding promises and mark those that + // have not finished as failed + for (CompletableFuture queuedFuture : queuedFutures) { + if (!queuedFuture.isDone()) { + queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list")); + } } } + possiblyClearCacheEntriesOnExceptions(); + valuesFuture.complete(completedValues); + } finally { + lock.unlock(); } - possiblyClearCacheEntriesOnExceptions(); - valuesFuture.complete(completedValues); } @Override - public synchronized void onError(Throwable ex) { - super.onError(ex); - ex = unwrapThrowable(ex); - // Set the remaining keys to the exception. - for (int i = idx; i < queuedFutures.size(); i++) { - K key = keys.get(i); - CompletableFuture future = queuedFutures.get(i); - if (!future.isDone()) { - future.completeExceptionally(ex); - // clear any cached view of this key because it failed - helperIntegration.clearCacheView(key); + public void onError(Throwable ex) { + try { + lock.lock(); + super.onError(ex); + ex = unwrapThrowable(ex); + // Set the remaining keys to the exception. + for (int i = idx; i < queuedFutures.size(); i++) { + K key = keys.get(i); + CompletableFuture future = queuedFutures.get(i); + if (!future.isDone()) { + future.completeExceptionally(ex); + // clear any cached view of this key because it failed + helperIntegration.clearCacheView(key); + } } + valuesFuture.completeExceptionally(ex); + } finally { + lock.unlock(); } - valuesFuture.completeExceptionally(ex); } } diff --git a/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java index d56efa0..3c937b0 100644 --- a/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java +++ b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java @@ -43,61 +43,77 @@ class MappedBatchSubscriberImpl extends AbstractBatchSubscriber entry) { - super.onNext(entry); - K key = entry.getKey(); - V value = entry.getValue(); + public void onNext(Map.Entry entry) { + try { + lock.lock(); + super.onNext(entry); + K key = entry.getKey(); + V value = entry.getValue(); - Object callContext = callContextByKey.get(key); - List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + Object callContext = callContextByKey.get(key); + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); - onNextValue(key, value, callContext, futures); + onNextValue(key, value, callContext, futures); - // did we have an actual key for this value - ignore it if they send us one outside the key set - if (!futures.isEmpty()) { - completedValuesByKey.put(key, value); + // did we have an actual key for this value - ignore it if they send us one outside the key set + if (!futures.isEmpty()) { + completedValuesByKey.put(key, value); + } + } finally { + lock.unlock(); } + } @Override - public synchronized void onComplete() { - super.onComplete(); + public void onComplete() { + try { + lock.lock(); + super.onComplete(); - possiblyClearCacheEntriesOnExceptions(); - List values = new ArrayList<>(keys.size()); - for (K key : keys) { - V value = completedValuesByKey.get(key); - values.add(value); + possiblyClearCacheEntriesOnExceptions(); + List values = new ArrayList<>(keys.size()); + for (K key : keys) { + V value = completedValuesByKey.get(key); + values.add(value); - List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); - for (CompletableFuture future : futures) { - if (!future.isDone()) { - // we have a future that never came back for that key - // but the publisher is done sending in data - it must be null - // e.g. for key X when found no value - future.complete(null); + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + for (CompletableFuture future : futures) { + if (!future.isDone()) { + // we have a future that never came back for that key + // but the publisher is done sending in data - it must be null + // e.g. for key X when found no value + future.complete(null); + } } } + valuesFuture.complete(values); + } finally { + lock.unlock(); } - valuesFuture.complete(values); } @Override - public synchronized void onError(Throwable ex) { - super.onError(ex); - ex = unwrapThrowable(ex); - // Complete the futures for the remaining keys with the exception. - for (int idx = 0; idx < queuedFutures.size(); idx++) { - K key = keys.get(idx); - List> futures = queuedFuturesByKey.get(key); - if (!completedValuesByKey.containsKey(key)) { - for (CompletableFuture future : futures) { - future.completeExceptionally(ex); + public void onError(Throwable ex) { + try { + lock.lock(); + super.onError(ex); + ex = unwrapThrowable(ex); + // Complete the futures for the remaining keys with the exception. + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + List> futures = queuedFuturesByKey.get(key); + if (!completedValuesByKey.containsKey(key)) { + for (CompletableFuture future : futures) { + future.completeExceptionally(ex); + } + // clear any cached view of this key because they all failed + helperIntegration.clearCacheView(key); } - // clear any cached view of this key because they all failed - helperIntegration.clearCacheView(key); } + valuesFuture.completeExceptionally(ex); + } finally { + lock.unlock(); } - valuesFuture.completeExceptionally(ex); } } diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index f391b80..6705cb8 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -265,7 +265,7 @@ public Collection> getAll() { } @Override - public CacheMap set(Object key, CompletableFuture value) { + public CompletableFuture putIfAbsentAtomically(Object key, CompletableFuture value) { return null; } @@ -278,6 +278,11 @@ public CacheMap delete(Object key) { public CacheMap clear() { return null; } + + @Override + public int size() { + return 0; + } } private void customCache() { diff --git a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java index df364a2..d3de4aa 100644 --- a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java +++ b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java @@ -14,6 +14,7 @@ /** * Tests for cacheMap functionality.. */ +@SuppressWarnings("NullableProblems") public class DataLoaderCacheMapTest { private BatchLoader keysAsValues() { @@ -24,12 +25,33 @@ private BatchLoader keysAsValues() { public void should_provide_all_futures_from_cache() { DataLoader dataLoader = newDataLoader(keysAsValues()); - dataLoader.load(1); - dataLoader.load(2); - dataLoader.load(1); + CompletableFuture cf1 = dataLoader.load(1); + CompletableFuture cf2 = dataLoader.load(2); + CompletableFuture cf3 = dataLoader.load(3); + + CacheMap cacheMap = dataLoader.getCacheMap(); + Collection> futures = cacheMap.getAll(); + assertThat(futures.size(), equalTo(3)); + + + assertThat(cacheMap.get(1), equalTo(cf1)); + assertThat(cacheMap.get(2), equalTo(cf2)); + assertThat(cacheMap.get(3), equalTo(cf3)); + assertThat(cacheMap.containsKey(1), equalTo(true)); + assertThat(cacheMap.containsKey(2), equalTo(true)); + assertThat(cacheMap.containsKey(3), equalTo(true)); + assertThat(cacheMap.containsKey(4), equalTo(false)); + + cacheMap.delete(1); + assertThat(cacheMap.containsKey(1), equalTo(false)); + assertThat(cacheMap.containsKey(2), equalTo(true)); + + cacheMap.clear(); + assertThat(cacheMap.containsKey(1), equalTo(false)); + assertThat(cacheMap.containsKey(2), equalTo(false)); + assertThat(cacheMap.containsKey(3), equalTo(false)); + assertThat(cacheMap.containsKey(4), equalTo(false)); - Collection> futures = dataLoader.getCacheMap().getAll(); - assertThat(futures.size(), equalTo(2)); } @Test diff --git a/src/test/java/org/dataloader/DataLoaderRegistryTest.java b/src/test/java/org/dataloader/DataLoaderRegistryTest.java index 270bd50..89624d7 100644 --- a/src/test/java/org/dataloader/DataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/DataLoaderRegistryTest.java @@ -1,6 +1,5 @@ package org.dataloader; -import org.dataloader.impl.DataLoaderAssertionException; import org.dataloader.stats.SimpleStatisticsCollector; import org.dataloader.stats.Statistics; import org.junit.jupiter.api.Assertions; @@ -63,7 +62,7 @@ public void registration_works() { try { registry.register(dlUnnamed); Assertions.fail("Should have thrown an exception"); - } catch (DataLoaderAssertionException ignored) { + } catch (NullPointerException ignored) { } } diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index 224b54d..6ec548a 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -60,6 +61,7 @@ import static org.dataloader.DataLoaderOptions.newDefaultOptions; import static org.dataloader.DataLoaderOptions.newOptions; import static org.dataloader.fixtures.TestKit.areAllDone; +import static org.dataloader.fixtures.TestKit.asSet; import static org.dataloader.fixtures.TestKit.listFrom; import static org.dataloader.impl.CompletableFutureKit.cause; import static org.hamcrest.MatcherAssert.assertThat; @@ -69,6 +71,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; /** @@ -979,7 +982,7 @@ public void should_Accept_a_custom_cache_map_implementation(TestDataLoaderFactor assertThat(future2b.get(), equalTo("b")); assertThat(loadCalls, equalTo(asList(asList("a", "b"), singletonList("c"), singletonList("b")))); - assertArrayEquals(customMap.stash.keySet().toArray(), asList("a", "c", "b").toArray()); + assertEquals(customMap.stash.keySet(), asSet("a", "c", "b")); // Supports clear all @@ -1230,6 +1233,31 @@ public void when_values_size_are_more_then_key_size(TestDataLoaderFactory factor } } + @ParameterizedTest + @MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get") + public void should_Support_loading_values_with_context(TestDataLoaderFactory factory) { + AtomicReference environmentREF = new AtomicReference<>(); + DataLoader identityLoader = factory.idLoaderWithContext(new DataLoaderOptions(), new ArrayList<>(), environmentREF); + + identityLoader.load(1, "ctx1"); + identityLoader.load(2, "ctx2"); + identityLoader.loadMany(List.of(3, 4), List.of("ctx3", "ctx4")); + + CompletableFuture> cf = identityLoader.dispatch(); + await().atMost(Duration.FIVE_SECONDS).until(cf::isDone); + + assertThat(cf.toCompletableFuture().join(), equalTo(asList(1, 2, 3, 4))); + + Map keyContexts = environmentREF.get().getKeyContexts(); + assertThat(keyContexts, equalTo(Map.of( + 1, "ctx1", + 2, "ctx2", + 3, "ctx3", + 4, "ctx4" + ))); + } + + @Test public void can_call_a_loader_from_a_loader() throws Exception { List> deepLoadCalls = new ArrayList<>(); diff --git a/src/test/java/org/dataloader/DelegatingDataLoaderTest.java b/src/test/java/org/dataloader/DelegatingDataLoaderTest.java index 8849752..0f51b5f 100644 --- a/src/test/java/org/dataloader/DelegatingDataLoaderTest.java +++ b/src/test/java/org/dataloader/DelegatingDataLoaderTest.java @@ -2,19 +2,21 @@ import org.dataloader.fixtures.TestKit; import org.dataloader.fixtures.parameterized.DelegatingDataLoaderFactory; -import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.NullMarked; import org.jspecify.annotations.Nullable; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertNotNull; /** * There are WAY more tests via the {@link DelegatingDataLoaderFactory} @@ -32,14 +34,37 @@ void canUnwrapDataLoaders() { } @Test + @NullMarked void canCreateAClassOk() { DataLoader rawLoader = TestKit.idLoader(); DelegatingDataLoader delegatingDataLoader = new DelegatingDataLoader<>(rawLoader) { - @Override - public CompletableFuture load(@NonNull String key, @Nullable Object keyContext) { - CompletableFuture cf = super.load(key, keyContext); + private CompletableFuture enhance(CompletableFuture cf) { return cf.thenApply(v -> "|" + v + "|"); } + + private CompletableFuture> enhanceList(CompletableFuture> cf) { + return cf.thenApply(v -> v.stream().map(s -> "|" + s + "|").collect(Collectors.toList())); + } + + @Override + public CompletableFuture load(String key, @Nullable Object keyContext) { + return enhance(super.load(key, keyContext)); + } + + @Override + public CompletableFuture load(String key) { + return enhance(super.load(key)); + } + + @Override + public CompletableFuture> loadMany(List keys) { + return enhanceList(super.loadMany(keys)); + } + + @Override + public CompletableFuture> loadMany(List keys, List keyContexts) { + return enhanceList(super.loadMany(keys, keyContexts)); + } }; assertThat(delegatingDataLoader.getDelegate(), is(rawLoader)); @@ -73,8 +98,69 @@ void can_delegate_simple_properties() { DelegatingDataLoader delegate = new DelegatingDataLoader<>(rawLoader); assertNotNull(delegate.getName()); - assertThat(delegate.getName(),equalTo("name")); - assertThat(delegate.getOptions(),equalTo(options)); - assertThat(delegate.getBatchLoadFunction(),equalTo(loadFunction)); + assertThat(delegate.getName(), equalTo("name")); + assertThat(delegate.getOptions(), equalTo(options)); + assertThat(delegate.getBatchLoadFunction(), equalTo(loadFunction)); + } + + @NullMarked + @Test + void can_create_a_delegate_class_that_has_post_side_effects() { + DataLoaderOptions options = DataLoaderOptions.newOptions().build(); + BatchLoader loadFunction = CompletableFuture::completedFuture; + DataLoader rawLoader = DataLoaderFactory.newDataLoader("name", loadFunction, options); + + AtomicInteger loadCalled = new AtomicInteger(0); + AtomicInteger loadManyCalled = new AtomicInteger(0); + AtomicInteger loadManyMapCalled = new AtomicInteger(0); + DelegatingDataLoader delegate = new DelegatingDataLoader<>(rawLoader) { + + @Override + public CompletableFuture load(String key) { + CompletableFuture cf = super.load(key); + loadCalled.incrementAndGet(); + return cf; + } + + @Override + public CompletableFuture load(String key, @Nullable Object keyContext) { + CompletableFuture cf = super.load(key, keyContext); + loadCalled.incrementAndGet(); + return cf; + } + + @Override + public CompletableFuture> loadMany(List keys, List keyContexts) { + CompletableFuture> cf = super.loadMany(keys, keyContexts); + loadManyCalled.incrementAndGet(); + return cf; + } + + @Override + public CompletableFuture> loadMany(List keys) { + CompletableFuture> cf = super.loadMany(keys); + loadManyCalled.incrementAndGet(); + return cf; + } + + @Override + public CompletableFuture> loadMany(Map keysAndContexts) { + CompletableFuture> cf = super.loadMany(keysAndContexts); + loadManyMapCalled.incrementAndGet(); + return cf; + } + }; + + + delegate.load("L1"); + delegate.load("L2", null); + delegate.loadMany(List.of("LM1", "LM2"), List.of()); + delegate.loadMany(List.of("LM3")); + delegate.loadMany(Map.of("LMM1", "kc1", "LMM2", "kc2")); + + assertNotNull(delegate.getDelegate()); + assertThat(loadCalled.get(), equalTo(2)); + assertThat(loadManyCalled.get(), equalTo(2)); + assertThat(loadManyMapCalled.get(), equalTo(1)); } } \ No newline at end of file diff --git a/src/test/java/org/dataloader/fixtures/CustomCacheMap.java b/src/test/java/org/dataloader/fixtures/CustomCacheMap.java index 695da5e..6e20f68 100644 --- a/src/test/java/org/dataloader/fixtures/CustomCacheMap.java +++ b/src/test/java/org/dataloader/fixtures/CustomCacheMap.java @@ -3,16 +3,15 @@ import org.dataloader.CacheMap; import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; public class CustomCacheMap implements CacheMap { - public Map> stash; + public ConcurrentHashMap> stash; public CustomCacheMap() { - stash = new LinkedHashMap<>(); + stash = new ConcurrentHashMap<>(); } @Override @@ -31,9 +30,8 @@ public Collection> getAll() { } @Override - public CacheMap set(String key, CompletableFuture value) { - stash.put(key, value); - return this; + public CompletableFuture putIfAbsentAtomically(String key, CompletableFuture value) { + return stash.putIfAbsent(key, value); } @Override @@ -47,4 +45,9 @@ public CacheMap clear() { stash.clear(); return this; } + + @Override + public int size() { + return stash.size(); + } } diff --git a/src/test/java/org/dataloader/fixtures/parameterized/DelegatingDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/DelegatingDataLoaderFactory.java index 0cbd3f3..8d1f815 100644 --- a/src/test/java/org/dataloader/fixtures/parameterized/DelegatingDataLoaderFactory.java +++ b/src/test/java/org/dataloader/fixtures/parameterized/DelegatingDataLoaderFactory.java @@ -1,5 +1,6 @@ package org.dataloader.fixtures.parameterized; +import org.dataloader.BatchLoaderEnvironment; import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.DelegatingDataLoader; @@ -8,6 +9,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; public class DelegatingDataLoaderFactory implements TestDataLoaderFactory { // its delegates all the way down to the turtles @@ -38,6 +40,11 @@ public DataLoader idLoader(DataLoaderOptions options, List DataLoader idLoaderWithContext(DataLoaderOptions options, List> loadCalls, AtomicReference environmentREF) { + return mkDelegateDataLoader(delegateFactory.idLoaderWithContext(options, loadCalls, environmentREF)); + } + @Override public DataLoader idLoaderDelayed(DataLoaderOptions options, List> loadCalls, Duration delay) { return mkDelegateDataLoader(delegateFactory.idLoaderDelayed(options, loadCalls, delay)); diff --git a/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java index 0644d3c..8ec69d7 100644 --- a/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java +++ b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java @@ -1,5 +1,6 @@ package org.dataloader.fixtures.parameterized; +import org.dataloader.BatchLoaderEnvironment; import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.fixtures.TestKit; @@ -9,6 +10,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -23,6 +25,15 @@ public DataLoader idLoader(DataLoaderOptions options, List DataLoader idLoaderWithContext(DataLoaderOptions options, List> loadCalls, AtomicReference environmentREF) { + return newDataLoader((keys, env) -> { + environmentREF.set(env); + loadCalls.add(new ArrayList<>(keys)); + return completedFuture(keys); + }, options); + } + @Override public DataLoader idLoaderDelayed(DataLoaderOptions options, List> loadCalls, Duration delay) { return newDataLoader(keys -> CompletableFuture.supplyAsync(() -> { diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java index e7c47ec..f1c548e 100644 --- a/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java +++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java @@ -1,5 +1,6 @@ package org.dataloader.fixtures.parameterized; +import org.dataloader.BatchLoaderEnvironment; import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.fixtures.TestKit; @@ -11,10 +12,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.dataloader.DataLoaderFactory.newDataLoader; import static org.dataloader.DataLoaderFactory.newMappedDataLoader; import static org.dataloader.fixtures.TestKit.futureError; @@ -31,6 +32,17 @@ public DataLoader idLoader( }, options); } + @Override + public DataLoader idLoaderWithContext(DataLoaderOptions options, List> loadCalls, AtomicReference environmentREF) { + return newMappedDataLoader((keys, environment) -> { + environmentREF.set(environment); + loadCalls.add(new ArrayList<>(keys)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); + return completedFuture(map); + }, options); + } + @Override public DataLoader idLoaderDelayed( DataLoaderOptions options, List> loadCalls, Duration delay) { diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java index fa920cf..3a0f54e 100644 --- a/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java +++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java @@ -1,5 +1,6 @@ package org.dataloader.fixtures.parameterized; +import org.dataloader.BatchLoaderEnvironment; import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.Try; @@ -12,16 +13,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader; import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry; -import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; public class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { @@ -36,6 +34,18 @@ public DataLoader idLoader( }, options); } + @Override + public DataLoader idLoaderWithContext(DataLoaderOptions options, List> loadCalls, AtomicReference environmentREF) { + return newMappedPublisherDataLoader((keys, subscriber, environment) -> { + environmentREF.set(environment); + + loadCalls.add(new ArrayList<>(keys)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); + Flux.fromIterable(map.entrySet()).subscribe(subscriber); + }, options); + } + @Override public DataLoader idLoaderDelayed( DataLoaderOptions options, List> loadCalls, Duration delay) { diff --git a/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java index 2049719..c8e8b67 100644 --- a/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java +++ b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java @@ -1,5 +1,6 @@ package org.dataloader.fixtures.parameterized; +import org.dataloader.BatchLoaderEnvironment; import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.Try; @@ -11,9 +12,9 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; -import static org.dataloader.DataLoaderFactory.newDataLoader; import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry; @@ -28,6 +29,15 @@ public DataLoader idLoader( }, options); } + @Override + public DataLoader idLoaderWithContext(DataLoaderOptions options, List> loadCalls, AtomicReference environmentREF) { + return newPublisherDataLoader((keys, subscriber, environment) -> { + environmentREF.set(environment); + loadCalls.add(new ArrayList<>(keys)); + Flux.fromIterable(keys).subscribe(subscriber); + }, options); + } + @Override public DataLoader idLoaderDelayed(DataLoaderOptions options, List> loadCalls, Duration delay) { return newPublisherDataLoader((keys, subscriber) -> { diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java index 789b136..3c584fd 100644 --- a/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java +++ b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java @@ -1,5 +1,6 @@ package org.dataloader.fixtures.parameterized; +import org.dataloader.BatchLoaderEnvironment; import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; @@ -7,6 +8,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; public interface TestDataLoaderFactory { DataLoader idLoader(DataLoaderOptions options, List> loadCalls); @@ -23,6 +25,11 @@ public interface TestDataLoaderFactory { DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls); + // similar to above but batch loaders with context + + DataLoader idLoaderWithContext(DataLoaderOptions options, List> loadCalls, AtomicReference environmentREF); + + // Convenience methods default DataLoader idLoader(DataLoaderOptions options) { diff --git a/src/test/kotlin/org/dataloader/KotlinExamples.kt b/src/test/kotlin/org/dataloader/KotlinExamples.kt new file mode 100644 index 0000000..f53faf4 --- /dev/null +++ b/src/test/kotlin/org/dataloader/KotlinExamples.kt @@ -0,0 +1,110 @@ +package org.dataloader + +import java.util.concurrent.CompletableFuture +import org.junit.jupiter.api.Test +import reactor.core.publisher.Flux +import java.util.concurrent.CompletableFuture.completedFuture +import org.dataloader.impl.NoOpValueCache + +/** + * Some Kotlin code to prove that are JSpecify annotations work here + * as expected in Kotlin land. We don't intend to ue Kotlin in our tests + * or to deliver Kotlin code in the java + */ +class KotlinExamples { + + @Test + fun `basic kotlin test of non nullable value types`() { + val batchLoadFunction = BatchLoader + { keys -> completedFuture(keys.toList()) } + val dataLoader: DataLoader = + DataLoaderFactory.newDataLoader(batchLoadFunction) + + val cfA = dataLoader.load("A") + val cfB = dataLoader.load("B") + + dataLoader.dispatch() + + assert(cfA.join().equals("A")) + assert(cfB.join().equals("B")) + } + + @Test + fun `basic kotlin test of nullable value types`() { + val batchLoadFunction: BatchLoader = BatchLoader { keys -> completedFuture(keys.toList()) } + val dataLoader: DataLoader = DataLoaderFactory.newDataLoader(batchLoadFunction) + + standardNullableAsserts(dataLoader) + } + + @Test + fun `basic kotlin test of nullable value types in mapped batch loader`() { + val batchLoadFunction = MappedBatchLoader + { keys -> completedFuture(keys.associateBy({ it })) } + + val dataLoader: DataLoader = DataLoaderFactory.newMappedDataLoader(batchLoadFunction) + + standardNullableAsserts(dataLoader) + } + + @Test + fun `basic kotlin test of nullable value types in mapped batch loader with context`() { + val batchLoadFunction = MappedBatchLoaderWithContext + { keys, env -> completedFuture(keys.associateBy({ it })) } + + val dataLoader: DataLoader = DataLoaderFactory.newMappedDataLoader(batchLoadFunction) + + standardNullableAsserts(dataLoader) + } + + @Test + fun `basic kotlin test of nullable value types in mapped batch publisher`() { + val batchLoadFunction = MappedBatchPublisher + { keys, subscriber -> + val map: Map = keys.associateBy({ it }) + Flux.fromIterable(map.entries).subscribe(subscriber); + } + + val dataLoader: DataLoader = DataLoaderFactory.newMappedPublisherDataLoader(batchLoadFunction) + + standardNullableAsserts(dataLoader) + } + + @Test + fun `basic kotlin test of nullable value types in mapped batch publisher with context`() { + val batchLoadFunction = MappedBatchPublisherWithContext + { keys, subscriber, env -> + val map: Map = keys.associateBy({ it }) + Flux.fromIterable(map.entries).subscribe(subscriber); + } + + val dataLoader: DataLoader = DataLoaderFactory.newMappedPublisherDataLoader(batchLoadFunction) + + standardNullableAsserts(dataLoader) + } + + @Test + fun `basic kotlin test of nullable value types in value cache`() { + val valueCache = object : ValueCache by NoOpValueCache() { + override fun get(key: String): CompletableFuture = if (key == "null") + completedFuture(null) + else + completedFuture(key) + } + + assert(valueCache["key"].get() == "key") + assert(valueCache["null"].get() == null) + } + + private fun standardNullableAsserts(dataLoader: DataLoader) { + val cfA = dataLoader.load("A") + val cfB = dataLoader.load("B") + + dataLoader.dispatch() + + assert(cfA.join().equals("A")) + assert(cfB.join().equals("B")) + } + + +} \ No newline at end of file