Skip to content

Commit

Permalink
Merge branch '3.3.1_fixes' into '3.3'
Browse files Browse the repository at this point in the history
3.3.1 fixes

See merge request wdc/compss/framework!542
  • Loading branch information
jorgee committed Jun 3, 2024
2 parents d9f310a + 7990266 commit 567b41a
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ public void process(TaskScheduler ts) throws ShutdownException {
boolean done = false;
while (!done) {
try {
tmpList.addAll(t.getSuccessors());
synchronized (t) {
tmpList.addAll(t.getSuccessors());
}
done = true;
} catch (ConcurrentModificationException cme) {
tmpList.clear();
Expand Down Expand Up @@ -158,7 +160,9 @@ public void process(TaskScheduler ts) throws ShutdownException {
boolean done = false;
while (!done) {
try {
tmpList.addAll(t.getSuccessors());
synchronized (t) {
tmpList.addAll(t.getSuccessors());
}
done = true;
} catch (ConcurrentModificationException cme) {
tmpList.clear();
Expand Down Expand Up @@ -203,7 +207,9 @@ public void process(TaskScheduler ts) throws ShutdownException {
boolean done = false;
while (!done) {
try {
tmpList.addAll(t.getSuccessors());
synchronized (t) {
tmpList.addAll(t.getSuccessors());
}
done = true;
} catch (ConcurrentModificationException cme) {
tmpList.clear();
Expand Down Expand Up @@ -238,7 +244,9 @@ public void process(TaskScheduler ts) throws ShutdownException {
boolean done = false;
while (!done) {
try {
tmpList.addAll(t.getSuccessors());
synchronized (t) {
tmpList.addAll(t.getSuccessors());
}
done = true;
} catch (ConcurrentModificationException cme) {
tmpList.clear();
Expand Down Expand Up @@ -284,7 +292,9 @@ public void process(TaskScheduler ts) throws ShutdownException {
boolean done = false;
while (!done) {
try {
tmpList.addAll(t.getSuccessors());
synchronized (t) {
tmpList.addAll(t.getSuccessors());
}
done = true;
} catch (ConcurrentModificationException cme) {
tmpList.clear();
Expand All @@ -304,7 +314,9 @@ public void process(TaskScheduler ts) throws ShutdownException {
boolean done = false;
while (!done) {
try {
successors.addAll(t.getSuccessors());
synchronized (t) {
successors.addAll(t.getSuccessors());
}
done = true;
} catch (ConcurrentModificationException cme) {
successors.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@
import es.bsc.compss.types.parameter.Parameter;
import es.bsc.compss.types.resources.Worker;
import es.bsc.compss.types.resources.WorkerResourceDescription;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;

import org.json.JSONObject;


Expand All @@ -39,14 +45,19 @@
public abstract class OrderStrictTS extends TaskScheduler {

protected final PriorityQueue<ObjectValue<AllocatableAction>> readyQueue;
protected Set<AllocatableAction> upgradedActions;
protected final Map<AllocatableAction, ObjectValue<AllocatableAction>> addedActions;


/**
* Constructs a new Ready Scheduler instance.
*/
public OrderStrictTS() {
super();
readyQueue = new PriorityQueue<>();
LOGGER.debug("[OrderStrict] Loading OrderStrict TS");
this.readyQueue = new PriorityQueue<>();
this.upgradedActions = new HashSet<>();
this.addedActions = new HashMap<>();
}

/*
Expand All @@ -66,34 +77,87 @@ public abstract <T extends WorkerResourceDescription> ResourceScheduler<T> gener
return new SchedulingInformation(rs);
}

@Override
public void upgradeAction(AllocatableAction action) {
if (DEBUG) {
LOGGER.debug("[OrderStrict] Upgrading action " + action);
}
upgradedActions.add(action);
ObjectValue<AllocatableAction> obj = addedActions.remove(action);
readyQueue.remove(obj);

}

/*
* *********************************************************************************************************
* *********************************************************************************************************
* ********************************* SCHEDULING OPERATIONS *************************************************
* *********************************************************************************************************
* *********************************************************************************************************
*/
private PriorityQueue<ObjectValue<AllocatableAction>> sortActions(Iterable<AllocatableAction> actions) {

if (DEBUG) {
LOGGER.debug("[OrcerStrict] Managing " + upgradedActions.size() + " upgraded actions.");
}
PriorityQueue<ObjectValue<AllocatableAction>> sortedActions = new PriorityQueue<>();
for (AllocatableAction action : actions) {
Score fullScore = this.generateActionScore(action);
ObjectValue<AllocatableAction> obj = new ObjectValue<>(action, fullScore);
sortedActions.add(obj);
}

return sortedActions;
}

private void manageUpgradedActions(ResourceScheduler<?> resource) {
if (!upgradedActions.isEmpty()) {
if (DEBUG) {
LOGGER.debug("[OrderStrict] Managing " + upgradedActions.size() + " upgraded actions.");
}

PriorityQueue<ObjectValue<AllocatableAction>> executableActions = sortActions(upgradedActions);

while (!executableActions.isEmpty()) {
ObjectValue<AllocatableAction> obj = executableActions.poll();
AllocatableAction freeAction = obj.getObject();
if (freeAction.getCompatibleWorkers().contains(resource) && resource.canRunSomething()) {
try {
freeAction.schedule(resource, obj.getScore());
tryToLaunch(freeAction);
upgradedActions.remove(freeAction);

} catch (UnassignedActionException e) {
// Nothing to do it could be scheduled in another resource
}
}
}
}
}

private void addActionToReadyQueue(AllocatableAction action, Score actionScore) {
ObjectValue<AllocatableAction> obj = new ObjectValue<>(action, actionScore);
addedActions.put(action, obj);
readyQueue.add(obj);
}

@Override
protected void scheduleAction(AllocatableAction action, Score actionScore) throws BlockedActionException {
if (!action.hasDataPredecessors()) {
ObjectValue<AllocatableAction> topReady = readyQueue.peek();
if (topReady == null || actionScore.isBetter(topReady.getScore())) {
try {
action.schedule(actionScore);
} catch (UnassignedActionException uae) {
if (upgradedActions.isEmpty()) {
ObjectValue<AllocatableAction> topReady = readyQueue.peek();
if (topReady == null || actionScore.isBetter(topReady.getScore())) {
try {
action.schedule(actionScore);
} catch (UnassignedActionException uae) {
addActionToReadyQueue(action, actionScore);
}
} else {
if (action.getCompatibleWorkers().isEmpty()) {
throw new BlockedActionException();
}
addActionToReadyQueue(action, actionScore);
}
} else {
if (action.getCompatibleWorkers().isEmpty()) {
throw new BlockedActionException();
}
addActionToReadyQueue(action, actionScore);
}
}
}
Expand All @@ -102,6 +166,7 @@ protected void scheduleAction(AllocatableAction action, Score actionScore) throw
public final <T extends WorkerResourceDescription> void handleDependencyFreeActions(
List<AllocatableAction> dataFreeActions, List<AllocatableAction> resourceFreeActions,
List<AllocatableAction> blockedCandidates, ResourceScheduler<T> resource) {
manageUpgradedActions(resource);

PriorityQueue<ObjectValue<AllocatableAction>> executableActions = new PriorityQueue<>();
for (AllocatableAction freeAction : dataFreeActions) {
Expand Down Expand Up @@ -139,6 +204,7 @@ public final <T extends WorkerResourceDescription> void handleDependencyFreeActi

if (topReadyQueue == topPriority) {
readyQueue.poll();
addedActions.remove(aa);
readyQueueEmpty = readyQueue.isEmpty();
} else {
executableActions.poll();
Expand Down

0 comments on commit 567b41a

Please sign in to comment.