Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(orchestration,workflows-sdk): Skip step #8334

Merged
merged 7 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/admin-next/dashboard/src/i18n/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,8 @@
"runningState": "Running...",
"awaitingState": "Awaiting",
"failedState": "Failed",
"skippedState": "Skipped",
"skippedFailureState": "Skipped (Failure)",
"definitionLabel": "Definition",
"outputLabel": "Output",
"compensateInputLabel": "Compensate input",
Expand All @@ -2085,6 +2087,7 @@
"step": {
"state": {
"skipped": "Skipped",
"skippedFailure": "Skipped (Failure)",
"dormant": "Dormant",
"timeout": "Timeout"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ export const STEP_IN_PROGRESS_STATES = [
TransactionStepState.COMPENSATING,
TransactionStepState.INVOKING,
]

export const STEP_SKIPPED_STATES = [
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
]
export const STEP_OK_STATES = [TransactionStepState.DONE]

export const STEP_ERROR_STATES = [
TransactionStepState.FAILED,
TransactionStepState.REVERTED,
TransactionStepState.TIMEOUT,
TransactionStepState.DORMANT,
TransactionStepState.SKIPPED,
]
export const STEP_INACTIVE_STATES = [TransactionStepState.NOT_STARTED]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,6 @@ export enum TransactionStepState {
FAILED = "failed",
DORMANT = "dormant",
SKIPPED = "skipped",
SKIPPED_FAILURE = "skipped_failure",
TIMEOUT = "timeout",
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ export const getStepState = (
return t("workflowExecutions.state.notStarted")
case TransactionStepState.SKIPPED:
return t("workflowExecutions.step.state.skipped")
case TransactionStepState.SKIPPED_FAILURE:
return t("workflowExecutions.step.state.skippedFailure")
case TransactionStepState.DORMANT:
return t("workflowExecutions.step.state.dormant")
case TransactionStepState.TIMEOUT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
STEP_INACTIVE_STATES,
STEP_IN_PROGRESS_STATES,
STEP_OK_STATES,
STEP_SKIPPED_STATES,
} from "../../../constants"
import {
StepError,
Expand Down Expand Up @@ -132,6 +133,9 @@ const Event = ({
<div className="bg-ui-bg-base shadow-borders-base flex size-2.5 items-center justify-center rounded-full">
<div
className={clx("size-1.5 rounded-full", {
"bg-ui-tag-neutral-bg": STEP_SKIPPED_STATES.includes(
step.invoke.state
),
"bg-ui-tag-green-icon": STEP_OK_STATES.includes(
step.invoke.state
),
Expand Down Expand Up @@ -204,7 +208,8 @@ const Event = ({
snippets={[
{
code: JSON.stringify(
stepInvokeContext.output.output,
// TODO: Apply resolve value: packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts
stepInvokeContext?.output?.output ?? {},
null,
2
),
Expand All @@ -227,8 +232,9 @@ const Event = ({
<CodeBlock
snippets={[
{
// TODO: Apply resolve value: packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts
code: JSON.stringify(
stepInvokeContext.output.compensateInput,
stepInvokeContext?.output?.compensateInput ?? {},
null,
2
),
Expand Down Expand Up @@ -290,6 +296,8 @@ const StepState = ({

const isFailed = state === TransactionStepState.FAILED
const isRunning = state === TransactionStepState.INVOKING
const isSkipped = state === TransactionStepState.SKIPPED
const isSkippedFailure = state === TransactionStepState.SKIPPED_FAILURE

if (isUnreachable) {
return null
Expand All @@ -306,10 +314,20 @@ const StepState = ({
)
}

if (isFailed) {
let stateText: string | undefined

if (isSkipped) {
stateText = t("workflowExecutions.history.skippedState")
} else if (isSkippedFailure) {
stateText = t("workflowExecutions.history.skippedFailureState")
} else if (isFailed) {
stateText = t("workflowExecutions.history.failedState")
}

if (stateText !== null) {
return (
<Text size="small" leading="compact" className="text-ui-fg-subtle">
{t("workflowExecutions.history.failedState")}
{stateText}
</Text>
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
STEP_INACTIVE_STATES,
STEP_IN_PROGRESS_STATES,
STEP_OK_STATES,
STEP_SKIPPED_STATES,
} from "../../../constants"
import { WorkflowExecutionDTO, WorkflowExecutionStep } from "../../../types"

Expand Down Expand Up @@ -405,6 +406,9 @@ const Node = ({ step }: { step: WorkflowExecutionStep }) => {
className={clx(
"size-2 rounded-sm shadow-[inset_0_0_0_1px_rgba(0,0,0,0.12)]",
{
"bg-ui-tag-neutral-bg": STEP_SKIPPED_STATES.includes(
step.invoke.state
),
"bg-ui-tag-green-icon": STEP_OK_STATES.includes(
step.invoke.state
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ describe("Transaction Orchestrator", () => {
expect(
transaction.getFlow().steps["_root.action1.action2.action4"].invoke
.state
).toBe(TransactionStepState.SKIPPED)
).toBe(TransactionStepState.SKIPPED_FAILURE)
expect(
transaction.getFlow().steps["_root.action1.action2.action4"].invoke
.status
Expand Down
13 changes: 13 additions & 0 deletions packages/core/orchestration/src/transaction/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ export class PermanentStepFailureError extends Error {
}
}

export class SkipStepResponse extends Error {
static isSkipStepResponse(error: Error): error is SkipStepResponse {
return (
error instanceof SkipStepResponse || error?.name === "SkipStepResponse"
)
}

constructor(message?: string) {
super(message)
this.name = "SkipStepResponse"
}
}

export class TransactionStepTimeoutError extends Error {
static isTransactionStepTimeoutError(
error: Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
isErrorLike,
PermanentStepFailureError,
serializeError,
SkipStepResponse,
TransactionStepTimeoutError,
TransactionTimeoutError,
} from "./errors"
Expand All @@ -36,6 +37,7 @@ export type TransactionFlow = {
}
hasAsyncSteps: boolean
hasFailedSteps: boolean
hasSkippedOnFailureSteps: boolean
hasWaitingSteps: boolean
hasSkippedSteps: boolean
hasRevertedSteps: boolean
Expand Down Expand Up @@ -125,6 +127,7 @@ export class TransactionOrchestrator extends EventEmitter {
TransactionStepState.FAILED,
TransactionStepState.TIMEOUT,
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
]

const siblings = this.getPreviousStep(flow, previousStep).next.map(
Expand Down Expand Up @@ -253,6 +256,7 @@ export class TransactionOrchestrator extends EventEmitter {
completed: number
}> {
let hasSkipped = false
let hasSkippedOnFailure = false
let hasIgnoredFailure = false
let hasFailed = false
let hasWaiting = false
Expand Down Expand Up @@ -328,7 +332,9 @@ export class TransactionOrchestrator extends EventEmitter {
} else {
completedSteps++

if (curState.state === TransactionStepState.SKIPPED) {
if (curState.state === TransactionStepState.SKIPPED_FAILURE) {
hasSkippedOnFailure = true
} else if (curState.state === TransactionStepState.SKIPPED) {
hasSkipped = true
} else if (curState.state === TransactionStepState.REVERTED) {
hasReverted = true
Expand Down Expand Up @@ -358,6 +364,9 @@ export class TransactionOrchestrator extends EventEmitter {

return await this.checkAllSteps(transaction)
} else if (completedSteps === totalSteps) {
if (hasSkippedOnFailure) {
flow.hasSkippedOnFailureSteps = true
}
if (hasSkipped) {
flow.hasSkippedSteps = true
}
Expand Down Expand Up @@ -453,6 +462,39 @@ export class TransactionOrchestrator extends EventEmitter {
transaction.emit(eventName, { step, transaction })
}

private static async skipStep(
transaction: DistributedTransaction,
step: TransactionStep
): Promise<void> {
const hasStepTimedOut =
step.getStates().state === TransactionStepState.TIMEOUT

const flow = transaction.getFlow()
const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId)

if (!hasStepTimedOut) {
step.changeStatus(TransactionStepStatus.OK)
step.changeState(TransactionStepState.SKIPPED)
}

if (step.definition.async || options?.storeExecution) {
await transaction.saveCheckpoint()
}

const cleaningUp: Promise<unknown>[] = []
if (step.hasRetryScheduled()) {
cleaningUp.push(transaction.clearRetry(step))
}
if (step.hasTimeout()) {
cleaningUp.push(transaction.clearStepTimeout(step))
}

await promiseAll(cleaningUp)

const eventName = DistributedTransactionEvent.STEP_SKIPPED
transaction.emit(eventName, { step, transaction })
}

private static async setStepTimeout(
transaction: DistributedTransaction,
step: TransactionStep,
Expand Down Expand Up @@ -539,7 +581,7 @@ export class TransactionOrchestrator extends EventEmitter {
) {
for (const childStep of step.next) {
const child = flow.steps[childStep]
child.changeState(TransactionStepState.SKIPPED)
child.changeState(TransactionStepState.SKIPPED_FAILURE)
}
} else {
flow.state = TransactionState.WAITING_TO_COMPENSATE
Expand Down Expand Up @@ -701,6 +743,11 @@ export class TransactionOrchestrator extends EventEmitter {
)
}

if (SkipStepResponse.isSkipStepResponse(response?.output)) {
await TransactionOrchestrator.skipStep(transaction, step)
return
}

await TransactionOrchestrator.setStepSuccess(
transaction,
step,
Expand Down Expand Up @@ -754,6 +801,11 @@ export class TransactionOrchestrator extends EventEmitter {
)
}

if (SkipStepResponse.isSkipStepResponse(response)) {
await TransactionOrchestrator.skipStep(transaction, step)
return
}

await TransactionOrchestrator.setStepSuccess(
transaction,
step,
Expand Down Expand Up @@ -912,6 +964,7 @@ export class TransactionOrchestrator extends EventEmitter {
metadata: flowMetadata,
hasAsyncSteps: features.hasAsyncSteps,
hasFailedSteps: false,
hasSkippedOnFailureSteps: false,
hasSkippedSteps: false,
hasWaitingSteps: false,
hasRevertedSteps: false,
Expand Down Expand Up @@ -1176,6 +1229,41 @@ export class TransactionOrchestrator extends EventEmitter {
return [transaction, step]
}

/** Skip the execution of a specific transaction and step
* @param responseIdempotencyKey - The idempotency key for the step
* @param handler - The handler function to execute the step
* @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey
*/
public async skipStep(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction
): Promise<DistributedTransaction> {
const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
handler,
transaction
)

if (step.getStates().status === TransactionStepStatus.WAITING) {
this.emit(DistributedTransactionEvent.RESUME, {
transaction: curTransaction,
})

await TransactionOrchestrator.skipStep(curTransaction, step)

await this.executeNext(curTransaction)
} else {
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`Cannot skip a step when status is ${step.getStates().status}`
)
}

return curTransaction
}

/** Register a step success for a specific transaction and step
* @param responseIdempotencyKey - The idempotency key for the step
* @param handler - The handler function to execute the step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ export class TransactionStep {
TransactionStepState.COMPENSATING,
TransactionStepState.FAILED,
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
],
[TransactionStepState.INVOKING]: [
TransactionStepState.FAILED,
TransactionStepState.DONE,
TransactionStepState.TIMEOUT,
TransactionStepState.SKIPPED,
],
[TransactionStepState.COMPENSATING]: [
TransactionStepState.REVERTED,
Expand Down
8 changes: 7 additions & 1 deletion packages/core/orchestration/src/transaction/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export type TransactionStepsDefinition = {

/**
* Indicates whether the workflow should continue even if there is a permanent failure in this step.
* In case it is set to true, the children steps of this step will not be executed and their status will be marked as TransactionStepState.SKIPPED.
* In case it is set to true, the children steps of this step will not be executed and their status will be marked as TransactionStepState.SKIPPED_FAILURE.
*/
continueOnPermanentFailure?: boolean

Expand Down Expand Up @@ -164,6 +164,7 @@ export enum DistributedTransactionEvent {
TIMEOUT = "timeout",
STEP_BEGIN = "stepBegin",
STEP_SUCCESS = "stepSuccess",
STEP_SKIPPED = "stepSkipped",
STEP_FAILURE = "stepFailure",
STEP_AWAITING = "stepAwaiting",
COMPENSATE_STEP_SUCCESS = "compensateStepSuccess",
Expand Down Expand Up @@ -211,6 +212,11 @@ export type DistributedTransactionEvents = {
step: TransactionStep
transaction: DistributedTransaction
}) => void

onStepSkipped?: (args: {
step: TransactionStep
transaction: DistributedTransaction
}) => void
}

export type StepFeatures = {
Expand Down
Loading
Loading