Skip to content

Commit

Permalink
Clear caches from the runtime hook (#10954)
Browse files Browse the repository at this point in the history
close #10897

Changelog:
- add: implement `RuntimeHooks` to defer some logic until the program execution happens.
  • Loading branch information
4e6 committed Sep 5, 2024
1 parent 3be8770 commit 6c80f8f
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.enso.interpreter.instrument.execution;

public interface ExecutionHooks {

/**
* Add a hook to run before the execution.
*
* @param hook the runnable hook
*/
void add(Runnable hook);

/** Consume and run all the stored execution hooks. */
void run();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.enso.interpreter.instrument.execution;

import java.util.LinkedHashSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RuntimeExecutionHooks implements ExecutionHooks {

private final Logger logger = LoggerFactory.getLogger(getClass());
private final Set<Runnable> hooks = new LinkedHashSet<>();

public RuntimeExecutionHooks() {}

@Override
public void add(Runnable hook) {
synchronized (hooks) {
hooks.add(hook);
}
}

@Override
public void run() {
synchronized (hooks) {
for (Runnable hook : hooks) {
try {
hook.run();
} catch (Exception e) {
logger.error("Failed to run execution hook.", e);
}
}
hooks.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import org.enso.logger.masking.MaskedPath
import org.enso.polyglot.runtime.Runtime.Api

import java.util.logging.Level

import scala.concurrent.ExecutionContext

/** A command that performs edition of a file.
Expand Down Expand Up @@ -53,17 +52,21 @@ class EditFileCmd(request: Api.EditFileNotification)
if (request.execute) {
ctx.jobControlPlane.abortAllJobs()
ctx.jobProcessor
.run(new EnsureCompiledJob(Seq(request.path)))
.run(compileJob())
.foreach(_ => executeJobs.foreach(ctx.jobProcessor.run))
} else if (request.idMap.isDefined) {
ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path)))
ctx.jobProcessor.run(compileJob())
}
}
)
)
}

private def executeJobs(implicit
protected def compileJob(): EnsureCompiledJob = {
new EnsureCompiledJob(Seq(request.path))
}

protected def executeJobs(implicit
ctx: RuntimeContext
): Iterable[ExecuteJob] = {
ctx.contextManager.getAllContexts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ final class ExecutionState {
/** The storage for pending file edits */
val pendingEdits: PendingEdits = new PendingFileEdits()

val executionHooks: ExecutionHooks = new RuntimeExecutionHooks

val suggestions: ModuleIndexing = ModuleIndexing.createInstance()
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import scala.jdk.OptionConverters._
* @param files a files to compile
* @param isCancellable a flag indicating if the job is cancellable
*/
final class EnsureCompiledJob(
class EnsureCompiledJob(
protected val files: Iterable[File],
isCancellable: Boolean = true
) extends Job[EnsureCompiledJob.CompilationStatus](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,63 +27,69 @@ class ExecuteJob(

/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Unit = {
try {
runImpl
} catch {
case t: Throwable =>
ctx.endpoint.sendToClient(
Api.Response(
Api.ExecutionFailed(
contextId,
Api.ExecutionResult.Failure(t.getMessage, None)
)
)
)
}
}

private def runImpl(implicit ctx: RuntimeContext): Unit = {
ctx.state.executionHooks.run()

ctx.locking.withContextLock(
ctx.locking.getOrCreateContextLock(contextId),
this.getClass,
() =>
ctx.locking.withReadCompilationLock(
this.getClass,
() => {
try {
val context = ctx.executionService.getContext
val originalExecutionEnvironment =
executionEnvironment.map(_ => context.getExecutionEnvironment)
executionEnvironment.foreach(env =>
context.setExecutionEnvironment(
ExecutionEnvironment.forName(env.name)
)
val context = ctx.executionService.getContext
val originalExecutionEnvironment =
executionEnvironment.map(_ => context.getExecutionEnvironment)
executionEnvironment.foreach(env =>
context.setExecutionEnvironment(
ExecutionEnvironment.forName(env.name)
)
val outcome =
try ProgramExecutionSupport.runProgram(contextId, stack)
finally {
originalExecutionEnvironment.foreach(
context.setExecutionEnvironment
)
val outcome =
try ProgramExecutionSupport.runProgram(contextId, stack)
finally {
originalExecutionEnvironment.foreach(
context.setExecutionEnvironment
)
}
outcome match {
case Some(diagnostic: Api.ExecutionResult.Diagnostic) =>
if (diagnostic.isError) {
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionFailed(contextId, diagnostic))
)
}
outcome match {
case Some(diagnostic: Api.ExecutionResult.Diagnostic) =>
if (diagnostic.isError) {
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionFailed(contextId, diagnostic))
)
} else {
ctx.endpoint.sendToClient(
Api.Response(
Api.ExecutionUpdate(contextId, Seq(diagnostic))
)
)
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionComplete(contextId))
)
}
case Some(failure: Api.ExecutionResult.Failure) =>
} else {
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionFailed(contextId, failure))
Api.Response(
Api.ExecutionUpdate(contextId, Seq(diagnostic))
)
)
case None =>
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionComplete(contextId))
)
}
} catch {
case t: Throwable =>
}
case Some(failure: Api.ExecutionResult.Failure) =>
ctx.endpoint.sendToClient(
Api.Response(
Api.ExecutionFailed(
contextId,
Api.ExecutionResult.Failure(t.getMessage, None)
)
)
Api.Response(Api.ExecutionFailed(contextId, failure))
)
case None =>
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionComplete(contextId))
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.enso.polyglot.runtime.Runtime.Api

import java.util.UUID
import java.util.logging.Level

import scala.annotation.unused
import scala.util.Try

Expand Down Expand Up @@ -157,6 +158,20 @@ class UpsertVisualizationJob(

object UpsertVisualizationJob {

/** Invalidate caches for a particular expression id. */
sealed private case class InvalidateCaches(
expressionId: Api.ExpressionId
)(implicit ctx: RuntimeContext)
extends Runnable {

override def run(): Unit = {
ctx.locking.withWriteCompilationLock(
classOf[UpsertVisualizationJob],
() => invalidateCaches(expressionId)
)
}
}

/** The number of times to retry the expression evaluation. */
private val MaxEvaluationRetryCount: Int = 5

Expand Down Expand Up @@ -494,10 +509,8 @@ object UpsertVisualizationJob {
callback,
arguments
)
ctx.locking.withWriteCompilationLock(
this.getClass,
() => invalidateCaches(visualization)
)
setCacheWeights(visualization)
ctx.state.executionHooks.add(InvalidateCaches(expressionId))
ctx.contextManager.upsertVisualization(
visualizationConfig.executionContextId,
visualization
Expand Down Expand Up @@ -550,9 +563,8 @@ object UpsertVisualizationJob {

/** Update the caches. */
private def invalidateCaches(
visualization: Visualization
expressionId: Api.ExpressionId
)(implicit ctx: RuntimeContext): Unit = {
setCacheWeights(visualization)
val stacks = ctx.contextManager.getAllContexts.values
/* The invalidation of the first cached dependent node is required for
* attaching the visualizations to sub-expressions. Consider the example
Expand All @@ -567,8 +579,8 @@ object UpsertVisualizationJob {
* visualized expression is a sub-expression and invalidate the first parent
* expression accordingly.
*/
if (!stacks.exists(isExpressionCached(visualization.expressionId, _))) {
invalidateFirstDependent(visualization.expressionId)
if (!stacks.exists(isExpressionCached(expressionId, _))) {
invalidateFirstDependent(expressionId)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.enso.interpreter.instrument.command

import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.interpreter.instrument.job.{
EnsureCompiledJob,
SlowEnsureCompiledJob
}
import org.enso.polyglot.runtime.Runtime.Api

import scala.concurrent.ExecutionContext
Expand All @@ -23,4 +27,8 @@ class SlowEditFileCmd(request: Api.EditFileNotification, delay: Boolean)
}
super.executeSynchronously(ctx, ec)
}

override protected def compileJob(): EnsureCompiledJob = {
new SlowEnsureCompiledJob(Seq(request.path))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.enso.interpreter.instrument.job

import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.interpreter.instrument.job.EnsureCompiledJob.CompilationStatus

import java.io.File

class SlowEnsureCompiledJob(
files: Iterable[File],
isCancellable: Boolean = true
) extends EnsureCompiledJob(files, isCancellable) {

override def run(implicit ctx: RuntimeContext): CompilationStatus = {
Thread.sleep(1000)
super.run(ctx)
}

}

0 comments on commit 6c80f8f

Please sign in to comment.