Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: BlobWriteChannelV2 - same throughput less GC #2110

Merged
merged 3 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
<!-- Not breaking, internal only interface and the new methods have default implementations -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/UnbufferedReadableByteChannelSession$UnbufferedReadableByteChannel</className>
<method>* read(*)</method>
<className>com/google/cloud/storage/UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel</className>
<method>* write(*)</method>
</difference>
<!-- Allow accessing the underlying Apiary instance -->
<difference>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.function.LongConsumer;
import javax.annotation.ParametersAreNonnullByDefault;
import org.checkerframework.checker.nullness.qual.Nullable;

@ParametersAreNonnullByDefault
final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved

private final ResumableSession<StorageObject> session;

private final SettableApiFuture<StorageObject> result;
private final LongConsumer committedBytesCallback;

private boolean open = true;
private long cumulativeByteCount;
private boolean finished = false;

ApiaryUnbufferedWritableByteChannel(
HttpClientContext httpClientContext,
RetryingDependencies deps,
ResultRetryAlgorithm<?> alg,
JsonResumableWrite resumableWrite,
SettableApiFuture<StorageObject> result,
LongConsumer committedBytesCallback) {
this.session = ResumableSession.json(httpClientContext, deps, alg, resumableWrite);
this.result = result;
this.committedBytesCallback = committedBytesCallback;
}

@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
if (!open) {
throw new ClosedChannelException();
}
RewindableHttpContent content = RewindableHttpContent.of(Utils.subArray(srcs, offset, length));
long available = content.getLength();
long newFinalByteOffset = cumulativeByteCount + available;
final HttpContentRange header;
ByteRangeSpec rangeSpec = ByteRangeSpec.explicit(cumulativeByteCount, newFinalByteOffset);
if (available % ByteSizeConstants._256KiB == 0) {
header = HttpContentRange.of(rangeSpec);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this cause issues for customers that write data as it comes in (might be less than 256KiB) so buffer would be filled over multiple writes to perform a final flush on falling out of writer scope or explicitly flushing or close()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially in standalone yet, however customers are not interacting with this class directly themselves.

We have the following pipeline to usage of this class (bash pipeline shorthand):

BlobWriteChannelV2 | BaseStorageWriteChannel | DefaultBufferedWritableByteChannel | ApiaryUnbufferedWritableByteChannel

                     ^-- buffer allocation and alignment happens here

header = HttpContentRange.of(rangeSpec, newFinalByteOffset);
finished = true;
}
try {
ResumableOperationResult<@Nullable StorageObject> operationResult =
session.put(content, header);
long persistedSize = operationResult.getPersistedSize();
committedBytesCallback.accept(persistedSize);
this.cumulativeByteCount = persistedSize;
if (finished) {
StorageObject storageObject = operationResult.getObject();
result.set(storageObject);
}
return available;
} catch (Exception e) {
result.setException(e);
throw StorageException.coalesce(e);
}
}

@Override
public boolean isOpen() {
return open;
}

@Override
public void close() throws IOException {
open = false;
if (!finished) {
try {
ResumableOperationResult<@Nullable StorageObject> operationResult =
session.put(RewindableHttpContent.empty(), HttpContentRange.of(cumulativeByteCount));
long persistedSize = operationResult.getPersistedSize();
committedBytesCallback.accept(persistedSize);
result.set(operationResult.getObject());
} catch (Exception e) {
result.setException(e);
throw StorageException.coalesce(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static com.google.cloud.storage.ByteSizeConstants._16MiB;
import static com.google.cloud.storage.ByteSizeConstants._256KiB;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import org.checkerframework.checker.nullness.qual.Nullable;

abstract class BaseStorageWriteChannel<T> implements StorageWriteChannel {

private final Decoder<T, BlobInfo> objectDecoder;
private final SettableApiFuture<T> result;

private long position;
private boolean open;
private int chunkSize;
private LazyWriteChannel<T> lazyWriteChannel;
private BufferHandle bufferHandle;

/**
* This is tracked for compatibility with BlobWriteChannel, such that simply creating a writer
* will create an object.
*
* <p>In the future we should move away from this behavior, and only create an object if write is
* called.
*/
protected boolean writeCalledAtLeastOnce;

protected BaseStorageWriteChannel(Decoder<T, BlobInfo> objectDecoder) {
this.objectDecoder = objectDecoder;
this.result = SettableApiFuture.create();
this.open = true;
this.chunkSize = _16MiB;
this.writeCalledAtLeastOnce = false;
}

@Override
public final synchronized void setChunkSize(int chunkSize) {
Preconditions.checkArgument(chunkSize > 0, "chunkSize must be > 0, received %d", chunkSize);
Preconditions.checkState(
bufferHandle == null || bufferHandle.position() == 0,
"unable to change chunk size with data buffered");
this.chunkSize = chunkSize;
}

@Override
public final synchronized boolean isOpen() {
return open;
}

@Override
public final synchronized void close() throws IOException {
try {
if (open && !writeCalledAtLeastOnce) {
this.write(ByteBuffer.allocate(0));
}
if (internalGetLazyChannel().isOpen()) {
StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close);
}
} finally {
open = false;
}
}

@Override
public final synchronized int write(ByteBuffer src) throws IOException {
if (!open) {
throw new ClosedChannelException();
}
writeCalledAtLeastOnce = true;
try {
BufferedWritableByteChannel tmp = internalGetLazyChannel().getChannel();
if (!tmp.isOpen()) {
return 0;
}
int write = tmp.write(src);
return write;
} catch (StorageException e) {
throw new IOException(e);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(StorageException.coalesce(e));
}
}

@Override
public final ApiFuture<BlobInfo> getObject() {
return ApiFutures.transform(result, objectDecoder::decode, MoreExecutors.directExecutor());
}

protected final BufferHandle getBufferHandle() {
if (bufferHandle == null) {
bufferHandle = BufferHandle.allocate(Buffers.alignSize(getChunkSize(), _256KiB));
}
return bufferHandle;
}

protected final int getChunkSize() {
return chunkSize;
}

@Nullable
protected final T getResolvedObject() {
if (result.isDone()) {
return StorageException.wrapFutureGet(result);
} else {
return null;
}
}

protected final long getCommittedPosition() {
return position;
}

protected final void setCommittedPosition(long l) {
position = l;
}

protected final void setOpen(boolean isOpen) {
this.open = isOpen;
}

protected abstract LazyWriteChannel<T> newLazyWriteChannel();

private LazyWriteChannel<T> internalGetLazyChannel() {
if (lazyWriteChannel == null) {
LazyWriteChannel<T> tmp = newLazyWriteChannel();
ApiFuture<T> future = tmp.getSession().getResult();
ApiFutures.addCallback(
future,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable t) {
if (!result.isDone()) {
result.setException(t);
}
}

@Override
public void onSuccess(T t) {
if (!result.isDone()) {
result.set(t);
}
}
},
MoreExecutors.directExecutor());
lazyWriteChannel = tmp;
}
return lazyWriteChannel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,18 @@ public String toString() {
static final class BlobReadChannelContext {
private final HttpStorageOptions storageOptions;
private final HttpRetryAlgorithmManager retryAlgorithmManager;
private final HttpClientContext httpClientContext;
private final Storage apiaryClient;

private BlobReadChannelContext(
HttpStorageOptions storageOptions,
Storage apiaryClient,
HttpRetryAlgorithmManager retryAlgorithmManager) {
HttpRetryAlgorithmManager retryAlgorithmManager,
HttpClientContext httpClientContext,
Storage apiaryClient) {
this.storageOptions = storageOptions;
this.apiaryClient = apiaryClient;
this.retryAlgorithmManager = retryAlgorithmManager;
this.httpClientContext = httpClientContext;
this.apiaryClient = apiaryClient;
}

public HttpStorageOptions getStorageOptions() {
Expand All @@ -162,13 +165,20 @@ public HttpRetryAlgorithmManager getRetryAlgorithmManager() {
return retryAlgorithmManager;
}

public HttpClientContext getHttpClientContext() {
return httpClientContext;
}

public Storage getApiaryClient() {
return apiaryClient;
}

static BlobReadChannelContext from(HttpStorageOptions options) {
return new BlobReadChannelContext(
options, options.getStorageRpcV1().getStorage(), options.getRetryAlgorithmManager());
options,
options.getRetryAlgorithmManager(),
HttpClientContext.from(options.getStorageRpcV1()),
options.getStorageRpcV1().getStorage());
}

static BlobReadChannelContext from(com.google.cloud.storage.Storage s) {
Expand Down
Loading