Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update BlobWriteChannelV2 to properly carry forward offset after incremental flush #2125

Merged
merged 1 commit into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt
private final SettableApiFuture<StorageObject> result;
private final LongConsumer committedBytesCallback;

private boolean open = true;
private boolean open;
private long cumulativeByteCount;
private boolean finished = false;
private boolean finished;

ApiaryUnbufferedWritableByteChannel(
HttpClientContext httpClientContext,
Expand All @@ -50,6 +50,9 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt
this.session = ResumableSession.json(httpClientContext, deps, alg, resumableWrite);
this.result = result;
this.committedBytesCallback = committedBytesCallback;
this.open = true;
this.cumulativeByteCount = resumableWrite.getBeginOffset();
this.finished = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public WriteChannel restore() {
entity != null ? Conversions.apiary().blobInfo().encode(entity) : null;
return new BlobWriteChannelV2.BlobWriteChannelV2State(
(HttpStorageOptions) serviceOptions,
JsonResumableWrite.of(encode, ImmutableMap.of(), uploadId),
JsonResumableWrite.of(encode, ImmutableMap.of(), uploadId, position),
position,
isOpen,
chunkSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ static final class BlobWriteChannelV2State

@Override
public WriteChannel restore() {
JsonResumableWrite resumableWrite = this.resumableWrite;
if (position != null) {
resumableWrite = resumableWrite.withBeginOffset(position);
}
BlobWriteChannelV2 channel =
new BlobWriteChannelV2(BlobReadChannelContext.from(options), resumableWrite);
if (chunkSize != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void rewindTo(long offset) {
success = true;
//noinspection DataFlowIssue compareTo result will filter out actualSize == null
return ResumableOperationResult.complete(storageObject, actualSize.longValue());
} else if (compare < 0) {
} else if (compare > 0) {
StorageException se =
JsonResumableSessionFailureScenario.SCENARIO_4_1.toStorageException(
uploadId, response, null, toString(storageObject));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.storage;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.MoreObjects;
Expand All @@ -41,24 +43,40 @@ final class JsonResumableWrite implements Serializable {
@MonotonicNonNull private final String signedUrl;

@NonNull private final String uploadId;
private final long beginOffset;

private volatile String objectJson;

private JsonResumableWrite(
StorageObject object,
Map<StorageRpc.Option, ?> options,
String signedUrl,
@NonNull String uploadId) {
@NonNull String uploadId,
long beginOffset) {
this.object = object;
this.options = options;
this.signedUrl = signedUrl;
this.uploadId = uploadId;
this.beginOffset = beginOffset;
}

public @NonNull String getUploadId() {
return uploadId;
}

public long getBeginOffset() {
return beginOffset;
}

public JsonResumableWrite withBeginOffset(long newBeginOffset) {
checkArgument(
newBeginOffset >= beginOffset,
"New beginOffset must be >= existing beginOffset (%s >= %s)",
newBeginOffset,
beginOffset);
return new JsonResumableWrite(object, options, signedUrl, uploadId, newBeginOffset);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -68,15 +86,16 @@ public boolean equals(Object o) {
return false;
}
JsonResumableWrite that = (JsonResumableWrite) o;
return Objects.equals(object, that.object)
return beginOffset == that.beginOffset
&& Objects.equals(object, that.object)
&& Objects.equals(options, that.options)
&& Objects.equals(signedUrl, that.signedUrl)
&& uploadId.equals(that.uploadId);
&& Objects.equals(uploadId, that.uploadId);
}

@Override
public int hashCode() {
return Objects.hash(object, options, signedUrl, uploadId);
return Objects.hash(object, options, signedUrl, uploadId, beginOffset);
}

@Override
Expand All @@ -86,6 +105,7 @@ public String toString() {
.add("options", options)
.add("signedUrl", signedUrl)
.add("uploadId", uploadId)
.add("beginOffset", beginOffset)
.toString();
}

Expand All @@ -112,11 +132,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}

static JsonResumableWrite of(
StorageObject req, Map<StorageRpc.Option, ?> options, String uploadId) {
return new JsonResumableWrite(req, options, null, uploadId);
StorageObject req, Map<StorageRpc.Option, ?> options, String uploadId, long beginOffset) {
return new JsonResumableWrite(req, options, null, uploadId, beginOffset);
}

static JsonResumableWrite of(String signedUrl, String uploadId) {
return new JsonResumableWrite(null, null, signedUrl, uploadId);
static JsonResumableWrite of(String signedUrl, String uploadId, long beginOffset) {
return new JsonResumableWrite(null, null, signedUrl, uploadId, beginOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get());
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);

JsonResumableSession session =
ResumableSession.json(
Expand Down Expand Up @@ -671,7 +671,7 @@ public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options)
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get());
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
}

Expand All @@ -688,7 +688,7 @@ public StorageWriteChannel writer(URL signedURL) {
ResumableMedia.startUploadForSignedUrl(
getOptions(), signedURL, forResumableUploadSessionCreate);
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get());
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0);
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public void rewindWillQueryStatusOnlyWhenDirty() throws Exception {
URI endpoint = fakeHttpServer.getEndpoint();
String uploadUrl = String.format("%s/upload/%s", endpoint.toString(), UUID.randomUUID());

JsonResumableWrite resumableWrite = JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl);
JsonResumableWrite resumableWrite =
JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl, 0);
JsonResumableSession session =
new JsonResumableSession(
httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite);
Expand Down Expand Up @@ -167,7 +168,8 @@ public void retryAttemptWillReturnQueryResultIfPersistedSizeMatchesSpecifiedEndO
URI endpoint = fakeHttpServer.getEndpoint();
String uploadUrl = String.format("%s/upload/%s", endpoint.toString(), UUID.randomUUID());

JsonResumableWrite resumableWrite = JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl);
JsonResumableWrite resumableWrite =
JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl, 0);
JsonResumableSession session =
new JsonResumableSession(
httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite);
Expand Down Expand Up @@ -234,7 +236,8 @@ public void rewindOfContentIsRelativeToItsBeginOffsetOfTheOverallObject() throws
URI endpoint = fakeHttpServer.getEndpoint();
String uploadUrl = String.format("%s/upload/%s", endpoint.toString(), UUID.randomUUID());

JsonResumableWrite resumableWrite = JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl);
JsonResumableWrite resumableWrite =
JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl, 0);
JsonResumableSession session =
new JsonResumableSession(
httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ protected Restorable<?>[] restorableObjects() {
JsonResumableWrite.of(
Conversions.apiary().blobInfo().encode(BlobInfo.newBuilder("b", "n").build()),
ImmutableMap.of(),
"upload-id"));
"upload-id",
0));
return new Restorable<?>[] {readerV2, writer};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.storage.it;

import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertArrayEquals;
Expand All @@ -27,6 +28,7 @@
import com.google.api.client.json.JsonParser;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.cloud.NoCredentials;
import com.google.cloud.RestorableState;
import com.google.cloud.WriteChannel;
import com.google.cloud.conformance.storage.v1.InstructionList;
import com.google.cloud.conformance.storage.v1.Method;
Expand All @@ -53,6 +55,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Optional;
import java.util.logging.Logger;
import org.junit.Test;
Expand Down Expand Up @@ -153,6 +156,39 @@ public void changeChunkSizeAfterWrite() throws IOException {
}
}

@Test
public void restoreProperlyPlumbsBeginOffset() throws IOException {
BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
int _256KiB = 256 * 1024;

byte[] bytes1 = DataGenerator.base64Characters().genBytes(_256KiB);
byte[] bytes2 = DataGenerator.base64Characters().genBytes(73);

int allLength = bytes1.length + bytes2.length;
byte[] expected = Arrays.copyOf(bytes1, allLength);
System.arraycopy(bytes2, 0, expected, bytes1.length, bytes2.length);
String xxdExpected = xxd(expected);

RestorableState<WriteChannel> capture;
{
WriteChannel writer = storage.writer(info, BlobWriteOption.doesNotExist());
writer.setChunkSize(_256KiB);
writer.write(ByteBuffer.wrap(bytes1));
// explicitly do not close writer, it will finalize the session
capture = writer.capture();
}

assertThat(capture).isNotNull();
WriteChannel restored = capture.restore();
restored.write(ByteBuffer.wrap(bytes2));
restored.close();

byte[] readAllBytes = storage.readAllBytes(info.getBlobId());
assertThat(readAllBytes).hasLength(expected.length);
String xxdActual = xxd(readAllBytes);
assertThat(xxdActual).isEqualTo(xxdExpected);
}

private void doJsonUnexpectedEOFTest(int contentSize, int cappedByteCount) throws IOException {
String blobPath = String.format("%s/%s/blob", generator.randomObjectName(), NOW_STRING);

Expand Down