Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add whenDone method and CompletionCallback to Job and Operation #1016

Merged
merged 7 commits into from
Jun 9, 2016
17 changes: 4 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,7 @@ if (table == null) {
}
System.out.println("Loading data into table " + tableId);
Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
while (!loadJob.isDone()) {
Thread.sleep(1000L);
}
loadJob = loadJob.waitFor();
if (loadJob.status().error() != null) {
System.out.println("Job completed with errors");
} else {
Expand Down Expand Up @@ -203,7 +201,6 @@ import com.google.cloud.compute.Compute;
import com.google.cloud.compute.ComputeOptions;
import com.google.cloud.compute.Disk;
import com.google.cloud.compute.DiskId;
import com.google.cloud.compute.Operation;
import com.google.cloud.compute.Snapshot;

Compute compute = ComputeOptions.defaultInstance().service();
Expand All @@ -212,12 +209,10 @@ Disk disk = compute.getDisk(diskId, Compute.DiskOption.fields());
if (disk != null) {
String snapshotName = "disk-name-snapshot";
Operation operation = disk.createSnapshot(snapshotName);
while (!operation.isDone()) {
Thread.sleep(1000L);
}
operation = operation.waitFor();
if (operation.errors() == null) {
// use snapshot
Snapshot snapshot = compute.getSnapshot("disk-name-snapshot");
Snapshot snapshot = compute.getSnapshot(snapshotName);
}
}
```
Expand All @@ -234,8 +229,6 @@ import com.google.cloud.compute.InstanceId;
import com.google.cloud.compute.InstanceInfo;
import com.google.cloud.compute.MachineTypeId;
import com.google.cloud.compute.NetworkId;
import com.google.cloud.compute.NetworkInterface;
import com.google.cloud.compute.Operation;

Compute compute = ComputeOptions.defaultInstance().service();
ImageId imageId = ImageId.of("debian-cloud", "debian-8-jessie-v20160329");
Expand All @@ -246,9 +239,7 @@ InstanceId instanceId = InstanceId.of("us-central1-a", "instance-name");
MachineTypeId machineTypeId = MachineTypeId.of("us-central1-a", "n1-standard-1");
Operation operation =
compute.create(InstanceInfo.of(instanceId, machineTypeId, attachedDisk, networkInterface));
while (!operation.isDone()) {
Thread.sleep(1000L);
}
operation = operation.waitFor();
if (operation.errors() == null) {
// use instance
Instance instance = compute.getInstance(instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,21 @@

package com.google.cloud.bigquery;

import static com.google.cloud.WaitForOption.Option.CHECKING_PERIOD;
import static com.google.cloud.WaitForOption.Option.TIMEOUT;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.Clock;
import com.google.cloud.WaitForOption;
import com.google.cloud.WaitForOption.CheckingPeriod;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* A Google BigQuery Job.
Expand Down Expand Up @@ -143,6 +153,60 @@ public boolean isDone() {
return job == null || job.status().state() == JobStatus.State.DONE;
}

/**
* Blocks until this job completes its execution, either failing or succeeding. This method
* returns current job's latest information. If the job no longer exists, this method returns
* {@code null}. By default, the job status is checked every 500 milliseconds, to configure this
* value use {@link WaitForOption#checkEvery(long, TimeUnit)}. Use
* {@link WaitForOption#timeout(long, TimeUnit)} to set the maximum time to wait.
*
* <p>Example usage of {@code waitFor()}:
* <pre> {@code
* Job completedJob = job.waitFor();
* if (completedJob == null) {
* // job no longer exists
* } else if (completedJob.status().error() != null) {
* // job failed, handle error
* } else {
* // job completed successfully
* }}</pre>
*
* <p>Example usage of {@code waitFor()} with checking period and timeout:
* <pre> {@code
* Job completedJob = job.waitFor(WaitForOption.checkEvery(1, TimeUnit.SECONDS),
* WaitForOption.timeout(60, TimeUnit.SECONDS));
* if (completedJob == null) {
* // job no longer exists
* } else if (completedJob.status().error() != null) {
* // job failed, handle error
* } else {
* // job completed successfully
* }}</pre>
*
* @param waitOptions options to configure checking period and timeout
* @throws BigQueryException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
* to complete
* @throws TimeoutException if the timeout provided with
* {@link WaitForOption#timeout(long, TimeUnit)} is exceeded. If no such option is provided
* this exception is never thrown.
*/
public Job waitFor(WaitForOption... waitOptions) throws InterruptedException, TimeoutException {
Map<WaitForOption.Option, ?> optionMap = WaitForOption.asMap(waitOptions);
CheckingPeriod checkingPeriod = firstNonNull(CHECKING_PERIOD.getCheckingPeriod(optionMap),
CheckingPeriod.defaultInstance());
long timeout = firstNonNull(TIMEOUT.getLong(optionMap), -1L);
Clock clock = options.clock();
long startTime = clock.millis();
while (!isDone()) {
if (timeout != -1 && (clock.millis() - startTime) >= timeout) {
throw new TimeoutException();
}
checkingPeriod.sleep();
}
return reload();
}

/**
* Fetches current job's latest information. Returns {@code null} if the job does not exist.
*
Expand All @@ -151,7 +215,7 @@ public boolean isDone() {
* @throws BigQueryException upon failure
*/
public Job reload(BigQuery.JobOption... options) {
return bigquery.getJob(jobId().job(), options);
return bigquery.getJob(jobId(), options);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
* }
* System.out.println("Loading data into table " + tableId);
* Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
* while (!loadJob.isDone()) {
* Thread.sleep(1000L);
* }
* loadJob = loadJob.waitFor();
* if (loadJob.status().error() != null) {
* System.out.println("Job completed with errors");
* } else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import com.google.cloud.Clock;
import com.google.cloud.WaitForOption;
import com.google.cloud.bigquery.JobStatistics.CopyStatistics;

import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class JobTest {

Expand Down Expand Up @@ -66,6 +74,9 @@ public class JobTest {
private Job expectedJob;
private Job job;

@Rule
public final ExpectedException thrown = ExpectedException.none();

private void initializeExpectedJob(int optionsCalls) {
expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls);
replay(serviceMockReturnsOptions);
Expand Down Expand Up @@ -177,13 +188,113 @@ public void testIsDone_NotExists() throws Exception {
assertTrue(job.isDone());
}

@Test
public void testWaitFor() throws InterruptedException, TimeoutException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
replay(status, bigquery, mockOptions);
initializeJob();
assertSame(completedJob, job.waitFor());
verify(status, mockOptions);
}

@Test
public void testWaitFor_Null() throws InterruptedException, TimeoutException {
initializeExpectedJob(1);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery, mockOptions);
initializeJob();
assertNull(job.waitFor());
verify(mockOptions);
}

@Test
public void testWaitForWithCheckingPeriod() throws InterruptedException, TimeoutException {
initializeExpectedJob(3);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(42);
EasyMock.expectLastCall();
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
Job runningJob = expectedJob.toBuilder().status(status).build();
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
replay(status, bigquery, timeUnit, mockOptions);
initializeJob();
assertSame(completedJob, job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
verify(status, timeUnit, mockOptions);
}

@Test
public void testWaitForWithCheckingPeriod_Null() throws InterruptedException, TimeoutException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(42);
EasyMock.expectLastCall();
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
Job runningJob = expectedJob.toBuilder().status(new JobStatus(JobStatus.State.RUNNING)).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery, timeUnit, mockOptions);
initializeJob();
assertNull(job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
verify(bigquery, timeUnit, mockOptions);
}

@Test
public void testWaitForWithTimeout() throws InterruptedException, TimeoutException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(1);
EasyMock.expectLastCall();
Clock clock = createStrictMock(Clock.class);
expect(clock.millis()).andReturn(0L);
expect(clock.millis()).andReturn(1L);
expect(clock.millis()).andReturn(3L);
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(clock);
Job runningJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
replay(status, bigquery, timeUnit, clock, mockOptions);
initializeJob();
thrown.expect(TimeoutException.class);
job.waitFor(WaitForOption.checkEvery(1, timeUnit),
WaitForOption.timeout(3, TimeUnit.MILLISECONDS));
verify(status, timeUnit, clock, mockOptions);
}

@Test
public void testReload() throws Exception {
initializeExpectedJob(4);
JobInfo updatedInfo = JOB_INFO.toBuilder().etag("etag").build();
Job expectedJob = new Job(serviceMockReturnsOptions, new JobInfo.BuilderImpl(updatedInfo));
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId().job())).andReturn(expectedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(expectedJob);
replay(bigquery);
initializeJob();
Job updatedJob = job.reload();
Expand All @@ -194,7 +305,7 @@ public void testReload() throws Exception {
public void testReloadNull() throws Exception {
initializeExpectedJob(1);
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId().job())).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery);
initializeJob();
assertNull(job.reload());
Expand All @@ -206,7 +317,7 @@ public void testReloadWithOptions() throws Exception {
JobInfo updatedInfo = JOB_INFO.toBuilder().etag("etag").build();
Job expectedJob = new Job(serviceMockReturnsOptions, new JobInfo.BuilderImpl(updatedInfo));
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId().job(), BigQuery.JobOption.fields()))
expect(bigquery.getJob(JOB_INFO.jobId(), BigQuery.JobOption.fields()))
.andReturn(expectedJob);
replay(bigquery);
initializeJob();
Expand Down
Loading