diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index d55abb40dec48..22bb4cf0514bf 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -131,20 +131,18 @@ public void uploadBlob( } final String resourceDescription = "BlobStoreTransferService.uploadBlob(blob=\"" + fileName + "\")"; byte[] bytes = inputStream.readAllBytes(); - try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) { - long expectedChecksum = computeChecksum(input, resourceDescription); - uploadBlobAsyncInternal( - fileName, - fileName, - bytes.length, - blobPath, - writePriority, - (size, position) -> new OffsetRangeIndexInputStream(input, size, position), - expectedChecksum, - listener, - null - ); - } + long expectedChecksum = computeChecksum(bytes, resourceDescription); + uploadBlobAsyncInternal( + fileName, + fileName, + bytes.length, + blobPath, + writePriority, + (size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position), + expectedChecksum, + listener, + null + ); } // Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file. @@ -220,7 +218,8 @@ private void uploadBlob( } - private void uploadBlobAsyncInternal( + // package private for testing + void uploadBlobAsyncInternal( String fileName, String remoteFileName, long contentLength, @@ -335,10 +334,10 @@ public void listAllInSortedOrderAsync( threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); }); } - private static long computeChecksum(IndexInput indexInput, String resourceDescription) throws ChecksumCombinationException { + private static long computeChecksum(byte[] bytes, String resourceDescription) throws ChecksumCombinationException { long expectedChecksum; - try { - expectedChecksum = checksumOfChecksum(indexInput.clone(), CHECKSUM_BYTES_LENGTH); + try (IndexInput indexInput = new ByteArrayIndexInput(resourceDescription, bytes)) { + expectedChecksum = checksumOfChecksum(indexInput, CHECKSUM_BYTES_LENGTH); } catch (Exception e) { throw new ChecksumCombinationException( "Potentially corrupted file: Checksum combination failed while combining stored checksum " diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index e567e1b626c5a..3a49fed4be282 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -223,10 +223,11 @@ private void writeAsyncWithPriority( return; } final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor, params); + final BytesReference bytesReference = serialize(obj, blobName, compressor, params); final String resourceDescription = "ChecksumBlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")"; - try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { - long expectedChecksum; + byte[] bytes = BytesReference.toBytes(bytesReference); + long expectedChecksum; + try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) { try { expectedChecksum = checksumOfChecksum(input.clone(), 8); } catch (Exception e) { @@ -237,21 +238,21 @@ private void writeAsyncWithPriority( e ); } + } - try ( - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - blobName, - blobName, - bytes.length(), - true, - priority, - (size, position) -> new OffsetRangeIndexInputStream(input, size, position), - expectedChecksum, - ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported() - ) - ) { - ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); - } + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + blobName, + blobName, + bytes.length, + true, + priority, + (size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position), + expectedChecksum, + ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported() + ) + ) { + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java index 18c718ca2110e..8127bf8c2a2a2 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java @@ -8,7 +8,6 @@ package org.opensearch.repositories.blobstore; -import org.apache.lucene.store.IndexInput; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -51,23 +50,30 @@ public void writeAsyncWithUrgentPriority(T obj, BlobContainer blobContainer, Str return; } String blobName = blobName(name); - BytesReference bytes = serialize(obj, blobName, new NoneCompressor(), ToXContent.EMPTY_PARAMS, XContentType.JSON, null, null); + BytesReference bytesReference = serialize( + obj, + blobName, + new NoneCompressor(), + ToXContent.EMPTY_PARAMS, + XContentType.JSON, + null, + null + ); String resourceDescription = "BlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")"; - try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { - try ( - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - blobName, - blobName, - bytes.length(), - true, - WritePriority.URGENT, - (size, position) -> new OffsetRangeIndexInputStream(input, size, position), - null, - false - ) - ) { - ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); - } + byte[] bytes = BytesReference.toBytes(bytesReference); + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + blobName, + blobName, + bytes.length, + true, + WritePriority.URGENT, + (size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position), + null, + false + ) + ) { + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); } } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index cd78aead80923..10e4cc6cfb1ef 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -22,6 +22,8 @@ import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; @@ -54,9 +56,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class BlobStoreTransferServiceTests extends OpenSearchTestCase { @@ -139,8 +145,28 @@ public void testUploadBlobFromInputStreamAsyncFSRepo() throws IOException, Inter FsBlobStore fsBlobStore = mock(FsBlobStore.class); when(fsBlobStore.blobContainer(any())).thenReturn(mockAsyncFsContainer); - TransferService transferService = new BlobStoreTransferService(fsBlobStore, threadPool); - uploadBlobFromInputStream(transferService); + BlobStoreTransferService transferServiceSpy = Mockito.spy(new BlobStoreTransferService(fsBlobStore, threadPool)); + uploadBlobFromInputStream(transferServiceSpy); + + ArgumentCaptor inputStreamCaptor = ArgumentCaptor.forClass( + RemoteTransferContainer.OffsetRangeInputStreamSupplier.class + ); + verify(transferServiceSpy).uploadBlobAsyncInternal( + Mockito.anyString(), + Mockito.anyString(), + Mockito.anyLong(), + Mockito.any(), + Mockito.any(), + inputStreamCaptor.capture(), + Mockito.anyLong(), + Mockito.any(), + Mockito.any() + ); + RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier = inputStreamCaptor.getValue(); + OffsetRangeInputStream inputStream1 = inputStreamSupplier.get(1, 0); + OffsetRangeInputStream inputStream2 = inputStreamSupplier.get(1, 2); + assertNotEquals(inputStream1, inputStream2); + assertNotEquals(inputStream1.getFilePointer(), inputStream2.getFilePointer()); } private IndexMetadata getIndexMetadata() {