Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] WorkflowStep Execution Improvements #210

Closed
Tracked by #88
joshpalis opened this issue Nov 29, 2023 · 10 comments
Closed
Tracked by #88

[FEATURE] WorkflowStep Execution Improvements #210

joshpalis opened this issue Nov 29, 2023 · 10 comments
Assignees
Labels
enhancement New feature or request

Comments

@joshpalis
Copy link
Member

joshpalis commented Nov 29, 2023

Is your feature request related to a problem?

Currently, WorkflowSteps are each given a List<WorkflowData> which is populated by its own user inputs, and outputs from predecessor nodes. From this data, a WorkflowStep then retrieves values from this list, based on a unique field name for the expected inputs.

There is a case in which there could be multiple entries of data with the same field name, which runs the risk of having a WorkflowStep consume the incorrect data.

What solution would you like?

It is necessary to modify the WorkflowStep::execute method to provide additional data, forgoing a List<WorkflowData> and instead providing a Map<String workflowStepID, WorkflowData output> which would allow us to provide multiple entries of the same type of data, uniquely identified by the workflow step ID that produced the output.

Additionally, given a Map<String workflowStepID, String fieldName> a WorkflowStep can then refer to this mapping to ascertain which keys to look for in the output map and the specific field to search for within that entries WorkflowData. The full definition is as follows :

public CompletableFuture<WorkflowData> execute(
    String currentNodeId,
    WorkflowData currentNodeInputs,
    Map<String, WorkflowData> outputs,
    Map<String, String> previousNodeInputs
)
@joshpalis joshpalis added enhancement New feature or request untriaged labels Nov 29, 2023
@joshpalis joshpalis self-assigned this Nov 29, 2023
@dbwiddis
Copy link
Member

Can you clarify what the Map<String, String> previousNodeInputs is and how we get it into the ProcessNode?

And I think this method returns a future, right?

@joshpalis
Copy link
Member Author

joshpalis commented Nov 29, 2023

Can you clarify what the Map<String, String> previousNodeInputs is and how we get it into the ProcessNode?

Previous Node inputs are already given to the process node here. This is provided by the user via the template and it denotes what inputs are coming from previous nodes in the graph. We can then just pass this in when executing a WorkflowStep, and it allows us to know which outputs entry we can retrieve a field value from

And I think this method returns a future, right?

Yes that's right, I have corrected the issue description

@owaiskazi19
Copy link
Member

Interface looks good. To avoid doing the validation again for previousNodeInputs, for immediate solution we can have the map of previousNodeInputs being passed in the interface and fetch the value and key associated from it.
The long term solution would be to go with the substitution/placeholder route for the params which needs output of previous steps.

@joshpalis joshpalis assigned dbwiddis and unassigned joshpalis Nov 29, 2023
@dbwiddis
Copy link
Member

dbwiddis commented Nov 29, 2023

Map<String, String> previousNodeInputs should be a List<String> :)

@owaiskazi19
Copy link
Member

Map<String, String> previousNodeInputs should be a List<String> :)

Map is required to store the nodeId:<output_needed_from_previous_step>

@dbwiddis
Copy link
Member

Will implement a util method as part of this issue:

Should we create a util method for this task?

Originally posted by @owaiskazi19 in #211 (comment)

@dbwiddis
Copy link
Member

@owaiskazi19 I've drafted the following util method to address this comment. WDYT?

    /**
     * Processes inputs from either a template or output from a previous node.
     * @param requiredInputKeys 
     * @param currentNodeInputs Input params and content for this node, from workflow parsing
     * @param outputs WorkflowData content of previous steps.
     * @param previousNodeInputs Input params for this node that come from previous steps
     * @return A map containing the requiredInputKeys with their corresponding values, 
     * and optionalInputKeys with their corresponding values if present. 
     * Throws a {@link FlowFrameworkException} if a required key is not present.
     */
    public static Map<String, Object> getInputsFromPreviousSteps(
        Set<String> requiredInputKeys,
        Set<String> optionalInputKeys,
        WorkflowData currentNodeInputs,
        Map<String, WorkflowData> outputs,
        Map<String, String> previousNodeInputs
    ) {
        // Evaluates steps in order for each key
        // On match, remove from set and put to output map with value
        //
        // Priority 1: specifically named prior step inputs
        // ... parse the previousNodeInputs map and fill in the specified keys
        //
        // Priority 2: inputs specified in template
        // ... fetch from currentNodeInputs
        //
        // Priority 3: other inputs explicit match
        // ... iterate outputs.entryset 
        // ... look for key by itself
        //
        // Priority 4: other inputs substitution
        // ... check if the key is of the form ${{foo.bar}} 
        // ... look for key in outputs.get(foo).get(bar)
        //
        // After iterating is complete, throw exception if requiredInputKeys is not empty
        return <the generated map>;
    }

@owaiskazi19
Copy link
Member

owaiskazi19 commented Dec 1, 2023

@dbwiddis if we go with the route of substitution

 // Priority 4: other inputs substitution
        // ... check if the key is of the form ${{foo.bar}} 
        // ... look for key in outputs.get(foo).get(bar)

It will fail our validation process here. That's why we thought of keeping

"previous_node_inputs": {
                        "workflow_step_1": "tools",
                        "workflow_step_2": "tools"
                    },

and fetch the values from there.

@dbwiddis
Copy link
Member

dbwiddis commented Dec 1, 2023

As I implemented it I realized the comment was wrong. I was referring to the value, not the key.

The validation you mention will match the "priority 1" choice and never get to that step unless where you have "tools" is a substitution.

See PR #234.

@dbwiddis
Copy link
Member

dbwiddis commented Dec 3, 2023

Closing this as it's been implemented in #234 and the bug fix #243

@dbwiddis dbwiddis closed this as completed Dec 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants