From e67991022f7d988c8fba4d0ca70f8b3a9d59c69a Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 27 Oct 2023 14:40:39 -0400 Subject: [PATCH 1/2] feat: port DefaultBlobWriteSessionConfig to work with HttpStorageOptions --- .../cloud/storage/BlobWriteSessionConfig.java | 9 ++++ .../storage/BlobWriteSessionConfigs.java | 4 +- .../DefaultBlobWriteSessionConfig.java | 41 ++++++++++++++++++- .../cloud/storage/HttpStorageOptions.java | 41 ++++++++++++++++++- .../com/google/cloud/storage/StorageImpl.java | 16 +++++++- .../storage/it/ITBlobWriteSessionTest.java | 11 ++++- 6 files changed, 114 insertions(+), 8 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java index b52e9bd4f..441f78ac7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java @@ -53,6 +53,15 @@ WritableByteChannelSession writeSession( StorageInternal s, BlobInfo info, Opts opts); } + /** + * Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is + * compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#HTTP} + * + *

We could evaluate the annotations, but the code for that is more complicated and probably + * not worth the effort. + */ + interface HttpCompatible {} + /** * Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is * compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#GRPC} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java index 43b68d8a0..781411c6d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java @@ -55,7 +55,7 @@ * full or close. Buffer size is configurable via * {@link DefaultBlobWriteSessionConfig#withChunkSize(int)} * - * gRPC + * gRPC, HTTP * The network will only be used for the following operations: *

    *
  1. Creating the Resumable Upload Session
  2. @@ -241,7 +241,7 @@ private BlobWriteSessionConfigs() {} * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi - @TransportCompatibility({Transport.GRPC}) + @TransportCompatibility({Transport.GRPC, Transport.HTTP}) public static DefaultBlobWriteSessionConfig getDefault() { return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java index 3a4dd2be2..1d45c9e34 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java @@ -21,17 +21,21 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import com.google.storage.v2.WriteObjectRequest; import com.google.storage.v2.WriteObjectResponse; import java.nio.channels.WritableByteChannel; import java.time.Clock; +import java.util.Map; +import java.util.function.Supplier; import javax.annotation.concurrent.Immutable; /** @@ -55,9 +59,9 @@ */ @Immutable @BetaApi -@TransportCompatibility({Transport.GRPC}) +@TransportCompatibility({Transport.GRPC, Transport.HTTP}) public final class DefaultBlobWriteSessionConfig extends BlobWriteSessionConfig - implements BlobWriteSessionConfig.GrpcCompatible { + implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible { private static final long serialVersionUID = -6873740918589930633L; private final int chunkSize; @@ -146,6 +150,39 @@ public WritableByteChannelSession writeSession( .build(); })), WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER); + } else if (s instanceof StorageImpl) { + StorageImpl json = (StorageImpl) s; + + return new DecoratedWritableByteChannelSession<>( + new LazySession<>( + new LazyWriteChannel<>( + () -> { + final Map optionsMap = opts.getRpcOptions(); + BlobInfo.Builder builder = info.toBuilder().setMd5(null).setCrc32c(null); + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); + + StorageObject encode = Conversions.json().blobInfo().encode(updated); + Supplier uploadIdSupplier = + ResumableMedia.startUploadForBlobInfo( + json.getOptions(), + updated, + optionsMap, + json.retryAlgorithmManager.getForResumableUploadSessionCreate( + optionsMap)); + ApiFuture startAsync = + ApiFutures.immediateFuture( + JsonResumableWrite.of( + encode, optionsMap, uploadIdSupplier.get(), 0L)); + + return ResumableMedia.http() + .write() + .byteChannel(HttpClientContext.from(json.storageRpc)) + .resumable() + .buffered(BufferHandle.allocate(chunkSize)) + .setStartAsync(startAsync) + .build(); + })), + Conversions.json().blobInfo()); } else { throw new IllegalStateException( "Unknown Storage implementation: " + s.getClass().getName()); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java index fd2a57311..684f3f15b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java @@ -16,6 +16,7 @@ package com.google.cloud.storage; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import com.google.api.core.ApiClock; @@ -30,6 +31,7 @@ import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.spi.ServiceRpcFactory; import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.spi.StorageRpcFactory; import com.google.cloud.storage.spi.v1.HttpStorageRpc; @@ -39,7 +41,9 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.Serializable; +import java.time.Clock; import java.util.Set; +import org.checkerframework.checker.nullness.qual.NonNull; /** @since 2.14.0 This new api is in preview and is subject to breaking changes. */ @BetaApi @@ -55,6 +59,7 @@ public class HttpStorageOptions extends StorageOptions { private final HttpRetryAlgorithmManager retryAlgorithmManager; private transient RetryDependenciesAdapter retryDepsAdapter; + private final BlobWriteSessionConfig blobWriteSessionConfig; private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) { super(builder, serviceDefaults); @@ -63,6 +68,7 @@ private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) { MoreObjects.firstNonNull( builder.storageRetryStrategy, defaults().getStorageRetryStrategy())); retryDepsAdapter = new RetryDependenciesAdapter(); + blobWriteSessionConfig = builder.blobWriteSessionConfig; } @Override @@ -120,6 +126,8 @@ RetryingDependencies asRetryDependencies() { public static class Builder extends StorageOptions.Builder { private StorageRetryStrategy storageRetryStrategy; + private BlobWriteSessionConfig blobWriteSessionConfig = + HttpStorageDefaults.INSTANCE.getDefaultStorageWriterConfig(); Builder() {} @@ -218,6 +226,24 @@ public HttpStorageOptions.Builder setQuotaProjectId(String quotaProjectId) { return this; } + /** + * @see BlobWriteSessionConfig + * @see BlobWriteSessionConfigs + * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) + * @see HttpStorageDefaults#getDefaultStorageWriterConfig() + * @since 2.29.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public HttpStorageOptions.Builder setBlobWriteSessionConfig( + @NonNull BlobWriteSessionConfig blobWriteSessionConfig) { + requireNonNull(blobWriteSessionConfig, "blobWriteSessionConfig must be non null"); + checkArgument( + blobWriteSessionConfig instanceof BlobWriteSessionConfig.HttpCompatible, + "The provided instance of BlobWriteSessionConfig is not compatible with this HTTP transport."); + this.blobWriteSessionConfig = blobWriteSessionConfig; + return this; + } + @Override public HttpStorageOptions build() { return new HttpStorageOptions(this, defaults()); @@ -249,6 +275,12 @@ public HttpTransportOptions getDefaultTransportOptions() { public StorageRetryStrategy getStorageRetryStrategy() { return StorageRetryStrategy.getDefaultStorageRetryStrategy(); } + + /** @since 2.29.0 This new api is in preview and is subject to breaking changes. */ + @BetaApi + public BlobWriteSessionConfig getDefaultStorageWriterConfig() { + return BlobWriteSessionConfigs.getDefault(); + } } /** @@ -287,7 +319,14 @@ public HttpStorageFactory() {} public Storage create(StorageOptions options) { if (options instanceof HttpStorageOptions) { HttpStorageOptions httpStorageOptions = (HttpStorageOptions) options; - return new StorageImpl(httpStorageOptions); + Clock clock = Clock.systemUTC(); + try { + return new StorageImpl( + httpStorageOptions, httpStorageOptions.blobWriteSessionConfig.createFactory(clock)); + } catch (IOException e) { + throw new IllegalStateException( + "Unable to instantiate HTTP com.google.cloud.storage.Storage client.", e); + } } else { throw new IllegalArgumentException("Only HttpStorageOptions supported"); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index a5df68b83..1a9bd8d03 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -38,6 +38,7 @@ import com.google.cloud.WriteChannel; import com.google.cloud.storage.Acl.Entity; import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext; +import com.google.cloud.storage.BlobWriteSessionConfig.WriterFactory; import com.google.cloud.storage.HmacKey.HmacKeyMetadata; import com.google.cloud.storage.PostPolicyV4.ConditionV4Type; import com.google.cloud.storage.PostPolicyV4.PostConditionsV4; @@ -92,7 +93,7 @@ import java.util.function.Supplier; import org.checkerframework.checker.nullness.qual.Nullable; -final class StorageImpl extends BaseService implements Storage { +final class StorageImpl extends BaseService implements Storage, StorageInternal { private static final byte[] EMPTY_BYTE_ARRAY = {}; private static final String EMPTY_BYTE_ARRAY_MD5 = "1B2M2Y8AsgTpgAmY7PhCfg=="; @@ -115,11 +116,13 @@ final class StorageImpl extends BaseService implements Storage { final HttpRetryAlgorithmManager retryAlgorithmManager; final StorageRpc storageRpc; + final WriterFactory writerFactory; - StorageImpl(HttpStorageOptions options) { + StorageImpl(HttpStorageOptions options, WriterFactory writerFactory) { super(options); this.retryAlgorithmManager = options.getRetryAlgorithmManager(); this.storageRpc = options.getStorageRpcV1(); + this.writerFactory = writerFactory; } @Override @@ -1635,4 +1638,13 @@ private Bucket internalBucketGet(String bucket, Map option () -> storageRpc.get(bucketPb, optionsMap), (b) -> Conversions.json().bucketInfo().decode(b).asBucket(this)); } + + @Override + public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... options) { + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); + + WritableByteChannelSession writableByteChannelSession = + writerFactory.writeSession(this, blobInfo, opts); + return BlobWriteSessions.of(writableByteChannelSession); + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java index 763346b1f..51f8e55b3 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java @@ -26,6 +26,7 @@ import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.DataGenerator; import com.google.cloud.storage.GrpcStorageOptions; +import com.google.cloud.storage.HttpStorageOptions; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.StorageException; @@ -49,7 +50,7 @@ @RunWith(StorageITRunner.class) @CrossRun( - transports = {Transport.GRPC}, + transports = {Transport.HTTP, Transport.GRPC}, backends = {Backend.PROD}) public final class ITBlobWriteSessionTest { @@ -68,6 +69,7 @@ public void allDefaults() throws Exception { } @Test + @CrossRun.Exclude(transports = Transport.HTTP) public void bufferToTempDirThenUpload() throws Exception { StorageOptions options = null; if (transport == Transport.GRPC) { @@ -94,6 +96,13 @@ public void overrideDefaultBufferSize() throws Exception { .setBlobWriteSessionConfig( BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024)) .build(); + } else if (transport == Transport.HTTP) { + options = + ((HttpStorageOptions) storage.getOptions()) + .toBuilder() + .setBlobWriteSessionConfig( + BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024)) + .build(); } assertWithMessage("unable to resolve options").that(options).isNotNull(); //noinspection DataFlowIssue From 28205a9662d4c3db2f296b3f9522cae9e3a68951 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Wed, 27 Mar 2024 12:38:32 -0400 Subject: [PATCH 2/2] test: exclude bidi for http --- .../java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java index 51f8e55b3..b8f89726e 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java @@ -112,6 +112,7 @@ public void overrideDefaultBufferSize() throws Exception { } @Test + @CrossRun.Exclude(transports = Transport.HTTP) public void bidiTest() throws Exception { StorageOptions options = null; if (transport == Transport.GRPC) {