Skip to content

Commit

Permalink
feat: BlobWriteChannelV2 - same throughput less GC (#2110)
Browse files Browse the repository at this point in the history
Use stable buffer allocation with laziness.

Leverage new JsonResumableSession to provide more robustness and easier
separation of concerns compared to BlobWriteChannel

* rename blobWriteChannel.ser.properties to the correct blobReadChannel.ser.properties

### Runtime improvments

Throughput is on par with the existing v1 implementation, however GC
impact has been lightened with the new implementation.

Below is the summary of the GC improvement between v1 and v2.

These GC numbers were collected while uploading 4096 randomly sized
objects, from 128KiB..2GiB across 16 concurrent threads, using a default
chunkSize of 16MiB.

| metric                          | unit   |           v1 |           v2 | % decrease |
|---------------------------------|--------|-------------:|-------------:|-----------:|
| gc.alloc.rate                   | MB/sec |     2240.056 |     1457.731 |     34.924 |
| gc.alloc.rate.norm              | B/op   | 955796726217 | 638403730507 |     33.207 |
| gc.churn.G1_Eden_Space          | MB/sec |     1597.009 |     1454.304 |      8.936 |
| gc.churn.G1_Eden_Space.norm     | B/op   | 681418424320 | 636902965248 |      6.533 |
| gc.churn.G1_Old_Gen             | MB/sec |      691.877 |       11.316 |     98.364 |
| gc.churn.G1_Old_Gen.norm        | B/op   | 295213237398 |   4955944331 |     98.321 |
| gc.churn.G1_Survivor_Space      | MB/sec |        0.004 |        0.002 |     50.000 |
| gc.churn.G1_Survivor_Space.norm | B/op   |      1572864 |       786432 |     50.000 |
| gc.count                        | counts |         1670 |         1319 |     21.018 |
| gc.time                         | ms     |        15936 |         9527 |     40.217 |

Overall allocation rate is decreased, while Old_Gen use is almost entirely eliminated.

```
openjdk version "11.0.18" 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10-post-Debian-1deb11u1)
OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Debian-1deb11u1, mixed
mode, sharing)

-Xms12g -Xmx12g
```

All other java parameters are defaults.
  • Loading branch information
BenWhitehead committed Jul 13, 2023
1 parent 29feeaf commit 1b52a10
Show file tree
Hide file tree
Showing 43 changed files with 1,802 additions and 1,923 deletions.
4 changes: 2 additions & 2 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
<!-- Not breaking, internal only interface and the new methods have default implementations -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/UnbufferedReadableByteChannelSession$UnbufferedReadableByteChannel</className>
<method>* read(*)</method>
<className>com/google/cloud/storage/UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel</className>
<method>* write(*)</method>
</difference>
<!-- Allow accessing the underlying Apiary instance -->
<difference>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.function.LongConsumer;
import javax.annotation.ParametersAreNonnullByDefault;
import org.checkerframework.checker.nullness.qual.Nullable;

@ParametersAreNonnullByDefault
final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {

private final ResumableSession<StorageObject> session;

private final SettableApiFuture<StorageObject> result;
private final LongConsumer committedBytesCallback;

private boolean open = true;
private long cumulativeByteCount;
private boolean finished = false;

ApiaryUnbufferedWritableByteChannel(
HttpClientContext httpClientContext,
RetryingDependencies deps,
ResultRetryAlgorithm<?> alg,
JsonResumableWrite resumableWrite,
SettableApiFuture<StorageObject> result,
LongConsumer committedBytesCallback) {
this.session = ResumableSession.json(httpClientContext, deps, alg, resumableWrite);
this.result = result;
this.committedBytesCallback = committedBytesCallback;
}

@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
if (!open) {
throw new ClosedChannelException();
}
RewindableHttpContent content = RewindableHttpContent.of(Utils.subArray(srcs, offset, length));
long available = content.getLength();
long newFinalByteOffset = cumulativeByteCount + available;
final HttpContentRange header;
ByteRangeSpec rangeSpec = ByteRangeSpec.explicit(cumulativeByteCount, newFinalByteOffset);
if (available % ByteSizeConstants._256KiB == 0) {
header = HttpContentRange.of(rangeSpec);
} else {
header = HttpContentRange.of(rangeSpec, newFinalByteOffset);
finished = true;
}
try {
ResumableOperationResult<@Nullable StorageObject> operationResult =
session.put(content, header);
long persistedSize = operationResult.getPersistedSize();
committedBytesCallback.accept(persistedSize);
this.cumulativeByteCount = persistedSize;
if (finished) {
StorageObject storageObject = operationResult.getObject();
result.set(storageObject);
}
return available;
} catch (Exception e) {
result.setException(e);
throw StorageException.coalesce(e);
}
}

@Override
public boolean isOpen() {
return open;
}

@Override
public void close() throws IOException {
open = false;
if (!finished) {
try {
ResumableOperationResult<@Nullable StorageObject> operationResult =
session.put(RewindableHttpContent.empty(), HttpContentRange.of(cumulativeByteCount));
long persistedSize = operationResult.getPersistedSize();
committedBytesCallback.accept(persistedSize);
result.set(operationResult.getObject());
} catch (Exception e) {
result.setException(e);
throw StorageException.coalesce(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static com.google.cloud.storage.ByteSizeConstants._16MiB;
import static com.google.cloud.storage.ByteSizeConstants._256KiB;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import org.checkerframework.checker.nullness.qual.Nullable;

abstract class BaseStorageWriteChannel<T> implements StorageWriteChannel {

private final Decoder<T, BlobInfo> objectDecoder;
private final SettableApiFuture<T> result;

private long position;
private boolean open;
private int chunkSize;
private LazyWriteChannel<T> lazyWriteChannel;
private BufferHandle bufferHandle;

/**
* This is tracked for compatibility with BlobWriteChannel, such that simply creating a writer
* will create an object.
*
* <p>In the future we should move away from this behavior, and only create an object if write is
* called.
*/
protected boolean writeCalledAtLeastOnce;

protected BaseStorageWriteChannel(Decoder<T, BlobInfo> objectDecoder) {
this.objectDecoder = objectDecoder;
this.result = SettableApiFuture.create();
this.open = true;
this.chunkSize = _16MiB;
this.writeCalledAtLeastOnce = false;
}

@Override
public final synchronized void setChunkSize(int chunkSize) {
Preconditions.checkArgument(chunkSize > 0, "chunkSize must be > 0, received %d", chunkSize);
Preconditions.checkState(
bufferHandle == null || bufferHandle.position() == 0,
"unable to change chunk size with data buffered");
this.chunkSize = chunkSize;
}

@Override
public final synchronized boolean isOpen() {
return open;
}

@Override
public final synchronized void close() throws IOException {
try {
if (open && !writeCalledAtLeastOnce) {
this.write(ByteBuffer.allocate(0));
}
if (internalGetLazyChannel().isOpen()) {
StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close);
}
} finally {
open = false;
}
}

@Override
public final synchronized int write(ByteBuffer src) throws IOException {
if (!open) {
throw new ClosedChannelException();
}
writeCalledAtLeastOnce = true;
try {
BufferedWritableByteChannel tmp = internalGetLazyChannel().getChannel();
if (!tmp.isOpen()) {
return 0;
}
int write = tmp.write(src);
return write;
} catch (StorageException e) {
throw new IOException(e);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(StorageException.coalesce(e));
}
}

@Override
public final ApiFuture<BlobInfo> getObject() {
return ApiFutures.transform(result, objectDecoder::decode, MoreExecutors.directExecutor());
}

protected final BufferHandle getBufferHandle() {
if (bufferHandle == null) {
bufferHandle = BufferHandle.allocate(Buffers.alignSize(getChunkSize(), _256KiB));
}
return bufferHandle;
}

protected final int getChunkSize() {
return chunkSize;
}

@Nullable
protected final T getResolvedObject() {
if (result.isDone()) {
return StorageException.wrapFutureGet(result);
} else {
return null;
}
}

protected final long getCommittedPosition() {
return position;
}

protected final void setCommittedPosition(long l) {
position = l;
}

protected final void setOpen(boolean isOpen) {
this.open = isOpen;
}

protected abstract LazyWriteChannel<T> newLazyWriteChannel();

private LazyWriteChannel<T> internalGetLazyChannel() {
if (lazyWriteChannel == null) {
LazyWriteChannel<T> tmp = newLazyWriteChannel();
ApiFuture<T> future = tmp.getSession().getResult();
ApiFutures.addCallback(
future,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable t) {
if (!result.isDone()) {
result.setException(t);
}
}

@Override
public void onSuccess(T t) {
if (!result.isDone()) {
result.set(t);
}
}
},
MoreExecutors.directExecutor());
lazyWriteChannel = tmp;
}
return lazyWriteChannel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,18 @@ public String toString() {
static final class BlobReadChannelContext {
private final HttpStorageOptions storageOptions;
private final HttpRetryAlgorithmManager retryAlgorithmManager;
private final HttpClientContext httpClientContext;
private final Storage apiaryClient;

private BlobReadChannelContext(
HttpStorageOptions storageOptions,
Storage apiaryClient,
HttpRetryAlgorithmManager retryAlgorithmManager) {
HttpRetryAlgorithmManager retryAlgorithmManager,
HttpClientContext httpClientContext,
Storage apiaryClient) {
this.storageOptions = storageOptions;
this.apiaryClient = apiaryClient;
this.retryAlgorithmManager = retryAlgorithmManager;
this.httpClientContext = httpClientContext;
this.apiaryClient = apiaryClient;
}

public HttpStorageOptions getStorageOptions() {
Expand All @@ -162,13 +165,20 @@ public HttpRetryAlgorithmManager getRetryAlgorithmManager() {
return retryAlgorithmManager;
}

public HttpClientContext getHttpClientContext() {
return httpClientContext;
}

public Storage getApiaryClient() {
return apiaryClient;
}

static BlobReadChannelContext from(HttpStorageOptions options) {
return new BlobReadChannelContext(
options, options.getStorageRpcV1().getStorage(), options.getRetryAlgorithmManager());
options,
options.getRetryAlgorithmManager(),
HttpClientContext.from(options.getStorageRpcV1()),
options.getStorageRpcV1().getStorage());
}

static BlobReadChannelContext from(com.google.cloud.storage.Storage s) {
Expand Down
Loading

0 comments on commit 1b52a10

Please sign in to comment.