diff --git a/scio-hdfs/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink.java b/scio-hdfs/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink.java index 7f4be4ca84..e86d00b5fc 100644 --- a/scio-hdfs/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink.java +++ b/scio-hdfs/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink.java @@ -125,15 +125,22 @@ public void initialize(PipelineOptions options) throws Exception { @Override public void finalize(Iterable writerResults, PipelineOptions options) throws Exception { - // job successful Job job = ((HadoopFileSink) getSink()).jobInstance(); + FileSystem fs = FileSystem.get(job.getConfiguration()); + + // If there are 0 output shards, just create output folder. + if (!writerResults.iterator().hasNext()) { + fs.mkdirs(new Path(path)); + return; + } + + // job successful JobContext context = new JobContextImpl(job.getConfiguration(), jobID()); FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context); outputCommitter.commitJob(context); // get actual output shards Set actual = Sets.newHashSet(); - FileSystem fs = FileSystem.get(job.getConfiguration()); FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() { @Override public boolean accept(Path path) {