Skip to content

Commit

Permalink
Create new IndexInput for multi part upload (#14888) (#14937)
Browse files Browse the repository at this point in the history
* Create new IndexInput for multi part upload


(cherry picked from commit 2def4fd)

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 06698dd commit 769fdc8
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,18 @@ public void uploadBlob(
}
final String resourceDescription = "BlobStoreTransferService.uploadBlob(blob=\"" + fileName + "\")";
byte[] bytes = inputStream.readAllBytes();
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) {
long expectedChecksum = computeChecksum(input, resourceDescription);
uploadBlobAsyncInternal(
fileName,
fileName,
bytes.length,
blobPath,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
listener,
null
);
}
long expectedChecksum = computeChecksum(bytes, resourceDescription);
uploadBlobAsyncInternal(
fileName,
fileName,
bytes.length,
blobPath,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position),
expectedChecksum,
listener,
null
);
}

// Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file.
Expand Down Expand Up @@ -220,7 +218,8 @@ private void uploadBlob(

}

private void uploadBlobAsyncInternal(
// package private for testing
void uploadBlobAsyncInternal(
String fileName,
String remoteFileName,
long contentLength,
Expand Down Expand Up @@ -335,10 +334,10 @@ public void listAllInSortedOrderAsync(
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); });
}

private static long computeChecksum(IndexInput indexInput, String resourceDescription) throws ChecksumCombinationException {
private static long computeChecksum(byte[] bytes, String resourceDescription) throws ChecksumCombinationException {
long expectedChecksum;
try {
expectedChecksum = checksumOfChecksum(indexInput.clone(), CHECKSUM_BYTES_LENGTH);
try (IndexInput indexInput = new ByteArrayIndexInput(resourceDescription, bytes)) {
expectedChecksum = checksumOfChecksum(indexInput, CHECKSUM_BYTES_LENGTH);
} catch (Exception e) {
throw new ChecksumCombinationException(
"Potentially corrupted file: Checksum combination failed while combining stored checksum "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,11 @@ private void writeAsyncWithPriority(
return;
}
final String blobName = blobName(name);
final BytesReference bytes = serialize(obj, blobName, compressor, params);
final BytesReference bytesReference = serialize(obj, blobName, compressor, params);
final String resourceDescription = "ChecksumBlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")";
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) {
long expectedChecksum;
byte[] bytes = BytesReference.toBytes(bytesReference);
long expectedChecksum;
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) {
try {
expectedChecksum = checksumOfChecksum(input.clone(), 8);
} catch (Exception e) {
Expand All @@ -237,21 +238,21 @@ private void writeAsyncWithPriority(
e
);
}
}

try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length(),
true,
priority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length,
true,
priority,
(size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position),
expectedChecksum,
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.repositories.blobstore;

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -51,23 +50,30 @@ public void writeAsyncWithUrgentPriority(T obj, BlobContainer blobContainer, Str
return;
}
String blobName = blobName(name);
BytesReference bytes = serialize(obj, blobName, new NoneCompressor(), ToXContent.EMPTY_PARAMS, XContentType.JSON, null, null);
BytesReference bytesReference = serialize(
obj,
blobName,
new NoneCompressor(),
ToXContent.EMPTY_PARAMS,
XContentType.JSON,
null,
null
);
String resourceDescription = "BlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")";
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) {
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length(),
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
null,
false
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
byte[] bytes = BytesReference.toBytes(bytesReference);
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length,
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position),
null,
false
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -54,9 +56,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class BlobStoreTransferServiceTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -139,8 +145,28 @@ public void testUploadBlobFromInputStreamAsyncFSRepo() throws IOException, Inter
FsBlobStore fsBlobStore = mock(FsBlobStore.class);
when(fsBlobStore.blobContainer(any())).thenReturn(mockAsyncFsContainer);

TransferService transferService = new BlobStoreTransferService(fsBlobStore, threadPool);
uploadBlobFromInputStream(transferService);
BlobStoreTransferService transferServiceSpy = Mockito.spy(new BlobStoreTransferService(fsBlobStore, threadPool));
uploadBlobFromInputStream(transferServiceSpy);

ArgumentCaptor<RemoteTransferContainer.OffsetRangeInputStreamSupplier> inputStreamCaptor = ArgumentCaptor.forClass(
RemoteTransferContainer.OffsetRangeInputStreamSupplier.class
);
verify(transferServiceSpy).uploadBlobAsyncInternal(
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyLong(),
Mockito.any(),
Mockito.any(),
inputStreamCaptor.capture(),
Mockito.anyLong(),
Mockito.any(),
Mockito.any()
);
RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier = inputStreamCaptor.getValue();
OffsetRangeInputStream inputStream1 = inputStreamSupplier.get(1, 0);
OffsetRangeInputStream inputStream2 = inputStreamSupplier.get(1, 2);
assertNotEquals(inputStream1, inputStream2);
assertNotEquals(inputStream1.getFilePointer(), inputStream2.getFilePointer());
}

private IndexMetadata getIndexMetadata() {
Expand Down

0 comments on commit 769fdc8

Please sign in to comment.