diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java index 76296e9b7..e07043e53 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java @@ -18,6 +18,8 @@ import com.google.api.core.BetaApi; import com.google.cloud.storage.GrpcStorageOptions.GrpcStorageDefaults; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy; +import com.google.cloud.storage.Storage.BlobTargetOption; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.common.collect.ImmutableList; import java.io.IOException; @@ -108,6 +110,10 @@ * retryable error query the offset of the Resumable Upload Session, then open the recovery * file from the offset and transmit the bytes to Cloud Storage. * + * + * Opening the stream for upload will be retried up to the limitations specified in {@link StorageOptions#getRetrySettings()} + * All bytes are buffered to disk and allow for recovery from any arbitrary offset. + * * gRPC * Resumable Upload * @@ -128,6 +134,90 @@ * * * + * + * Parallel Composite Upload + * {@link #parallelCompositeUpload()} + * + * Break the stream of bytes into smaller part objects uploading each part in parallel. Then + * composing the parts together to make the ultimate object. + * + * + * Automatic retires will be applied for the following: + *
    + *
  1. Creation of each individual part
  2. + *
  3. Performing an intermediary compose
  4. + *
  5. Performing a delete to cleanup each part and intermediary compose object
  6. + *
+ * + * Retrying the creation of the final object is contingent upon if an appropriate precondition + * is supplied when calling {@link Storage#blobWriteSession(BlobInfo, BlobWriteOption...)}. + * Either {@link BlobTargetOption#doesNotExist()} or {@link Storage.BlobTargetOption#generationMatch(long)} + * should be specified in order to make the final request idempotent. + *

Each operation will be retried up to the limitations specified in {@link StorageOptions#getRetrySettings()} + * + * gRPC + * + *

+ * + * + *
    + *
  1. + * Performing parallel composite uploads costs more money. + * Class A + * operations are performed to create each part and to perform each compose. If a storage + * tier other than + * STANDARD + * is used, early deletion fees apply to deletion of the parts. + *

    An illustrative example. Upload a 5GiB object using 64MiB as the max size per part. + *

      + *
    1. 80 Parts will be created (Class A)
    2. + *
    3. 3 compose calls will be performed (Class A)
    4. + *
    5. Delete 80 Parts along with 2 intermediary Compose objects (Free tier as long as {@code STANDARD} class)
    6. + *
    + * + * Once the parts and intermediary compose objects are deleted, there will be no storage charges related to those temporary objects. + *
  2. + *
  3. + * The service account/credentials used to perform the parallel composite upload require + * {@code storage.objects.delete} + * in order to cleanup the temporary part and intermediary compose objects. + *

    To handle handle part and intermediary compose object deletion out of band + * passing {@link PartCleanupStrategy#never()} to {@link ParallelCompositeUploadBlobWriteSessionConfig#withPartCleanupStrategy(PartCleanupStrategy)} + * will prevent automatic cleanup. + *

  4. + *
  5. + * Please see the + * Parallel composite uploads documentation for a more in depth explanation of the + * limitations of Parallel composite uploads. + *
  6. + *
  7. + * A failed upload can leave part and intermediary compose objects behind which will count + * as storage usage, and you will be billed for it. + *

    By default if an upload fails, an attempt to cleanup the part and intermediary compose + * will be made. However if the program were to crash there is no means for the client to + * perform the cleanup. + *

    Every part and intermediary compose object will be created with a name which ends in + * {@code .part}. An Object Lifecycle Management rule can be setup on your bucket to automatically + * cleanup objects with the suffix after some period of time. See + * Object Lifecycle Management + * for full details and a guide on how to setup a Delete + * rule with a suffix match condition. + *

  8. + *
  9. + * Using parallel composite uploads are not a one size fits all solution. They have very + * real overhead until uploading a large enough object. The inflection point is dependent + * upon many factors, and there is no one size fits all value. You will need to experiment + * with your deployment and workload to determine if parallel composite uploads are useful + * to you. + *
  10. + *
+ * + * * * * @see BlobWriteSessionConfig @@ -219,4 +309,19 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection pat public static JournalingBlobWriteSessionConfig journaling(Collection paths) { return new JournalingBlobWriteSessionConfig(ImmutableList.copyOf(paths), false); } + + /** + * Create a new {@link BlobWriteSessionConfig} which will perform a Parallel Composite + * Upload by breaking the stream into parts and composing the parts together to make the + * ultimate object. + * + * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) + * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public static ParallelCompositeUploadBlobWriteSessionConfig parallelCompositeUpload() { + return ParallelCompositeUploadBlobWriteSessionConfig.withDefaults(); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index e8d6130fa..16963e4d1 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -1815,7 +1815,8 @@ private UnbufferedReadableByteChannelSession unbufferedReadSession( ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts); Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest)); - GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes); + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes)); return ResumableMedia.gapic() .read() .byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext)) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java index d2ed5b433..aa86de36b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java @@ -19,14 +19,23 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; +import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.MetadataField.PartRange; +import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; +import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.HashCode; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.storage.v2.WriteObjectResponse; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.SecureRandom; @@ -41,16 +50,70 @@ import org.checkerframework.checker.nullness.qual.NonNull; /** - * Immutable config builder for a Parallel Composite Upload + * Immutable config builder to configure BlobWriteSession instances to perform Parallel Composite + * Uploads. * - * @see https://cloud.google.com/storage/docs/composing-objects + *

Parallel Composite Uploads can yield higher throughput when uploading large objects. However, + * there are some things which must be kept in mind when choosing to use this strategy. + * + *

    + *
  1. Performing parallel composite uploads costs more money. Class A operations + * are performed to create each part and to perform each compose. If a storage tier other than + * STANDARD + * is used, early deletion fees apply to deletion of the parts. + *

    An illustrative example. Upload a 5GiB object using 64MiB as the max size per part.
    + *

      + *
    1. 80 Parts will be created (Class A) + *
    2. 3 compose calls will be performed (Class A) + *
    3. Delete 80 Parts along with 2 intermediary Compose objects (Free tier as long as + * {@code STANDARD} class) + *
    + * Once the parts and intermediary compose objects are deleted, there will be no storage + * charges related to those temporary objects. + *
  2. The service account/credentials used to perform the parallel composite upload require {@code + * storage.objects.delete} in order to cleanup the temporary part and intermediary compose + * objects.
    + * To handle handle part and intermediary compose object deletion out of band passing + * {@link PartCleanupStrategy#never()} to {@link + * ParallelCompositeUploadBlobWriteSessionConfig#withPartCleanupStrategy(PartCleanupStrategy)} + * will prevent automatic cleanup. + *
  3. Please see the + * Parallel composite uploads documentation for a more in depth explanation of the + * limitations of Parallel composite uploads. + *
  4. A failed upload can leave part and intermediary compose objects behind which will count as + * storage usage, and you will be billed for it.
    + * By default if an upload fails, an attempt to cleanup the part and intermediary compose will + * be made. However if the program were to crash there is no means for the client to perform + * the cleanup.
    + * Every part and intermediary compose object will be created with a name which ends in {@code + * .part}. An Object Lifecycle Management rule can be setup on your bucket to automatically + * cleanup objects with the suffix after some period of time. See Object Lifecycle Management for + * full details and a guide on how to setup a Delete rule with a suffix + * match condition. + *
  5. Using parallel composite uploads are not a a one size fits all solution. They have very + * real overhead until uploading a large enough object. The inflection point is dependent upon + * many factors, and there is no one size fits all value. You will need to experiment with + * your deployment and workload to determine if parallel composite uploads are useful to you. + *
+ * + *

In general if you object sizes are smaller than several hundred megabytes it is unlikely + * parallel composite uploads will be beneficial to overall throughput. + * + * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + * @see BlobWriteSessionConfigs#parallelCompositeUpload() + * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) * @see https://cloud.google.com/storage/docs/parallel-composite-uploads + * @since 2.28.0 This new api is in preview and is subject to breaking changes. */ @Immutable @BetaApi -final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig { +public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig { private static final int MAX_PARTS_PER_COMPOSE = 32; private final int maxPartsPerCompose; @@ -86,6 +149,14 @@ ParallelCompositeUploadBlobWriteSessionConfig withMaxPartsPerCompose(int maxPart partCleanupStrategy); } + /** + * Specify a specific executor supplier where work will be submitted when performing a parallel + * composite upload. + * + *

Default: {@link ExecutorSupplier#cachedPool()} + * + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public ParallelCompositeUploadBlobWriteSessionConfig withExecutorSupplier( ExecutorSupplier executorSupplier) { @@ -98,6 +169,14 @@ public ParallelCompositeUploadBlobWriteSessionConfig withExecutorSupplier( partCleanupStrategy); } + /** + * Specify a specific buffering strategy which will dictate how buffers are allocated and used + * when performing a parallel composite upload. + * + *

Default: {@link BufferStrategy#simple(int) BufferStrategy#simple(16MiB)} + * + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public ParallelCompositeUploadBlobWriteSessionConfig withBufferStrategy( BufferStrategy bufferStrategy) { @@ -110,6 +189,14 @@ public ParallelCompositeUploadBlobWriteSessionConfig withBufferStrategy( partCleanupStrategy); } + /** + * Specify a specific naming strategy which will dictate how individual part and intermediary + * compose objects will be named when performing a parallel composite upload. + * + *

Default: {@link PartNamingStrategy#noPrefix()} + * + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public ParallelCompositeUploadBlobWriteSessionConfig withPartNamingStrategy( PartNamingStrategy partNamingStrategy) { @@ -122,6 +209,14 @@ public ParallelCompositeUploadBlobWriteSessionConfig withPartNamingStrategy( partCleanupStrategy); } + /** + * Specify a specific cleanup strategy which will dictate what cleanup operations are performed + * automatically when performing a parallel composite upload. + * + *

Default: {@link PartCleanupStrategy#always()} + * + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public ParallelCompositeUploadBlobWriteSessionConfig withPartCleanupStrategy( PartCleanupStrategy partCleanupStrategy) { @@ -135,7 +230,7 @@ public ParallelCompositeUploadBlobWriteSessionConfig withPartCleanupStrategy( } @BetaApi - static ParallelCompositeUploadBlobWriteSessionConfig of() { + static ParallelCompositeUploadBlobWriteSessionConfig withDefaults() { return new ParallelCompositeUploadBlobWriteSessionConfig( MAX_PARTS_PER_COMPOSE, ExecutorSupplier.cachedPool(), @@ -149,20 +244,45 @@ static ParallelCompositeUploadBlobWriteSessionConfig of() { WriterFactory createFactory(Clock clock) throws IOException { Executor executor = executorSupplier.get(); BufferHandlePool bufferHandlePool = bufferStrategy.get(); - throw new IllegalStateException("Not yet implemented"); + return new ParallelCompositeUploadWriterFactory(clock, executor, bufferHandlePool); } + /** + * A strategy which dictates how buffers are to be used for individual parts. The chosen strategy + * will apply to all instances of {@link BlobWriteSession} created from a single instance of + * {@link Storage}. + * + * @see #withBufferStrategy(BufferStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi @Immutable public abstract static class BufferStrategy extends Factory { private BufferStrategy() {} + /** + * Create a buffer strategy which will rely upon standard garbage collection. Each buffer will + * be used once and then garbage collected. + * + * @param capacity the number of bytes each buffer should be + * @see #withBufferStrategy(BufferStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public static BufferStrategy simple(int capacity) { return new SimpleBufferStrategy(capacity); } + /** + * Create a buffer strategy which will have a fixed size pool of buffers. Each buffer will be + * lazily allocated. + * + * @param bufferCount the number of buffers the pool will be + * @param bufferCapacity the number of bytes each buffer should be + * @see #withBufferStrategy(BufferStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public static BufferStrategy fixedPool(int bufferCount, int bufferCapacity) { return new FixedBufferStrategy(bufferCount, bufferCapacity); @@ -199,6 +319,13 @@ BufferHandlePool get() { } } + /** + * Class which will be used to supply an Executor where work will be submitted when performing a + * parallel composite upload. + * + * @see #withExecutorSupplier(ExecutorSupplier) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi @Immutable public abstract static class ExecutorSupplier extends Factory { @@ -206,6 +333,12 @@ public abstract static class ExecutorSupplier extends Factory { private ExecutorSupplier() {} + /** + * Create a cached thread pool for submitting work + * + * @see #withExecutorSupplier(ExecutorSupplier) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public static ExecutorSupplier cachedPool() { return new ExecutorSupplier() { @@ -217,6 +350,13 @@ Executor get() { }; } + /** + * Create a fixed size thread pool for submitting work + * + * @param poolSize the number of threads in the pool + * @see #withExecutorSupplier(ExecutorSupplier) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public static ExecutorSupplier fixedPool(int poolSize) { return new ExecutorSupplier() { @@ -228,6 +368,16 @@ Executor get() { }; } + /** + * Wrap an existing executor instance which will be used for submitting work + * + *

Choosing to use this supplier type will make your instance of {@link StorageOptions} + * unable to be serialized. + * + * @param executor the executor to use + * @see #withExecutorSupplier(ExecutorSupplier) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public static ExecutorSupplier useExecutor(Executor executor) { return new SuppliedExecutorSupplier(executor); @@ -256,6 +406,13 @@ Executor get() { } } + /** + * A naming strategy which will be used to generate a name for a part or intermediary compose + * object. + * + * @see #withPartNamingStrategy(PartNamingStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi @Immutable public abstract static class PartNamingStrategy { @@ -285,12 +442,52 @@ String fmtName(String ultimateObjectName, PartRange partRange) { protected abstract String fmtFields(String randomKey, String nameDigest, String partRange); + /** + * Default strategy in which no stable prefix is defined. + * + *

General format is + * + *


+     *   {randomKeyDigest};{objectInfoDigest};{partIndex}.part
+     * 
+ * + *

{@code {objectInfoDigest}} will be fixed for an individual {@link BlobWriteSession}. + * + *

NOTE:The way in which both {@code randomKeyDigest} and {@code + * objectInfoDigest} are generated is undefined and subject to change at any time. + * + * @see #withPartNamingStrategy(PartNamingStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public static PartNamingStrategy noPrefix() { SecureRandom rand = new SecureRandom(); return new NoPrefix(rand); } + /** + * Strategy in which an explicit stable prefix is present on each part and intermediary compose + * object. + * + *

General format is + * + *


+     *   {prefixPattern}/{randomKeyDigest};{objectInfoDigest};{partIndex}.part
+     * 
+ * + *

{@code {objectInfoDigest}} will be fixed for an individual {@link BlobWriteSession}. + * + *

NOTE:The way in which both {@code randomKeyDigest} and {@code + * objectInfoDigest} are generated is undefined and subject to change at any time. + * + *

Care must be taken when choosing to specify a stable prefix as this can create hotspots in + * the keyspace for object names. See Object Naming + * Convention Guidelines for more details. + * + * @see #withPartNamingStrategy(PartNamingStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public static PartNamingStrategy prefix(String prefixPattern) { checkNotNull(prefixPattern, "prefixPattern must be non null"); @@ -342,6 +539,13 @@ protected String fmtFields(String randomKey, String nameDigest, String partRange } } + /** + * A cleanup strategy which will dictate what cleanup operations are performed automatically when + * performing a parallel composite upload. + * + * @see #withPartCleanupStrategy(PartCleanupStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi @Immutable public static class PartCleanupStrategy { @@ -364,17 +568,47 @@ boolean isDeleteOnError() { /** * If an unrecoverable error is encountered, define whether to attempt to delete any object * parts already uploaded. + * + *

Default: {@code true} + * + * @since 2.28.0 This new api is in preview and is subject to breaking changes. */ @BetaApi - public PartCleanupStrategy withDeleteOnError(boolean deleteOnError) { + PartCleanupStrategy withDeleteOnError(boolean deleteOnError) { return new PartCleanupStrategy(deleteParts, deleteOnError); } + /** + * Cleanup strategy which will always attempt to clean up part and intermediary compose objects + * either on success or on error. + * + * @see #withPartCleanupStrategy(PartCleanupStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public static PartCleanupStrategy always() { return new PartCleanupStrategy(true, true); } + /** + * Cleanup strategy which will only attempt to clean up parts and intermediary compose objects + * either on success. + * + * @see #withPartCleanupStrategy(PartCleanupStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public static PartCleanupStrategy onlyOnSuccess() { + return new PartCleanupStrategy(true, false); + } + + /** + * Cleanup strategy which will never attempt to clean up parts or intermediary compose objects + * either on success or on error. + * + * @see #withPartCleanupStrategy(PartCleanupStrategy) + * @since 2.28.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi public static PartCleanupStrategy never() { return new PartCleanupStrategy(false, false); @@ -384,4 +618,65 @@ public static PartCleanupStrategy never() { private abstract static class Factory { abstract T get(); } + + private class ParallelCompositeUploadWriterFactory implements WriterFactory { + + private final Clock clock; + private final Executor executor; + private final BufferHandlePool bufferHandlePool; + + private ParallelCompositeUploadWriterFactory( + Clock clock, Executor executor, BufferHandlePool bufferHandlePool) { + this.clock = clock; + this.executor = executor; + this.bufferHandlePool = bufferHandlePool; + } + + @Override + public WritableByteChannelSession writeSession( + StorageInternal s, + BlobInfo info, + Opts opts, + Decoder d) { + return new PCUSession(s, info, opts); + } + + private final class PCUSession + implements WritableByteChannelSession { + + private final SettableApiFuture result; + private final StorageInternal storageInternal; + private final BlobInfo info; + private final Opts opts; + + private PCUSession( + StorageInternal storageInternal, BlobInfo info, Opts opts) { + this.storageInternal = storageInternal; + this.info = info; + this.opts = opts; + result = SettableApiFuture.create(); + } + + @Override + public ApiFuture openAsync() { + ParallelCompositeUploadWritableByteChannel channel = + new ParallelCompositeUploadWritableByteChannel( + bufferHandlePool, + executor, + partNamingStrategy, + partCleanupStrategy, + maxPartsPerCompose, + result, + storageInternal, + info, + opts); + return ApiFutures.immediateFuture(channel); + } + + @Override + public ApiFuture getResult() { + return result; + } + } + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java new file mode 100644 index 000000000..72a07dd56 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java @@ -0,0 +1,248 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.it; + +import static com.google.cloud.storage.TestUtils.xxd; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.api.gax.rpc.ApiExceptions; +import com.google.cloud.kms.v1.CryptoKey; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BlobWriteSession; +import com.google.cloud.storage.BlobWriteSessionConfigs; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.DataGenerator; +import com.google.cloud.storage.GrpcStorageOptions; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferStrategy; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobSourceOption; +import com.google.cloud.storage.Storage.BlobTargetOption; +import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.it.runner.StorageITRunner; +import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.BucketFixture; +import com.google.cloud.storage.it.runner.annotations.BucketType; +import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.annotations.SingleBackend; +import com.google.cloud.storage.it.runner.annotations.StorageFixture; +import com.google.cloud.storage.it.runner.registry.Generator; +import com.google.cloud.storage.it.runner.registry.KmsFixture; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.WritableByteChannel; +import java.security.Key; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(StorageITRunner.class) +@SingleBackend(Backend.PROD) +public final class ITParallelCompositeUploadBlobWriteSessionConfigTest { + + private static final int _1MiB = 1024 * 1024; + private static ExecutorService exec; + + @Inject public BucketInfo bucket; + + @Inject + @BucketFixture(BucketType.REQUESTER_PAYS) + public BucketInfo rpBucket; + + @Inject + @StorageFixture(Transport.GRPC) + public Storage injectedStorage; + + @Inject public Generator generator; + @Inject public KmsFixture kmsFixture; + + // configured Storage with the PCU config + private Storage storage; + private Random rand; + + @BeforeClass + public static void beforeClass() { + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("test-pcuwbct-%d").build(); + exec = Executors.newCachedThreadPool(threadFactory); + } + + @Before + public void setUp() throws Exception { + GrpcStorageOptions storageOptions = + ((GrpcStorageOptions) injectedStorage.getOptions()) + .toBuilder() + .setBlobWriteSessionConfig( + BlobWriteSessionConfigs.parallelCompositeUpload() + .withExecutorSupplier(ExecutorSupplier.useExecutor(exec)) + // deinfe a max part size that is fairly small to aid in test speed + .withBufferStrategy(BufferStrategy.simple(_1MiB)) + .withPartNamingStrategy(PartNamingStrategy.prefix("prefix-a")) + // let our fixtures take care of cleaning things up + .withPartCleanupStrategy(PartCleanupStrategy.never())) + .build(); + storage = storageOptions.getService(); + rand = new Random(); + } + + @After + public void tearDown() throws Exception { + if (storage != null) { + storage.close(); + } + } + + @AfterClass + public static void afterClass() { + if (exec != null) { + exec.shutdownNow(); + } + } + + @Test + public void errorRaisedByMethodAndFutureResult() throws IOException { + + BlobInfo info = + BlobInfo.newBuilder(bucket.getName() + "x", generator.randomObjectName()).build(); + byte[] bytes = DataGenerator.rand(rand).genBytes(1); + + BlobWriteSession session = storage.blobWriteSession(info, BlobWriteOption.doesNotExist()); + try { + try (WritableByteChannel channel = session.open()) { + channel.write(ByteBuffer.wrap(bytes)); + } + // it is okay if the exception is raised during write itself or close, if it happens during + // close we should get an AsynchronousCloseException + } catch (AsynchronousCloseException ace) { + assertThat(ace).hasCauseThat().hasMessageThat().contains("NOT_FOUND"); + assertThat(((StorageException) ace.getCause()).getCode()).isEqualTo(404); + } catch (StorageException se) { + assertThat(se.getCode()).isEqualTo(404); + } + + // the result future should resolve to a failure specifying the failure kind + StorageException se = + assertThrows( + StorageException.class, + () -> ApiExceptions.callAndTranslateApiException(session.getResult())); + assertThat(se.getCode()).isEqualTo(404); + } + + @Test + public void uploadingAnObjectWorks() throws Exception { + doTest(bucket, 32 * _1MiB + 37, ImmutableList.of(), ImmutableList.of(), ImmutableList.of()); + } + + @Test + public void uploadingAnObjectWorks_requesterPays() throws Exception { + String projectId = storage.getOptions().getProjectId(); + int _1MiB = 1024 * 1024; + doTest( + rpBucket, + 32 * _1MiB + 37, + ImmutableList.of(BlobTargetOption.userProject(projectId)), + ImmutableList.of(BlobWriteOption.userProject(projectId)), + ImmutableList.of(BlobSourceOption.userProject(projectId))); + } + + @Test + public void uploadingAnObjectWorks_customerSuppliedEncryptionKey() throws IOException { + CSEKSupport csek = CSEKSupport.create(); + Key key = csek.getKey(); + + doTest( + bucket, + 16 * _1MiB - 13, + ImmutableList.of(BlobTargetOption.encryptionKey(key)), + ImmutableList.of(BlobWriteOption.encryptionKey(key)), + ImmutableList.of(BlobSourceOption.decryptionKey(key))); + } + + @Test + public void uploadingAnObjectWorks_kms() throws IOException { + CryptoKey key1 = kmsFixture.getKey1(); + doTest( + bucket, + 16 * _1MiB - 13, + ImmutableList.of(BlobTargetOption.kmsKeyName(key1.getName())), + ImmutableList.of(BlobWriteOption.kmsKeyName(key1.getName())), + ImmutableList.of()); + } + + /** + * Create an empty object, then overwrite it using a Parallel Composite Upload, then read the full + * object and verify its contents match exactly with what was written. + */ + private void doTest( + BucketInfo bucket, + int objectSizeBytes, + ImmutableList overriddenCreateOptions, + ImmutableList overriddenOverwriteOptions, + ImmutableList overriddenReadOptions) + throws IOException { + + BlobTargetOption[] createOptions = + BlobTargetOption.dedupe( + ImmutableList.of(BlobTargetOption.doesNotExist()), + overriddenCreateOptions.toArray(new BlobTargetOption[0])); + BlobWriteOption[] overwriteOptions = + BlobWriteOption.dedupe( + ImmutableList.of(BlobWriteOption.generationMatch()), + overriddenOverwriteOptions.toArray(new BlobWriteOption[0])); + BlobSourceOption[] readOptions = + BlobSourceOption.dedupe( + ImmutableList.of(), overriddenReadOptions.toArray(new BlobSourceOption[0])); + + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + Blob gen1 = storage.create(info, createOptions); + + byte[] bytes = DataGenerator.rand(rand).genBytes(objectSizeBytes); + + BlobWriteSession session = storage.blobWriteSession(gen1, overwriteOptions); + + try (WritableByteChannel channel = session.open()) { + long written = channel.write(ByteBuffer.wrap(bytes)); + assertThat(written).isEqualTo(objectSizeBytes); + } + + BlobInfo result = ApiExceptions.callAndTranslateApiException(session.getResult()); + + assertThat(result.getCrc32c()).isNotNull(); + assertThat(result.getGeneration()).isNotNull(); + + byte[] actual = storage.readAllBytes(result.getBlobId(), readOptions); + + assertThat(actual).isEqualTo(bytes); + assertThat(xxd(actual)).isEqualTo(xxd(bytes)); + } +}