Skip to content

Commit

Permalink
Implemented throttling for max workflows to be created (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
owaiskazi19 authored Nov 13, 2023
1 parent 1de0ca5 commit 56ccb1d
Show file tree
Hide file tree
Showing 17 changed files with 400 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@

import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
*/
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;
private ClusterService clusterService;

/**
* Instantiate this plugin.
Expand All @@ -84,6 +88,7 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Settings settings = environment.settings();
this.clusterService = clusterService;
flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings);

MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
Expand All @@ -106,7 +111,7 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
Expand All @@ -123,7 +128,7 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = ImmutableList.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED);
List<Setting<?>> settings = ImmutableList.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT);
return settings;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,15 @@
package org.opensearch.flowframework.common;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Controls enabling or disabling features of this plugin
*/
public class FlowFrameworkFeatureEnabledSetting {

/** This setting enables/disables the Flow Framework REST API */
public static final Setting<Boolean> FLOW_FRAMEWORK_ENABLED = Setting.boolSetting(
"plugins.flow_framework.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile Boolean isFlowFrameworkEnabled;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

/** The common settings of flow framework */
public class FlowFrameworkSettings {

private FlowFrameworkSettings() {}

/** The upper limit of max workflows that can be created */
public static final int MAX_WORKFLOWS_LIMIT = 10000;

/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOWS = Setting.intSetting(
"plugins.flow_framework.max_workflows",
1000,
0,
MAX_WORKFLOWS_LIMIT,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/** This setting sets the timeout for the request */
public static final Setting<TimeValue> WORKFLOW_REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.flow_framework.request_timeout",
TimeValue.timeValueSeconds(10),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/** This setting enables/disables the Flow Framework REST API */
public static final Setting<Boolean> FLOW_FRAMEWORK_ENABLED = Setting.boolSetting(
"plugins.flow_framework.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,32 +373,30 @@ public void updateTemplateInGlobalContext(String documentId, Template template,

/**
* Updates a document in the workflow state index
* @param indexName the index that we will be updating a document of.
* @param documentId the document ID
* @param updatedFields the fields to update the global state index with
* @param listener action listener
*/
public void updateFlowFrameworkSystemIndexDoc(
String indexName,
String documentId,
Map<String, Object> updatedFields,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(indexName)) {
String exceptionMessage = "Failed to update document for given workflow due to missing " + indexName + " index";
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String exceptionMessage = "Failed to update document for given workflow due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(exceptionMessage);
listener.onFailure(new FlowFrameworkException(exceptionMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(indexName, documentId);
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId);
Map<String, Object> updatedContent = new HashMap<>();
updatedContent.putAll(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
String errorMessage = "Failed to update " + indexName + " entry : " + documentId;
String errorMessage = "Failed to update " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,23 @@
import java.util.List;

import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.util.RestHandlerUtils.getSourceContext;

/**
* Abstract class to handle search request.
*/
public abstract class AbstractSearchWorkflowAction<T extends ToXContentObject> extends BaseRestHandler {

/** Url Paths of the routes*/
protected final List<String> urlPaths;
/** Index on search operation needs to be performed*/
protected final String index;
/** Search class name*/
protected final Class<T> clazz;
/** Search action type*/
protected final ActionType<SearchResponse> actionType;
/** Settings to enable FlowFramework API*/
protected final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.rest.BaseRestHandler;

import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
* Abstract action for the rest actions
*/
public abstract class AbstractWorkflowAction extends BaseRestHandler {
/** Timeout for the request*/
protected volatile TimeValue requestTimeout;
/** Max workflows that can be created*/
protected volatile Integer maxWorkflows;

/**
* Instantiates a new AbstractWorkflowAction
*
* @param settings Environment settings
* @param clusterService clusterService
*/
public AbstractWorkflowAction(Settings settings, ClusterService clusterService) {
this.requestTimeout = WORKFLOW_REQUEST_TIMEOUT.get(settings);
this.maxWorkflows = MAX_WORKFLOWS.get(settings);

clusterService.getClusterSettings().addSettingsUpdateConsumer(WORKFLOW_REQUEST_TIMEOUT, it -> requestTimeout = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOWS, it -> maxWorkflows = it);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -21,7 +23,6 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

Expand All @@ -32,12 +33,12 @@
import static org.opensearch.flowframework.common.CommonValue.DRY_RUN;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to create and update a use case template
*/
public class RestCreateWorkflowAction extends BaseRestHandler {
public class RestCreateWorkflowAction extends AbstractWorkflowAction {

private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";
Expand All @@ -48,8 +49,15 @@ public class RestCreateWorkflowAction extends BaseRestHandler {
* Instantiates a new RestCreateWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
* @param settings Environment settings
* @param clusterService clusterService
*/
public RestCreateWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
public RestCreateWorkflowAction(
FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting,
Settings settings,
ClusterService clusterService
) {
super(settings, clusterService);
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

Expand Down Expand Up @@ -85,7 +93,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
Template template = Template.parse(request.content().utf8ToString());
boolean dryRun = request.paramAsBoolean(DRY_RUN, false);

WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun, requestTimeout, maxWorkflows);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest action to facilitate requests to provision a workflow from an inline defined or stored use case template
Expand Down
Loading

0 comments on commit 56ccb1d

Please sign in to comment.