Skip to content

Commit

Permalink
Buffered writer for exactly once sink (#156)
Browse files Browse the repository at this point in the history
Create writer used by exactly once sink, following the two-phase commit
protocol. This writer uses BQ's buffered stream for appending data to
the table.
  • Loading branch information
jayehwhyehentee authored Sep 20, 2024
1 parent c23ec8b commit 6a40c09
Show file tree
Hide file tree
Showing 16 changed files with 1,547 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ class BigQueryDefaultSink extends BigQueryBaseSink {
public SinkWriter createWriter(InitContext context) {
checkParallelism(context.getNumberOfParallelSubtasks());
return new BigQueryDefaultWriter(
context.getSubtaskId(), connectOptions, schemaProvider, serializer, tablePath);
context.getSubtaskId(), tablePath, connectOptions, schemaProvider, serializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* <p>Depending on the checkpointing mode, this writer offers the following consistency guarantees:
* <li>{@link CheckpointingMode#EXACTLY_ONCE}: exactly-once write consistency.
* <li>{@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency.
* <li>Checkpointing disabled: no consistency guarantee.
* <li>Checkpointing disabled (NOT RECOMMENDED!): no consistency guarantee.
*
* @param <IN> Type of records written to BigQuery
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.flink.bigquery.sink.throttle;

import com.google.cloud.flink.bigquery.sink.BigQueryExactlyOnceSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
* Throttler implementation for BigQuery write stream creation.
*
* <p>Each {@link BigQueryBufferedWriter} will invoke BigQuery's CreateWriteStream API before its
* initial write to a BigQuery table. This API, however, requires a low QPS (~3) for best
* performance in steady state since write stream creation is an expensive operation for BigQuery
* storage backend. Hence, this throttler is responsible for distributing writers into buckets which
* correspond to a specific "wait" duration before calling the CreateWriteStream API.
*
* <p>Note that actual separation between CreateWriteStream invocations across all writers will not
* ensure exact QPS of 3, because neither all writers are initialized at the same instant, nor do
* they all identify the need to create a write stream after some uniform fixed duration. Given
* these uncontrollable factors, this throttler aims to achieve 3 QPS on a best effort basis.
*/
public class WriteStreamCreationThrottler implements Throttler {

// MAX_SINK_PARALLELISM is set as 128.
public static final int MAX_BUCKETS = BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM / 3;
private static final Logger LOG = LoggerFactory.getLogger(WriteStreamCreationThrottler.class);
private final int writerId;

public WriteStreamCreationThrottler(int writerId) {
this.writerId = writerId;
}

public void throttle() {
int waitSeconds = writerId % MAX_BUCKETS;
LOG.debug("Throttling writer {} for {} second", writerId, waitSeconds);
try {
// Sleep does nothing if input is 0 or less.
TimeUnit.SECONDS.sleep(waitSeconds);
} catch (InterruptedException e) {
LOG.warn("Throttle attempt interrupted in subtask {}", writerId);
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
Expand Down Expand Up @@ -70,23 +71,34 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> {

// Number of bytes to be sent in the next append request.
private long appendRequestSizeBytes;
private BigQueryServices.StorageWriteClient writeClient;
protected final int subtaskId;
private final String tablePath;
private final BigQueryConnectOptions connectOptions;
private final ProtoSchema protoSchema;
private final BigQueryProtoSerializer serializer;
private final Queue<ApiFuture> appendResponseFuturesQueue;
private final ProtoRows.Builder protoRowsBuilder;

final Queue<AppendInfo> appendResponseFuturesQueue;
// Initialization of writeClient has been deferred to first append call. BigQuery's best
// practices suggest that client connections should be opened when needed.
BigQueryServices.StorageWriteClient writeClient;
StreamWriter streamWriter;
String streamName;
long totalRecordsSeen;
// In exactly-once mode, "totalRecordsWritten" actually represents records appended to a
// write stream by this writer. Only at a checkpoint, when sink's commit is invoked, will
// the records in a stream get committed to the table. Hence, records written to BigQuery
// table is equal to this "totalRecordsWritten" only upon checkpoint completion.
long totalRecordsWritten;

BaseWriter(
int subtaskId,
String tablePath,
BigQueryConnectOptions connectOptions,
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer) {
this.subtaskId = subtaskId;
this.tablePath = tablePath;
this.connectOptions = connectOptions;
this.protoSchema = getProtoSchema(schemaProvider);
this.serializer = serializer;
Expand All @@ -102,7 +114,7 @@ public void flush(boolean endOfInput) {
if (appendRequestSizeBytes > 0) {
append();
}
logger.debug("Validating all pending append responses in subtask {}", subtaskId);
logger.info("Validating all pending append responses in subtask {}", subtaskId);
validateAppendResponses(true);
}

Expand All @@ -125,10 +137,10 @@ public void close() {
}

/** Invoke BigQuery storage API for appending data to a table. */
abstract ApiFuture sendAppendRequest(ProtoRows protoRows);
abstract void sendAppendRequest(ProtoRows protoRows);

/** Checks append response for errors. */
abstract void validateAppendResponse(ApiFuture<AppendRowsResponse> appendResponseFuture);
abstract void validateAppendResponse(AppendInfo appendInfo);

/** Add serialized record to append request. */
void addToAppendRequest(ByteString protoRow) {
Expand All @@ -138,21 +150,45 @@ void addToAppendRequest(ByteString protoRow) {

/** Send append request to BigQuery storage and prepare for next append request. */
void append() {
ApiFuture responseFuture = sendAppendRequest(protoRowsBuilder.build());
appendResponseFuturesQueue.add(responseFuture);
sendAppendRequest(protoRowsBuilder.build());
protoRowsBuilder.clear();
appendRequestSizeBytes = 0L;
}

/** Creates a StreamWriter for appending to BigQuery table. */
StreamWriter createStreamWriter(boolean enableConnectionPool) {
logger.debug("Creating BigQuery StreamWriter in subtask {}", subtaskId);
void createStreamWriter(boolean enableConnectionPool) {
try {
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite();
return writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool);
if (writeClient == null) {
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite();
}
logger.info(
"Creating BigQuery StreamWriter for write stream {} in subtask {}",
streamName,
subtaskId);
streamWriter =
writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool);
} catch (IOException e) {
logger.error(
String.format(
"Unable to create StreamWriter for stream %s in subtask %d",
streamName, subtaskId),
e);
throw new BigQueryConnectorException("Unable to connect to BigQuery", e);
}
}

/** Creates a write stream for appending to BigQuery table. */
void createWriteStream(WriteStream.Type streamType) {
try {
if (writeClient == null) {
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite();
}
logger.info("Creating BigQuery write stream in subtask {}", subtaskId);
streamName = writeClient.createWriteStream(tablePath, streamType).getName();
} catch (IOException e) {
logger.error("Unable to create StreamWriter for stream {}", streamName);
throw new BigQueryConnectorException("Unable to create StreamWriter", e);
logger.error(
String.format("Unable to create write stream in subtask %d", subtaskId), e);
throw new BigQueryConnectorException("Unable to connect to BigQuery", e);
}
}

Expand Down Expand Up @@ -196,17 +232,55 @@ private int getProtoRowBytes(ByteString protoRow) {
* order, we proceed to check the next response only after the previous one has arrived.
*/
void validateAppendResponses(boolean waitForResponse) {
ApiFuture<AppendRowsResponse> appendResponseFuture;
while ((appendResponseFuture = appendResponseFuturesQueue.peek()) != null) {
if (waitForResponse || appendResponseFuture.isDone()) {
while (!appendResponseFuturesQueue.isEmpty()) {
AppendInfo appendInfo = appendResponseFuturesQueue.peek();
if (waitForResponse || appendInfo.getFuture().isDone()) {
appendResponseFuturesQueue.poll();
validateAppendResponse(appendResponseFuture);
validateAppendResponse(appendInfo);
} else {
break;
}
}
}

void logAndThrowFatalException(Throwable error) {
logger.error(String.format("AppendRows request failed in subtask %d", subtaskId), error);
throw new BigQueryConnectorException("Error while writing to BigQuery", error);
}

void logAndThrowFatalException(String errorMessage) {
logger.error(
String.format(
"AppendRows request failed in subtask %d\n%s", subtaskId, errorMessage));
throw new BigQueryConnectorException(
String.format("Error while writing to BigQuery\n%s", errorMessage));
}

static class AppendInfo {
private final ApiFuture<AppendRowsResponse> future;
private final long expectedOffset;
private final long recordsAppended;

AppendInfo(
ApiFuture<AppendRowsResponse> future, long expectedOffset, long recordsAppended) {
this.future = future;
this.expectedOffset = expectedOffset;
this.recordsAppended = recordsAppended;
}

public ApiFuture<AppendRowsResponse> getFuture() {
return future;
}

public long getExpectedOffset() {
return expectedOffset;
}

public long getRecordsAppended() {
return recordsAppended;
}
}

/**
* Following "getters" expose some internal fields required for testing.
*
Expand Down
Loading

0 comments on commit 6a40c09

Please sign in to comment.