Skip to content

Commit

Permalink
Refactor remote writeable entity and store to make it more reusable (o…
Browse files Browse the repository at this point in the history
…pensearch-project#15210)

* Refactor remote writeable entity and store to make it more reusable

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
Bukhtawar authored and akolarkunnu committed Sep 10, 2024
1 parent 7faca61 commit 5623cb3
Show file tree
Hide file tree
Showing 27 changed files with 176 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff;
Expand Down Expand Up @@ -262,12 +263,13 @@ protected void doStart() {
clusterSettings
);

this.remoteRoutingTableDiffStore = new RemoteClusterStateBlobStore<>(
this.remoteRoutingTableDiffStore = new RemoteWriteableEntityBlobStore<>(
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository,
clusterName,
threadPool,
ThreadPool.Names.REMOTE_STATE_READ
ThreadPool.Names.REMOTE_STATE_READ,
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;

/**
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
*/
public abstract class AbstractClusterMetadataWriteableBlobEntity<T> extends RemoteWriteableBlobEntity<T> {

protected final NamedXContentRegistry namedXContentRegistry;

public AbstractClusterMetadataWriteableBlobEntity(
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor);
this.namedXContentRegistry = namedXContentRegistry;
}

public AbstractClusterMetadataWriteableBlobEntity(final String clusterUUID, final Compressor compressor) {
super(clusterUUID, compressor);
this.namedXContentRegistry = null;
}

public abstract UploadedMetadata getUploadedMetadata();

public NamedXContentRegistry getNamedXContentRegistry() {
return namedXContentRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class AbstractRemoteWritableEntityManager implements RemoteWrita
* @return the remote writable entity store for the given entity
* @throws IllegalArgumentException if the entity type is unknown
*/
protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
protected RemoteWritableEntityStore getStore(AbstractClusterMetadataWriteableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
Expand All @@ -49,7 +49,7 @@ protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity en
*/
protected abstract ActionListener<Void> getWrappedWriteListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
);

Expand All @@ -64,21 +64,21 @@ protected abstract ActionListener<Void> getWrappedWriteListener(
*/
protected abstract ActionListener<Object> getWrappedReadListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
);

@Override
public void writeAsync(
String component,
AbstractRemoteWritableBlobEntity entity,
AbstractClusterMetadataWriteableBlobEntity entity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
getStore(entity).writeAsync(entity, getWrappedWriteListener(component, entity, listener));
}

@Override
public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
public void readAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface RemoteWritableEntityManager {
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the read operation fails.
*/
void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener);
void readAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener<RemoteReadResult> listener);

/**
* Performs an asynchronous write operation for the specified component and entity.
Expand All @@ -43,5 +43,5 @@ public interface RemoteWritableEntityManager {
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the write operation fails.
*/
void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<UploadedMetadata> listener);
void writeAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener<UploadedMetadata> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,25 @@

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;

/**
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
* The abstract class which represents a {@link RemoteWriteableEntity} that can be written to a store
* @param <T> the entity to be written
*/
public abstract class AbstractRemoteWritableBlobEntity<T> implements RemoteWriteableEntity<T> {
public abstract class RemoteWriteableBlobEntity<T> implements RemoteWriteableEntity<T> {

protected String blobFileName;

protected String blobName;
private final String clusterUUID;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;
private String[] pathTokens;

public AbstractRemoteWritableBlobEntity(
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
public RemoteWriteableBlobEntity(final String clusterUUID, final Compressor compressor) {
this.clusterUUID = clusterUUID;
this.compressor = compressor;
this.namedXContentRegistry = namedXContentRegistry;
}

public AbstractRemoteWritableBlobEntity(final String clusterUUID, final Compressor compressor) {
this(clusterUUID, compressor, null);
}

public abstract BlobPathParameters getBlobPathParameters();
Expand Down Expand Up @@ -80,16 +67,10 @@ public String clusterUUID() {
return clusterUUID;
}

public abstract UploadedMetadata getUploadedMetadata();

public void setFullBlobName(BlobPath blobPath) {
this.blobName = blobPath.buildAsString() + blobFileName;
}

public NamedXContentRegistry getNamedXContentRegistry() {
return namedXContentRegistry;
}

protected Compressor getCompressor() {
return compressor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,48 @@
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;
package org.opensearch.common.remote;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntity;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.concurrent.ExecutorService;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN;

/**
* Abstract class for a blob type storage
*
* @param <T> The entity which can be uploaded to / downloaded from blob store
* @param <U> The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for T entity.
*/
public class RemoteClusterStateBlobStore<T, U extends AbstractRemoteWritableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {
public class RemoteWriteableEntityBlobStore<T, U extends RemoteWriteableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {

private final BlobStoreTransferService transferService;
private final BlobStoreRepository blobStoreRepository;
private final String clusterName;
private final ExecutorService executorService;
private final String pathToken;

public RemoteClusterStateBlobStore(
public RemoteWriteableEntityBlobStore(
final BlobStoreTransferService blobStoreTransferService,
final BlobStoreRepository blobStoreRepository,
final String clusterName,
final ThreadPool threadPool,
final String executor
final String executor,
final String pathToken
) {
this.transferService = blobStoreTransferService;
this.blobStoreRepository = blobStoreRepository;
this.clusterName = clusterName;
this.executorService = threadPool.executor(executor);
this.pathToken = pathToken;
}

@Override
Expand Down Expand Up @@ -95,21 +94,18 @@ public String getClusterName() {
}

public BlobPath getBlobPathPrefix(String clusterUUID) {
return blobStoreRepository.basePath()
.add(RemoteClusterStateUtils.encodeString(getClusterName()))
.add(CLUSTER_STATE_PATH_TOKEN)
.add(clusterUUID);
return blobStoreRepository.basePath().add(encodeString(getClusterName())).add(pathToken).add(clusterUUID);
}

public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<T> obj) {
public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<T> obj) {
BlobPath blobPath = getBlobPathPrefix(obj.clusterUUID());
for (String token : obj.getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
}

public BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T> obj) {
public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity<T> obj) {
String[] pathTokens = obj.getBlobPathTokens();
BlobPath blobPath = new BlobPath();
if (pathTokens == null || pathTokens.length < 1) {
Expand All @@ -122,4 +118,8 @@ public BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T>
return blobPath;
}

private static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
import org.opensearch.common.remote.AbstractRemoteWritableEntityManager;
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
import org.opensearch.gateway.remote.model.RemoteReadResult;
Expand All @@ -28,7 +28,7 @@
import java.util.Map;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteWriteableEntityBlobStore}
*
* @opensearch.internal
*/
Expand All @@ -47,40 +47,43 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableE
) {
this.remoteWritableEntityStores.put(
RemoteDiscoveryNodes.DISCOVERY_NODES,
new RemoteClusterStateBlobStore<>(
new RemoteWriteableEntityBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
ThreadPool.Names.REMOTE_STATE_READ,
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
)
);
this.remoteWritableEntityStores.put(
RemoteClusterBlocks.CLUSTER_BLOCKS,
new RemoteClusterStateBlobStore<>(
new RemoteWriteableEntityBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
ThreadPool.Names.REMOTE_STATE_READ,
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
)
);
this.remoteWritableEntityStores.put(
RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM,
new RemoteClusterStateBlobStore<>(
new RemoteWriteableEntityBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
ThreadPool.Names.REMOTE_STATE_READ,
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
)
);
}

@Override
protected ActionListener<Void> getWrappedWriteListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return ActionListener.wrap(
Expand All @@ -92,7 +95,7 @@ protected ActionListener<Void> getWrappedWriteListener(
@Override
protected ActionListener<Object> getWrappedReadListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
) {
return ActionListener.wrap(
Expand Down
Loading

0 comments on commit 5623cb3

Please sign in to comment.