Skip to content

Commit

Permalink
Add vararg options (WaitForOption) to waitFor method
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed May 30, 2016
1 parent bfffd31 commit 2c5f323
Show file tree
Hide file tree
Showing 13 changed files with 535 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@

package com.google.cloud.bigquery;

import static com.google.cloud.WaitForOption.Option.CHECKING_PERIOD;
import static com.google.cloud.WaitForOption.Option.TIMEOUT;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.ServiceOptions;
import com.google.cloud.WaitForOption;
import com.google.cloud.WaitForOption.CheckingPeriod;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* A Google BigQuery Job.
Expand Down Expand Up @@ -145,9 +154,13 @@ public boolean isDone() {
}

/**
* Blocks until this job completes its execution, either failing or succeeding. The job status is
* checked every 500 milliseconds. This method returns current job's latest information. If the
* job no longer exists, this method returns {@code null}.
* Blocks until this job completes its execution, either failing or succeeding. This method
* returns current job's latest information. If the job no longer exists, this method returns
* {@code null}. By default, the job status is checked every 500 milliseconds, to configure this
* value use {@link WaitForOption#checkEvery(long, TimeUnit)}. Use
* {@link WaitForOption#timeout(long, TimeUnit)} to set the maximum time to wait.
*
* <p>Example usage of {@code waitFor()}:
* <pre> {@code
* Job completedJob = job.waitFor();
* if (completedJob == null) {
Expand All @@ -158,21 +171,10 @@ public boolean isDone() {
* // job completed successfully
* }}</pre>
*
* @throws BigQueryException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
* to complete
*/
public Job waitFor() throws InterruptedException {
return waitFor(500, TimeUnit.MILLISECONDS);
}

/**
* Blocks until this job completes its execution, either failing or succeeding. The
* {@code checkEvery} and {@code unit} parameters determine how often the job's status is checked.
* This method returns current job's latest information. If the job no longer exists, this method
* returns {@code null}.
* <p>Example usage of {@code waitFor()} with checking period and timeout:
* <pre> {@code
* Job completedJob = job.waitFor(1, TimeUnit.SECONDS);
* Job completedJob = job.waitFor(WaitForOption.checkEvery(1, TimeUnit.SECONDS),
* WaitForOption.timeout(60, TimeUnit.SECONDS));
* if (completedJob == null) {
* // job no longer exists
* } else if (completedJob.status().error() != null) {
Expand All @@ -181,13 +183,26 @@ public Job waitFor() throws InterruptedException {
* // job completed successfully
* }}</pre>
*
* @param waitOptions options to configure checking period and timeout
* @throws BigQueryException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
* to complete
* @throws TimeoutException if the timeout provided with
* {@link WaitForOption#timeout(long, TimeUnit)} is exceeded. If no such option is provided
* this exception is never thrown.
*/
public Job waitFor(int checkEvery, TimeUnit unit) throws InterruptedException {
public Job waitFor(WaitForOption... waitOptions) throws InterruptedException, TimeoutException {
Map<WaitForOption.Option, ?> optionMap = WaitForOption.asMap(waitOptions);
CheckingPeriod checkingPeriod = firstNonNull(CHECKING_PERIOD.getCheckingPeriod(optionMap),
CheckingPeriod.defaultInstance());
long timeout = firstNonNull(TIMEOUT.getLong(optionMap), -1L);
ServiceOptions.Clock clock = options.clock();
long startTime = clock.millis();
while (!isDone()) {
unit.sleep(checkEvery);
if (timeout != -1 && (clock.millis() - startTime) >= timeout) {
throw new TimeoutException();
}
checkingPeriod.sleep();
}
return reload();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import com.google.cloud.ServiceOptions.Clock;
import com.google.cloud.WaitForOption;
import com.google.cloud.bigquery.JobStatistics.CopyStatistics;

import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class JobTest {

Expand Down Expand Up @@ -69,6 +74,9 @@ public class JobTest {
private Job expectedJob;
private Job job;

@Rule
public final ExpectedException thrown = ExpectedException.none();

private void initializeExpectedJob(int optionsCalls) {
expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls);
replay(serviceMockReturnsOptions);
Expand Down Expand Up @@ -181,35 +189,38 @@ public void testIsDone_NotExists() throws Exception {
}

@Test
public void testWaitFor() throws InterruptedException {
public void testWaitFor() throws InterruptedException, TimeoutException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
replay(status, bigquery);
replay(status, bigquery, mockOptions);
initializeJob();
assertSame(completedJob, job.waitFor());
verify(status);
verify(status, mockOptions);
}

@Test
public void testWaitFor_Null() throws InterruptedException {
public void testWaitFor_Null() throws InterruptedException, TimeoutException {
initializeExpectedJob(1);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery);
replay(bigquery, mockOptions);
initializeJob();
assertNull(job.waitFor());
verify(mockOptions);
}

@Test
public void testWaitForWithTimeUnit() throws InterruptedException {
public void testWaitForWithCheckingPeriod() throws InterruptedException, TimeoutException {
initializeExpectedJob(3);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
Expand All @@ -219,33 +230,62 @@ public void testWaitForWithTimeUnit() throws InterruptedException {
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
Job runningJob = expectedJob.toBuilder().status(status).build();
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
replay(status, bigquery, timeUnit);
replay(status, bigquery, timeUnit, mockOptions);
initializeJob();
assertSame(completedJob, job.waitFor(42, timeUnit));
verify(status, timeUnit);
assertSame(completedJob, job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
verify(status, timeUnit, mockOptions);
}

@Test
public void testWaitForWithTimeUnit_Null() throws InterruptedException {
public void testWaitForWithCheckingPeriod_Null() throws InterruptedException, TimeoutException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(42);
EasyMock.expectLastCall();
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
Job runningJob = expectedJob.toBuilder().status(new JobStatus(JobStatus.State.RUNNING)).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery, timeUnit);
replay(bigquery, timeUnit, mockOptions);
initializeJob();
assertNull(job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
verify(bigquery, timeUnit, mockOptions);
}

@Test
public void testWaitForWithTimeout() throws InterruptedException, TimeoutException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(1);
EasyMock.expectLastCall();
Clock clock = createStrictMock(Clock.class);
EasyMock.expect(clock.millis()).andReturn(0L);
EasyMock.expect(clock.millis()).andReturn(1L);
EasyMock.expect(clock.millis()).andReturn(3L);
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(clock);
Job runningJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
replay(status, bigquery, timeUnit, clock, mockOptions);
initializeJob();
assertNull(job.waitFor(42, timeUnit));
verify(bigquery, timeUnit);
thrown.expect(TimeoutException.class);
job.waitFor(WaitForOption.checkEvery(1, timeUnit),
WaitForOption.timeout(3, TimeUnit.MILLISECONDS));
verify(status, timeUnit, clock, mockOptions);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -169,7 +170,7 @@ public class ITBigQueryTest {
public Timeout globalTimeout = Timeout.seconds(300);

@BeforeClass
public static void beforeClass() throws InterruptedException {
public static void beforeClass() throws InterruptedException, TimeoutException {
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
RemoteStorageHelper storageHelper = RemoteStorageHelper.create();
bigquery = bigqueryHelper.options().service();
Expand Down Expand Up @@ -783,7 +784,7 @@ public void testCreateAndGetJobWithSelectedFields() {
}

@Test
public void testCopyJob() throws InterruptedException {
public void testCopyJob() throws InterruptedException, TimeoutException {
String sourceTableName = "test_copy_job_source_table";
String destinationTableName = "test_copy_job_destination_table";
TableId sourceTable = TableId.of(DATASET, sourceTableName);
Expand All @@ -808,7 +809,7 @@ public void testCopyJob() throws InterruptedException {
}

@Test
public void testQueryJob() throws InterruptedException {
public void testQueryJob() throws InterruptedException, TimeoutException {
String tableName = "test_query_job_table";
String query = new StringBuilder()
.append("SELECT TimestampField, StringField, BooleanField FROM ")
Expand Down Expand Up @@ -851,7 +852,7 @@ public void testQueryJob() throws InterruptedException {
}

@Test
public void testExtractJob() throws InterruptedException {
public void testExtractJob() throws InterruptedException, TimeoutException {
String tableName = "test_export_job_table";
TableId destinationTable = TableId.of(DATASET, tableName);
LoadJobConfiguration configuration =
Expand All @@ -875,7 +876,7 @@ public void testExtractJob() throws InterruptedException {
}

@Test
public void testCancelJob() throws InterruptedException {
public void testCancelJob() throws InterruptedException, TimeoutException {
String destinationTableName = "test_cancel_query_job_table";
String query = "SELECT TimestampField, StringField, BooleanField FROM " + TABLE_ID.table();
TableId destinationTable = TableId.of(DATASET, destinationTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@

package com.google.cloud.compute;

import static com.google.cloud.WaitForOption.Option.CHECKING_PERIOD;
import static com.google.cloud.WaitForOption.Option.TIMEOUT;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.ServiceOptions;
import com.google.cloud.WaitForOption;
import com.google.cloud.WaitForOption.CheckingPeriod;
import com.google.cloud.compute.Compute.OperationOption;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
Expand All @@ -37,6 +43,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Google Compute Engine operations. Operation identity can be obtained via {@link #operationId()}.
Expand Down Expand Up @@ -660,9 +667,13 @@ public boolean isDone() {
}

/**
* Blocks until this operation completes its execution, either failing or succeeding. The
* operation status is checked every 500 milliseconds. This method returns current operation's
* latest information. If the operation no longer exists, this method returns {@code null}.
* Blocks until this operation completes its execution, either failing or succeeding. This method
* returns current operation's latest information. If the operation no longer exists, this method
* returns {@code null}. By default, the operation status is checked every 500 milliseconds, to
* configure this value use {@link WaitForOption#checkEvery(long, TimeUnit)}. Use
* {@link WaitForOption#timeout(long, TimeUnit)} to set the maximum time to wait.
*
* <p>Example usage of {@code waitFor()}:
* <pre> {@code
* Operation completedOperation = operation.waitFor();
* if (completedOperation == null) {
Expand All @@ -673,21 +684,11 @@ public boolean isDone() {
* // operation completed successfully
* }}</pre>
*
* @throws ComputeException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the
* operation to complete
*/
public Operation waitFor() throws InterruptedException {
return waitFor(500, TimeUnit.MILLISECONDS);
}

/**
* Blocks until this operation completes its execution, either failing or succeeding. The
* {@code checkEvery} and {@code unit} parameters determine how often the operation status is
* checked. This method returns current operation's latest information. If the operation no longer
* exists, this method returns {@code null}.
* <p>Example usage of {@code waitFor()} with checking period and timeout:
* <pre> {@code
* Operation completedOperation = operation.waitFor(1, TimeUnit.SECONDS);
* Operation completedOperation =
* operation.waitFor(WaitForOption.checkEvery(1, TimeUnit.SECONDS),
* WaitForOption.timeout(60, TimeUnit.SECONDS));
* if (completedOperation == null) {
* // operation no longer exists
* } else if (completedOperation.errors() != null) {
Expand All @@ -696,13 +697,27 @@ public Operation waitFor() throws InterruptedException {
* // operation completed successfully
* }}</pre>
*
* @param waitOptions options to configure checking period and timeout
* @throws ComputeException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the
* operation to complete
* @throws TimeoutException if the timeout provided with
* {@link WaitForOption#timeout(long, TimeUnit)} is exceeded. If no such option is provided
* this exception is never thrown.
*/
public Operation waitFor(int checkEvery, TimeUnit unit) throws InterruptedException {
public Operation waitFor(WaitForOption... waitOptions)
throws InterruptedException, TimeoutException {
Map<WaitForOption.Option, ?> optionMap = WaitForOption.asMap(waitOptions);
CheckingPeriod checkingPeriod = firstNonNull(CHECKING_PERIOD.getCheckingPeriod(optionMap),
CheckingPeriod.defaultInstance());
long timeout = firstNonNull(TIMEOUT.getLong(optionMap), -1L);
ServiceOptions.Clock clock = options.clock();
long startTime = clock.millis();
while (!isDone()) {
unit.sleep(checkEvery);
if (timeout != -1 && (clock.millis() - startTime) >= timeout) {
throw new TimeoutException();
}
checkingPeriod.sleep();
}
return reload();
}
Expand Down
Loading

0 comments on commit 2c5f323

Please sign in to comment.