Skip to content

Commit

Permalink
Fix HDFS auth #124
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsmartin committed May 17, 2016
1 parent 73aa36a commit 275df98
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 91 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.cloud.dataflow.contrib.hadoop.AvroHadoopFileSource;
import com.google.cloud.dataflow.contrib.hadoop.HadoopFileSource;
import com.google.cloud.dataflow.contrib.hadoop.HadoopUserUtils;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.common.base.Function;
Expand All @@ -20,7 +19,7 @@
*/
public class SimpleAuthAvroHadoopFileSource<T> extends AvroHadoopFileSource<T>{
// keep this field to pass Hadoop user between workers
private final HadoopUserUtils user = new HadoopUserUtils();
private final String username;

/**
* Create a {@code SimpleAuthAvroHadoopFileSource} based on a file or a file pattern specification,
Expand All @@ -30,7 +29,7 @@ public SimpleAuthAvroHadoopFileSource(String filepattern,
AvroCoder<T> avroCoder,
String username) {
super(filepattern, avroCoder);
user.setSimpleAuthUser(username);
this.username = username;
}

/**
Expand All @@ -42,7 +41,7 @@ public SimpleAuthAvroHadoopFileSource(String filepattern,
HadoopFileSource.SerializableSplit serializableSplit,
String username) {
super(filepattern, avroCoder, serializableSplit);
user.setSimpleAuthUser(username);
this.username = username;
}

@Override
Expand All @@ -57,7 +56,7 @@ public AvroHadoopFileSource<T> apply(@Nullable InputSplit inputSplit) {
return new SimpleAuthAvroHadoopFileSource<>(filepattern,
avroCoder,
new HadoopFileSource.SerializableSplit(inputSplit),
user.getSimpleAuthUser());
username);
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,154 @@
package com.google.cloud.dataflow.contrib.hadoop.simpleauth;

import com.google.cloud.dataflow.contrib.hadoop.HadoopFileSink;
import com.google.cloud.dataflow.contrib.hadoop.HadoopUserUtils;
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.values.KV;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;

import java.lang.invoke.MethodHandle;
import java.security.PrivilegedExceptionAction;

/**
* Sink for Hadoop/HDFS with Simple Authentication.
*
* Allows to set arbitrary username as HDFS user, which is used for writing to HDFS.
*/
public class SimpleAuthHadoopFileSink<K, V> extends HadoopFileSink<K, V> {
// keep this field to pass Hadoop user between workers
private final HadoopUserUtils user = new HadoopUserUtils();
private final String username;

public SimpleAuthHadoopFileSink(String path,
Class<? extends FileOutputFormat<K, V>> formatClass,
Configuration conf,
String username) {
super(path, formatClass);
user.setSimpleAuthUser(username);
super(path, formatClass, conf);
this.username = username;
}

public SimpleAuthHadoopFileSink(String path,
@Override
public WriteOperation<KV<K, V>, ?> createWriteOperation(PipelineOptions options) {
return new SimpleAuthHadoopWriteOperation<>(this, path, formatClass, username);
}

public static class SimpleAuthHadoopWriteOperation<K, V> extends HadoopWriteOperation<K, V> {
private final String username;

SimpleAuthHadoopWriteOperation(Sink<KV<K, V>> sink,
String path,
Class<? extends FileOutputFormat<K, V>> formatClass,
String username) {
super(sink, path, formatClass);
this.username = username;
}

@Override
public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
final Iterable<String> results = writerResults;
final PipelineOptions opts = options;

UserGroupInformation.createRemoteUser(username).doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
_finalize(results, opts);
return null;
}
});
}

private void _finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
super.finalize(writerResults, options);
}

@Override
public Writer<KV<K, V>, String> createWriter(PipelineOptions options) throws Exception {
return new SimpleAuthHadoopWriter<>(this, path, formatClass, username);
}
}

public static class SimpleAuthHadoopWriter<K, V> extends HadoopWriter<K, V> {
private final UserGroupInformation ugi;

public SimpleAuthHadoopWriter(SimpleAuthHadoopWriteOperation<K, V> writeOperation,
String path,
Class<? extends FileOutputFormat<K, V>> formatClass,
Configuration conf,
String username) {
super(path, formatClass, conf);
user.setSimpleAuthUser(username);
super(writeOperation, path, formatClass);
ugi = UserGroupInformation.createRemoteUser(username);
}

@Override
public void open(String uId) throws Exception {
final String uid = uId;
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
_open(uid);
return null;
}
});
}

private void _open(String uId) throws Exception {
super.open(uId);
}

@Override
public String close() throws Exception {
return ugi.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
return _close();
}
});
}

private String _close() throws Exception {
return super.close();
}

// @Override
// public void open(String uId) throws Exception {
// ugi.doAs(new SimpleAuthHadoopAction<Void>(
// MethodHandles.lookup().findSpecial(HadoopWriter.class, "open", MethodType.methodType(Void.class, String.class).unwrap(), this.getClass()),
// uId));
// }
//
// @Override
// public String close() throws Exception {
// return ugi.doAs(new SimpleAuthHadoopAction<String>(
// MethodHandles.lookup().findSpecial(HadoopWriter.class, "close", MethodType.methodType(Void.class).unwrap(), this.getClass()),
// null));
// }

}

public static class SimpleAuthHadoopAction<T> implements PrivilegedExceptionAction<T> {
private final MethodHandle method;
private final Object[] args;

public SimpleAuthHadoopAction(MethodHandle method, Object... args) {
this.method = method.asType(method.type().wrap());
this.args = args;
}

@Override
public T run() throws Exception {
try {
if (method.type().returnType().equals(Void.class)) {
method.invoke(args);
return null;
}
else {
return (T) method.invoke(args);
}

}
catch (Throwable throwable) {
return null;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.google.cloud.dataflow.contrib.hadoop.simpleauth;

import com.google.cloud.dataflow.contrib.hadoop.HadoopFileSource;
import com.google.cloud.dataflow.contrib.hadoop.HadoopUserUtils;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
Expand All @@ -37,9 +36,7 @@
* Allows to set arbitrary username as HDFS user, which is used for reading from HDFS.
*/
public class SimpleAuthHadoopFileSource<K, V> extends HadoopFileSource<K, V> {
// keep this field to pass Hadoop user between workers
private final HadoopUserUtils user = new HadoopUserUtils();

private final String username;
/**
* Create a {@code SimpleAuthHadoopFileSource} based on a single Hadoop input split, which won't be
* split up further, {@param username} is used for Simple Authentication with Hadoop.
Expand All @@ -51,7 +48,7 @@ protected SimpleAuthHadoopFileSource(String filepattern,
HadoopFileSource.SerializableSplit serializableSplit,
String username) {
super(filepattern, formatClass, keyClass, valueClass, serializableSplit);
user.setSimpleAuthUser(username);
this.username = username;
}

/**
Expand All @@ -64,7 +61,7 @@ protected SimpleAuthHadoopFileSource(String filepattern,
Class<V> valueClass,
String username) {
super(filepattern, formatClass, keyClass, valueClass);
user.setSimpleAuthUser(username);
this.username = username;
}

/**
Expand Down Expand Up @@ -112,7 +109,7 @@ public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBund
public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
return new SimpleAuthHadoopFileSource<>(filepattern, formatClass, keyClass,
valueClass, new HadoopFileSource.SerializableSplit(inputSplit),
user.getSimpleAuthUser());
username);
}
});
} else {
Expand Down

0 comments on commit 275df98

Please sign in to comment.