Skip to content

Commit

Permalink
Add support for resumable uploads
Browse files Browse the repository at this point in the history
- Add LoadConfiguration class for data needed by load jobs and resumable uploads
- Update LoadJobInfo and related classes to use LoadConfiguration instead
- Add insertAll(LoadConfiguration, byte[]) method
- Add insertAll(LoadConfiguration. SeekableByteChannel) method
- Add Table.insert methods that use resumable upload
- Add unit and integration tests
  • Loading branch information
mziccard committed Jan 8, 2016
1 parent 824c46c commit a430e73
Show file tree
Hide file tree
Showing 14 changed files with 1,071 additions and 395 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.gcloud.Service;
import com.google.gcloud.spi.BigQueryRpc;

import java.nio.channels.SeekableByteChannel;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -590,6 +591,33 @@ Page<BaseTableInfo> listTables(DatasetId datasetId, TableListOption... options)
*/
InsertAllResponse insertAll(InsertAllRequest request) throws BigQueryException;

/**
* Sends a resumable insert request given a seekable channel containing the rows to be inserted.
* This method does not close the channel so you should take care of closing it.
*
* <p>Example usage of inserting data from a local file:
* <pre> {@code
* LoadConfiguration config = LoadConfiguration.of(TableId.of("my_dataset_id", "my_table_id"));
* try(FileChannel channel = FileChannel.open(Paths.get("/path/to/file"))) {
* bigquery.insertAll(config, channel);
* }}</pre>
*
* @throws BigQueryException upon failure
* @see <a href="https://cloud.google.com/bigquery/loading-data-post-request#resumable">Resumable
* Upload</a>
*/
void insertAll(LoadConfiguration configuration, SeekableByteChannel channel)
throws BigQueryException;

/**
* Sends a resumable insert request given a byte array containing the rows to be inserted.
*
* @throws BigQueryException upon failure
* @see <a href="https://cloud.google.com/bigquery/loading-data-post-request#resumable">Resumable
* Upload</a>
*/
void insertAll(LoadConfiguration configuration, byte[] content) throws BigQueryException;

/**
* Lists the table's rows.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.gcloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
Expand All @@ -43,9 +44,18 @@
import com.google.gcloud.bigquery.InsertAllRequest.RowToInsert;
import com.google.gcloud.spi.BigQueryRpc;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

final class BigQueryImpl extends BaseService<BigQueryOptions> implements BigQuery {

Expand Down Expand Up @@ -422,6 +432,74 @@ public Rows apply(RowToInsert rowToInsert) {
bigQueryRpc.insertAll(tableId.dataset(), tableId.table(), requestPb));
}

@Override
public void insertAll(LoadConfiguration configuration, final byte[] content) {
Function<Long, InputStream> nextStream = new Function<Long, InputStream>() {
@Override
public InputStream apply(Long startPos) {
return new ByteArrayInputStream(content, startPos.intValue(), content.length);
}
};
insertAll(configuration, nextStream);
}

@Override
public void insertAll(LoadConfiguration configuration, final SeekableByteChannel channel) {
Function<Long, InputStream> nextStream = new Function<Long, InputStream>() {
@Override
public InputStream apply(Long startPos) {
try {
channel.position(startPos);
return Channels.newInputStream(channel);
} catch (IOException e) {
BigQueryException exception = new BigQueryException(0, e.getMessage(), false);
exception.initCause(e);
throw exception;
}
}
};
insertAll(configuration, nextStream);
}

private void insertAll(LoadConfiguration configuration,
final Function<Long, InputStream> nextStream) throws BigQueryException {
try {
final String uploadId = open(configuration);
final AtomicLong startPos = new AtomicLong(0L);
runWithRetries(callable(new Runnable() {
@Override
public void run() {
try {
bigQueryRpc.write(uploadId, nextStream.apply(startPos.get()), startPos.get());
} catch (BigQueryException ex) {
BigQueryRpc.Tuple<Boolean, Long> uploadStatus = runWithRetries(
new Callable<BigQueryRpc.Tuple<Boolean, Long>>() {
@Override
public BigQueryRpc.Tuple<Boolean, Long> call() {
return bigQueryRpc.status(uploadId);
}
}, options().retryParams(), EXCEPTION_HANDLER);
if (!uploadStatus.x()) {
startPos.set(uploadStatus.y() != null ? uploadStatus.y() + 1 : 0);
throw new BigQueryException(0, "Resume Incomplete", true);
}
}
}
}), options().retryParams(), EXCEPTION_HANDLER);
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}

private String open(final LoadConfiguration configuration) {
return runWithRetries(new Callable<String>() {
@Override
public String call() {
return bigQueryRpc.open(setProjectId(configuration).toPb());
}
}, options().retryParams(), EXCEPTION_HANDLER);
}

@Override
public Page<List<FieldValue>> listTableData(String datasetId, String tableId,
TableDataListOption... options) throws BigQueryException {
Expand Down Expand Up @@ -698,8 +776,7 @@ public TableId apply(TableId tableId) {
if (job instanceof LoadJobInfo) {
LoadJobInfo loadJob = (LoadJobInfo) job;
LoadJobInfo.Builder loadBuilder = loadJob.toBuilder();
loadBuilder.destinationTable(setProjectId(loadJob.destinationTable()));
return loadBuilder.build();
return loadBuilder.configuration(setProjectId(loadJob.configuration())).build();
}
return job;
}
Expand All @@ -711,4 +788,10 @@ private QueryRequest setProjectId(QueryRequest request) {
}
return builder.build();
}

private LoadConfiguration setProjectId(LoadConfiguration configuration) {
LoadConfiguration.Builder builder = configuration.toBuilder();
builder.destinationTable(setProjectId(configuration.destinationTable()));
return builder.build();
}
}
Loading

0 comments on commit a430e73

Please sign in to comment.