Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Add tests for existence of BQ job
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafal Wojdyla committed Jul 6, 2016
1 parent 78690ee commit bd7556f
Showing 1 changed file with 103 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;

import com.google.api.client.util.Data;
import com.google.api.client.util.Strings;
Expand Down Expand Up @@ -210,17 +211,21 @@ private static class FakeJobService implements JobService, Serializable {

private Object[] startJobReturns;
private Object[] pollJobReturns;
private Object[] existsJobReturns;
private String executingProject;
// Both counts will be reset back to zeros after serialization.
// This is a work around for DoFn's verifyUnmodified check.
private transient int startJobCallsCount;
private transient int pollJobStatusCallsCount;
private transient int existsJobStatusCallsCount;

public FakeJobService() {
this.startJobReturns = new Object[0];
this.pollJobReturns = new Object[0];
this.existsJobReturns = new Object[0];
this.startJobCallsCount = 0;
this.pollJobStatusCallsCount = 0;
this.existsJobStatusCallsCount = 0;
}

/**
Expand All @@ -234,6 +239,16 @@ public FakeJobService startJobReturns(Object... startJobReturns) {
return this;
}

/**
* Sets the return values to mock {@link JobService#exists}.
*
* <p>Throws if the {@link Object} is a {@link InterruptedException}, returns otherwise.
*/
public FakeJobService existsJobReturns(Object... pollJobReturns) {
this.existsJobReturns = pollJobReturns;
return this;
}

/**
* Sets the return values to mock {@link JobService#pollJob}.
*
Expand Down Expand Up @@ -328,7 +343,28 @@ public JobStatistics dryRunQuery(String projectId, String query)

@Override
public Boolean exists(JobReference jobRef, int maxAttempts) throws InterruptedException {
throw new UnsupportedOperationException();
if (!Strings.isNullOrEmpty(executingProject)) {
checkArgument(
jobRef.getProjectId().equals(executingProject),
"Project id: %s is not equal to executing project: %s",
jobRef.getProjectId(), executingProject);
}

if (existsJobStatusCallsCount < existsJobReturns.length) {
Object ret = existsJobReturns[existsJobStatusCallsCount++];
if (ret == null) {
return null;
}else if (ret instanceof Boolean) {
return (Boolean) ret;
} else if (ret instanceof InterruptedException) {
throw (InterruptedException) ret;
} else {
throw new RuntimeException("Unexpected return type: " + ret.getClass());
}
} else {
throw new RuntimeException(
"Exceeded expected number of calls: " + existsJobReturns.length);
}
}
}

Expand Down Expand Up @@ -512,6 +548,7 @@ public void testReadFromTable() {
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService()
.startJobReturns("done", "done")
.existsJobReturns(Boolean.TRUE)
.pollJobReturns(Status.UNKNOWN)
.verifyExecutingProject(bqOptions.getProject()))
.readerReturns(
Expand Down Expand Up @@ -570,6 +607,71 @@ public boolean accept(File pathname) {
}}).length);
}

@Test
public void testExportJobDoesNotExist() throws Exception {
FakeJobService fakeJobService = new FakeJobService()
.existsJobReturns(Boolean.FALSE)
// no poll returns defined - poll method should not be called given that exists should
// return false
.verifyExecutingProject(bqOptions.getProject());

FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(fakeJobService)
.readerReturns(
toJsonString(new TableRow().set("name", "a").set("number", 1)),
toJsonString(new TableRow().set("name", "b").set("number", 2)),
toJsonString(new TableRow().set("name", "c").set("number", 3)));

Pipeline p = TestPipeline.create(bqOptions);
PCollection<String> output = p
.apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
.withTestServices(fakeBqServices)
.withoutValidation())
.apply(ParDo.of(new DoFn<TableRow, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output((String) c.element().get("name"));
}
}));

DataflowAssert.that(output)
.containsInAnyOrder(ImmutableList.of("a", "b", "c"));

p.run();
}

@Test
public void testExportJobExists() throws Exception {
JobService fakeJobService = new FakeJobService()
.existsJobReturns(Boolean.TRUE)
.pollJobReturns(Status.UNKNOWN)
.verifyExecutingProject(bqOptions.getProject());

FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(fakeJobService)
.readerReturns(
toJsonString(new TableRow().set("name", "a").set("number", 1)),
toJsonString(new TableRow().set("name", "b").set("number", 2)),
toJsonString(new TableRow().set("name", "c").set("number", 3)));

Pipeline p = TestPipeline.create(bqOptions);
PCollection<String> output = p
.apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
.withTestServices(fakeBqServices)
.withoutValidation())
.apply(ParDo.of(new DoFn<TableRow, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output((String) c.element().get("name"));
}
}));

DataflowAssert.that(output)
.containsInAnyOrder(ImmutableList.of("a", "b", "c"));

p.run();
}

@Test
public void testCustomSinkUnknown() throws Exception {
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
Expand Down

0 comments on commit bd7556f

Please sign in to comment.