From 71ac38e80e0c23798af822938fd2ea78b0fbbaa1 Mon Sep 17 00:00:00 2001 From: Andrew Martin Date: Sat, 28 May 2016 14:00:42 -0400 Subject: [PATCH] Fix empty SCollection writes to HDFS Sink #158 --- .../cloud/dataflow/contrib/hadoop/HadoopFileSink.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/scio-hdfs/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink.java b/scio-hdfs/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink.java index 7f4be4ca84..e86d00b5fc 100644 --- a/scio-hdfs/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink.java +++ b/scio-hdfs/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink.java @@ -125,15 +125,22 @@ public void initialize(PipelineOptions options) throws Exception { @Override public void finalize(Iterable writerResults, PipelineOptions options) throws Exception { - // job successful Job job = ((HadoopFileSink) getSink()).jobInstance(); + FileSystem fs = FileSystem.get(job.getConfiguration()); + + // If there are 0 output shards, just create output folder. + if (!writerResults.iterator().hasNext()) { + fs.mkdirs(new Path(path)); + return; + } + + // job successful JobContext context = new JobContextImpl(job.getConfiguration(), jobID()); FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context); outputCommitter.commitJob(context); // get actual output shards Set actual = Sets.newHashSet(); - FileSystem fs = FileSystem.get(job.getConfiguration()); FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() { @Override public boolean accept(Path path) {