Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for streaming inserts with resumable upload #534

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.gcloud.Service;
import com.google.gcloud.spi.BigQueryRpc;

import java.nio.channels.SeekableByteChannel;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -590,6 +591,33 @@ Page<BaseTableInfo> listTables(DatasetId datasetId, TableListOption... options)
*/
InsertAllResponse insertAll(InsertAllRequest request) throws BigQueryException;

/**
* Sends a resumable insert request given a seekable channel containing the rows to be inserted.
* This method does not close the channel so you should take care of closing it.
*
* <p>Example usage of inserting data from a local file:
* <pre> {@code
* LoadConfiguration config = LoadConfiguration.of(TableId.of("my_dataset_id", "my_table_id"));
* try(FileChannel channel = FileChannel.open(Paths.get("/path/to/file"))) {

This comment was marked as spam.

* bigquery.insertAll(config, channel);
* }}</pre>
*
* @throws BigQueryException upon failure
* @see <a href="https://cloud.google.com/bigquery/loading-data-post-request#resumable">Resumable
* Upload</a>
*/
void insertAll(LoadConfiguration configuration, SeekableByteChannel channel)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

throws BigQueryException;

/**
* Sends a resumable insert request given a byte array containing the rows to be inserted.
*
* @throws BigQueryException upon failure
* @see <a href="https://cloud.google.com/bigquery/loading-data-post-request#resumable">Resumable
* Upload</a>
*/
void insertAll(LoadConfiguration configuration, byte[] content) throws BigQueryException;

/**
* Lists the table's rows.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.gcloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
Expand All @@ -43,9 +44,18 @@
import com.google.gcloud.bigquery.InsertAllRequest.RowToInsert;
import com.google.gcloud.spi.BigQueryRpc;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

final class BigQueryImpl extends BaseService<BigQueryOptions> implements BigQuery {

Expand Down Expand Up @@ -422,6 +432,74 @@ public Rows apply(RowToInsert rowToInsert) {
bigQueryRpc.insertAll(tableId.dataset(), tableId.table(), requestPb));
}

@Override
public void insertAll(LoadConfiguration configuration, final byte[] content) {
Function<Long, InputStream> nextStream = new Function<Long, InputStream>() {
@Override
public InputStream apply(Long startPos) {
return new ByteArrayInputStream(content, startPos.intValue(), content.length);
}
};
insertAll(configuration, nextStream);
}

@Override
public void insertAll(LoadConfiguration configuration, final SeekableByteChannel channel) {
Function<Long, InputStream> nextStream = new Function<Long, InputStream>() {
@Override
public InputStream apply(Long startPos) {
try {
channel.position(startPos);
return Channels.newInputStream(channel);
} catch (IOException e) {
BigQueryException exception = new BigQueryException(0, e.getMessage(), false);
exception.initCause(e);
throw exception;
}
}
};
insertAll(configuration, nextStream);
}

private void insertAll(LoadConfiguration configuration,
final Function<Long, InputStream> nextStream) throws BigQueryException {
try {
final String uploadId = open(configuration);
final AtomicLong startPos = new AtomicLong(0L);
runWithRetries(callable(new Runnable() {
@Override
public void run() {
try {
bigQueryRpc.write(uploadId, nextStream.apply(startPos.get()), startPos.get());
} catch (BigQueryException ex) {
BigQueryRpc.Tuple<Boolean, Long> uploadStatus = runWithRetries(
new Callable<BigQueryRpc.Tuple<Boolean, Long>>() {
@Override
public BigQueryRpc.Tuple<Boolean, Long> call() {
return bigQueryRpc.status(uploadId);
}
}, options().retryParams(), EXCEPTION_HANDLER);
if (!uploadStatus.x()) {
startPos.set(uploadStatus.y() != null ? uploadStatus.y() + 1 : 0);
throw new BigQueryException(0, "Resume Incomplete", true);
}
}
}
}), options().retryParams(), EXCEPTION_HANDLER);
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}

private String open(final LoadConfiguration configuration) {
return runWithRetries(new Callable<String>() {
@Override
public String call() {
return bigQueryRpc.open(setProjectId(configuration).toPb());
}
}, options().retryParams(), EXCEPTION_HANDLER);
}

@Override
public Page<List<FieldValue>> listTableData(String datasetId, String tableId,
TableDataListOption... options) throws BigQueryException {
Expand Down Expand Up @@ -698,8 +776,7 @@ public TableId apply(TableId tableId) {
if (job instanceof LoadJobInfo) {
LoadJobInfo loadJob = (LoadJobInfo) job;
LoadJobInfo.Builder loadBuilder = loadJob.toBuilder();
loadBuilder.destinationTable(setProjectId(loadJob.destinationTable()));
return loadBuilder.build();
return loadBuilder.configuration(setProjectId(loadJob.configuration())).build();
}
return job;
}
Expand All @@ -711,4 +788,10 @@ private QueryRequest setProjectId(QueryRequest request) {
}
return builder.build();
}

private LoadConfiguration setProjectId(LoadConfiguration configuration) {
LoadConfiguration.Builder builder = configuration.toBuilder();
builder.destinationTable(setProjectId(configuration.destinationTable()));
return builder.build();
}
}
Loading