Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Asynchronous Offline Batch Inference and Ingestion to OpenSearch #2891

Open
Zhangxunmt opened this issue Sep 4, 2024 · 1 comment
Open
Assignees
Labels
RFC Request For Comments from the OpenSearch Community

Comments

@Zhangxunmt
Copy link
Collaborator

Zhangxunmt commented Sep 4, 2024

Problem Statement

Nowadays remote model servers like AWS SageMaker, BedRock, or OpenAI, Cohere, etc all support batch predict APIs, which allow users to send large amount of synchronous requests in a file like S3 and produce the results asynchronously into a file as the output. Different platform use different terminology, e.g. SageMaker uses "batch transform" and bedrock uses "batch inference", but they are all the same in the sense of accepting the requests in the batch mode and process them asynchronously. While some uses require to send synchronous requests, but there are many cases where requests do not need an immediate response or certain rate limits prevent you from executing a large number of queries quickly. For example, in this case, the customer experiences throttling issues by the remote model and the data ingestion cannot finish.

In this RFC, "batch inference" is used as the terminology to represent the "batch" operations in all remote servers. The benefits of utilizing batch inference can be summaries but not limited to 1) Better cost efficiency: 50% cost discount compared to synchronous APIs (number may vary for different servers) and 2) Higher rate limits compared to the synchronous APIs.

For OpenSearch users, the most common use case of batch inference is to ingest embedding data into the K-NN index for vector search. However, the entire Ingest API in OpenSearch was designed for stream data processing. So all of the data processing capabilities supported by our ingestion processors aren’t optimized for batch ingestion and they do not accept files as inputs for ingestion.

This RFC focus on the these two improvements:

  1. Enhance our AI connector framework in order to support these batch APIs
  2. Provide a new ingest API to ingest data into the OpenSearch from files like S3, openAI files, etc which are the output of these batch APIs.

Proposed Solution

Screenshot 2024-09-04 at 15 19 21

Phase 1 : Add new “batch_predict” action type in AI Connector framework (released in OpenSearch 2.16)
Add a new action type “batch_predict” in the connector blueprint. This action type is to run batch prediction using the connector in ML-Commons. An example of the connector for sageMaker is given below. Different model frameworks have different configuration of the connector, and we will define multiple ConnectorExecutors to easily extend batch prediction for other new AI models.

# Example API requestPOST /_plugins/_ml/connectors/_create
POST /_plugins/_ml/connectors/_create
{
  "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2",
  "version": "1",
  "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2",
  "protocol": "aws_sigv4",
  "credential": {
    "access_key": "<your access key>",
    "secret_key": "<your secret key>",
    "session_token": "<your session token>"
  },
  "parameters": {
    "region": "us-east-1",
    "service_name": "sagemaker",
    "DataProcessing": {
        "InputFilter": "$.content",
        "JoinSource": "Input",
        "OutputFilter": "$"
    },
    "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines",
    "TransformInput": { 
      "ContentType": "application/json",
      "DataSource": { 
         "S3DataSource": { 
            "S3DataType": "S3Prefix",
            "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json"
         }
      },
      "SplitType": "Line"
    },
    "TransformJobName": "SM-offline-batch-transform-07-12-13-30",
    "TransformOutput": { 
      "AssembleWith": "Line",
      "Accept": "application/json",
      "S3OutputPath": "s3://offlinebatch/output"
   },
   "TransformResources": { 
      "InstanceCount": 1,
      "InstanceType": "ml.c5.xlarge"
   },
   "BatchStrategy": "SingleRecord"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "headers": {
        "content-type": "application/json"
      },
      "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations",
      "request_body": "${parameters.input}",
      "pre_process_function": "connector.pre_process.default.embedding",
      "post_process_function": "connector.post_process.default.embedding"
    },
    {
        "action_type": "batch_predict",
        "method": "POST",
        "headers": {
            "content-type": "application/json"
        },
        "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob",
        "request_body": "{ \"BatchStrategy\": \"${parameters.BatchStrategy}\", \"ModelName\": \"${parameters.ModelName}\", \"DataProcessing\" : ${parameters.DataProcessing}, \"TransformInput\": ${parameters.TransformInput}, \"TransformJobName\" : \"${parameters.TransformJobName}\", \"TransformOutput\" : ${parameters.TransformOutput}, \"TransformResources\" : ${parameters.TransformResources}}"
    }
  ]
}

After the connector new action type is added, add a new API for batch inference jobs. To be consistent with the current “Predict” API in ML-Commons, the new API is named batch-prediction-job. This API maps to the “create_transform_job” API in SageMaker and “model-invocation-job” API in Bedrock. Example request of the new batch inference API.

POST /_plugins/_ml/models/dBK3t5ABrxVhHgFYhg7Q/_batch_predict
{
  "parameters": {
    "TransformJobName": "SM-offline-batch-transform-07-15-11-30"
  }
}
# Response
{
   "job_arn": "arn:aws:sagemaker:us-east-1:802041417063:transform-job/SM-offline-batch-transform"
}
{
    'task_id': 'xxxxxxxx',
    "state": "[InProgress]
}

Phase 2 : Add offline batch inference task management and new batch ingestion API.
Add the task management for the batch transform jobs which maps to the “list-prediction-jobs”, “describe-prediction-job” and “ cancel-prediction-job” etc APIs from remote model servers.

Add a new API for offline batch ingestion jobs. This API reads the batch inference results in the phase 1 and other input files, and bulk ingest the vectors into KNN index. This API reads the vector data from the S3 file, and organize the bulk ingestion request based on the field mapping. We will call the bulk ingestion to the KNN index asynchronously so a task id will be returned.

POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-nlp-index",
  "field_map": {
    "chapter": "$.content[0]",
    "title": "$.content[1]",
    "chapter_embedding": "$.SageMakerOutput[0]"
     "title_embedding": "$.SageMakerOutput[1]",
    "ingest_fields": ["$.id"]
  },
  "credential": {
    "region": "us-east-1",
    "access_key": "<your access key>",
    "secret_key": "<your secret key>",
    "session_token": "<your session token>"
  },
  "data_source": {
    "type": "s3",
    "source": ["s3://offlinebatch/output/sagemaker_djl_batch_input.json.out"]
  }
}

# expected response
{
  "task_id": "cbsPlpEBMHcagzGbOQOx",
  "task_type": "BATCH_INGEST",
  "status": "CREATED"
}

Phase 3: Integrate the offline batch inference and ingestion engine in the data prepper
Data Prepper is a server-side data collector capable of filtering, enriching, transforming, normalizing, and aggregating data for downstream analytics and visualization. Data Prepper lets users build custom pipelines to improve the operational view of applications. We can integrate the AI connector in the data prepper for customer to setup custom pipelines to run offline batch ingestion.

A sample data Prepper config is presented below. Integration with data prepper can be implemented after phase 1 and 2. That usually means users have the option to use data Prepper to setup offline batch ingestion without directly calling any Plugins.

sample-pipeline:
  workers: 4 #Number of workers
  delay: 100 # in milliseconds, how often the workers should run
  source:
    file:
        path: <path/to/input-file> -> input file S3
  buffer:
    bounded_blocking:
      buffer_size: 1024 # max number of events the buffer will accept
      batch_size: 256 # max number of events the buffer will drain for each read
  processor:
    - string_converter:
       upper_case: true
    - text_embedding: 
       model_id: <xxxxx> -> the model in ml-commons for batch inference
  sink:
    - file:
       path: <path/to/output-file> -> ourput file S3

In Scope

  • Offline Batch Inference is released in OpenSearch 2.16
  • Offline Batch Ingestion is scheduled to be released in OpenSearch 2.17
  • Offline Batch Inference and Ingestion Task/Job management is scheduled to be released in OpenSearch 2.17
  • The Integration of Offline Batch Inference with OSI is in the planning

Out of Scope

  • Integrate the offline batch engine into the OpenSearch Ingest processor and pipeline - Not in planning
  • Integrate the offline batch engine with other remote ingestion engine like Apache Airflow, Apache Spark (Flint). These external engines can be used for offline batch. This will be discussed in a separate RFC if prioritized.

Real Case Example using a public ML Model Service

Given an input file for batch inference using the OpenAI embedding model "text-embedding-ada-002":

{"custom_id": "request-1", "method": "POST", "url": "/v1/embeddings", "body": {"model": "text-embedding-ada-002", "input": [ "What is the meaning of life?", "The food was delicious and the waiter..."]}}
{"custom_id": "request-2", "method": "POST", "url": "/v1/embeddings", "body": {"model": "text-embedding-ada-002", "input": [ "What is the meaning of work?", "The travel was fantastic and the view..."]}}
{"custom_id": "request-3", "method": "POST", "url": "/v1/embeddings", "body": {"model": "text-embedding-ada-002", "input": [ "What is the meaning of friend?", "The old friend was far away and the time..."]}}
... ...

Use the connector below to kick off the batch inference through OpenAI batch API

POST /_plugins/_ml/connectors/_create
{
  "name": "OpenAI Embedding model",
  "description": "OpenAI embedding model for offline batch",
  "version": "1",
  "protocol": "http",
  "parameters": {
    "model": "text-embedding-ada-002",
    "input_file_id": "<your input file id in OpenAI>",
    "endpoint": "/v1/embeddings"
  },
  "credential": {
    "openAI_key": "<your key>"
  },
  "actions": [
    {
         ...
    },
    {
      "action_type": "batch_predict",
      "method": "POST",
      "url": "https://api.openai.com/v1/batches",
      "headers": {"Authorization": "Bearer ${credential.openAI_key}"},
      "request_body": "{ \"input_file_id\": \"${parameters.input_file_id}\", \"endpoint\": \"${parameters.endpoint}\", \"completion_window\": \"24h\" }"
    },
    {
      "action_type": "batch_predict_status",
      "method": "GET",
      "url": "https://api.openai.com/v1/batches/${parameters.id}",
      "headers": {
        "Authorization": "Bearer ${credential.openAI_key}"
      }
    }
  ]
}

# Invoke the model with "batch_predict" action type
POST /_plugins/_ml/models/<your model associated with the connector>/_batch_predict
{
  "parameters": {
    "input_file_id": "<your file id>"
  }
}

# Response
{
 "task_id": "KYZSv5EBqL2d0mFvs80C",
 "status": "CREATED"
}

Once you receive the task_id, you can check the task through the Task API to check the remote job status in OpenAI.

// Get batch job status using get task API
GET /_plugins/_ml/tasks/KYZSv5EBqL2d0mFvs80C

// Response
{
 "model_id": "JYZRv5EBqL2d0mFvKs1E",
 "task_type": "BATCH_PREDICTION",
 "function_name": "REMOTE",
 "state": "COMPLETED",
 "input_type": "REMOTE",
 "worker_node": [
 "Ee5OCIq0RAy05hqQsNI1rg"
 ],
 "create_time": 1725491751455,
 "last_update_time": 1725491751455,
 "is_async": false,
 "remote_job": {
 "cancelled_at": null,
 "metadata": null,
 "request_counts": {
 "total": 3.0,
 "completed": 3.0,
 "failed": 0.0
 },
 "input_file_id": "file-5gXEtbKjHnYrKrdtv69IeRN2",
 "output_file_id": "file-Wux0Pk80dhkxi98Z5iKNjB4n",
 "error_file_id": null,
 "created_at": 1.725491753E9,
 "in_progress_at": 1.725491753E9,
 "expired_at": null,
 "finalizing_at": 1.725491757E9,
 "completed_at": 1.725491758E9,
 "endpoint": "/v1/embeddings",
 "expires_at": 1.725578153E9,
 "cancelling_at": null,
 "completion_window": "24h",
 "id": "batch_yz4YzfPfajDgcWFk4CqfxYox",
 "failed_at": null,
 "errors": null,
 "object": "batch",
 "status": "completed"
 }
}

After the job status is showing "completed", you can double check your output in the file id "file-Wux0Pk80dhkxi98Z5iKNjB4n" in this example. It's a file in the jsonl format that each line in the file represents a single inference result for a request in the input file.
The output file content is:

{"id": "batch_req_ITKQn29igorXCAGp6wzYs5IS", "custom_id": "request-1", "response": {"status_code": 200, "request_id": "10845755592510080d13054c3776aef4", "body": {"object": "list", "data": [{"object": "embedding", "index": 0, "embedding": [0.0044326545, ... ...]}, {"object": "embedding", "index": 1, "embedding": [0.002297497, ... ... ]}], "model": "text-embedding-ada-002", "usage": {"prompt_tokens": 15, "total_tokens": 15}}}, "error": null}
...
...

To ingest your embedding data and your other fields into OpenSearch, you can use the batch ingestion API given below. The field map is the actual fields you want to ingest in your KNN cluster. In the field map, the key is the field name, and the value is the JsonPath to find your data in the sources files. For example, source[1].$.body.input[1] means you want to use the JsonPath $.body.input[1] to fetch the element body.input[1] in the second file in the "source" array of the batch request.

POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-nlp-index-openai",
  "field_map": {
    "question": "source[1].$.body.input[0]",
    "answer": "source[1].$.body.input[1]",
    "question_embedding":"source[0].$.response.body.data[0].embedding",
    "answer_embedding":"source[0].$.response.body.data[1].embedding",
    "_id": ["source[0].$.custom_id", "source[1].$.custom_id"],
  },
  "ingest_fields": ["source[2].$.custom_field1", "source[2].$.custom_field2", "source[2].$.custom_field3" ], 
  "credential": {
    "openAI_key": "<your openAI key>"
  },
  "data_source": {
    "type": "openAI",
    "source": ["<your batch inference output file id>", "<your batch inference input file id>", "<your file id that contains other fields data>"]
  }
}

# expected response
{
  "task_id": "b5Yym5EBdxfqV0JWKrT6",
  "task_type": "BATCH_INGEST",
  "status": "CREATED"
}

# Once you obtain the task_id, use it to check the job status
GET /_plugins/_ml/tasks/cbsPlpEBMHcagzGbOQOx

# expected response, "state": "COMPLETED" means this task is successful
{
  "task_type": "BATCH_INGEST",
  "state": "COMPLETED",
  "create_time": 1724885641978,
  "last_update_time": 1724885642658,
  "is_async": true
}

Request for Comments

This Offline batch engine is implemented in the machine learning space. It's designed to incorporate with the remote Model Batch APIs to facilitate the batch inference of the LLM models and ingest ML model output data into the OpenSearch KNN index for vector search. However, the batch ingestion API can also be used for general ingestion purpose when the inputs are in a file system. So please not limit your thoughts only in the machine learning when you review this feature.

Please leave your suggestions and concerns in this RFC and your valuable inputs are appreciated.

@Zhangxunmt Zhangxunmt added the RFC Request For Comments from the OpenSearch Community label Sep 4, 2024
@Zhangxunmt Zhangxunmt self-assigned this Sep 4, 2024
@dylan-tong-aws
Copy link

dylan-tong-aws commented Sep 11, 2024

@Zhangxunmt, I think it's important for us to continue supporting Ingest API compatibility. It's been in use since 2016, and we have major feature sets like neural search that are dependent on it.

Today, users can use the Ingest API to define an ingest pipeline.

PUT /_ingest/pipeline/my_pipeline
{
"description": "pipeline with inference processing",
"processors": [
{
{
"ml_inference": {
"model_id": "<model_id>",
"function_name": "<function_name>",
"full_response_path": "<full_response_path>",
"model_config":{
"<model_config_field>": "<config_value>"
},
"model_input": "<model_input>",
"input_map": [
{
"<model_input_field>": "<document_field>"
}
],
"output_map": [
{
"<new_document_field>": "<model_output_field>"
}
],
"override": ""
}
}]}

They can run a bulk streaming ingestion job like my-index/_bulk?pipeline=my-pipeline .

The user should be able to execute this pipeline in batch mode using a command like my-index/_batch?pipeline=my-pipeline source="s3://..." temp_stage="s3://..." (optionally override default)

Batch processing support for each ingestion processor can be incrementally added.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
RFC Request For Comments from the OpenSearch Community
Projects
Status: In Progress
Status: In Progress
Development

No branches or pull requests

2 participants