From af24360c62e60216412851617050e9e848ede5aa Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Wed, 12 Jul 2023 21:52:04 -0400 Subject: [PATCH] fix: update GrpcStorageImpl.createFrom(BlobInfo, Path) to use RewindableContent With the introduction of RewindableContent, we can now upload an entire file in a single WriteObjectRequest stream. * chore: rename RewindableHttpContent -> RewindableContent --- .../ApiaryUnbufferedWritableByteChannel.java | 6 +- .../cloud/storage/ByteSizeConstants.java | 3 + ...apicWritableByteChannelSessionBuilder.java | 19 ++- .../cloud/storage/GrpcResumableSession.java | 123 ++++++++++++++++++ .../google/cloud/storage/GrpcStorageImpl.java | 80 +++++------- .../cloud/storage/JsonResumableSession.java | 6 +- .../storage/JsonResumableSessionPutTask.java | 8 +- .../cloud/storage/ResumableSession.java | 27 ++-- ...ttpContent.java => RewindableContent.java} | 71 ++++++++-- .../com/google/cloud/storage/StorageImpl.java | 2 +- .../google/cloud/storage/FakeHttpServer.java | 7 + .../ITGrpcStorageImplUploadRetryTest.java | 14 +- .../ITJsonResumableSessionPutTaskTest.java | 64 +++++++-- .../storage/ITJsonResumableSessionTest.java | 10 +- .../RewindableByteBufferContentTest.java | 12 +- ...ava => RewindableContentPropertyTest.java} | 6 +- .../it/ITObjectChecksumSupportTest.java | 2 +- 17 files changed, 346 insertions(+), 114 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcResumableSession.java rename google-cloud-storage/src/main/java/com/google/cloud/storage/{RewindableHttpContent.java => RewindableContent.java} (72%) rename google-cloud-storage/src/test/java/com/google/cloud/storage/{RewindableHttpContentPropertyTest.java => RewindableContentPropertyTest.java} (98%) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java index 5fce18ae5..35ad97ffe 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java @@ -31,7 +31,7 @@ @ParametersAreNonnullByDefault final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel { - private final ResumableSession session; + private final JsonResumableSession session; private final SettableApiFuture result; private final LongConsumer committedBytesCallback; @@ -57,7 +57,7 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException if (!open) { throw new ClosedChannelException(); } - RewindableHttpContent content = RewindableHttpContent.of(Utils.subArray(srcs, offset, length)); + RewindableContent content = RewindableContent.of(Utils.subArray(srcs, offset, length)); long available = content.getLength(); long newFinalByteOffset = cumulativeByteCount + available; final HttpContentRange header; @@ -96,7 +96,7 @@ public void close() throws IOException { if (!finished) { try { ResumableOperationResult<@Nullable StorageObject> operationResult = - session.put(RewindableHttpContent.empty(), HttpContentRange.of(cumulativeByteCount)); + session.put(RewindableContent.empty(), HttpContentRange.of(cumulativeByteCount)); long persistedSize = operationResult.getPersistedSize(); committedBytesCallback.accept(persistedSize); result.set(operationResult.getObject()); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java index 98b31a3de..cbdbd94d6 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java @@ -28,6 +28,9 @@ final class ByteSizeConstants { static final int _2MiB = 2 * _1MiB; static final int _16MiB = 16 * _1MiB; static final int _32MiB = 32 * _1MiB; + static final long _1GiB = 1024 * _1MiB; + static final long _1TiB = 1024 * _1GiB; + static final long _5TiB = 5 * _1TiB; static final long _128KiBL = 131072L; static final long _256KiBL = 262144L; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java index e39bf85d3..80bca1c57 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ClientStreamingCallable; @@ -219,10 +220,12 @@ final class ResumableUploadBuilder { private RetryingDependencies deps; private ResultRetryAlgorithm alg; + private boolean fsyncEvery; ResumableUploadBuilder() { this.deps = RetryingDependencies.attemptOnce(); this.alg = Retrying.neverRetry(); + this.fsyncEvery = true; } ResumableUploadBuilder withRetryConfig(RetryingDependencies deps, ResultRetryAlgorithm alg) { @@ -231,6 +234,12 @@ ResumableUploadBuilder withRetryConfig(RetryingDependencies deps, ResultRetryAlg return this; } + @InternalApi + ResumableUploadBuilder setFsyncEvery(boolean fsyncEvery) { + this.fsyncEvery = fsyncEvery; + return this; + } + /** * Do not apply any intermediate buffering. Any call to {@link * java.nio.channels.WritableByteChannel#write(ByteBuffer)} will be segmented as is and sent to @@ -281,7 +290,10 @@ UnbufferedWritableByteChannelSession build() { return new UnbufferedWriteSession<>( requireNonNull(start, "start must be non null"), bindFunction( - WriteFlushStrategy.fsyncEveryFlush(write, deps, alg, Retrying::newCallContext), + fsyncEvery + ? WriteFlushStrategy.fsyncEveryFlush( + write, deps, alg, Retrying::newCallContext) + : WriteFlushStrategy.fsyncOnClose(write), ResumableWrite::identity) .andThen(StorageByteChannels.writable()::createSynchronized)); } @@ -310,7 +322,10 @@ BufferedWritableByteChannelSession build() { return new BufferedWriteSession<>( requireNonNull(start, "start must be non null"), bindFunction( - WriteFlushStrategy.fsyncEveryFlush(write, deps, alg, Retrying::newCallContext), + fsyncEvery + ? WriteFlushStrategy.fsyncEveryFlush( + write, deps, alg, Retrying::newCallContext) + : WriteFlushStrategy.fsyncOnClose(write), ResumableWrite::identity) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) .andThen(StorageByteChannels.writable()::createSynchronized)); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcResumableSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcResumableSession.java new file mode 100644 index 000000000..8de44fb65 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcResumableSession.java @@ -0,0 +1,123 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.ApiFutures; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; +import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.storage.v2.Object; +import com.google.storage.v2.QueryWriteStatusRequest; +import com.google.storage.v2.QueryWriteStatusResponse; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import java.util.concurrent.atomic.AtomicBoolean; +import org.checkerframework.checker.nullness.qual.Nullable; + +final class GrpcResumableSession { + + private final RetryingDependencies deps; + private final ResultRetryAlgorithm alg; + private final ClientStreamingCallable writeCallable; + private final UnaryCallable + queryWriteStatusCallable; + private final ResumableWrite resumableWrite; + private final Hasher hasher; + + GrpcResumableSession( + RetryingDependencies deps, + ResultRetryAlgorithm alg, + ClientStreamingCallable writeCallable, + UnaryCallable queryWriteStatusCallable, + ResumableWrite resumableWrite, + Hasher hasher) { + this.deps = deps; + this.alg = alg; + this.writeCallable = writeCallable; + this.queryWriteStatusCallable = queryWriteStatusCallable; + this.resumableWrite = resumableWrite; + this.hasher = hasher; + } + + ResumableOperationResult<@Nullable Object> query() { + QueryWriteStatusRequest.Builder b = + QueryWriteStatusRequest.newBuilder().setUploadId(resumableWrite.getRes().getUploadId()); + if (resumableWrite.getReq().hasCommonObjectRequestParams()) { + b.setCommonObjectRequestParams(resumableWrite.getReq().getCommonObjectRequestParams()); + } + QueryWriteStatusRequest req = b.build(); + try { + QueryWriteStatusResponse response = queryWriteStatusCallable.call(req); + if (response.hasResource()) { + return ResumableOperationResult.complete( + response.getResource(), response.getResource().getSize()); + } else { + return ResumableOperationResult.incremental(response.getPersistedSize()); + } + } catch (Exception e) { + throw StorageException.coalesce(e); + } + } + + ResumableOperationResult<@Nullable Object> put(RewindableContent content) { + AtomicBoolean dirty = new AtomicBoolean(false); + GrpcCallContext retryingCallContext = Retrying.newCallContext(); + BufferHandle handle = BufferHandle.allocate(ByteSizeConstants._2MiB); + + return Retrying.run( + deps, + alg, + () -> { + if (dirty.getAndSet(true)) { + ResumableOperationResult<@Nullable Object> query = query(); + if (query.getObject() != null) { + return query; + } else { + content.rewindTo(query.getPersistedSize()); + } + } + WritableByteChannelSession session = + ResumableMedia.gapic() + .write() + .byteChannel(writeCallable.withDefaultCallContext(retryingCallContext)) + .setByteStringStrategy(ByteStringStrategy.copy()) + .setHasher(hasher) + .resumable() + .setFsyncEvery(false) + .buffered(handle) + .setStartAsync(ApiFutures.immediateFuture(resumableWrite)) + .build(); + + try (BufferedWritableByteChannel channel = session.open()) { + content.writeTo(channel); + } + + WriteObjectResponse response = session.getResult().get(); + if (response.hasResource()) { + return ResumableOperationResult.complete( + response.getResource(), response.getResource().getSize()); + } else { + return ResumableOperationResult.incremental(response.getPersistedSize()); + } + }, + Decoder.identity()); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 14b478990..d7d405919 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -28,6 +28,7 @@ import static java.util.Objects.requireNonNull; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.paging.AbstractPage; @@ -35,6 +36,7 @@ import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiExceptions; +import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.api.gax.rpc.NotFoundException; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; @@ -72,6 +74,7 @@ import com.google.common.collect.Streams; import com.google.common.io.BaseEncoding; import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.MoreExecutors; import com.google.iam.v1.GetIamPolicyRequest; import com.google.iam.v1.SetIamPolicyRequest; import com.google.iam.v1.TestIamPermissionsRequest; @@ -123,7 +126,6 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; -import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.OpenOption; @@ -138,6 +140,7 @@ import java.util.Spliterator; import java.util.Spliterators.AbstractSpliterator; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; @@ -285,55 +288,34 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); - long size = Files.size(path); - if (size < bufferSize) { - // ignore the bufferSize argument if the file is smaller than it - GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); - return Retrying.run( - getOptions(), - retryAlgorithmManager.getFor(req), - () -> { - BufferedWritableByteChannelSession session = - ResumableMedia.gapic() - .write() - .byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge)) - .setHasher(Hasher.enabled()) - .setByteStringStrategy(ByteStringStrategy.noCopy()) - .direct() - .buffered(Buffers.allocate(size)) - .setRequest(req) - .build(); - - try (SeekableByteChannel src = Files.newByteChannel(path, READ_OPS); - BufferedWritableByteChannel dst = session.open()) { - ByteStreams.copy(src, dst); - } catch (Exception e) { - throw StorageException.coalesce(e); - } - return session.getResult(); - }, - this::getBlob); - } else { - ApiFuture start = startResumableWrite(grpcCallContext, req); - BufferedWritableByteChannelSession session = - ResumableMedia.gapic() - .write() - .byteChannel( - storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext)) - .setHasher(Hasher.noop()) - .setByteStringStrategy(ByteStringStrategy.noCopy()) - .resumable() - .withRetryConfig(getOptions(), retryAlgorithmManager.idempotent()) - .buffered(Buffers.allocateAligned(bufferSize, _256KiB)) - .setStartAsync(start) - .build(); - try (SeekableByteChannel src = Files.newByteChannel(path, READ_OPS); - BufferedWritableByteChannel dst = session.open()) { - ByteStreams.copy(src, dst); - } catch (Exception e) { - throw StorageException.coalesce(e); + ClientStreamingCallable write = + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); + + ApiFuture start = startResumableWrite(grpcCallContext, req); + ApiFuture session2 = + ApiFutures.transform( + start, + rw -> + ResumableSession.grpc( + getOptions(), + retryAlgorithmManager.idempotent(), + write, + storageClient.queryWriteStatusCallable(), + rw, + Hasher.noop()), + MoreExecutors.directExecutor()); + try { + GrpcResumableSession got = session2.get(); + ResumableOperationResult<@Nullable Object> put = got.put(RewindableContent.of(path)); + Object object = put.getObject(); + if (object == null) { + // if by some odd chance the put didn't get the Object, query for it + ResumableOperationResult<@Nullable Object> query = got.query(); + object = query.getObject(); } - return getBlob(session.getResult()); + return codecs.blobInfo().decode(object).asBlob(this); + } catch (InterruptedException | ExecutionException e) { + throw StorageException.coalesce(e); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java index debd611b0..7cc2f74e8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.checkerframework.checker.nullness.qual.Nullable; -final class JsonResumableSession extends ResumableSession { +final class JsonResumableSession { static final String SPAN_NAME_WRITE = String.format("Sent.%s.write", HttpStorageRpc.class.getName()); @@ -53,14 +53,12 @@ final class JsonResumableSession extends ResumableSession { * Not automatically retried. Usually called from within another retrying context. We don't yet * have the concept of nested retry handling. */ - @Override ResumableOperationResult<@Nullable StorageObject> query() { return new JsonResumableSessionQueryTask(context, resumableWrite.getUploadId()).call(); } - @Override ResumableOperationResult<@Nullable StorageObject> put( - RewindableHttpContent content, HttpContentRange contentRange) { + RewindableContent content, HttpContentRange contentRange) { JsonResumableSessionPutTask task = new JsonResumableSessionPutTask( context, resumableWrite.getUploadId(), content, contentRange); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java index de905fb48..5abb43c16 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java @@ -39,7 +39,7 @@ final class JsonResumableSessionPutTask private final HttpClientContext context; private final String uploadId; - private final RewindableHttpContent content; + private final RewindableContent content; private final HttpContentRange originalContentRange; private HttpContentRange contentRange; @@ -48,7 +48,7 @@ final class JsonResumableSessionPutTask JsonResumableSessionPutTask( HttpClientContext httpClientContext, String uploadId, - RewindableHttpContent content, + RewindableContent content, HttpContentRange originalContentRange) { this.context = httpClientContext; this.uploadId = uploadId; @@ -64,9 +64,9 @@ public void rewindTo(long offset) { long originalBegin = range.beginOffset(); long contentOffset = offset - originalBegin; Preconditions.checkArgument( - 0 <= contentOffset && contentOffset < content.getLength(), + 0 <= contentOffset && contentOffset < range.length(), "Rewind offset is out of bounds. (%s <= %s < %s)", - range.beginOffset(), + originalBegin, offset, range.endOffset()); content.rewindTo(contentOffset); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSession.java index 7b608c68a..5c308f7fb 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSession.java @@ -17,17 +17,17 @@ package com.google.cloud.storage; import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.storage.Retrying.RetryingDependencies; -import org.checkerframework.checker.nullness.qual.Nullable; +import com.google.storage.v2.QueryWriteStatusRequest; +import com.google.storage.v2.QueryWriteStatusResponse; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; -abstract class ResumableSession { +final class ResumableSession { - ResumableSession() {} - - abstract ResumableOperationResult<@Nullable T> put( - RewindableHttpContent content, HttpContentRange contentRange); - - abstract ResumableOperationResult<@Nullable T> query(); + private ResumableSession() {} static JsonResumableSession json( HttpClientContext context, @@ -36,4 +36,15 @@ static JsonResumableSession json( JsonResumableWrite resumableWrite) { return new JsonResumableSession(context, deps, alg, resumableWrite); } + + static GrpcResumableSession grpc( + RetryingDependencies deps, + ResultRetryAlgorithm alg, + ClientStreamingCallable writeCallable, + UnaryCallable queryWriteStatusCallable, + ResumableWrite resumableWrite, + Hasher hasher) { + return new GrpcResumableSession( + deps, alg, writeCallable, queryWriteStatusCallable, resumableWrite, hasher); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableHttpContent.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java similarity index 72% rename from google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableHttpContent.java rename to google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java index 68bf4e22f..166ccbc97 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableHttpContent.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java @@ -25,6 +25,7 @@ import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; @@ -32,9 +33,9 @@ import java.nio.file.StandardOpenOption; import java.util.Arrays; -abstract class RewindableHttpContent extends AbstractHttpContent { +abstract class RewindableContent extends AbstractHttpContent { - private RewindableHttpContent() { + private RewindableContent() { super((HttpMediaType) null); } @@ -43,24 +44,28 @@ private RewindableHttpContent() { abstract void rewindTo(long offset); + abstract long writeTo(WritableByteChannel gbc) throws IOException; + + abstract long writeTo(GatheringByteChannel gbc) throws IOException; + @Override public final boolean retrySupported() { return false; } - static RewindableHttpContent empty() { + static RewindableContent empty() { return EmptyRewindableContent.INSTANCE; } - static RewindableHttpContent of(ByteBuffer... buffers) { - return new ByteBufferHttpContent(buffers); + static RewindableContent of(ByteBuffer... buffers) { + return new ByteBufferContent(buffers); } - static RewindableHttpContent of(Path path) throws IOException { - return new PathRewindableHttpContent(path); + static RewindableContent of(Path path) throws IOException { + return new PathRewindableContent(path); } - private static final class EmptyRewindableContent extends RewindableHttpContent { + private static final class EmptyRewindableContent extends RewindableContent { private static final EmptyRewindableContent INSTANCE = new EmptyRewindableContent(); @Override @@ -73,18 +78,28 @@ public void writeTo(OutputStream out) throws IOException { out.flush(); } + @Override + long writeTo(WritableByteChannel gbc) { + return 0; + } + + @Override + long writeTo(GatheringByteChannel gbc) { + return 0; + } + @Override protected void rewindTo(long offset) {} } - private static final class PathRewindableHttpContent extends RewindableHttpContent { + private static final class PathRewindableContent extends RewindableContent { private final Path path; private final long size; private long readOffset; - private PathRewindableHttpContent(Path path) throws IOException { + private PathRewindableContent(Path path) throws IOException { this.path = path; this.size = Files.size(path); this.readOffset = 0; @@ -110,9 +125,25 @@ public void writeTo(OutputStream out) throws IOException { out.flush(); } } + + @Override + long writeTo(WritableByteChannel gbc) throws IOException { + try (SeekableByteChannel in = Files.newByteChannel(path, StandardOpenOption.READ)) { + in.position(readOffset); + return ByteStreams.copy(in, gbc); + } + } + + @Override + long writeTo(GatheringByteChannel gbc) throws IOException { + try (SeekableByteChannel in = Files.newByteChannel(path, StandardOpenOption.READ)) { + in.position(readOffset); + return ByteStreams.copy(in, gbc); + } + } } - private static final class ByteBufferHttpContent extends RewindableHttpContent { + private static final class ByteBufferContent extends RewindableContent { private final ByteBuffer[] buffers; // keep an array of the positions in case we need to rewind them for retries @@ -126,7 +157,7 @@ private static final class ByteBufferHttpContent extends RewindableHttpContent { private long offset; - private ByteBufferHttpContent(ByteBuffer[] buffers) { + private ByteBufferContent(ByteBuffer[] buffers) { this.buffers = buffers; this.positions = Arrays.stream(buffers).mapToInt(Buffers::position).toArray(); this.totalLength = Arrays.stream(buffers).mapToLong(Buffer::remaining).sum(); @@ -148,6 +179,22 @@ public void writeTo(OutputStream out) throws IOException { out.flush(); } + @Override + long writeTo(WritableByteChannel gbc) throws IOException { + dirty = true; + int retVal = 0; + for (ByteBuffer buffer : buffers) { + retVal += gbc.write(buffer); + } + return retVal; + } + + @Override + long writeTo(GatheringByteChannel gbc) throws IOException { + dirty = true; + return gbc.write(buffers); + } + @Override void rewindTo(long offset) { Preconditions.checkArgument( diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index fc8601cb6..fd387717b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -255,7 +255,7 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp HttpContentRange contentRange = HttpContentRange.of(ByteRangeSpec.relativeLength(0L, size), size); ResumableOperationResult put = - session.put(RewindableHttpContent.of(path), contentRange); + session.put(RewindableContent.of(path), contentRange); // all exception translation is taken care of down in the JsonResumableSession StorageObject object = put.getObject(); if (object == null) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java index be739ab18..cff9dc469 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java @@ -36,6 +36,7 @@ import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; import io.grpc.netty.shaded.io.netty.handler.codec.http.FullHttpResponse; import io.grpc.netty.shaded.io.netty.handler.codec.http.HttpHeaders; +import io.grpc.netty.shaded.io.netty.handler.codec.http.HttpObjectAggregator; import io.grpc.netty.shaded.io.netty.handler.codec.http.HttpRequest; import io.grpc.netty.shaded.io.netty.handler.codec.http.HttpServerCodec; import io.grpc.netty.shaded.io.netty.handler.codec.http.HttpServerExpectContinueHandler; @@ -84,6 +85,12 @@ static FakeHttpServer of(HttpRequestHandler server) { protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(new HttpServerCodec()); + // Accept a request and content up to 100 MiB + // If we don't do this, sometimes the ordering on the wire will result in the server + // rejecting the request before the client has finished sending. + // While our client can handle this scenario and retry, it makes assertions more + // difficult due to the variability of request counts. + p.addLast(new HttpObjectAggregator(100 * 1024 * 1024)); p.addLast(new HttpServerExpectContinueHandler()); p.addLast(new Handler(server)); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcStorageImplUploadRetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcStorageImplUploadRetryTest.java index ac192a1ff..12f1bd6b9 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcStorageImplUploadRetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcStorageImplUploadRetryTest.java @@ -29,6 +29,8 @@ import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.Object; import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.QueryWriteStatusRequest; +import com.google.storage.v2.QueryWriteStatusResponse; import com.google.storage.v2.StartResumableWriteRequest; import com.google.storage.v2.StartResumableWriteResponse; import com.google.storage.v2.StorageGrpc.StorageImplBase; @@ -93,7 +95,7 @@ public void create_inputStream() throws Exception { @Test public void createFrom_path_smallerThanBufferSize() throws Exception { - Direct.FakeService service = Direct.FakeService.create(); + Resumable.FakeService service = Resumable.FakeService.create(); try (TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize); FakeServer server = FakeServer.of(service); @@ -287,8 +289,16 @@ public void startResumableWrite( } } + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(QueryWriteStatusResponse.newBuilder().setPersistedSize(0).build()); + responseObserver.onCompleted(); + } + // a bit of constructor lifecycle hackery to appease the compiler - // Even though the thing past to super() is a lazy function, the closing over of the outer + // Even though the thing passed to super() is a lazy function, the closing over of the outer // fields happens earlier than they are available. To side step this fact, we provide the // AtomicBoolean as a constructor argument which can be closed over without issue, and then // bind it to the class field after super(). diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionPutTaskTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionPutTaskTest.java index 7ae68773d..b7d9d6c74 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionPutTaskTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionPutTaskTest.java @@ -17,6 +17,7 @@ package com.google.cloud.storage; import static com.google.cloud.storage.ByteSizeConstants._128KiBL; +import static com.google.cloud.storage.ByteSizeConstants._256KiB; import static com.google.cloud.storage.ByteSizeConstants._256KiBL; import static com.google.cloud.storage.ByteSizeConstants._512KiBL; import static com.google.cloud.storage.ByteSizeConstants._768KiBL; @@ -45,6 +46,7 @@ import io.grpc.netty.shaded.io.netty.handler.codec.http.HttpResponseStatus; import java.math.BigInteger; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -98,7 +100,7 @@ public void emptyObjectHappyPath() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(ByteRangeSpec.explicitClosed(0L, 0L), 0)); ResumableOperationResult<@Nullable StorageObject> operationResult = task.call(); @@ -144,7 +146,7 @@ public void scenario9() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(ByteRangeSpec.explicitClosed(0L, 10L))); StorageException se = assertThrows(StorageException.class, task::call); @@ -191,7 +193,7 @@ public void scenario7() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(ByteRangeSpec.explicitClosed(0L, 10L))); StorageException se = assertThrows(StorageException.class, task::call); @@ -272,7 +274,7 @@ public void scenario1() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.of(tmpFile.getPath()), + RewindableContent.of(tmpFile.getPath()), HttpContentRange.of(ByteRangeSpec.explicit(0L, _256KiBL))); StorageException se = assertThrows(StorageException.class, task::call); @@ -341,7 +343,7 @@ public void scenario2() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_256KiBL)); StorageException se = assertThrows(StorageException.class, task::call); @@ -410,7 +412,7 @@ public void scenario3() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_512KiBL)); StorageException se = assertThrows(StorageException.class, task::call); @@ -488,7 +490,7 @@ public void scenario4() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_256KiBL)); ResumableOperationResult<@Nullable StorageObject> operationResult = task.call(); @@ -570,7 +572,7 @@ public void scenario4_1() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_512KiBL)); StorageException se = assertThrows(StorageException.class, task::call); @@ -650,7 +652,7 @@ public void scenario4_2() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_128KiBL)); StorageException se = assertThrows(StorageException.class, task::call); @@ -728,7 +730,7 @@ public void scenario5() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.of(tmpFile.getPath()), + RewindableContent.of(tmpFile.getPath()), HttpContentRange.of(ByteRangeSpec.explicit(_512KiBL, _768KiBL))); StorageException se = assertThrows(StorageException.class, task::call); @@ -768,7 +770,7 @@ public void jsonParseFailure() throws Exception { JsonResumableSessionPutTask task = new JsonResumableSessionPutTask( - httpClientContext, uploadUrl, RewindableHttpContent.empty(), HttpContentRange.of(0)); + httpClientContext, uploadUrl, RewindableContent.empty(), HttpContentRange.of(0)); StorageException se = assertThrows(StorageException.class, task::call); // the parse error happens while trying to read the success object, make sure we raise it as @@ -803,7 +805,7 @@ public void jsonDeserializationOnlyAttemptedWhenContentPresent() throws Exceptio JsonResumableSessionPutTask task = new JsonResumableSessionPutTask( - httpClientContext, uploadUrl, RewindableHttpContent.empty(), HttpContentRange.of(0)); + httpClientContext, uploadUrl, RewindableContent.empty(), HttpContentRange.of(0)); ResumableOperationResult<@Nullable StorageObject> operationResult = task.call(); StorageObject call = operationResult.getObject(); @@ -814,7 +816,7 @@ public void jsonDeserializationOnlyAttemptedWhenContentPresent() throws Exceptio @Test public void attemptToRewindOutOfBoundsThrows_lower() { - RewindableHttpContent content = RewindableHttpContent.of(); + RewindableContent content = RewindableContent.of(); JsonResumableSessionPutTask task = new JsonResumableSessionPutTask( null, null, content, HttpContentRange.of(ByteRangeSpec.relativeLength(10L, 10L))); @@ -826,7 +828,7 @@ public void attemptToRewindOutOfBoundsThrows_lower() { @Test public void attemptToRewindOutOfBoundsThrows_upper() { - RewindableHttpContent content = RewindableHttpContent.of(); + RewindableContent content = RewindableContent.of(); JsonResumableSessionPutTask task = new JsonResumableSessionPutTask( null, null, content, HttpContentRange.of(ByteRangeSpec.relativeLength(10L, 10L))); @@ -835,4 +837,38 @@ public void attemptToRewindOutOfBoundsThrows_upper() { assertThrows(IllegalArgumentException.class, () -> task.rewindTo(20)); assertThat(iae).hasMessageThat().isEqualTo("Rewind offset is out of bounds. (10 <= 20 < 20)"); } + + @Test + public void repeatedRewindsToTheSameLocationWork() { + ByteBuffer buf1 = DataGenerator.base64Characters().genByteBuffer(_256KiB); + ByteBuffer buf2 = DataGenerator.base64Characters().genByteBuffer(_256KiB); + RewindableContent content = RewindableContent.of(buf1, buf2); + JsonResumableSessionPutTask task = + new JsonResumableSessionPutTask( + null, null, content, HttpContentRange.of(ByteRangeSpec.relativeLength(0L, _512KiBL))); + + task.rewindTo(0); + assertThat(buf1.position()).isEqualTo(0); + assertThat(buf2.position()).isEqualTo(0); + + int last = buf1.capacity(); + buf1.position(last); + buf2.position(last); + + task.rewindTo(_256KiBL); + assertThat(buf1.remaining()).isEqualTo(0); + assertThat(buf2.position()).isEqualTo(0); + + task.rewindTo(_256KiBL); + assertThat(buf1.remaining()).isEqualTo(0); + assertThat(buf2.position()).isEqualTo(0); + + task.rewindTo(_256KiBL + 13); + assertThat(buf1.remaining()).isEqualTo(0); + assertThat(buf2.position()).isEqualTo(13); + + task.rewindTo(_256KiBL + 13); + assertThat(buf1.remaining()).isEqualTo(0); + assertThat(buf2.position()).isEqualTo(13); + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java index 1cc917e34..108e6d6de 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java @@ -119,7 +119,7 @@ public void rewindWillQueryStatusOnlyWhenDirty() throws Exception { httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite); ResumableOperationResult<@Nullable StorageObject> operationResult = - session.put(RewindableHttpContent.of(tmpFile.getPath()), range1); + session.put(RewindableContent.of(tmpFile.getPath()), range1); StorageObject call = operationResult.getObject(); assertThat(call).isNull(); assertThat(operationResult.getPersistedSize()).isEqualTo(_512KiBL); @@ -173,13 +173,13 @@ public void retryAttemptWillReturnQueryResultIfPersistedSizeMatchesSpecifiedEndO httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite); ResumableOperationResult<@Nullable StorageObject> operationResult1 = - session.put(RewindableHttpContent.of(buf1), range1); + session.put(RewindableContent.of(buf1), range1); StorageObject call1 = operationResult1.getObject(); assertThat(call1).isNull(); assertThat(operationResult1.getPersistedSize()).isEqualTo(_512KiBL); ResumableOperationResult<@Nullable StorageObject> operationResult2 = - session.put(RewindableHttpContent.of(buf2), range3); + session.put(RewindableContent.of(buf2), range3); StorageObject call2 = operationResult2.getObject(); assertThat(call2).isNull(); assertThat(operationResult2.getPersistedSize()).isEqualTo(_768KiBL); @@ -240,13 +240,13 @@ public void rewindOfContentIsRelativeToItsBeginOffsetOfTheOverallObject() throws httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite); ResumableOperationResult<@Nullable StorageObject> operationResult1 = - session.put(RewindableHttpContent.of(buf1), range1); + session.put(RewindableContent.of(buf1), range1); StorageObject call1 = operationResult1.getObject(); assertThat(call1).isNull(); assertThat(operationResult1.getPersistedSize()).isEqualTo(_512KiBL); ResumableOperationResult<@Nullable StorageObject> operationResult2 = - session.put(RewindableHttpContent.of(buf2), range2); + session.put(RewindableContent.of(buf2), range2); StorageObject call2 = operationResult2.getObject(); assertThat(call2).isNull(); assertThat(operationResult2.getPersistedSize()).isEqualTo(_768KiBL); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableByteBufferContentTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableByteBufferContentTest.java index 9d65a00dc..fe867e6f2 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableByteBufferContentTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableByteBufferContentTest.java @@ -21,7 +21,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; -import com.google.cloud.storage.RewindableHttpContentPropertyTest.ErroringOutputStream; +import com.google.cloud.storage.RewindableContentPropertyTest.ErroringOutputStream; import com.google.protobuf.ByteString; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -61,7 +61,7 @@ public void setUp() throws Exception { @Test public void getLength() { - RewindableHttpContent content = RewindableHttpContent.of(buffers); + RewindableContent content = RewindableContent.of(buffers); assertThat(content.getLength()).isEqualTo(total); } @@ -69,7 +69,7 @@ public void getLength() { @Test public void writeTo() throws IOException { - RewindableHttpContent content = RewindableHttpContent.of(buffers); + RewindableContent content = RewindableContent.of(buffers); ByteArrayOutputStream baos = new ByteArrayOutputStream(); content.writeTo(baos); @@ -81,7 +81,7 @@ public void writeTo() throws IOException { @Test public void rewind() throws IOException { - RewindableHttpContent content = RewindableHttpContent.of(buffers); + RewindableContent content = RewindableContent.of(buffers); assertThrows( IOException.class, @@ -100,7 +100,7 @@ public void rewind() throws IOException { @Test public void rewindTo() throws Exception { - RewindableHttpContent content = RewindableHttpContent.of(buffers); + RewindableContent content = RewindableContent.of(buffers); ByteString reduce = Arrays.stream(buffers) @@ -135,7 +135,7 @@ public void rewind_dirtyAware() throws IOException { int position = buf.position(); int limit = buf.limit(); - RewindableHttpContent content = RewindableHttpContent.of(buf); + RewindableContent content = RewindableContent.of(buf); int hackPosition = 2; // after content has initialized, mutate the position underneath it. We're doing this to detect // if rewind is actually modifying things. It shouldn't until the content is dirtied by calling diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableHttpContentPropertyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java similarity index 98% rename from google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableHttpContentPropertyTest.java rename to google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java index f8e18f286..48d29bc8c 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableHttpContentPropertyTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java @@ -41,12 +41,12 @@ import net.jqwik.api.RandomDistribution; import org.checkerframework.checker.nullness.qual.NonNull; -final class RewindableHttpContentPropertyTest { +final class RewindableContentPropertyTest { @Property void path(@ForAll("PathScenario") PathScenario pathScenario) throws Exception { try (PathScenario s = pathScenario) { - RewindableHttpContent content = RewindableHttpContent.of(s.getPath()); + RewindableContent content = RewindableContent.of(s.getPath()); assertThrows( IOException.class, () -> { @@ -68,7 +68,7 @@ void path(@ForAll("PathScenario") PathScenario pathScenario) throws Exception { @Property void byteBuffers(@ForAll("ByteBuffersScenario") ByteBuffersScenario s) throws IOException { - RewindableHttpContent content = RewindableHttpContent.of(s.getBuffers()); + RewindableContent content = RewindableContent.of(s.getBuffers()); assertThat(content.getLength()).isEqualTo(s.getFullLength()); assertThrows( IOException.class, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java index d8e026cbc..12bbf3df5 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java @@ -54,7 +54,7 @@ @RunWith(StorageITRunner.class) @CrossRun( - transports = {Transport.HTTP /*, Transport.GRPC*/}, + transports = {Transport.HTTP, Transport.GRPC}, backends = Backend.PROD) @Parameterized(ChecksummedTestContentProvider.class) public final class ITObjectChecksumSupportTest {