Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/agent_framework] Add Get Workflow API to retrieve a stored template by workflow id #273

Merged
merged 6 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStateAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
Expand Down Expand Up @@ -125,6 +128,7 @@ public List<RestHandler> getRestHandlers(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
}
Expand All @@ -135,6 +139,7 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowRequest;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
Expand All @@ -34,7 +34,7 @@
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to get a workflow status
* Rest Action to facilitate requests to get a stored template
*/
public class RestGetWorkflowAction extends BaseRestHandler {

Expand All @@ -55,6 +55,11 @@ public String getName() {
return GET_WORKFLOW_ACTION;
}

@Override
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
public List<Route> routes() {
return ImmutableList.of(new Route(RestRequest.Method.GET, String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, WORKFLOW_ID)));
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

Expand All @@ -68,17 +73,16 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request

// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("No request body present", RestStatus.BAD_REQUEST);
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
}
// Validate params
String workflowId = request.param(WORKFLOW_ID);
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}

boolean all = request.paramAsBoolean("all", false);
GetWorkflowRequest getWorkflowRequest = new GetWorkflowRequest(workflowId, all);
return channel -> client.execute(GetWorkflowAction.INSTANCE, getWorkflowRequest, ActionListener.wrap(response -> {
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
return channel -> client.execute(GetWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
Expand All @@ -88,7 +92,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));

} catch (IOException e) {
logger.error("Failed to send back provision workflow exception", e);
logger.error("Failed to send back get template exception", e);
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage()));
}
}));
Expand All @@ -99,12 +103,4 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
);
}
}

@Override
public List<Route> routes() {
return ImmutableList.of(
// Provision workflow from indexed use case template
new Route(RestRequest.Method.GET, String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, WORKFLOW_ID, "_status"))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to get a workflow status
*/
public class RestGetWorkflowStateAction extends BaseRestHandler {

private static final String GET_WORKFLOW_STATE_ACTION = "get_workflow_state";
private static final Logger logger = LogManager.getLogger(RestGetWorkflowStateAction.class);
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestGetWorkflowStateAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestGetWorkflowStateAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
return GET_WORKFLOW_STATE_ACTION;
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}

// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("No request body present", RestStatus.BAD_REQUEST);
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
}
// Validate params
String workflowId = request.param(WORKFLOW_ID);
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}

boolean all = request.paramAsBoolean("all", false);
GetWorkflowStateRequest getWorkflowRequest = new GetWorkflowStateRequest(workflowId, all);
return channel -> client.execute(GetWorkflowStateAction.INSTANCE, getWorkflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));

} catch (IOException e) {
logger.error("Failed to send back provision workflow exception", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage()));
}
}));

} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}

@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(RestRequest.Method.GET, String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, WORKFLOW_ID, "_status"))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,55 +13,52 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.model.Template;

import java.io.IOException;

/**
* Transport Response from getting a workflow status
* Transport Response from getting a template
*/
public class GetWorkflowResponse extends ActionResponse implements ToXContentObject {

/** The workflow state */
public WorkflowState workflowState;
/** Flag to indicate if the entire state should be returned */
public boolean allStatus;
/** The template */
private Template template;

/**
* Instantiates a new GetWorkflowResponse from an input stream
* @param in the input stream to read from
* @throws IOException if the workflowId cannot be read from the input stream
* @throws IOException if the template json cannot be read from the input stream
*/
public GetWorkflowResponse(StreamInput in) throws IOException {
super(in);
workflowState = new WorkflowState(in);
allStatus = in.readBoolean();
this.template = Template.parse(in.readString());
}

/**
* Instatiates a new GetWorkflowResponse from an input stream
* @param workflowState the workflow state object
* @param allStatus whether to return all fields in state index
* Instantiates a new GetWorkflowResponse
* @param template the template
*/
public GetWorkflowResponse(WorkflowState workflowState, boolean allStatus) {
if (allStatus) {
this.workflowState = workflowState;
} else {
this.workflowState = new WorkflowState.Builder().workflowId(workflowState.getWorkflowId())
.error(workflowState.getError())
.state(workflowState.getState())
.resourcesCreated(workflowState.resourcesCreated())
.build();
}
public GetWorkflowResponse(Template template) {
this.template = template;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
workflowState.writeTo(out);
out.writeString(template.toJson());
}

@Override
public XContentBuilder toXContent(XContentBuilder xContentBuilder, Params params) throws IOException {
return workflowState.toXContent(xContentBuilder, params);
return this.template.toXContent(xContentBuilder, params);
}

/**
* Gets the template
* @return the template
*/
public Template getTemplate() {
return this.template;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestGetWorkflowStateAction
*/
public class GetWorkflowStateAction extends ActionType<GetWorkflowStateResponse> {
/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow_state/get";
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
/** An instance of this action */
public static final GetWorkflowStateAction INSTANCE = new GetWorkflowStateAction();

private GetWorkflowStateAction() {
super(NAME, GetWorkflowStateResponse::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import java.io.IOException;

/**
* Transport Request to get a workflow or workflow status
* Transport Request to get a workflow status
*/
public class GetWorkflowRequest extends ActionRequest {
public class GetWorkflowStateRequest extends ActionRequest {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved

/**
* The documentId of the workflow entry within the Global Context index
Expand All @@ -33,21 +33,21 @@ public class GetWorkflowRequest extends ActionRequest {
private boolean all;

/**
* Instantiates a new GetWorkflowRequest
* Instantiates a new GetWorkflowStateRequest
* @param workflowId the documentId of the workflow
* @param all whether the get request is looking for all fields in status
*/
public GetWorkflowRequest(@Nullable String workflowId, boolean all) {
public GetWorkflowStateRequest(@Nullable String workflowId, boolean all) {
this.workflowId = workflowId;
this.all = all;
}

/**
* Instantiates a new GetWorkflowRequest request
* Instantiates a new GetWorkflowStateRequest request
* @param in The input stream to read from
* @throws IOException If the stream cannot be read properly
*/
public GetWorkflowRequest(StreamInput in) throws IOException {
public GetWorkflowStateRequest(StreamInput in) throws IOException {
super(in);
this.workflowId = in.readString();
this.all = in.readBoolean();
Expand Down
Loading