Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batching): v6 synchronize executions when attempting to remove an entry #2014

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,25 @@ fun <V> CompletableFuture<V>.dispatchIfNeeded(
val dataLoaderRegistry = environment.dataLoaderRegistry as? KotlinDataLoaderRegistry ?: throw MissingKotlinDataLoaderRegistryException()

if (dataLoaderRegistry.dataLoadersInvokedOnDispatch()) {
val cantContinueExecution = when {
when {
environment.graphQlContext.hasKey(ExecutionLevelDispatchedState::class) -> {
environment
.graphQlContext.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
.allExecutionsDispatched(Level(environment.executionStepInfo.path.level))
val cantContinueExecution =
environment
.graphQlContext.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
.allExecutionsDispatched(Level(environment.executionStepInfo.path.level))
if (cantContinueExecution) {
dataLoaderRegistry.dispatchAll()
}
}
environment.graphQlContext.hasKey(SyncExecutionExhaustedState::class) -> {
environment
.graphQlContext.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
.allSyncExecutionsExhausted()
.ifAllSyncExecutionsExhausted {
dataLoaderRegistry.dispatchAll()
}
}
else -> throw MissingInstrumentationStateException()
}

if (cantContinueExecution) {
dataLoaderRegistry.dispatchAll()
}
}
return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,6 @@ class SyncExecutionExhaustedState(
private val totalExecutions: AtomicReference<Int> = AtomicReference(totalOperations)
val executions = ConcurrentHashMap<ExecutionId, ExecutionBatchState>()

/**
* Remove an [ExecutionBatchState] from the state in case operation does not qualify for starting an execution,
* for example:
* - parsing, validation errors
* - persisted query errors
* - an exception during execution was thrown
*/
private fun removeExecution(executionId: ExecutionId) {
if (executions.containsKey(executionId)) {
executions.remove(executionId)
totalExecutions.set(totalExecutions.get() - 1)
}
}

/**
* Create the [ExecutionBatchState] When a specific [ExecutionInput] starts his execution
*
Expand All @@ -84,11 +70,12 @@ class SyncExecutionExhaustedState(
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
if ((result != null && result.errors.size > 0) || t != null) {
if (executions.containsKey(parameters.executionInput.executionId)) {
executions.remove(parameters.executionInput.executionId)
totalExecutions.set(totalExecutions.get() - 1)
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
synchronized(executions) {
executions.remove(parameters.executionInput.executionId)
totalExecutions.set(totalExecutions.get() - 1)
}
ifAllSyncExecutionsExhausted { executionIds ->
onSyncExecutionExhausted(executionIds)
}
}
}
Expand Down Expand Up @@ -147,9 +134,8 @@ class SyncExecutionExhaustedState(
executionState
}

val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
ifAllSyncExecutionsExhausted { executionIds ->
onSyncExecutionExhausted(executionIds)
}
}
override fun onCompleted(result: Any?, t: Throwable?) {
Expand All @@ -158,26 +144,26 @@ class SyncExecutionExhaustedState(
executionState
}

val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
ifAllSyncExecutionsExhausted { executionIds ->
onSyncExecutionExhausted(executionIds)
}
}
}
}

/**
* Provide the information about when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution
* execute a given [predicate] when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution.
* A Synchronous Execution is considered Exhausted when all [DataFetcher]s of all paths were executed up until
* a scalar leaf or a [DataFetcher] that returns a [CompletableFuture]
*/
fun allSyncExecutionsExhausted(): Boolean = synchronized(executions) {
val operationsToExecute = totalExecutions.get()
when {
executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled() -> false
else -> {
executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)
fun ifAllSyncExecutionsExhausted(predicate: (List<ExecutionId>) -> Unit) =
synchronized(executions) {
val operationsToExecute = totalExecutions.get()
if (executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled())
return@synchronized

if (executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)) {
predicate(executions.keys().toList())
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,9 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
fun `Instrumentation should not consider executions that thrown exceptions`() {
val executions = listOf(
ExecutionInput.newExecutionInput("query test1 { astronaut(id: 1) { id name } }").operationName("test1").build(),
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("test2").build(),
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("test3").build(),
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build()
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("test4").build()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
Expand All @@ -633,7 +633,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
val missionStatistics = kotlinDataLoaderRegistry.dataLoadersMap["MissionDataLoader"]?.statistics

assertEquals(1, astronautStatistics?.batchInvokeCount)
assertEquals(2, astronautStatistics?.batchLoadCount)
assertEquals(1, astronautStatistics?.batchLoadCount)

assertEquals(1, missionStatistics?.batchInvokeCount)
assertEquals(1, missionStatistics?.batchLoadCount)
Expand Down
Loading