Skip to content

Commit

Permalink
feat!: add new compute tasks status linked to function status (#366)
Browse files Browse the repository at this point in the history
## Companion PR

- Substra/substra-backend#823
- Substra/substra-tests#319
- Substra/substra#397
- Substra/substra-frontend#297
- Substra/substra-documentation#390

## Description

<!-- Please reference issue if any. -->

<!-- Please include a summary of your changes. -->

Fixes of FL-1397

## How has this been tested?

<!-- Please describe the tests that you ran to verify your changes.  -->

## Checklist

- [x] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: Guilhem Barthés <guilhem.barthes@owkin.com>
  • Loading branch information
guilhem-barthes authored Feb 15, 2024
1 parent 637103c commit b71e4ee
Show file tree
Hide file tree
Showing 20 changed files with 399 additions and 305 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- BREAKING: Field `asset_type` of type `FailedAssetKind` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277))
- BREAKING: Add `FunctionStatus` ([#263](https://github.com/Substra/orchestrator/pull/263))
- Add Function status event machine ([#263](https://github.com/Substra/orchestrator/pull/263))
- BREAKING: Add statuses `WAITING_FOR_BUILDER_SLOT` and `BUILDING` on tasks to reflect associated function status ([#366](https://github.com/Substra/orchestrator/pull/366))
- Add task actions `BUILD_STARTED` and `BUILD_FINISHED` to propagate status change from function to compute task ([#366](https://github.com/Substra/orchestrator/pull/366))

### Changed

Expand All @@ -29,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `FailureReport` now can be reference a `ComputeTask` or a `Function` through `asset_key` + `asset_type` ([#277](https://github.com/Substra/orchestrator/pull/277))
- Logic to determine new compute task status takes in account the status of the function. A new task can now be created with the status `FAILED`or `CANCELLED` (if the function reached the corresponding status) ([#365](https://github.com/Substra/orchestrator/pull/365))
- BREAKING: Transition to status `TODO` for a given compute task is done after the function is built([#365](https://github.com/Substra/orchestrator/pull/365))
- BREAKING: Rename `TODO` to `WAITING_FOR_EXECUTOR_SLOT` and `WAITING` to `WAITING_FOR_PARENT_TASKS`([#366](https://github.com/Substra/orchestrator/pull/366))

### Fixed

Expand Down
32 changes: 20 additions & 12 deletions docs/assets/computetask.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ Since parents are set during task definition, the rank is an immutable property.

A task can have several status (see _States_ below for available transitions):

- WAITING: new task waiting for its parents to be DONE. In this state the task cannot be processed yet.
- TODO: all dependencies are built (all parents DONE) so the task can be picked up by a worker and processed.
- WAITING_FOR_BUILDER_SLOT: the function needed by the task is waiting to be built
- BUILDING: the function is currently being built.
- WAITING_FOR_PARENT_TASK: new task waiting for its parents to be DONE. In this state the task cannot be processed yet.
- WAITING_FOR_EXECUTOR_SLOT: all dependencies are built (all parents DONE) so the task can be picked up by a worker and processed.
- DOING: the task is being processed by a worker.
- DONE: task has been successfully completed.
- FAILED: task execution has failed.
Expand All @@ -68,10 +70,11 @@ This is an overview of a task's lifecycle:

![](./schemas/computetask.state.svg)

A task can be created in TODO or WAITING state depending on its parents.

A task can be created in `WAITING_FOR_BUILDING`, `BUILDING`, `CANCELED`, `FAILED`, `WAITING_FOR_PARENT_TASKS` and `WAITING_FOR_EXECUTOR_SLOT` state, based on its parents and the linked function. The 4 former are based on the function status, the 2 latter based on the parent status.

During the ComputePlan execution, as tasks are DONE, their statuses will be reflected to their children.
If all the parents of a child task are DONE, this task enters TODO state.
If all the parents of a child task are DONE, this task enters STATUS_WAITING_FOR_EXECUTOR_SLOT state.

When a parent task fails, children statuses are not changed.

Expand All @@ -83,20 +86,25 @@ This is to ensure that when a task starts (switch to DOING), all its inputs are
A status change is a reaction to an action.
Task actions should match the following restrictions:

| action ↓ / sender → | Owner | Worker | Other |
| ------------------- | ----- | ------ | ----- |
| DOING | n | y | n |
| CANCELED | y | n | n |
| FAILED | n | y | n |
| DONE | n | y | n |
| action ↓ / sender → | Owner | Worker | Other |
| -------------------- | ----- | ------ | ----- |
| BUILD_STARTED | n | n | y |
| BUILD_FINISHED | n | n | y |
| DOING | n | y | n |
| CANCELED | y | n | n |
| FAILED | y | y | n |
| DONE | n | y | n |

Basically:

- only the owner can cancel a task
- only the worker can act on a task processing (DOING/DONE/FAILED)
- BUILD_STARTED & BUILD_FINISHED are done internally
- only the owner can cancel a task or act on building (function being built on owner)
- only the worker can act on a task processing (DOING/DONE)
- both can fail a task

## Worker

A function associated with a task is built in the organization owning the function.
A task is processed on a specific worker.

Most of the time, the worker can be inferred from task inputs: it should be where the data is, ie. the datamanager's owner.
Expand Down
10 changes: 0 additions & 10 deletions docs/assets/function.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,3 @@ A compute task will go through different state during a compute plan execution.
This is an overview of a task's lifecycle:

![](./schemas/function.state.svg)

A task can be created in TODO or WAITING state depending on its parents.

During the ComputePlan execution, as tasks are DONE, their statuses will be reflected to their children.
If all the parents of a child task are DONE, this task enters TODO state.

When a parent task fails, children statuses are not changed.

A task may produce one or more [models](./model.md), they can only be registered when the task is in DOING.
This is to ensure that when a task starts (switch to DOING), all its inputs are available.
47 changes: 35 additions & 12 deletions docs/assets/schemas/computetask.state.mmd
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
stateDiagram-v2
WAITING --> TODO
WAITING --> CANCELED
WAITING --> FAILED
TODO --> FAILED
TODO --> DOING
TODO --> CANCELED
DOING --> FAILED
DOING --> DONE
DOING --> CANCELED
DONE --> [*]
FAILED --> [*]
CANCELED --> [*]
waitingBuilding: WAITING_FOR_BUILDER_SLOT
building: BUILDING
waitingParent: WAITING_FOR_PARENT_TASKS
waitingExecutor: WAITING_FOR_EXECUTOR_SLOT
doing: DOING
canceled: CANCELED
failed: FAILED
done: DONE


[*] --> waitingBuilding
[*] --> building
[*] --> waitingParent
[*] --> waitingExecutor
[*] --> failed
[*] --> canceled

waitingBuilding --> building
building --> waitingParent
waitingParent --> waitingExecutor
waitingExecutor --> doing
doing --> done
done --> [*]


building --> failed
doing --> failed
failed --> [*]

waitingBuilding --> canceled
building --> canceled
waitingParent --> canceled
waitingExecutor --> canceled
doing --> canceled
canceled --> [*]
2 changes: 1 addition & 1 deletion docs/assets/schemas/computetask.state.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 6 additions & 6 deletions e2e/computetask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestPredictTaskLifecycle(t *testing.T) {
require.Equal(t, predictTask.Status, asset.ComputeTaskStatus_STATUS_DONE)

testTask := appClient.GetComputeTask("test")
require.Equal(t, testTask.Status, asset.ComputeTaskStatus_STATUS_TODO)
require.Equal(t, testTask.Status, asset.ComputeTaskStatus_STATUS_WAITING_FOR_EXECUTOR_SLOT)
}

// TestCascadeCancel registers 10 children tasks and cancel their parent
Expand All @@ -136,7 +136,7 @@ func TestCascadeCancel(t *testing.T) {

for i := 0; i < 10; i++ {
task := appClient.GetComputeTask(fmt.Sprintf("task%d", i))
require.Equal(t, asset.ComputeTaskStatus_STATUS_WAITING, task.Status, "child task should be WAITING")
require.Equal(t, asset.ComputeTaskStatus_STATUS_WAITING_FOR_PARENT_TASKS, task.Status, "child task should be WAITING_FOR_PARENT_TASKS ")
}

plan := appClient.GetComputePlan(client.DefaultPlanRef)
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestCascadeTodo(t *testing.T) {

for i := 0; i < 10; i++ {
task := appClient.GetComputeTask(fmt.Sprintf("task%d", i))
require.Equal(t, asset.ComputeTaskStatus_STATUS_TODO, task.Status, "child task should be TODO")
require.Equal(t, asset.ComputeTaskStatus_STATUS_WAITING_FOR_EXECUTOR_SLOT, task.Status, "child task should be STATUS_WAITING_FOR_EXECUTOR_SLOT")
}

plan := appClient.GetComputePlan(client.DefaultPlanRef)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestCascadeFailure(t *testing.T) {

for i := 0; i < 10; i++ {
task := appClient.GetComputeTask(fmt.Sprintf("task%d", i))
require.Equal(t, asset.ComputeTaskStatus_STATUS_WAITING, task.Status, "child task should be WAITING")
require.Equal(t, asset.ComputeTaskStatus_STATUS_WAITING_FOR_PARENT_TASKS, task.Status, "child task should be WAITING_FOR_PARENT_TASKS")
}

plan := appClient.GetComputePlan(client.DefaultPlanRef)
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestGetTaskInputAssets(t *testing.T) {
WithFunctionRef("aggregate_function"),
)

// Fetching inputs for a task not in TODO should return an error
// Fetching inputs for a task not in STATUS_WAITING_FOR_EXECUTOR_SLOT should return an error
_, err := appClient.FailableGetTaskInputAssets("aggregate")
assert.Error(t, err)
assert.ErrorContains(t, err, "inputs may not be defined")
Expand Down Expand Up @@ -650,7 +650,7 @@ func TestGetTaskInputAssetsFromComposite(t *testing.T) {
WithFunctionRef("aggregate_function"),
)

// Fetching inputs for a task not in TODO should return an error
// Fetching inputs for a task not in STATUS_WAITING_FOR_EXECUTOR_SLOT should return an error
_, err := appClient.FailableGetTaskInputAssets("aggregate")
assert.Error(t, err)
assert.ErrorContains(t, err, "inputs may not be defined")
Expand Down
8 changes: 6 additions & 2 deletions lib/asset/computetask.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ option go_package = "github.com/substra/orchestrator/lib/asset";

enum ComputeTaskStatus {
STATUS_UNKNOWN = 0;
STATUS_WAITING = 1;
STATUS_TODO = 2;
STATUS_WAITING_FOR_PARENT_TASKS = 1;
STATUS_WAITING_FOR_EXECUTOR_SLOT = 2;
STATUS_DOING = 3;
STATUS_DONE = 4;
STATUS_CANCELED = 5;
STATUS_FAILED = 6;
STATUS_WAITING_FOR_BUILDER_SLOT = 7;
STATUS_BUILDING = 8;
}

message ParentTaskOutputRef {
Expand Down Expand Up @@ -116,6 +118,8 @@ enum ComputeTaskAction {
TASK_ACTION_CANCELED = 2;
TASK_ACTION_FAILED = 3;
TASK_ACTION_DONE = 4;
TASK_ACTION_BUILD_STARTED = 5;
TASK_ACTION_BUILD_FINISHED = 6;
}

// ComputeTaskOutputAsset links an asset to a task output.
Expand Down
1 change: 1 addition & 0 deletions lib/asset/computetask_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (p *ApplyTaskActionParam) Validate() error {
return validation.ValidateStruct(p,
validation.Field(&p.ComputeTaskKey, validation.Required, is.UUID),
validation.Field(&p.Action, validation.Required, validation.In(
// TASK_ACTION_BUILDING, TASK_ACTION_WAITING_FOR_EXECUTION are managed internally based on function status
ComputeTaskAction_TASK_ACTION_DOING,
ComputeTaskAction_TASK_ACTION_FAILED,
ComputeTaskAction_TASK_ACTION_CANCELED,
Expand Down
9 changes: 9 additions & 0 deletions lib/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ var (

// ErrTerminatedComputePlan occurs when attempting to cancel or fail an already terminated compute plan
ErrTerminatedComputePlan = "OE0109"

// ErrTerminatedComputeTask occurs when attempting to cancel or fail an already terminated compute plan
ErrTerminatedComputeTask = "OE0110"
)

// OrcError represents an orchestration error.
Expand Down Expand Up @@ -184,6 +187,12 @@ func NewTerminatedComputePlan(planKey string) *OrcError {
return newErrorWithSource(ErrTerminatedComputePlan, msg)
}

// NewTerminatedComputeTask returns an ErrTerminatedComputeTask kind of OrcError with given message
func NewTerminatedComputeTask(taskKey string) *OrcError {
msg := fmt.Sprintf("compute task %s is already terminated", taskKey)
return newErrorWithSource(ErrTerminatedComputeTask, msg)
}

func NewIncompatibleTaskOutput(taskKey, identifier, expected, actual string) *OrcError {
return newErrorWithSource(
ErrIncompatibleKind,
Expand Down
4 changes: 2 additions & 2 deletions lib/service/computetask.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// Task statuses in which the inputs are defined
var inputDefinedStatus = []asset.ComputeTaskStatus{
asset.ComputeTaskStatus_STATUS_DOING,
asset.ComputeTaskStatus_STATUS_TODO,
asset.ComputeTaskStatus_STATUS_WAITING_FOR_EXECUTOR_SLOT,
asset.ComputeTaskStatus_STATUS_FAILED,
}

Expand All @@ -37,7 +37,7 @@ type ComputeTaskAPI interface {
applyTaskAction(task *asset.ComputeTask, action taskTransition, reason string) error
addComputeTaskOutputAsset(output *asset.ComputeTaskOutputAsset) error
getTaskOutputCounter(taskKey string) (persistence.ComputeTaskOutputCounter, error)
propagateFunctionCancelation(functionKey string, requester string) error
PropagateActionFromFunction(functionKey string, action asset.ComputeTaskAction, reason string, requester string) error
GetTasksByFunction(functionKey string, statuses []asset.ComputeTaskStatus) ([]*asset.ComputeTask, error)
StartDependentTask(child *asset.ComputeTask, reason string) error
}
Expand Down
18 changes: 10 additions & 8 deletions lib/service/computetask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func TestRegisterTrainTask(t *testing.T) {
dms.On("CheckDataManager", dataManager, dataSampleKeys, "testOwner").Once().Return(nil)

function := &asset.Function{
Key: "b09cc8eb-cb76-49ce-8f93-2f8b3185e7b7",
Key: "b09cc8eb-cb76-49ce-8f93-2f8b3185e7b7",
Status: asset.FunctionStatus_FUNCTION_STATUS_READY,
Permissions: &asset.Permissions{
Process: &asset.Permission{Public: false, AuthorizedIds: []string{"testOwner"}},
Download: &asset.Permission{Public: false, AuthorizedIds: []string{"testOwner"}},
Expand Down Expand Up @@ -228,7 +229,7 @@ func TestRegisterTrainTask(t *testing.T) {
Owner: "testOwner",
ComputePlanKey: newTrainTask.ComputePlanKey,
Metadata: newTrainTask.Metadata,
Status: asset.ComputeTaskStatus_STATUS_WAITING,
Status: asset.ComputeTaskStatus_STATUS_WAITING_FOR_EXECUTOR_SLOT,
Worker: dataManager.Owner,
Inputs: newTrainTask.Inputs,
CreationDate: timestamppb.New(time.Unix(1337, 0)),
Expand Down Expand Up @@ -313,14 +314,16 @@ func TestRegisterCompositeTaskWithCompositeParents(t *testing.T) {
}

functionParent1 := &asset.Function{
Key: "d29118a9-f989-41af-ae02-90f0c4aaffe3",
Key: "d29118a9-f989-41af-ae02-90f0c4aaffe3",
Status: asset.FunctionStatus_FUNCTION_STATUS_READY,
Outputs: map[string]*asset.FunctionOutput{
"local": {Kind: asset.AssetKind_ASSET_MODEL},
"shared": {Kind: asset.AssetKind_ASSET_MODEL},
},
}
functionParent2 := &asset.Function{
Key: "cc765417-1e14-41c8-9f7b-653ed335d30d",
Key: "cc765417-1e14-41c8-9f7b-653ed335d30d",
Status: asset.FunctionStatus_FUNCTION_STATUS_READY,
Outputs: map[string]*asset.FunctionOutput{
"local": {Kind: asset.AssetKind_ASSET_MODEL},
"shared": {Kind: asset.AssetKind_ASSET_MODEL},
Expand Down Expand Up @@ -378,7 +381,7 @@ func TestRegisterCompositeTaskWithCompositeParents(t *testing.T) {
// All parents already exist
dbal.On("GetExistingComputeTaskKeys", []string{parent1.Key, parent2.Key}).Once().Return([]string{parent1.Key, parent2.Key}, nil)

// TODO: we fetch the same data several times
// STATUS_WAITING_FOR_EXECUTOR_SLOT: we fetch the same data several times
// Since this will change with task category removal, let's revisit later
dbal.On("GetComputeTasks", []string{parent1.Key, parent2.Key}).Once().Return([]*asset.ComputeTask{parent1, parent2}, nil)
dbal.On("GetComputeTask", parent1.Key).Once().Return(parent1, nil)
Expand Down Expand Up @@ -432,7 +435,6 @@ func TestRegisterCompositeTaskWithCompositeParents(t *testing.T) {
Owner: "testOwner",
ComputePlanKey: newTask.ComputePlanKey,
Metadata: newTask.Metadata,
Status: asset.ComputeTaskStatus_STATUS_WAITING,
Worker: dataManager.Owner,
Rank: 1,
CreationDate: timestamppb.New(time.Unix(1337, 0)),
Expand Down Expand Up @@ -1485,7 +1487,7 @@ func TestGetInputAssetsTaskUnready(t *testing.T) {
Once().
Return(&asset.ComputeTask{
Key: "uuid",
Status: asset.ComputeTaskStatus_STATUS_WAITING,
Status: asset.ComputeTaskStatus_STATUS_WAITING_FOR_PARENT_TASKS,
}, nil)

_, err := service.GetInputAssets("uuid")
Expand Down Expand Up @@ -1525,7 +1527,7 @@ func TestGetInputAssets(t *testing.T) {
Once().
Return(&asset.ComputeTask{
Key: "uuid",
Status: asset.ComputeTaskStatus_STATUS_TODO,
Status: asset.ComputeTaskStatus_STATUS_WAITING_FOR_EXECUTOR_SLOT,
Inputs: []*asset.ComputeTaskInput{
{Identifier: "data", Ref: &asset.ComputeTaskInput_AssetKey{AssetKey: "uuid:ds"}},
{Identifier: "opener", Ref: &asset.ComputeTaskInput_AssetKey{AssetKey: "uuid:dm"}},
Expand Down
Loading

0 comments on commit b71e4ee

Please sign in to comment.