Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: port DefaultBlobWriteSessionConfig to work with HttpStorageOptions #2472

Merged
merged 2 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ WritableByteChannelSession<?, BlobInfo> writeSession(
StorageInternal s, BlobInfo info, Opts<ObjectTargetOpt> opts);
}

/**
* Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is
* compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#HTTP}
*
* <p>We could evaluate the annotations, but the code for that is more complicated and probably
* not worth the effort.
*/
interface HttpCompatible {}

/**
* Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is
* compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#GRPC}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
* full or close. Buffer size is configurable via
* {@link DefaultBlobWriteSessionConfig#withChunkSize(int)}
* </td>
* <td>gRPC</td>
* <td>gRPC, HTTP</td>
* <td>The network will only be used for the following operations:
* <ol>
* <li>Creating the Resumable Upload Session</li>
Expand Down Expand Up @@ -241,7 +241,7 @@ private BlobWriteSessionConfigs() {}
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static DefaultBlobWriteSessionConfig getDefault() {
return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import java.nio.channels.WritableByteChannel;
import java.time.Clock;
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.concurrent.Immutable;

/**
Expand All @@ -55,9 +59,9 @@
*/
@Immutable
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public final class DefaultBlobWriteSessionConfig extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {
private static final long serialVersionUID = -6873740918589930633L;

private final int chunkSize;
Expand Down Expand Up @@ -146,6 +150,39 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
.build();
})),
WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER);
} else if (s instanceof StorageImpl) {
StorageImpl json = (StorageImpl) s;

return new DecoratedWritableByteChannelSession<>(
new LazySession<>(
new LazyWriteChannel<>(
() -> {
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
BlobInfo.Builder builder = info.toBuilder().setMd5(null).setCrc32c(null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to set Md5 and CRC32C to null here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is replicated to follow the pattern of all the other write methods, for example

BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null);

The reasoning why this is done, is due to the fact if the checksum is present in the object GCS will enforce it, and we only want to enforce it if the option crc32cMatch() or md5Match() are provided. At which point the files will be set back to non-null in the following line with opts.blobInfoMapper().apply(builder).build().

The usage pattern this enables is the read-modify-write of an object, i.e.

// assume storage.createFrom(info, Path.get("/file/on/disk/1")); created the current generation
Blob gen1 = storage.get(BlobId.of("bucket", "object"));
Blob gen2 = storage.createFrom(gen1, Paths.get("/file/on/disk/2"));

BlobInfo updated = opts.blobInfoMapper().apply(builder).build();

StorageObject encode = Conversions.json().blobInfo().encode(updated);
Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForBlobInfo(
json.getOptions(),
updated,
optionsMap,
json.retryAlgorithmManager.getForResumableUploadSessionCreate(
optionsMap));
ApiFuture<JsonResumableWrite> startAsync =
ApiFutures.immediateFuture(
JsonResumableWrite.of(
encode, optionsMap, uploadIdSupplier.get(), 0L));

return ResumableMedia.http()
.write()
.byteChannel(HttpClientContext.from(json.storageRpc))
.resumable()
.buffered(BufferHandle.allocate(chunkSize))
.setStartAsync(startAsync)
.build();
})),
Conversions.json().blobInfo());
} else {
throw new IllegalStateException(
"Unknown Storage implementation: " + s.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.storage;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import com.google.api.core.ApiClock;
Expand All @@ -30,6 +31,7 @@
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.spi.ServiceRpcFactory;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.HttpStorageRpc;
Expand All @@ -39,7 +41,9 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.time.Clock;
import java.util.Set;
import org.checkerframework.checker.nullness.qual.NonNull;

/** @since 2.14.0 This new api is in preview and is subject to breaking changes. */
@BetaApi
Expand All @@ -55,6 +59,7 @@ public class HttpStorageOptions extends StorageOptions {

private final HttpRetryAlgorithmManager retryAlgorithmManager;
private transient RetryDependenciesAdapter retryDepsAdapter;
private final BlobWriteSessionConfig blobWriteSessionConfig;

private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) {
super(builder, serviceDefaults);
Expand All @@ -63,6 +68,7 @@ private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) {
MoreObjects.firstNonNull(
builder.storageRetryStrategy, defaults().getStorageRetryStrategy()));
retryDepsAdapter = new RetryDependenciesAdapter();
blobWriteSessionConfig = builder.blobWriteSessionConfig;
}

@Override
Expand Down Expand Up @@ -120,6 +126,8 @@ RetryingDependencies asRetryDependencies() {
public static class Builder extends StorageOptions.Builder {

private StorageRetryStrategy storageRetryStrategy;
private BlobWriteSessionConfig blobWriteSessionConfig =
HttpStorageDefaults.INSTANCE.getDefaultStorageWriterConfig();

Builder() {}

Expand Down Expand Up @@ -218,6 +226,24 @@ public HttpStorageOptions.Builder setQuotaProjectId(String quotaProjectId) {
return this;
}

/**
* @see BlobWriteSessionConfig
* @see BlobWriteSessionConfigs
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see HttpStorageDefaults#getDefaultStorageWriterConfig()
* @since 2.29.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public HttpStorageOptions.Builder setBlobWriteSessionConfig(
@NonNull BlobWriteSessionConfig blobWriteSessionConfig) {
requireNonNull(blobWriteSessionConfig, "blobWriteSessionConfig must be non null");
checkArgument(
blobWriteSessionConfig instanceof BlobWriteSessionConfig.HttpCompatible,
"The provided instance of BlobWriteSessionConfig is not compatible with this HTTP transport.");
this.blobWriteSessionConfig = blobWriteSessionConfig;
return this;
}

@Override
public HttpStorageOptions build() {
return new HttpStorageOptions(this, defaults());
Expand Down Expand Up @@ -249,6 +275,12 @@ public HttpTransportOptions getDefaultTransportOptions() {
public StorageRetryStrategy getStorageRetryStrategy() {
return StorageRetryStrategy.getDefaultStorageRetryStrategy();
}

/** @since 2.29.0 This new api is in preview and is subject to breaking changes. */
@BetaApi
public BlobWriteSessionConfig getDefaultStorageWriterConfig() {
return BlobWriteSessionConfigs.getDefault();
}
}

/**
Expand Down Expand Up @@ -287,7 +319,14 @@ public HttpStorageFactory() {}
public Storage create(StorageOptions options) {
if (options instanceof HttpStorageOptions) {
HttpStorageOptions httpStorageOptions = (HttpStorageOptions) options;
return new StorageImpl(httpStorageOptions);
Clock clock = Clock.systemUTC();
try {
return new StorageImpl(
httpStorageOptions, httpStorageOptions.blobWriteSessionConfig.createFactory(clock));
} catch (IOException e) {
throw new IllegalStateException(
"Unable to instantiate HTTP com.google.cloud.storage.Storage client.", e);
}
} else {
throw new IllegalArgumentException("Only HttpStorageOptions supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
import com.google.cloud.storage.BlobWriteSessionConfig.WriterFactory;
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
import com.google.cloud.storage.PostPolicyV4.PostConditionsV4;
Expand Down Expand Up @@ -92,7 +93,7 @@
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.Nullable;

final class StorageImpl extends BaseService<StorageOptions> implements Storage {
final class StorageImpl extends BaseService<StorageOptions> implements Storage, StorageInternal {

private static final byte[] EMPTY_BYTE_ARRAY = {};
private static final String EMPTY_BYTE_ARRAY_MD5 = "1B2M2Y8AsgTpgAmY7PhCfg==";
Expand All @@ -115,11 +116,13 @@ final class StorageImpl extends BaseService<StorageOptions> implements Storage {

final HttpRetryAlgorithmManager retryAlgorithmManager;
final StorageRpc storageRpc;
final WriterFactory writerFactory;

StorageImpl(HttpStorageOptions options) {
StorageImpl(HttpStorageOptions options, WriterFactory writerFactory) {
super(options);
this.retryAlgorithmManager = options.getRetryAlgorithmManager();
this.storageRpc = options.getStorageRpcV1();
this.writerFactory = writerFactory;
}

@Override
Expand Down Expand Up @@ -1635,4 +1638,13 @@ private Bucket internalBucketGet(String bucket, Map<StorageRpc.Option, ?> option
() -> storageRpc.get(bucketPb, optionsMap),
(b) -> Conversions.json().bucketInfo().decode(b).asBucket(this));
}

@Override
public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... options) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);

WritableByteChannelSession<?, BlobInfo> writableByteChannelSession =
writerFactory.writeSession(this, blobInfo, opts);
return BlobWriteSessions.of(writableByteChannelSession);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.DataGenerator;
import com.google.cloud.storage.GrpcStorageOptions;
import com.google.cloud.storage.HttpStorageOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.StorageException;
Expand All @@ -49,7 +50,7 @@

@RunWith(StorageITRunner.class)
@CrossRun(
transports = {Transport.GRPC},
transports = {Transport.HTTP, Transport.GRPC},
backends = {Backend.PROD})
public final class ITBlobWriteSessionTest {

Expand All @@ -68,6 +69,7 @@ public void allDefaults() throws Exception {
}

@Test
@CrossRun.Exclude(transports = Transport.HTTP)
public void bufferToTempDirThenUpload() throws Exception {
StorageOptions options = null;
if (transport == Transport.GRPC) {
Expand All @@ -94,6 +96,13 @@ public void overrideDefaultBufferSize() throws Exception {
.setBlobWriteSessionConfig(
BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024))
.build();
} else if (transport == Transport.HTTP) {
options =
((HttpStorageOptions) storage.getOptions())
.toBuilder()
.setBlobWriteSessionConfig(
BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024))
.build();
}
assertWithMessage("unable to resolve options").that(options).isNotNull();
//noinspection DataFlowIssue
Expand All @@ -103,6 +112,7 @@ public void overrideDefaultBufferSize() throws Exception {
}

@Test
@CrossRun.Exclude(transports = Transport.HTTP)
public void bidiTest() throws Exception {
StorageOptions options = null;
if (transport == Transport.GRPC) {
Expand Down
Loading