Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Block delete model requests if an index uses the model #1745

Merged
merged 2 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-notes/opensearch-knn.release-notes-2.15.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Compatible with OpenSearch 2.15.0
* Add stats for radial search [#1684](https://github.com/opensearch-project/k-NN/pull/1684)
* Support script score when doc value is disabled and fix misusing DISI [#1696](https://github.com/opensearch-project/k-NN/pull/1696)
* Add validation for pq m parameter before training starts [#1713](https://github.com/opensearch-project/k-NN/pull/1713)
* Block delete model requests if an index uses the model [#1722](https://github.com/opensearch-project/k-NN/pull/1722)
### Bug Fixes
* Block commas in model description [#1692](https://github.com/opensearch-project/k-NN/pull/1692)
* Update threshold value after new result is added [#1715](https://github.com/opensearch-project/k-NN/pull/1715)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class KNNConstants {
public static final String VECTOR = "vector";
public static final String K = "k";
public static final String TYPE_KNN_VECTOR = "knn_vector";
public static final String PROPERTIES = "properties";
public static final String METHOD_PARAMETER_EF_SEARCH = "ef_search";
public static final String METHOD_PARAMETER_EF_CONSTRUCTION = "ef_construction";
public static final String METHOD_PARAMETER_M = "m";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
import org.opensearch.core.rest.RestStatus;

/**
* Exception thrown when a model is deleted while it is in the training state. The RestStatus associated with this
* Exception thrown when a model is deleted while it is in the training state or in use by an index. The RestStatus associated with this
* exception should be a {@link RestStatus#CONFLICT} because the request cannot be deleted due to the model being in
* the training state.
* the training state or in use by an index.
*/
public class DeleteModelWhenInTrainStateException extends OpenSearchException {
public class DeleteModelException extends OpenSearchException {
/**
* Constructor
*
* @param msg detailed exception message
* @param args arguments of the message
*/
public DeleteModelWhenInTrainStateException(String msg, Object... args) {
public DeleteModelException(String msg, Object... args) {
super(LoggerMessageFormat.format(msg, args));
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/opensearch/knn/indices/ModelDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.common.exception.DeleteModelWhenInTrainStateException;
import org.opensearch.knn.common.exception.DeleteModelException;
import org.opensearch.knn.index.MethodComponentContext;
import org.opensearch.knn.plugin.transport.DeleteModelResponse;
import org.opensearch.knn.plugin.transport.GetModelResponse;
Expand Down Expand Up @@ -84,7 +84,7 @@
public interface ModelDao {

/**
* Creates model index. It is possible that the 2 threads call this function simulateously. In this case, one
* Creates model index. It is possible that the 2 threads call this function simultaneously. In this case, one
* thread will throw a ResourceAlreadyExistsException. This should be caught and handled.
*
* @param actionListener CreateIndexResponse listener
Expand Down Expand Up @@ -527,7 +527,7 @@ public void delete(String modelId, ActionListener<DeleteModelResponse> listener)
// If model is in Training state, fail delete model request
if (ModelState.TRAINING == getModelResponse.getModel().getModelMetadata().getState()) {
String errorMessage = String.format("Cannot delete model [%s]. Model is still in training", modelId);
listener.onFailure(new DeleteModelWhenInTrainStateException(errorMessage));
listener.onFailure(new DeleteModelException(errorMessage));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import lombok.Value;
import lombok.extern.log4j.Log4j2;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand All @@ -22,16 +23,22 @@
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.knn.common.exception.DeleteModelException;
import org.opensearch.knn.indices.ModelGraveyard;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.stream.Collectors.toList;
import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.common.KNNConstants.PLUGIN_NAME;

/**
Expand All @@ -42,14 +49,16 @@ public class UpdateModelGraveyardTransportAction extends TransportClusterManager
UpdateModelGraveyardRequest,
AcknowledgedResponse> {
private UpdateModelGraveyardExecutor updateModelGraveyardExecutor;
private final IndicesService indicesService;

@Inject
public UpdateModelGraveyardTransportAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService
) {
super(
UpdateModelGraveyardAction.NAME,
Expand All @@ -61,6 +70,7 @@ public UpdateModelGraveyardTransportAction(
indexNameExpressionResolver
);
this.updateModelGraveyardExecutor = new UpdateModelGraveyardExecutor();
this.indicesService = indicesService;
}

@Override
Expand All @@ -82,7 +92,7 @@ protected void clusterManagerOperation(
// ClusterManager updates model graveyard based on request parameters
clusterService.submitStateUpdateTask(
PLUGIN_NAME,
new UpdateModelGraveyardTask(request.getModelId(), request.isRemoveRequest()),
new UpdateModelGraveyardTask(request.getModelId(), request.isRemoveRequest(), indicesService),
ClusterStateTaskConfig.build(Priority.NORMAL),
updateModelGraveyardExecutor,
new ClusterStateTaskListener() {
Expand Down Expand Up @@ -111,6 +121,7 @@ protected ClusterBlockException checkBlock(UpdateModelGraveyardRequest request,
private static class UpdateModelGraveyardTask {
String modelId;
boolean isRemoveRequest;
IndicesService indicesService;
}

/**
Expand All @@ -123,7 +134,8 @@ private static class UpdateModelGraveyardExecutor implements ClusterStateTaskExe
* @return Represents the result of a batched execution of cluster state update tasks (UpdateModelGraveyardTasks)
*/
@Override
public ClusterTasksResult<UpdateModelGraveyardTask> execute(ClusterState clusterState, List<UpdateModelGraveyardTask> taskList) {
public ClusterTasksResult<UpdateModelGraveyardTask> execute(ClusterState clusterState, List<UpdateModelGraveyardTask> taskList)
throws IOException {

// Check if the objects are not null and throw a customized NullPointerException
Objects.requireNonNull(clusterState, "Cluster state must not be null");
Expand All @@ -146,6 +158,17 @@ public ClusterTasksResult<UpdateModelGraveyardTask> execute(ClusterState cluster
modelGraveyard.remove(task.getModelId());
continue;
}
List<String> indicesUsingModel = getIndicesUsingModel(clusterState, task);
// Throw exception if any indices are using the model
if (!indicesUsingModel.isEmpty()) {
throw new DeleteModelException(
String.format(
"Cannot delete model [%s]. Model is in use by the following indices %s, which must be deleted first.",
task.getModelId(),
indicesUsingModel
)
);
}
modelGraveyard.add(task.getModelId());
}

Expand All @@ -155,5 +178,50 @@ public ClusterTasksResult<UpdateModelGraveyardTask> execute(ClusterState cluster
ClusterState updatedClusterState = ClusterState.builder(clusterState).metadata(metaDataBuilder).build();
return new ClusterTasksResult.Builder<UpdateModelGraveyardTask>().successes(taskList).build(updatedClusterState);
}

private List<String> getIndicesUsingModel(ClusterState clusterState, UpdateModelGraveyardTask task) throws IOException {
Map<String, IndexMetadata> indices = clusterState.metadata().indices();
String[] knnIndicesList = indices.values()
.stream()
.filter(metadata -> "true".equals(metadata.getSettings().get("index.knn", "false")))
.map(metadata -> metadata.getIndex().getName())
.toArray(String[]::new);
if (knnIndicesList.length == 0) {
return Collections.emptyList();
}

return clusterState.metadata()
.findMappings(knnIndicesList, task.getIndicesService().getFieldFilter())
.entrySet()
.stream()
.filter(entry -> entry.getValue() != null)
.filter(entry -> {
Object properties = entry.getValue().getSourceAsMap().get("properties");
if (properties == null || properties instanceof Map == false) {
return false;
}
Map propertiesMap = (Map<String, Object>) properties;
return propertiesMapContainsModel(propertiesMap, task.getModelId());
})
.map(Map.Entry::getKey)
.collect(toList());
}

private boolean propertiesMapContainsModel(Map<String, Object> propertiesMap, String modelId) {
for (Map.Entry<String, Object> fieldsEntry : propertiesMap.entrySet()) {
if (fieldsEntry.getKey() != null && fieldsEntry.getValue() instanceof Map) {
Map<String, Object> innerMap = (Map<String, Object>) fieldsEntry.getValue();
for (Map.Entry<String, Object> innerEntry : innerMap.entrySet()) {
// If model is in use, fail delete model request
if (innerEntry.getKey().equals(MODEL_ID)
&& innerEntry.getValue() instanceof String
&& innerEntry.getValue().equals(modelId)) {
return true;
}
}
}
}
return false;
}
}
}
40 changes: 36 additions & 4 deletions src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.memory.NativeMemoryLoadStrategy;
import org.opensearch.knn.indices.Model;
import org.opensearch.knn.indices.ModelDao;
import org.opensearch.knn.indices.ModelMetadata;
import org.opensearch.knn.indices.ModelState;
Expand All @@ -36,13 +37,12 @@
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ExecutionException;

import static org.mockito.Mockito.when;
import static org.opensearch.knn.common.KNNConstants.*;
import static org.opensearch.knn.common.KNNConstants.MODEL_INDEX_NAME;

public class KNNSingleNodeTestCase extends OpenSearchSingleNodeTestCase {
@Override
Expand Down Expand Up @@ -181,6 +181,38 @@ protected void addDoc(String index, String docId, String fieldName, String dummy
assertEquals(response.status(), RestStatus.CREATED);
}

/**
* Index a new model
*/
protected void addDoc(Model model) throws IOException, ExecutionException, InterruptedException {
ModelMetadata modelMetadata = model.getModelMetadata();

XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field(MODEL_ID, model.getModelID())
.field(KNN_ENGINE, modelMetadata.getKnnEngine().getName())
.field(METHOD_PARAMETER_SPACE_TYPE, modelMetadata.getSpaceType().getValue())
.field(DIMENSION, modelMetadata.getDimension())
.field(MODEL_STATE, modelMetadata.getState().getName())
.field(MODEL_TIMESTAMP, modelMetadata.getTimestamp().toString())
.field(MODEL_DESCRIPTION, modelMetadata.getDescription())
.field(MODEL_ERROR, modelMetadata.getError());

if (model.getModelBlob() != null) {
builder.field(MODEL_BLOB_PARAMETER, Base64.getEncoder().encodeToString(model.getModelBlob()));
}

builder.endObject();

IndexRequest indexRequest = new IndexRequest().index(MODEL_INDEX_NAME)
.id(model.getModelID())
.source(builder)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

IndexResponse response = client().index(indexRequest).get();
assertTrue(response.status() == RestStatus.CREATED || response.status() == RestStatus.OK);
}

/**
* Run a search against a k-NN index
*/
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/opensearch/knn/index/FaissIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,7 @@ public void testSharedIndexState_whenOneIndexDeleted_thenSecondIndexIsStillSearc
// will give 15 second buffer from that
Thread.sleep(1000 * 45);
validateSearchWorkflow(secondIndexName, testData.queries, 10);
deleteKNNIndex(secondIndexName);
deleteModel(modelId);
}

Expand Down
Loading
Loading