Skip to content

Commit

Permalink
Modify test to use one pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
lgajowy committed Nov 2, 2017
1 parent a651a3e commit 6d04d78
Showing 1 changed file with 16 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Rule;
Expand All @@ -23,24 +24,24 @@ public class TextIOIT {

private static final String FILE_BASENAME = "textioit";
private static final long LINES_OF_TEXT_COUNT = 10000L;
private static final String EXPECTED_HASHCODE = "ccae48ff685c1822e9f4d510363bf018";

@Rule public TestPipeline pipelineWrite = TestPipeline.create();
@Rule public TestPipeline pipelineRead = TestPipeline.create();

@Rule public TestPipeline pipeline = TestPipeline.create();

@Test
public void testWriteThenRead() {
runWrite();
runRead();
}

private void runWrite() {
pipelineWrite
.apply(GenerateSequence.from(0).to(LINES_OF_TEXT_COUNT))
.apply(MapElements.into(TypeDescriptors.strings()).via(produceTextLine()))
.apply(TextIO.write().to(FILE_BASENAME));
PCollection<String> consolidatedContentHashcode = pipeline
.apply("Generate sequence", GenerateSequence.from(0).to(LINES_OF_TEXT_COUNT))
.apply("Produce text", MapElements.into(TypeDescriptors.strings()).via(produceTextLine()))
.apply("Write content to files", TextIO.write().to(FILE_BASENAME).withOutputFilenames())
.getPerDestinationOutputFilenames()
.apply("Read all files", Values.create()).apply(TextIO.readAll())
.apply("Calculate hashcode", Combine.globally(new HashingFn()).withoutDefaults());

assertHashcodeOk(consolidatedContentHashcode);

pipelineWrite.run().waitUntilFinish();
pipeline.run().waitUntilFinish();
}

private SerializableFunction<Long, String> produceTextLine() {
Expand All @@ -49,23 +50,14 @@ private SerializableFunction<Long, String> produceTextLine() {
Hashing.murmur3_128().hashString(seed.toString(), StandardCharsets.UTF_8).toString();
}

private void runRead() {
PCollection<String> files =
pipelineRead.apply(TextIO.read().from(String.format("%s*", FILE_BASENAME)));

PCollection<String> consolidatedHashcode =
files.apply(Combine.globally(new HashingFn()).withoutDefaults());

PAssert.that(consolidatedHashcode).containsInAnyOrder("ccae48ff685c1822e9f4d510363bf018");

pipelineRead.run().waitUntilFinish();
private void assertHashcodeOk(PCollection<String> consolidatedContentHashcode) {
PAssert.that(consolidatedContentHashcode).containsInAnyOrder(EXPECTED_HASHCODE);
}
}

/*
TODO:
Next steps:
- use one pipeline instead of the two (investigation)
- Should we pre calculate them or calculate them during test runs?
- test setup & cleanup
- Better files destination (filesystem? path?)
- parametrize this test (data amount, filesystem, path)
Expand Down

0 comments on commit 6d04d78

Please sign in to comment.