From 79902662436d4b87b41fbef3d9c7356545bd7832 Mon Sep 17 00:00:00 2001 From: jorgee Date: Mon, 3 Jun 2024 20:06:44 +0200 Subject: [PATCH] 3.3.1 fixes --- .../request/td/PrintCurrentGraphRequest.java | 24 +++-- .../scheduler/orderstrict/OrderStrictTS.java | 88 ++++++++++++++++--- 2 files changed, 95 insertions(+), 17 deletions(-) diff --git a/compss/runtime/engine/src/main/java/es/bsc/compss/types/request/td/PrintCurrentGraphRequest.java b/compss/runtime/engine/src/main/java/es/bsc/compss/types/request/td/PrintCurrentGraphRequest.java index 2080f23d6c..c56fef52d9 100644 --- a/compss/runtime/engine/src/main/java/es/bsc/compss/types/request/td/PrintCurrentGraphRequest.java +++ b/compss/runtime/engine/src/main/java/es/bsc/compss/types/request/td/PrintCurrentGraphRequest.java @@ -121,7 +121,9 @@ public void process(TaskScheduler ts) throws ShutdownException { boolean done = false; while (!done) { try { - tmpList.addAll(t.getSuccessors()); + synchronized (t) { + tmpList.addAll(t.getSuccessors()); + } done = true; } catch (ConcurrentModificationException cme) { tmpList.clear(); @@ -158,7 +160,9 @@ public void process(TaskScheduler ts) throws ShutdownException { boolean done = false; while (!done) { try { - tmpList.addAll(t.getSuccessors()); + synchronized (t) { + tmpList.addAll(t.getSuccessors()); + } done = true; } catch (ConcurrentModificationException cme) { tmpList.clear(); @@ -203,7 +207,9 @@ public void process(TaskScheduler ts) throws ShutdownException { boolean done = false; while (!done) { try { - tmpList.addAll(t.getSuccessors()); + synchronized (t) { + tmpList.addAll(t.getSuccessors()); + } done = true; } catch (ConcurrentModificationException cme) { tmpList.clear(); @@ -238,7 +244,9 @@ public void process(TaskScheduler ts) throws ShutdownException { boolean done = false; while (!done) { try { - tmpList.addAll(t.getSuccessors()); + synchronized (t) { + tmpList.addAll(t.getSuccessors()); + } done = true; } catch (ConcurrentModificationException cme) { tmpList.clear(); @@ -284,7 +292,9 @@ public void process(TaskScheduler ts) throws ShutdownException { boolean done = false; while (!done) { try { - tmpList.addAll(t.getSuccessors()); + synchronized (t) { + tmpList.addAll(t.getSuccessors()); + } done = true; } catch (ConcurrentModificationException cme) { tmpList.clear(); @@ -304,7 +314,9 @@ public void process(TaskScheduler ts) throws ShutdownException { boolean done = false; while (!done) { try { - successors.addAll(t.getSuccessors()); + synchronized (t) { + successors.addAll(t.getSuccessors()); + } done = true; } catch (ConcurrentModificationException cme) { successors.clear(); diff --git a/compss/runtime/scheduler/orderstrict/base/src/main/java/es/bsc/compss/scheduler/orderstrict/OrderStrictTS.java b/compss/runtime/scheduler/orderstrict/base/src/main/java/es/bsc/compss/scheduler/orderstrict/OrderStrictTS.java index d84ee88ee8..490feade61 100644 --- a/compss/runtime/scheduler/orderstrict/base/src/main/java/es/bsc/compss/scheduler/orderstrict/OrderStrictTS.java +++ b/compss/runtime/scheduler/orderstrict/base/src/main/java/es/bsc/compss/scheduler/orderstrict/OrderStrictTS.java @@ -28,8 +28,14 @@ import es.bsc.compss.types.parameter.Parameter; import es.bsc.compss.types.resources.Worker; import es.bsc.compss.types.resources.WorkerResourceDescription; + +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.PriorityQueue; +import java.util.Set; + import org.json.JSONObject; @@ -39,6 +45,8 @@ public abstract class OrderStrictTS extends TaskScheduler { protected final PriorityQueue> readyQueue; + protected Set upgradedActions; + protected final Map> addedActions; /** @@ -46,7 +54,10 @@ public abstract class OrderStrictTS extends TaskScheduler { */ public OrderStrictTS() { super(); - readyQueue = new PriorityQueue<>(); + LOGGER.debug("[OrderStrict] Loading OrderStrict TS"); + this.readyQueue = new PriorityQueue<>(); + this.upgradedActions = new HashSet<>(); + this.addedActions = new HashMap<>(); } /* @@ -66,6 +77,17 @@ public abstract ResourceScheduler gener return new SchedulingInformation(rs); } + @Override + public void upgradeAction(AllocatableAction action) { + if (DEBUG) { + LOGGER.debug("[OrderStrict] Upgrading action " + action); + } + upgradedActions.add(action); + ObjectValue obj = addedActions.remove(action); + readyQueue.remove(obj); + + } + /* * ********************************************************************************************************* * ********************************************************************************************************* @@ -73,27 +95,69 @@ public abstract ResourceScheduler gener * ********************************************************************************************************* * ********************************************************************************************************* */ + private PriorityQueue> sortActions(Iterable actions) { + + if (DEBUG) { + LOGGER.debug("[OrcerStrict] Managing " + upgradedActions.size() + " upgraded actions."); + } + PriorityQueue> sortedActions = new PriorityQueue<>(); + for (AllocatableAction action : actions) { + Score fullScore = this.generateActionScore(action); + ObjectValue obj = new ObjectValue<>(action, fullScore); + sortedActions.add(obj); + } + + return sortedActions; + } + + private void manageUpgradedActions(ResourceScheduler resource) { + if (!upgradedActions.isEmpty()) { + if (DEBUG) { + LOGGER.debug("[OrderStrict] Managing " + upgradedActions.size() + " upgraded actions."); + } + + PriorityQueue> executableActions = sortActions(upgradedActions); + + while (!executableActions.isEmpty()) { + ObjectValue obj = executableActions.poll(); + AllocatableAction freeAction = obj.getObject(); + if (freeAction.getCompatibleWorkers().contains(resource) && resource.canRunSomething()) { + try { + freeAction.schedule(resource, obj.getScore()); + tryToLaunch(freeAction); + upgradedActions.remove(freeAction); + + } catch (UnassignedActionException e) { + // Nothing to do it could be scheduled in another resource + } + } + } + } + } private void addActionToReadyQueue(AllocatableAction action, Score actionScore) { ObjectValue obj = new ObjectValue<>(action, actionScore); + addedActions.put(action, obj); readyQueue.add(obj); } @Override protected void scheduleAction(AllocatableAction action, Score actionScore) throws BlockedActionException { if (!action.hasDataPredecessors()) { - ObjectValue topReady = readyQueue.peek(); - if (topReady == null || actionScore.isBetter(topReady.getScore())) { - try { - action.schedule(actionScore); - } catch (UnassignedActionException uae) { + if (upgradedActions.isEmpty()) { + ObjectValue topReady = readyQueue.peek(); + if (topReady == null || actionScore.isBetter(topReady.getScore())) { + try { + action.schedule(actionScore); + } catch (UnassignedActionException uae) { + addActionToReadyQueue(action, actionScore); + } + } else { + if (action.getCompatibleWorkers().isEmpty()) { + throw new BlockedActionException(); + } addActionToReadyQueue(action, actionScore); } - } else { - if (action.getCompatibleWorkers().isEmpty()) { - throw new BlockedActionException(); - } - addActionToReadyQueue(action, actionScore); } } } @@ -102,6 +166,7 @@ protected void scheduleAction(AllocatableAction action, Score actionScore) throw public final void handleDependencyFreeActions( List dataFreeActions, List resourceFreeActions, List blockedCandidates, ResourceScheduler resource) { + manageUpgradedActions(resource); PriorityQueue> executableActions = new PriorityQueue<>(); for (AllocatableAction freeAction : dataFreeActions) { @@ -139,6 +204,7 @@ public final void handleDependencyFreeActi if (topReadyQueue == topPriority) { readyQueue.poll(); + addedActions.remove(aa); readyQueueEmpty = readyQueue.isEmpty(); } else { executableActions.poll();