diff --git a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala index 09a399b3..04c02fb9 100644 --- a/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala +++ b/src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala @@ -61,7 +61,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream val ec = ExecutionContext.fromExecutor(fjp) // Map of request id to the runnable responsible for executing that request id - val activeRequests = new ConcurrentHashMap[Int, CancellableTask[Int]](poolSize) + val activeRequests = new ConcurrentHashMap[Int, (WorkerProtocol.WorkRequest, CancellableTask[Int])](poolSize) def writeResponse( requestId: Int, @@ -69,6 +69,16 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream maybeExitCode: Option[Int], wasCancelled: Boolean = false, ): Unit = { + // Remove the request from our book keeping right before we respond to Bazel. If + // we respond to Bazel about the request before removing it,then there is a race: + // Bazel could make a request with the same requestId to this worker before the + // requestId is removed from the worker's book keeping. + // + // Ideally Bazel will not send a request to this worker with the same requestId + // as another request before we've responded to the original request. If that + // happens, then there's a race regardless of what we do. + activeRequests.remove(requestId) + // Defined here so all writes to stdout are synchronized stdout.synchronized { val builder = WorkerProtocol.WorkResponse.newBuilder @@ -88,8 +98,6 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream .build() .writeDelimitedTo(stdout) } - - activeRequests.remove(requestId) } /** @@ -128,10 +136,10 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // From the Bazel doc: "The server may send cancel requests for requests that the worker // has already responded to, in which case the cancel request must be ignored." - Option(activeRequests.get(requestId)).foreach { activeRequest => + Option(activeRequests.get(requestId)).foreach { case (_, workTask) => // Cancel will wait for the thread to complete or be interrupted, so we do it in a future // to prevent blocking the worker from processing more requests - Future(activeRequest.cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))( + Future(workTask.cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))( scala.concurrent.ExecutionContext.global, ) } @@ -227,9 +235,14 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream // for this requestId. If that's the case, we have a book keeping error or there are // two active requests with the same ID. Either of which is not good and something we // should just crash on. - if (activeRequests.putIfAbsent(requestId, workTask) != null) { + val alreadyActiveRequest = activeRequests.putIfAbsent(requestId, (request, workTask)) + if (alreadyActiveRequest != null) { + val (activeRequest, _) = alreadyActiveRequest throw new AnnexDuplicateActiveRequestException( - s"Received a WorkRequest with an already active request id: ${requestId}", + s"""Received a WorkRequest with an already active request id: ${requestId}. + Currently active request: ${activeRequest.toString} + New request with the same id: ${request.toString} + """, ) } else { workTask.execute(ec)