Skip to content

Commit

Permalink
Fix build failures.
Browse files Browse the repository at this point in the history
Signed-off-by: Shailendra Singh <singhlhs@amazon.com>
  • Loading branch information
Shailendra Singh committed Jul 17, 2024
1 parent 0365ec6 commit c19f565
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 602 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.core.compress.Compressor;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
Expand All @@ -35,6 +34,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteStateTransferException;
Expand Down Expand Up @@ -319,7 +319,11 @@ private void uploadIndexRoutingDiff(
) {
BytesReference bytesInput = null;
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
RemoteIndexRoutingTableDiff remoteIndexRoutingTableDiff = new RemoteIndexRoutingTableDiff(indexRoutingTableDiff, clusterUUID, compressor);
RemoteIndexRoutingTableDiff remoteIndexRoutingTableDiff = new RemoteIndexRoutingTableDiff(
indexRoutingTableDiff,
clusterUUID,
compressor
);
remoteIndexRoutingTableDiff.writeTo(streamOutput);
bytesInput = streamOutput.bytes();
} catch (IOException e) {
Expand Down Expand Up @@ -414,9 +418,10 @@ private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, I
}
}

//@Override
// @Override
public CheckedRunnable<IOException> getAsyncIndexRoutingTableDiffReadAction(
String clusterUUID, String uploadedFilename,
String clusterUUID,
String uploadedFilename,
LatchedActionListener<Map<String, Diff<IndexRoutingTable>>> latchedActionListener
) {
int idx = uploadedFilename.lastIndexOf("/");
Expand All @@ -428,10 +433,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingTableDiffReadAction(
clusterUUID,
blobContainer,
blobFileName,
ActionListener.wrap(
response -> latchedActionListener.onResponse(response.getDiffs()),
latchedActionListener::onFailure
)
ActionListener.wrap(response -> latchedActionListener.onResponse(response.getDiffs()), latchedActionListener::onFailure)
);
}

Expand All @@ -452,7 +454,10 @@ private RemoteIndexRoutingTableDiff readDiff(String clusterUUID, BlobContainer b
try {
return new RemoteIndexRoutingTableDiff(blobContainer.readBlob(path), clusterUUID, compressor);
} catch (IOException | AssertionError e) {
logger.error(() -> new ParameterizedMessage("RoutingTableDiff read failed for path {}", blobContainer.path().buildAsString() + path), e);
logger.error(
() -> new ParameterizedMessage("RoutingTableDiff read failed for path {}", blobContainer.path().buildAsString() + path),
e
);
throw new RemoteStateTransferException("Failed to read RemoteRoutingTableDiff from Manifest with error ", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
RoutingTable before,
RoutingTable after
) {
return DiffableUtils.diff(Map.of(), Map.of(), DiffableUtils.getStringKeySerializer(), CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER);
return DiffableUtils.diff(
Map.of(),
Map.of(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER
);
}

@Override
Expand All @@ -58,7 +63,7 @@ public CheckedRunnable<IOException> getIndexRoutingDiffAsyncAction(
Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiff,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
){
) {
// noop
return () -> {};
}
Expand All @@ -84,9 +89,9 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
}

public CheckedRunnable<IOException> getAsyncIndexRoutingTableDiffReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<Map<String, Diff<IndexRoutingTable>>> latchedActionListener
String clusterUUID,
String uploadedFilename,
LatchedActionListener<Map<String, Diff<IndexRoutingTable>>> latchedActionListener
) {
// noop
return () -> {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ public AbstractRemoteWritableBlobEntity(
this.namedXContentRegistry = namedXContentRegistry;
}

public AbstractRemoteWritableBlobEntity(
final String clusterUUID,
final Compressor compressor
) {
public AbstractRemoteWritableBlobEntity(final String clusterUUID, final Compressor compressor) {
this(clusterUUID, compressor, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import static org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer.getAbstractInstance;
import static org.opensearch.cluster.DiffableUtils.getStringKeySerializer;
import static org.opensearch.cluster.routing.remote.RemoteRoutingTableService.CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
Expand Down Expand Up @@ -77,7 +76,11 @@ public class ClusterStateDiffManifest implements ToXContentFragment, Writeable {
private final List<String> clusterStateCustomUpdated;
private final List<String> clusterStateCustomDeleted;

public ClusterStateDiffManifest(ClusterState state, ClusterState previousState, DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableIncrementalDiff) {
public ClusterStateDiffManifest(
ClusterState state,
ClusterState previousState,
DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableIncrementalDiff
) {
fromStateUUID = previousState.stateUUID();
toStateUUID = state.stateUUID();
coordinationMetadataUpdated = !Metadata.isCoordinationMetadataEqual(state.metadata(), previousState.metadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,15 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
routingTableIncrementalDiff.getDeletes()
);

ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest(clusterState, previousClusterState, routingTableIncrementalDiff);
ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest(
clusterState,
previousClusterState,
routingTableIncrementalDiff
);
if (uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata != null) {
clusterStateDiffManifest.setIndicesRoutingDiffPath(uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata.getUploadedFilePath());
clusterStateDiffManifest.setIndicesRoutingDiffPath(
uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata.getUploadedFilePath()
);
}
final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
clusterState,
Expand Down Expand Up @@ -687,23 +693,13 @@ UploadedMetadataResults writeMetadataInParallel(
indicesRoutingToUpload.forEach(indexRoutingTable -> {
uploadTasks.put(
InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(),
remoteRoutingTableService.getIndexRoutingAsyncAction(
clusterState,
indexRoutingTable,
listener,
clusterBasePath
)
remoteRoutingTableService.getIndexRoutingAsyncAction(clusterState, indexRoutingTable, listener, clusterBasePath)
);
});
if (indexRoutingTableDiff != null && !indexRoutingTableDiff.isEmpty()) {
uploadTasks.put(
InternalRemoteRoutingTableService.INDEX_ROUTING_DIFF_FILE_PREFIX,
remoteRoutingTableService.getIndexRoutingDiffAsyncAction(
clusterState,
indexRoutingTableDiff,
listener,
clusterBasePath
)
remoteRoutingTableService.getIndexRoutingDiffAsyncAction(clusterState, indexRoutingTableDiff, listener, clusterBasePath)
);
}

Expand Down Expand Up @@ -1078,7 +1074,9 @@ private ClusterState readClusterStateInParallel(
LatchedActionListener<Map<String, Diff<IndexRoutingTable>>> routingTableDiffLatchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(response -> {
logger.debug("Successfully read cluster state diff component from remote");
readIndexRoutingTableDiffResults.set(new RemoteIndexRoutingTableDiff(response, clusterUUID, blobStoreRepository.getCompressor()));
readIndexRoutingTableDiffResults.set(
new RemoteIndexRoutingTableDiff(response, clusterUUID, blobStoreRepository.getCompressor())
);
}, ex -> {
logger.error("Failed to read cluster state diff from remote", ex);
exceptionList.add(ex);
Expand All @@ -1088,9 +1086,11 @@ private ClusterState readClusterStateInParallel(

if (readIndexRoutingTableDiff) {
asyncMetadataReadActions.add(
remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction(clusterUUID,
remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction(
clusterUUID,
manifest.getDiffManifest().getIndicesRoutingDiffPath(),
routingTableDiffLatchedActionListener)
routingTableDiffLatchedActionListener
)
);
}

Expand Down Expand Up @@ -1354,7 +1354,10 @@ public ClusterState getClusterStateForManifest(
includeEphemeral ? manifest.getIndicesRouting() : emptyList(),
includeEphemeral && manifest.getHashesOfConsistentSettings() != null,
includeEphemeral ? manifest.getClusterStateCustomMap() : emptyMap(),
includeEphemeral && manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath() != null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
includeEphemeral
&& manifest.getDiffManifest() != null
&& manifest.getDiffManifest().getIndicesRoutingDiffPath() != null
&& !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
includeEphemeral
);
} else {
Expand Down Expand Up @@ -1436,7 +1439,9 @@ public ClusterState getClusterStateUsingDiff(
updatedIndexRouting,
diff.isHashesOfConsistentSettingsUpdated(),
updatedClusterStateCustom,
manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath() != null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
manifest.getDiffManifest() != null
&& manifest.getDiffManifest().getIndicesRoutingDiffPath() != null
&& !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
true
);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ RemoteClusterStateManifestInfo uploadManifest(
) {
synchronized (this) {
if (uploadedMetadataResult.uploadedIndicesRoutingDiffMetadata != null) {
clusterDiffManifest.setIndicesRoutingDiffPath(uploadedMetadataResult.uploadedIndicesRoutingDiffMetadata.getUploadedFilePath());
clusterDiffManifest.setIndicesRoutingDiffPath(
uploadedMetadataResult.uploadedIndicesRoutingDiffMetadata.getUploadedFilePath()
);
}
ClusterMetadataManifest.Builder manifestBuilder = ClusterMetadataManifest.builder();
manifestBuilder.clusterTerm(clusterState.term())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.opensearch.gateway.remote.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package org.opensearch.gateway.remote.routingtable;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
Expand All @@ -21,18 +21,25 @@
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Map;

public class RemoteIndexRoutingTableDiff extends AbstractRemoteWritableBlobEntity<IndexRoutingTable> implements Diff<IndexRoutingTable>, Writeable {
/**
* Represents a difference between {@link IndexRoutingTable} objects that can be serialized and deserialized.
* This class is responsible for writing and reading the differences between IndexRoutingTables to and from an input/output stream.
*/
public class RemoteIndexRoutingTableDiff extends AbstractRemoteWritableBlobEntity<IndexRoutingTable>
implements
Diff<IndexRoutingTable>,
Writeable {

private final Map<String, Diff<IndexRoutingTable>> diffs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,75 +250,6 @@ public void testGetIndicesRoutingMapDiffIndexAdded() {
assertEquals(0, diff.getDeletes().size());
}

public void testGetIndicesRoutingMapDiffShardChanged() {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
int noOfShards = between(1, 1000);
int noOfReplicas = randomInt(10);
final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, "uuid")
.build()
).numberOfShards(noOfShards).numberOfReplicas(noOfReplicas).build();

RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build();

final IndexMetadata indexMetadata2 = new IndexMetadata.Builder(indexName).settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, "uuid")
.build()
).numberOfShards(noOfShards + 1).numberOfReplicas(noOfReplicas).build();
RoutingTable routingTable2 = RoutingTable.builder().addAsNew(indexMetadata2).build();

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> diff = remoteRoutingTableService
.getIndicesRoutingMapDiff(routingTable, routingTable2);
assertEquals(1, diff.getUpserts().size());
assertNotNull(diff.getUpserts().get(indexName));
assertEquals(noOfShards + 1, diff.getUpserts().get(indexName).getShards().size());
assertEquals(noOfReplicas + 1, diff.getUpserts().get(indexName).getShards().get(0).getSize());
assertEquals(0, diff.getDeletes().size());

final IndexMetadata indexMetadata3 = new IndexMetadata.Builder(indexName).settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, "uuid")
.build()
).numberOfShards(noOfShards + 1).numberOfReplicas(noOfReplicas + 1).build();
RoutingTable routingTable3 = RoutingTable.builder().addAsNew(indexMetadata3).build();

diff = remoteRoutingTableService.getIndicesRoutingMapDiff(routingTable2, routingTable3);
assertEquals(1, diff.getUpserts().size());
assertNotNull(diff.getUpserts().get(indexName));
assertEquals(noOfShards + 1, diff.getUpserts().get(indexName).getShards().size());
assertEquals(noOfReplicas + 2, diff.getUpserts().get(indexName).getShards().get(0).getSize());

assertEquals(0, diff.getDeletes().size());
}

public void testGetIndicesRoutingMapDiffShardDetailChanged() {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
int noOfShards = between(1, 1000);
int noOfReplicas = randomInt(10);
final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, "uuid")
.build()
).numberOfShards(noOfShards).numberOfReplicas(noOfReplicas).build();

RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build();
RoutingTable routingTable2 = RoutingTable.builder().addAsRecovery(indexMetadata).build();

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> diff = remoteRoutingTableService
.getIndicesRoutingMapDiff(routingTable, routingTable2);
assertEquals(1, diff.getUpserts().size());
assertNotNull(diff.getUpserts().get(indexName));
assertEquals(noOfShards, diff.getUpserts().get(indexName).getShards().size());
assertEquals(noOfReplicas + 1, diff.getUpserts().get(indexName).getShards().get(0).getSize());
assertEquals(0, diff.getDeletes().size());
}

public void testGetIndicesRoutingMapDiffIndexDeleted() {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings(
Expand Down
Loading

0 comments on commit c19f565

Please sign in to comment.