Skip to content

Commit

Permalink
Create new IndexInput for multi part upload
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Jul 23, 2024
1 parent 2d8c68c commit 6694ff6
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,18 @@ public void uploadBlob(
}
final String resourceDescription = "BlobStoreTransferService.uploadBlob(blob=\"" + fileName + "\")";
byte[] bytes = inputStream.readAllBytes();
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) {
long expectedChecksum = computeChecksum(input, resourceDescription);
uploadBlobAsyncInternal(
fileName,
fileName,
bytes.length,
blobPath,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
listener,
null
);
}
long expectedChecksum = computeChecksum(bytes, resourceDescription);
uploadBlobAsyncInternal(
fileName,
fileName,
bytes.length,
blobPath,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position),
expectedChecksum,
listener,
null
);
}

// Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file.
Expand Down Expand Up @@ -335,10 +333,10 @@ public void listAllInSortedOrderAsync(
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); });
}

private static long computeChecksum(IndexInput indexInput, String resourceDescription) throws ChecksumCombinationException {
private static long computeChecksum(byte[] bytes, String resourceDescription) throws ChecksumCombinationException {
long expectedChecksum;
try {
expectedChecksum = checksumOfChecksum(indexInput.clone(), CHECKSUM_BYTES_LENGTH);
try (IndexInput indexInput = new ByteArrayIndexInput(resourceDescription, bytes)) {
expectedChecksum = checksumOfChecksum(indexInput, CHECKSUM_BYTES_LENGTH);
} catch (Exception e) {
throw new ChecksumCombinationException(
"Potentially corrupted file: Checksum combination failed while combining stored checksum "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistr
* @param compressor whether to use compression
*/
public void write(final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor) throws IOException {
BlobStoreRepository blobStoreRepository = new BlobStoreRepository();
write(obj, blobContainer, name, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS, XContentType.SMILE, codec, VERSION);
}

Expand Down Expand Up @@ -223,35 +224,35 @@ private void writeAsyncWithPriority(
return;
}
final String blobName = blobName(name);
final BytesReference bytes = serialize(obj, blobName, compressor, params);
final BytesReference bytesReference = serialize(obj, blobName, compressor, params);
final String resourceDescription = "ChecksumBlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")";
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) {
long expectedChecksum;
byte[] bytes = BytesReference.toBytes(bytesReference);
long expectedChecksum;
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) {
try {
expectedChecksum = checksumOfChecksum(input.clone(), 8);
} catch (Exception e) {
throw new ChecksumCombinationException(
"Potentially corrupted file: Checksum combination failed while combining stored checksum "
throw new ChecksumCombinationException("Potentially corrupted file: Checksum combination failed while combining stored checksum "
+ "and calculated checksum of stored checksum",
resourceDescription,
e
);
}
}

try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length(),
true,
priority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length,
true,
priority,
(size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position),
expectedChecksum,
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.xcontent.ToXContent;

import java.io.IOException;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;

/**
* Format for writing short configurations to remote. Read interface does not exist as it not yet required. This format
Expand Down Expand Up @@ -51,23 +52,22 @@ public void writeAsyncWithUrgentPriority(T obj, BlobContainer blobContainer, Str
return;
}
String blobName = blobName(name);
BytesReference bytes = serialize(obj, blobName, new NoneCompressor(), ToXContent.EMPTY_PARAMS, XContentType.JSON, null, null);
BytesReference bytesReference = serialize(obj, blobName, new NoneCompressor(), ToXContent.EMPTY_PARAMS, XContentType.JSON, null, null);
String resourceDescription = "BlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")";
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) {
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length(),
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
null,
false
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
byte[] bytes = BytesReference.toBytes(bytesReference);
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length,
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position),
null,
false
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
}
}

0 comments on commit 6694ff6

Please sign in to comment.