Skip to content

Commit

Permalink
Fix empty SCollection writes to HDFS Sink #158
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsmartin committed May 28, 2016
1 parent 692b429 commit 71ac38e
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,22 @@ public void initialize(PipelineOptions options) throws Exception {

@Override
public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
// job successful
Job job = ((HadoopFileSink<K, V>) getSink()).jobInstance();
FileSystem fs = FileSystem.get(job.getConfiguration());

// If there are 0 output shards, just create output folder.
if (!writerResults.iterator().hasNext()) {
fs.mkdirs(new Path(path));
return;
}

// job successful
JobContext context = new JobContextImpl(job.getConfiguration(), jobID());
FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context);
outputCommitter.commitJob(context);

// get actual output shards
Set<String> actual = Sets.newHashSet();
FileSystem fs = FileSystem.get(job.getConfiguration());
FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
@Override
public boolean accept(Path path) {
Expand Down

0 comments on commit 71ac38e

Please sign in to comment.