Skip to content

Commit

Permalink
This closes #18
Browse files Browse the repository at this point in the history
  • Loading branch information
lukecwik committed Mar 4, 2016
2 parents 4da935b + d91bc09 commit 419b6f4
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -176,7 +176,7 @@ public static Write write() {
* @see BigtableIO
*/
@Experimental
public static class Read extends PTransform<PInput, PCollection<Row>> {
public static class Read extends PTransform<PBegin, PCollection<Row>> {
/**
* Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
* indicated by the given options, and using any other specified customizations.
Expand Down Expand Up @@ -241,14 +241,14 @@ public String getTableId() {
}

@Override
public PCollection<Row> apply(PInput input) {
public PCollection<Row> apply(PBegin input) {
BigtableSource source =
new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
return input.getPipeline().apply(com.google.cloud.dataflow.sdk.io.Read.from(source));
}

@Override
public void validate(PInput input) {
public void validate(PBegin input) {
checkArgument(options != null, "BigtableOptions not specified");
checkArgument(!tableId.isEmpty(), "Table ID not specified");
try {
Expand Down

0 comments on commit 419b6f4

Please sign in to comment.