diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java index dab7b2474..0f5a378f8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java @@ -52,7 +52,7 @@ public StartResumableWriteResponse getRes() { @Override public BidiWriteObjectRequest.Builder newBuilder() { - return writeRequest.toBuilder(); + return writeRequest.toBuilder().clearWriteObjectSpec(); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java index 09e4177c2..a458c079c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java @@ -16,8 +16,6 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto; - import com.google.cloud.storage.BidiWriteCtx.BidiWriteObjectRequestBuilderFactory; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.storage.v2.BidiWriteObjectRequest; @@ -84,36 +82,5 @@ interface BidiWriteObjectRequestBuilderFactory { @Nullable String bucketName(); - - static BidiSimpleWriteObjectRequestBuilderFactory simple(BidiWriteObjectRequest req) { - return new BidiSimpleWriteObjectRequestBuilderFactory(req); - } - } - - static final class BidiSimpleWriteObjectRequestBuilderFactory - implements BidiWriteObjectRequestBuilderFactory { - private final BidiWriteObjectRequest req; - - private BidiSimpleWriteObjectRequestBuilderFactory(BidiWriteObjectRequest req) { - this.req = req; - } - - @Override - public BidiWriteObjectRequest.Builder newBuilder() { - return req.toBuilder(); - } - - @Override - public @Nullable String bucketName() { - if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) { - return req.getWriteObjectSpec().getResource().getBucket(); - } - return null; - } - - @Override - public String toString() { - return "SimpleBidiWriteObjectRequestBuilderFactory{" + "req=" + fmtProto(req) + '}'; - } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java index 19aba735e..15278616d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java @@ -21,23 +21,28 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.api.gax.rpc.OutOfRangeException; import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.cloud.storage.Retrying.RetryingDependencies; import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.BidiWriteObjectResponse; import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.ObjectChecksums; +import io.grpc.Status; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Semaphore; import java.util.function.Supplier; @@ -47,18 +52,19 @@ final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritable private final BidiStreamingCallable write; private final RetryingDependencies deps; private final ResultRetryAlgorithm alg; - private final String bucketName; - private final Supplier baseContextSupplier; private final SettableApiFuture resultFuture; private final ChunkSegmenter chunkSegmenter; private final BidiWriteCtx writeCtx; + private final GrpcCallContext context; private final BidiObserver responseObserver; private volatile ApiStreamObserver stream; private boolean open = true; private boolean first = true; private boolean finished = false; + private volatile BidiWriteObjectRequest lastWrittenRequest; + private volatile RewindableContent currentContent; GapicBidiUnbufferedWritableByteChannel( BidiStreamingCallable write, @@ -66,18 +72,18 @@ final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritable ResultRetryAlgorithm alg, SettableApiFuture resultFuture, ChunkSegmenter chunkSegmenter, - BidiResumableWrite requestFactory, + BidiWriteCtx writeCtx, Supplier baseContextSupplier) { this.write = write; this.deps = deps; this.alg = alg; - this.baseContextSupplier = baseContextSupplier; - this.bucketName = requestFactory.bucketName(); this.resultFuture = resultFuture; this.chunkSegmenter = chunkSegmenter; - this.writeCtx = new BidiWriteCtx<>(requestFactory); + this.writeCtx = writeCtx; this.responseObserver = new BidiObserver(); + String bucketName = writeCtx.getRequestFactory().bucketName(); + this.context = contextWithBucketName(bucketName, baseContextSupplier.get()); } @Override @@ -102,22 +108,22 @@ public void close() throws IOException { if (!open) { return; } - ApiStreamObserver openedStream = openedStream(); - if (!finished) { - BidiWriteObjectRequest message = finishMessage(); - try { - openedStream.onNext(message); - finished = true; - openedStream.onCompleted(); - } catch (RuntimeException e) { - resultFuture.setException(e); - throw e; + try { + if (!finished) { + BidiWriteObjectRequest message = finishMessage(); + lastWrittenRequest = message; + flush(Collections.singletonList(message)); + } else { + if (stream != null) { + stream.onCompleted(); + responseObserver.await(); + } } - } else { - openedStream.onCompleted(); + } finally { + open = false; + stream = null; + lastWrittenRequest = null; } - responseObserver.await(); - open = false; } @VisibleForTesting @@ -131,12 +137,18 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo throw new ClosedChannelException(); } - ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength); + long begin = writeCtx.getConfirmedBytes().get(); + currentContent = RewindableContent.of(srcs, srcsOffset, srcsLength); + ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength, finalize); + if (data.length == 0) { + currentContent = null; + return 0; + } List messages = new ArrayList<>(); - int bytesConsumed = 0; - for (ChunkSegment datum : data) { + for (int i = 0; i < data.length; i++) { + ChunkSegment datum = data[i]; Crc32cLengthKnown crc32c = datum.getCrc32c(); ByteString b = datum.getB(); int contentSize = b.size(); @@ -149,11 +161,14 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo if (crc32c != null) { checksummedData.setCrc32C(crc32c.getValue()); } - BidiWriteObjectRequest.Builder builder = - writeCtx - .newRequestBuilder() - .setWriteOffset(offset) - .setChecksummedData(checksummedData.build()); + BidiWriteObjectRequest.Builder builder = writeCtx.newRequestBuilder(); + if (!first) { + builder.clearUploadId(); + builder.clearObjectChecksums(); + } else { + first = false; + } + builder.setWriteOffset(offset).setChecksummedData(checksummedData.build()); if (!datum.isOnlyFullBlocks()) { builder.setFinishWrite(true); if (cumulative != null) { @@ -163,10 +178,17 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo finished = true; } - BidiWriteObjectRequest build = possiblyPairDownBidiRequest(builder, first).build(); - first = false; + if (i == data.length - 1 && !finished) { + if (finalize) { + builder.setFinishWrite(true); + finished = true; + } else { + builder.setFlush(true).setStateLookup(true); + } + } + + BidiWriteObjectRequest build = builder.build(); messages.add(build); - bytesConsumed += contentSize; } if (finalize && !finished) { messages.add(finishMessage()); @@ -176,10 +198,14 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo try { flush(messages); } catch (RuntimeException e) { + open = false; resultFuture.setException(e); throw e; } + long end = writeCtx.getConfirmedBytes().get(); + + long bytesConsumed = end - begin; return bytesConsumed; } @@ -188,8 +214,11 @@ private BidiWriteObjectRequest finishMessage() { long offset = writeCtx.getTotalSentBytes().get(); Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get(); - BidiWriteObjectRequest.Builder b = - writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset); + BidiWriteObjectRequest.Builder b = writeCtx.newRequestBuilder(); + if (!first) { + b.clearUploadId().clearObjectChecksums(); + } + b.setFinishWrite(true).setWriteOffset(offset); if (crc32cValue != null) { b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build()); } @@ -201,13 +230,9 @@ private ApiStreamObserver openedStream() { if (stream == null) { synchronized (this) { if (stream == null) { - GrpcCallContext internalContext = - contextWithBucketName(bucketName, baseContextSupplier.get()); + responseObserver.reset(); stream = - this.write - .withDefaultCallContext(internalContext) - .bidiStreamingCall(responseObserver); - responseObserver.sem.drainPermits(); + new GracefulOutboundStream(this.write.bidiStreamingCall(responseObserver, context)); } } } @@ -223,47 +248,28 @@ private void flush(@NonNull List segments) { ApiStreamObserver opened = openedStream(); for (BidiWriteObjectRequest message : segments) { opened.onNext(message); + lastWrittenRequest = message; } - if (!finished) { - BidiWriteObjectRequest message = - BidiWriteObjectRequest.newBuilder().setFlush(true).setStateLookup(true).build(); - opened.onNext(message); + if (lastWrittenRequest.getFinishWrite()) { + opened.onCompleted(); } responseObserver.await(); return null; - } catch (Exception e) { + } catch (Throwable t) { stream = null; first = true; - throw e; + t.addSuppressed(new AsyncStorageTaskException()); + throw t; } }, Decoder.identity()); } - private static BidiWriteObjectRequest.Builder possiblyPairDownBidiRequest( - BidiWriteObjectRequest.Builder b, boolean firstMessageOfStream) { - if (firstMessageOfStream && b.getWriteOffset() == 0) { - return b; - } - - if (!firstMessageOfStream) { - b.clearUploadId(); - } - - if (b.getWriteOffset() > 0) { - b.clearWriteObjectSpec(); - } - - if (b.getWriteOffset() > 0 && !b.getFinishWrite()) { - b.clearObjectChecksums(); - } - return b; - } - private class BidiObserver implements ApiStreamObserver { private final Semaphore sem; private volatile BidiWriteObjectResponse last; + private volatile StorageException clientDetectedError; private volatile RuntimeException previousError; private BidiObserver() { @@ -272,22 +278,96 @@ private BidiObserver() { @Override public void onNext(BidiWriteObjectResponse value) { - // incremental update - if (value.hasPersistedSize()) { - writeCtx.getConfirmedBytes().set((value.getPersistedSize())); - } else if (value.hasResource()) { - writeCtx.getConfirmedBytes().set(value.getResource().getSize()); + boolean finalizing = lastWrittenRequest.getFinishWrite(); + if (!finalizing && value.hasPersistedSize()) { // incremental + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long persistedSize = value.getPersistedSize(); + + if (totalSentBytes == persistedSize) { + writeCtx.getConfirmedBytes().set(persistedSize); + ok(value); + } else if (persistedSize < totalSentBytes) { + long delta = totalSentBytes - persistedSize; + // rewind our content and any state that my have run ahead of the actual ack'd bytes + currentContent.rewindTo(delta); + writeCtx.getTotalSentBytes().set(persistedSize); + writeCtx.getConfirmedBytes().set(persistedSize); + ok(value); + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_7.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } + } else if (finalizing && value.hasResource()) { + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long finalSize = value.getResource().getSize(); + if (totalSentBytes == finalSize) { + writeCtx.getConfirmedBytes().set(finalSize); + ok(value); + } else if (finalSize < totalSentBytes) { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } + } else if (!finalizing && value.hasResource()) { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_1.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } else if (finalizing && value.hasPersistedSize()) { + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long persistedSize = value.getPersistedSize(); + // if a flush: true, state_lookup: true message is in the stream along with a + // finish_write: true, GCS can respond with the incremental update, gracefully handle this + // message + if (totalSentBytes == persistedSize) { + writeCtx.getConfirmedBytes().set(persistedSize); + } else if (persistedSize < totalSentBytes) { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_3.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_2.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); + } + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_0.toStorageException( + ImmutableList.of(lastWrittenRequest), value, context, null)); } - sem.release(); - last = value; } @Override public void onError(Throwable t) { - if (t instanceof RuntimeException) { + if (t instanceof OutOfRangeException) { + OutOfRangeException oore = (OutOfRangeException) t; + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_5.toStorageException( + ImmutableList.of(lastWrittenRequest), null, context, oore)); + } else if (t instanceof ApiException) { + // use StorageExceptions logic to translate from ApiException to our status codes ensuring + // things fall in line with our retry handlers. + // This is suboptimal, as it will initialize a second exception, however this is the + // unusual case, and it should not cause a significant overhead given its rarity. + StorageException tmp = StorageException.asStorageException((ApiException) t); + previousError = + ResumableSessionFailureScenario.toStorageException( + tmp.getCode(), + tmp.getMessage(), + tmp.getReason(), + ImmutableList.of(lastWrittenRequest), + null, + context, + t); + sem.release(); + } else if (t instanceof RuntimeException) { previousError = (RuntimeException) t; + sem.release(); } - sem.release(); } @Override @@ -298,6 +378,25 @@ public void onCompleted() { sem.release(); } + private void ok(BidiWriteObjectResponse value) { + last = value; + sem.release(); + } + + private void clientDetectedError(StorageException storageException) { + open = false; + clientDetectedError = storageException; + // yes, check that previousError is not the same instance as e + if (previousError != null && previousError != storageException) { + storageException.addSuppressed(previousError); + previousError = null; + } + if (previousError == null) { + previousError = storageException; + } + sem.release(); + } + void await() { try { sem.acquire(); @@ -308,11 +407,69 @@ void await() { throw new RuntimeException(e); } } + StorageException e = clientDetectedError; RuntimeException err = previousError; + clientDetectedError = null; + previousError = null; + if ((e != null || err != null) && stream != null) { + if (lastWrittenRequest.getFinishWrite()) { + stream.onCompleted(); + } else { + stream.onError(Status.CANCELLED.asRuntimeException()); + } + } + if (e != null) { + throw e; + } if (err != null) { - previousError = null; throw err; } } + + public void reset() { + sem.drainPermits(); + last = null; + clientDetectedError = null; + previousError = null; + } + } + + /** + * Prevent "already half-closed" if we previously called onComplete but then detect an error and + * call onError + */ + private static final class GracefulOutboundStream + implements ApiStreamObserver { + + private final ApiStreamObserver delegate; + private volatile boolean closing; + + private GracefulOutboundStream(ApiStreamObserver delegate) { + this.delegate = delegate; + this.closing = false; + } + + @Override + public void onNext(BidiWriteObjectRequest value) { + delegate.onNext(value); + } + + @Override + public void onError(Throwable t) { + if (closing) { + return; + } + closing = true; + delegate.onError(t); + } + + @Override + public void onCompleted() { + if (closing) { + return; + } + closing = true; + delegate.onCompleted(); + } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java index 536eba7fb..05387326a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java @@ -155,7 +155,7 @@ BufferedWritableByteChannelSession build() { resultFuture, new ChunkSegmenter( boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE), - start, + new BidiWriteCtx<>(start), Retrying::newCallContext)) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) .andThen(StorageByteChannels.writable()::createSynchronized)); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java index 8c06d70dd..88dbaf7bd 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java @@ -26,11 +26,11 @@ import com.google.cloud.BaseServiceException; import com.google.cloud.storage.StorageException.IOExceptionCallable; import com.google.common.io.CharStreams; -import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.Message; +import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.ObjectChecksums; import com.google.storage.v2.WriteObjectRequest; -import com.google.storage.v2.WriteObjectResponse; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.io.InputStreamReader; @@ -81,10 +81,6 @@ enum ResumableSessionFailureScenario { private static final String PREFIX_I = "\t|< "; private static final String PREFIX_O = "\t|> "; private static final String PREFIX_X = "\t| "; - // define some constants for tab widths that are more compressed that the literals - private static final String T1 = "\t"; - private static final String T2 = "\t\t"; - private static final String T3 = "\t\t\t"; private static final Predicate includedHeaders = matches("Content-Length") @@ -134,8 +130,10 @@ StorageException toStorageException( } StorageException toStorageException( - @NonNull List<@NonNull WriteObjectRequest> reqs, - @Nullable WriteObjectResponse resp, + /*In java List is not a sub-type of List despite WriteObjectRequest being a Message. + * intentionally only define List so the compiler doesn't complain */ + @SuppressWarnings("rawtypes") @NonNull List reqs, + @Nullable Message resp, @NonNull GrpcCallContext context, @Nullable Throwable cause) { return toStorageException(code, message, reason, reqs, resp, context, cause); @@ -161,8 +159,8 @@ static StorageException toStorageException( int code, String message, @Nullable String reason, - @NonNull List<@NonNull WriteObjectRequest> reqs, - @Nullable WriteObjectResponse resp, + @NonNull List reqs, + @Nullable Message resp, @NonNull GrpcCallContext context, @Nullable Throwable cause) { final StringBuilder sb = new StringBuilder(); @@ -177,35 +175,8 @@ static StorageException toStorageException( } else { sb.append(","); } - WriteObjectRequest req = reqs.get(i); - sb.append("\n").append(PREFIX_O).append(T1).append(req.getClass().getName()).append("{"); - if (req.hasUploadId()) { - sb.append("\n").append(PREFIX_O).append(T2).append("upload_id: ").append(req.getUploadId()); - } - long writeOffset = req.getWriteOffset(); - if (req.hasChecksummedData()) { - ChecksummedData checksummedData = req.getChecksummedData(); - sb.append("\n").append(PREFIX_O).append(T2); - sb.append( - String.format( - "checksummed_data: {range: [%d:%d]", - writeOffset, writeOffset + checksummedData.getContent().size())); - if (checksummedData.hasCrc32C()) { - sb.append(", crc32c: ").append(checksummedData.getCrc32C()); - } - sb.append("}"); - } else { - sb.append("\n").append(PREFIX_O).append(T2).append("write_offset: ").append(writeOffset); - } - if (req.getFinishWrite()) { - sb.append("\n").append(PREFIX_O).append(T2).append("finish_write: true"); - } - if (req.hasObjectChecksums()) { - ObjectChecksums objectChecksums = req.getObjectChecksums(); - sb.append("\n").append(PREFIX_O).append(T2).append("object_checksums: ").append("{"); - fmt(objectChecksums, PREFIX_O, T3, sb); - sb.append("\n").append(PREFIX_O).append(T2).append("}"); - } + Message req = (Message) reqs.get(i); + fmt(req, PREFIX_O, Indentation.T1, sb); sb.append("\n").append(PREFIX_O).append("\t}"); if (i == length - 1) { sb.append("\n").append(PREFIX_O).append("]"); @@ -217,7 +188,7 @@ static StorageException toStorageException( // response context if (resp != null) { sb.append("\n").append(PREFIX_I).append(resp.getClass().getName()).append("{"); - fmt(resp, PREFIX_I, T1, sb); + fmt(resp, PREFIX_I, Indentation.T1, sb); sb.append("\n").append(PREFIX_I).append("}"); sb.append("\n").append(PREFIX_X); } @@ -250,7 +221,8 @@ static StorageException toStorageException( sb.append("\n").append(PREFIX_X); } } - return new StorageException(code, sb.toString(), reason, cause); + StorageException se = new StorageException(code, sb.toString(), reason, cause); + return se; } static StorageException toStorageException( @@ -359,16 +331,122 @@ private static String headerValueToString(Object o) { } private static void fmt( - MessageOrBuilder msg, + Message msg, @SuppressWarnings("SameParameterValue") String prefix, - String indentation, + Indentation indentation, StringBuilder sb) { - String string = msg.toString(); - // drop the final new line before prefixing - string = string.replaceAll("\n$", ""); - sb.append("\n") - .append(prefix) - .append(indentation) - .append(string.replaceAll("\r?\n", "\n" + prefix + indentation)); + if (msg instanceof WriteObjectRequest) { + WriteObjectRequest req = (WriteObjectRequest) msg; + fmtWriteObjectRequest(req, prefix, indentation, sb); + } else if (msg instanceof BidiWriteObjectRequest) { + BidiWriteObjectRequest req = (BidiWriteObjectRequest) msg; + fmtBidiWriteObjectRequest(req, prefix, indentation, sb); + } else { + String string = msg.toString(); + // drop the final new line before prefixing + string = string.replaceAll("\n$", ""); + sb.append("\n") + .append(prefix) + .append(indentation) + .append(string.replaceAll("\r?\n", "\n" + prefix + indentation.indentation)); + } + } + + private static void fmtWriteObjectRequest( + WriteObjectRequest req, String prefix, Indentation t1, StringBuilder sb) { + Indentation t2 = t1.indent(); + Indentation t3 = t2.indent(); + sb.append("\n").append(prefix).append(t1).append(req.getClass().getName()).append("{"); + if (req.hasUploadId()) { + sb.append("\n").append(prefix).append(t2).append("upload_id: ").append(req.getUploadId()); + } + long writeOffset = req.getWriteOffset(); + if (req.hasChecksummedData()) { + ChecksummedData checksummedData = req.getChecksummedData(); + sb.append("\n").append(prefix).append(t2); + sb.append( + String.format( + "checksummed_data: {range: [%d:%d]", + writeOffset, writeOffset + checksummedData.getContent().size())); + if (checksummedData.hasCrc32C()) { + sb.append(", crc32c: ").append(checksummedData.getCrc32C()); + } + sb.append("}"); + } else { + sb.append("\n").append(prefix).append(t2).append("write_offset: ").append(writeOffset); + } + if (req.getFinishWrite()) { + sb.append("\n").append(prefix).append(t2).append("finish_write: true"); + } + if (req.hasObjectChecksums()) { + ObjectChecksums objectChecksums = req.getObjectChecksums(); + sb.append("\n").append(prefix).append(t2).append("object_checksums: ").append("{"); + fmt(objectChecksums, prefix, t3, sb); + sb.append("\n").append(prefix).append(t2).append("}"); + } + } + + private static void fmtBidiWriteObjectRequest( + BidiWriteObjectRequest req, String prefix, Indentation t1, StringBuilder sb) { + Indentation t2 = t1.indent(); + Indentation t3 = t2.indent(); + sb.append("\n").append(prefix).append(t1).append(req.getClass().getName()).append("{"); + if (req.hasUploadId()) { + sb.append("\n").append(prefix).append(t2).append("upload_id: ").append(req.getUploadId()); + } + long writeOffset = req.getWriteOffset(); + if (req.hasChecksummedData()) { + ChecksummedData checksummedData = req.getChecksummedData(); + sb.append("\n").append(prefix).append(t2); + sb.append( + String.format( + "checksummed_data: {range: [%d:%d]", + writeOffset, writeOffset + checksummedData.getContent().size())); + if (checksummedData.hasCrc32C()) { + sb.append(", crc32c: ").append(checksummedData.getCrc32C()); + } + sb.append("}"); + } else { + sb.append("\n").append(prefix).append(t2).append("write_offset: ").append(writeOffset); + } + if (req.getFlush()) { + sb.append("\n").append(prefix).append(t2).append("flush: true"); + } + if (req.getStateLookup()) { + sb.append("\n").append(prefix).append(t2).append("state_lookup: true"); + } + if (req.getFinishWrite()) { + sb.append("\n").append(prefix).append(t2).append("finish_write: true"); + } + if (req.hasObjectChecksums()) { + ObjectChecksums objectChecksums = req.getObjectChecksums(); + sb.append("\n").append(prefix).append(t2).append("object_checksums: ").append("{"); + fmt(objectChecksums, prefix, t3, sb); + sb.append("\n").append(prefix).append(t2).append("}"); + } + } + + enum Indentation { + T1("\t"), + T2("\t\t"), + T3("\t\t\t"), + T4("\t\t\t\t"), + ; + + private final String indentation; + + Indentation(String indentation) { + this.indentation = indentation; + } + + Indentation indent() { + int ordinal = ordinal(); + return values()[ordinal + 1]; + } + + @Override + public String toString() { + return indentation; + } } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java index d27783425..578f6c7dc 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java @@ -17,6 +17,7 @@ package com.google.cloud.storage; import com.google.cloud.NoCredentials; +import com.google.cloud.storage.it.GrpcPlainRequestLoggingInterceptor; import com.google.storage.v2.StorageGrpc; import com.google.storage.v2.StorageSettings; import io.grpc.Server; @@ -58,6 +59,7 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException { .setHost("http://" + endpoint) .setProjectId("test-proj") .setCredentials(NoCredentials.getInstance()) + .setGrpcInterceptorProvider(GrpcPlainRequestLoggingInterceptor.getInterceptorProvider()) .build(); return new FakeServer(server, grpcStorageOptions); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java new file mode 100644 index 000000000..7c3fb1fd7 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java @@ -0,0 +1,938 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.ByteSizeConstants._256KiB; +import static com.google.cloud.storage.ByteSizeConstants._512KiB; +import static com.google.cloud.storage.ByteSizeConstants._768KiB; +import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static org.junit.Assert.assertThrows; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ByteString; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.BidiWriteObjectResponse; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.Object; +import com.google.storage.v2.StartResumableWriteRequest; +import com.google.storage.v2.StartResumableWriteResponse; +import com.google.storage.v2.StorageClient; +import com.google.storage.v2.StorageGrpc.StorageImplBase; +import io.grpc.Status.Code; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.logging.Logger; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Test; + +public final class ITGapicBidiUnbufferedWritableByteChannelTest { + + private static final ChunkSegmenter CHUNK_SEGMENTER = + new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _256KiB, _256KiB); + + /** + * + * + *

S.1

+ * + * Attempting to append to a session which has already been finalized should raise an error + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = { name = obj, size = 524288 }
+   *     
client state
+   * write_offset = 0, data = [0:262144]
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset= 0, checksummed_data.content.length = 262144 }
+   *     
response
+   * onNext(BidiWriteObjectResponse{ resources = {name = obj, size = 525288 } })
+   *     
+ */ + @Test + public void scenario1() throws Exception { + + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .setStateLookup(true) + .setFlush(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("obj").setSize(_512KiB).build()) + .build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + SettableApiFuture done = SettableApiFuture.create(); + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.2

+ * + * Attempting to finalize a session with fewer bytes than GCS acknowledges. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * persisted_size = 524288
+   *     
client state
+   * write_offset = 262144, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ persisted_size = 525288 })
+   *     
+ */ + @Test + public void scenario2() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_512KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(_256KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.3

+ * + * Attempting to finalize a session with more bytes than GCS acknowledges. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * persisted_size = 262144
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ persisted_size = 262144 })
+   *     
+ */ + @Test + public void scenario3() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(_512KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.4

+ * + * Attempting to finalize an already finalized session + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 262144}
+   *     
client state
+   * write_offset = 262144, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ resources = {name = obj, size = 262144 } })
+   *     
+ */ + @Test + public void scenario4() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) + .build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(_256KiB); + + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + channel.close(); + + BidiWriteObjectResponse BidiWriteObjectResponse = done.get(2, TimeUnit.SECONDS); + assertThat(BidiWriteObjectResponse).isEqualTo(resp1); + } + } + + /** + * + * + *

S.4.1

+ * + * Attempting to finalize an already finalized session (ack < expected) + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 262144}
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ resources = {name = obj, size = 262144 } })
+   *     
+ */ + @Test + public void scenario4_1() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) + .build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(_512KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.4.2

+ * + * Attempting to finalize an already finalized session (ack > expected) + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 786432}
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(BidiWriteObjectResponse{ resources = {name = obj, size = 786432 } })
+   *     
+ */ + @Test + public void scenario4_2() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_768KiB).build()) + .build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(_512KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.5

+ * + * Attempt to append to a resumable session with an offset higher than GCS expects + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * persisted_size = 0
+   *     
client state
+   * write_offset = 262144, data = [262144:524288]
+   *     
request
+   * BidiWriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, checksummed_data.content.length = 262144}
+   *     
response
+   * onError(Status{code=OUT_OF_RANGE, description="Upload request started at offset '262144', which is past expected offset '0'."})
+   *     
+ */ + @Test + public void scenario5() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB)))) + .setStateLookup(true) + .setFlush(true) + .build(); + StorageImplBase service1 = + new BidiWriteService( + (obs, requests) -> { + if (requests.equals(ImmutableList.of(req1))) { + obs.onError( + TestUtils.apiException( + Code.OUT_OF_RANGE, + "Upload request started at offset '262144', which is past expected offset '0'.")); + } else { + obs.onError( + TestUtils.apiException(Code.PERMISSION_DENIED, "Unexpected request chain.")); + } + }); + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(_256KiB); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.7

+ * + * GCS Acknowledges more bytes than were sent in the PUT + * + *

The client believes the server offset is N, it sends K bytes and the server responds that N + * + 2K bytes are now committed. + * + *

The client has detected data loss and should raise an error and prevent sending of more + * bytes. + */ + @Test + public void scenario7() throws Exception { + + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .setStateLookup(true) + .setFlush(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_512KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(buf)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + @Test + public void incremental_success() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .setStateLookup(true) + .setFlush(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + CHUNK_SEGMENTER, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(0), + () -> assertThat(written).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB)); + } + } + + @Test + public void incremental_partialSuccess() throws Exception { + String uploadId = "uploadId"; + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_512KiB))) + .build()) + .setStateLookup(true) + .setFlush(true) + .build(); + BidiWriteObjectResponse resp1 = + BidiWriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, BidiWriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + BidiWriteService service1 = new BidiWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + BidiResumableWrite resumableWrite = getResumableWrite(uploadId); + BidiWriteCtx writeCtx = new BidiWriteCtx<>(resumableWrite); + + ChunkSegmenter chunkSegmenter = + new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _512KiB, _256KiB); + //noinspection resource + GapicBidiUnbufferedWritableByteChannel channel = + new GapicBidiUnbufferedWritableByteChannel( + storageClient.bidiWriteObjectCallable(), + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + done, + chunkSegmenter, + writeCtx, + GrpcCallContext::createDefault); + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_512KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(_256KiB), + () -> assertThat(written).isEqualTo(_256KiB), + () -> + assertWithMessage("totalSentBytes") + .that(writeCtx.getTotalSentBytes().get()) + .isEqualTo(_256KiB), + () -> + assertWithMessage("confirmedBytes") + .that(writeCtx.getConfirmedBytes().get()) + .isEqualTo(_256KiB)); + } + } + + private static @NonNull BidiResumableWrite getResumableWrite(String uploadId) { + StartResumableWriteRequest req = StartResumableWriteRequest.getDefaultInstance(); + StartResumableWriteResponse resp = + StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build(); + return new BidiResumableWrite( + req, resp, id -> BidiWriteObjectRequest.newBuilder().setUploadId(id).build()); + } + + static class BidiWriteService extends StorageImplBase { + private static final Logger LOGGER = Logger.getLogger(BidiWriteService.class.getName()); + private final BiConsumer, List> + c; + + private ImmutableList.Builder requests; + + BidiWriteService( + BiConsumer, List> c) { + this.c = c; + this.requests = new ImmutableList.Builder<>(); + } + + BidiWriteService(ImmutableMap, BidiWriteObjectResponse> writes) { + this( + (obs, build) -> { + if (writes.containsKey(build)) { + obs.onNext(writes.get(build)); + last(build) + .filter(BidiWriteObjectRequest::getFinishWrite) + .ifPresent(ignore -> obs.onCompleted()); + } else { + logUnexpectedRequest(writes.keySet(), build); + obs.onError( + TestUtils.apiException(Code.PERMISSION_DENIED, "Unexpected request chain.")); + } + }); + } + + private static Optional last(List l) { + if (l.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(l.get(l.size() - 1)); + } + } + + private static void logUnexpectedRequest( + Set> writes, List build) { + Collector joining = Collectors.joining(",\n\t", "[\n\t", "\n]"); + Collector oneLine = Collectors.joining(",", "[", "]"); + String msg = + String.format( + "Unexpected Request Chain.%nexpected one of: %s%n but was: %s", + writes.stream() + .map(l -> l.stream().map(StorageV2ProtoUtils::fmtProto).collect(oneLine)) + .collect(joining), + build.stream().map(StorageV2ProtoUtils::fmtProto).collect(oneLine)); + LOGGER.warning(msg); + } + + @Override + public StreamObserver bidiWriteObject( + StreamObserver obs) { + return new Adapter() { + @Override + public void onNext(BidiWriteObjectRequest value) { + requests.add(value); + if ((value.getFlush() && value.getStateLookup()) || value.getFinishWrite()) { + ImmutableList build = requests.build(); + c.accept(obs, build); + } + } + + @Override + public void onError(Throwable t) { + requests = new ImmutableList.Builder<>(); + } + + @Override + public void onCompleted() { + requests = new ImmutableList.Builder<>(); + } + }; + } + } + + private abstract static class Adapter extends CallStreamObserver { + + private Adapter() {} + + @Override + public boolean isReady() { + return true; + } + + @Override + public void setOnReadyHandler(Runnable onReadyHandler) {} + + @Override + public void disableAutoInboundFlowControl() {} + + @Override + public void request(int count) {} + + @Override + public void setMessageCompression(boolean enable) {} + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/GrpcPlainRequestLoggingInterceptor.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/GrpcPlainRequestLoggingInterceptor.java index 7116108d7..611f7fd1c 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/GrpcPlainRequestLoggingInterceptor.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/GrpcPlainRequestLoggingInterceptor.java @@ -20,6 +20,8 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import com.google.protobuf.MessageOrBuilder; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.ReadObjectResponse; import com.google.storage.v2.WriteObjectRequest; import io.grpc.CallOptions; import io.grpc.Channel; @@ -109,6 +111,10 @@ public void sendMessage(ReqT message) { static String fmtProto(@NonNull Object obj) { if (obj instanceof WriteObjectRequest) { return fmtProto((WriteObjectRequest) obj); + } else if (obj instanceof BidiWriteObjectRequest) { + return fmtProto((BidiWriteObjectRequest) obj); + } else if (obj instanceof ReadObjectResponse) { + return fmtProto((ReadObjectResponse) obj); } else if (obj instanceof MessageOrBuilder) { return fmtProto((MessageOrBuilder) obj); } else { @@ -137,6 +143,38 @@ static String fmtProto(@NonNull WriteObjectRequest msg) { return msg.toString(); } + @NonNull + static String fmtProto(@NonNull BidiWriteObjectRequest msg) { + if (msg.hasChecksummedData()) { + ByteString content = msg.getChecksummedData().getContent(); + if (content.size() > 20) { + BidiWriteObjectRequest.Builder b = msg.toBuilder(); + ByteString snip = ByteString.copyFromUtf8(String.format("", content.size())); + ByteString trim = content.substring(0, 20).concat(snip); + b.getChecksummedDataBuilder().setContent(trim); + + return b.build().toString(); + } + } + return msg.toString(); + } + + @NonNull + static String fmtProto(@NonNull ReadObjectResponse msg) { + if (msg.hasChecksummedData()) { + ByteString content = msg.getChecksummedData().getContent(); + if (content.size() > 20) { + ReadObjectResponse.Builder b = msg.toBuilder(); + ByteString snip = ByteString.copyFromUtf8(String.format("", content.size())); + ByteString trim = content.substring(0, 20).concat(snip); + b.getChecksummedDataBuilder().setContent(trim); + + return b.build().toString(); + } + } + return msg.toString(); + } + private static final class InterceptorProvider implements GrpcInterceptorProvider { private static final InterceptorProvider INSTANCE = new InterceptorProvider();