From a63f7a9af64a3e3862e739d25dcd099be1dc2827 Mon Sep 17 00:00:00 2001 From: James Judd Date: Thu, 1 May 2025 12:06:16 -0600 Subject: [PATCH 1/2] Attempt to fix race by removing request from book keeping before responding to Bazel We were previously responding to Bazel about the request before removing the request from book keeping. That leaves a window for Bazel to send us another request with the same request ID before we've removed the original request from the worker's book keeping. --- .../rules_scala/common/worker/WorkerMain.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala index 09a399b3..deaf7027 100644 --- a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala +++ b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala @@ -69,6 +69,16 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream maybeExitCode: Option[Int], wasCancelled: Boolean = false, ): Unit = { + // Remove the request from our book keeping right before we respond to Bazel. If + // we respond to Bazel about the request before removing it,then there is a race: + // Bazel could make a request with the same requestId to this worker before the + // requestId is removed from the worker's book keeping. + // + // Ideally Bazel will not send a request to this worker with the same requestId + // as another request before we've responded to the original request. If that + // happens, then there's a race regardless of what we do. + activeRequests.remove(requestId) + // Defined here so all writes to stdout are synchronized stdout.synchronized { val builder = WorkerProtocol.WorkResponse.newBuilder @@ -88,8 +98,6 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream .build() .writeDelimitedTo(stdout) } - - activeRequests.remove(requestId) } /** From d1b542c26886335cb80427125d6d45c08bb10542 Mon Sep 17 00:00:00 2001 From: James Judd Date: Thu, 1 May 2025 13:14:36 -0600 Subject: [PATCH 2/2] Add additional logging when a request is received with an already active ID --- .../rules_scala/common/worker/WorkerMain.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala index deaf7027..04c02fb9 100644 --- a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala +++ b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala @@ -61,7 +61,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream val ec = ExecutionContext.fromExecutor(fjp) // Map of request id to the runnable responsible for executing that request id - val activeRequests = new ConcurrentHashMap[Int, CancellableTask[Int]](poolSize) + val activeRequests = new ConcurrentHashMap[Int, (WorkerProtocol.WorkRequest, CancellableTask[Int])](poolSize) def writeResponse( requestId: Int, @@ -136,10 +136,10 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // From the Bazel doc: "The server may send cancel requests for requests that the worker // has already responded to, in which case the cancel request must be ignored." - Option(activeRequests.get(requestId)).foreach { activeRequest => + Option(activeRequests.get(requestId)).foreach { case (_, workTask) => // Cancel will wait for the thread to complete or be interrupted, so we do it in a future // to prevent blocking the worker from processing more requests - Future(activeRequest.cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))( + Future(workTask.cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))( scala.concurrent.ExecutionContext.global, ) } @@ -235,9 +235,14 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // for this requestId. If that's the case, we have a book keeping error or there are // two active requests with the same ID. Either of which is not good and something we // should just crash on. - if (activeRequests.putIfAbsent(requestId, workTask) != null) { + val alreadyActiveRequest = activeRequests.putIfAbsent(requestId, (request, workTask)) + if (alreadyActiveRequest != null) { + val (activeRequest, _) = alreadyActiveRequest throw new AnnexDuplicateActiveRequestException( - s"Received a WorkRequest with an already active request id: ${requestId}", + s"""Received a WorkRequest with an already active request id: ${requestId}. + Currently active request: ${activeRequest.toString} + New request with the same id: ${request.toString} + """, ) } else { workTask.execute(ec)