Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/custom auth provider plugpoint #160

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,26 @@ A couple of manual configuration steps are required to run the code in IntelliJ:
- Run the `compileBuildConfig` task: eg: `./gradlew compileBuildConfig` or via Gradle > mongo-kafka > Tasks > other > compileBuildConfig
- Set `compileBuildConfig` to execute Before Build. via Gradle > Tasks > other > right click compileBuildConfig - click on "Execute Before Build"
- Delegate all build actions to Gradle: Settings > Build, Execution, Deployment > Build Tools > Gradle > Runner - tick "Delegate IDE build/run actions to gradle"

## Custom Auth Provider Interface

The `com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider` interface can be implemented to provide an object of type `com.mongodb.MongoCredential` which gets wrapped in the MongoClient that is constructed for the sink and source connector.
The following properties need to be set -

```
mongo.custom.auth.mechanism.enable - set to true.
mongo.custom.auth.mechanism.providerClass - qualified class name of the implementation class
```
Additional properties and can be set as required within the implementation class.
The init and validate methods of the implementation class get called when the connector initializes.

### Example
When using MONGODB-AWS authentication mechanism for atlas, one can specify the following configuration -

```
"connection.uri": "mongodb+srv://<sever>/?authMechanism=MONGODB-AWS"
"mongo.custom.auth.mechanism.enable": true,
"mongo.custom.auth.mechanism.providerClass": "sample.AwsAssumeRoleCredentialProvider"
"mongodbaws.auth.mechanism.roleArn": "arn:aws:iam::<ACCOUNTID>:role/<ROLENAME>"
```
Here the `sample.AwsAssumeRoleCredentialProvider` must be available on the classpath. `mongodbaws.auth.mechanism.roleArn` is an example of custom properties that can be read by `sample.AwsAssumeRoleCredentialProvider`.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ repositories {
}

extra.apply {
set("mongodbDriverVersion", "[4.7,4.7.99)")
set("mongodbDriverVersion", "4.9.1")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jagadishmdb ,

I made that change thinking that it is needed for MONGODB-AWS auth mechanism. We have been using 4.9.1 drivers so far.

After checking for compatibility at - https://www.mongodb.com/docs/drivers/java/sync/v4.7/fundamentals/auth/#std-label-mongodb-aws-auth-mechanism, I see that 4.7 supports it as well. I am testing with 4.7 to confirm. I think we can keep 'mongodbDriverVersion' to the same value as before, if that is desirable.

I will let you know how the tests go.

Thanks,
Nilay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests successful with the existing mongo driver versions. Please let me know if I should change the PR for restoring mongodbDriverVersion. I see that it has been already changed at #161.

Thanks,
Nilay.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming Nilay! There are some additional tests we run which were getting different error codes than what we were expecting with 4.9.1 driver. I will take care of fixing those later. Thanks for the contribution!

set("kafkaVersion", "2.6.0")
set("avroVersion", "1.9.2")

Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/mongodb/kafka/connect/sink/MongoSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static com.mongodb.kafka.connect.util.ServerApiConfig.addServerApiConfig;
import static com.mongodb.kafka.connect.util.SslConfigs.addSslConfigDef;
import static com.mongodb.kafka.connect.util.Validators.errorCheckingPasswordValueValidator;
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderConstants.*;
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderGenericInitializer.initializeCustomProvider;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
Expand All @@ -50,6 +52,7 @@

import com.mongodb.kafka.connect.MongoSinkConnector;
import com.mongodb.kafka.connect.util.Validators;
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;

public class MongoSinkConfig extends AbstractConfig {
private static final String EMPTY_STRING = "";
Expand Down Expand Up @@ -100,6 +103,7 @@ public class MongoSinkConfig extends AbstractConfig {
private final Optional<Pattern> topicsRegex;
private Map<String, MongoSinkTopicConfig> topicSinkConnectorConfigMap;
private ConnectionString connectionString;
private CustomCredentialProvider customCredentialProvider;

public MongoSinkConfig(final Map<String, String> originals) {
super(CONFIG, originals, false);
Expand Down Expand Up @@ -146,6 +150,10 @@ public MongoSinkConfig(final Map<String, String> originals) {
}
});
}
// Initialize CustomCredentialProvider if mongo.custom.auth.mechanism.enable is set to true
if (Boolean.parseBoolean(originals.get(CUSTOM_AUTH_ENABLE_CONFIG))) {
customCredentialProvider = initializeCustomProvider(originals);
}
}

public static final ConfigDef CONFIG = createConfigDef();
Expand All @@ -157,6 +165,10 @@ static String createOverrideKey(final String topic, final String config) {
return format(TOPIC_OVERRIDE_CONFIG, topic, config);
}

public CustomCredentialProvider getCustomCredentialProvider() {
return customCredentialProvider;
}

public ConnectionString getConnectionString() {
return connectionString;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ private static MongoClient createMongoClient(final MongoSinkConfig sinkConfig) {
MongoClientSettings.builder()
.applyConnectionString(sinkConfig.getConnectionString())
.applyToSslSettings(sslBuilder -> setupSsl(sslBuilder, sinkConfig));
if (sinkConfig.getCustomCredentialProvider() != null) {
builder.credential(
sinkConfig.getCustomCredentialProvider().getCustomCredential(sinkConfig.getOriginals()));
}
setServerApi(builder, sinkConfig);

return MongoClients.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import static com.mongodb.kafka.connect.util.Validators.errorCheckingPasswordValueValidator;
import static com.mongodb.kafka.connect.util.Validators.errorCheckingValueValidator;
import static com.mongodb.kafka.connect.util.VisibleForTesting.AccessModifier.PACKAGE;
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG;
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderGenericInitializer.initializeCustomProvider;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -78,6 +80,7 @@
import com.mongodb.kafka.connect.util.Validators;
import com.mongodb.kafka.connect.util.VisibleForTesting;
import com.mongodb.kafka.connect.util.config.BsonTimestampParser;
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;

public class MongoSourceConfig extends AbstractConfig {

Expand Down Expand Up @@ -583,6 +586,11 @@ public class MongoSourceConfig extends AbstractConfig {
+ "connection details will be used.";

static final String PROVIDER_CONFIG = "provider";
private CustomCredentialProvider customCredentialProvider;

public CustomCredentialProvider getCustomCredentialProvider() {
return customCredentialProvider;
}

public static final ConfigDef CONFIG = createConfigDef();
private static final List<Consumer<MongoSourceConfig>> INITIALIZERS =
Expand Down Expand Up @@ -745,6 +753,9 @@ public String value() {

public MongoSourceConfig(final Map<?, ?> originals) {
this(originals, true);
if (Boolean.parseBoolean((String) originals.get(CUSTOM_AUTH_ENABLE_CONFIG))) {
customCredentialProvider = initializeCustomProvider(originals);
}
}

private MongoSourceConfig(final Map<?, ?> originals, final boolean validateAll) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public void commandFailed(final CommandFailedEvent event) {
.applyConnectionString(sourceConfig.getConnectionString())
.addCommandListener(statisticsCommandListener)
.applyToSslSettings(sslBuilder -> setupSsl(sslBuilder, sourceConfig));

if (sourceConfig.getCustomCredentialProvider() != null) {
builder.credential(
sourceConfig
.getCustomCredentialProvider()
.getCustomCredential(sourceConfig.originals()));
}

setServerApi(builder, sourceConfig);

mongoClient =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
import com.mongodb.event.ClusterListener;
import com.mongodb.event.ClusterOpeningEvent;

import com.mongodb.kafka.connect.sink.MongoSinkConfig;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;

public final class ConnectionValidator {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionValidator.class);
Expand Down Expand Up @@ -77,6 +81,18 @@ public static Optional<MongoClient> validateCanConnect(
new ConnectionString(((Password) configValue.value()).value());
MongoClientSettings.Builder mongoClientSettingsBuilder =
MongoClientSettings.builder().applyConnectionString(connectionString);
CustomCredentialProvider customCredentialProvider = null;
if (connectorProperties instanceof MongoSinkConfig) {
customCredentialProvider =
((MongoSinkConfig) connectorProperties).getCustomCredentialProvider();
} else if (connectorProperties instanceof MongoSourceConfig) {
customCredentialProvider =
((MongoSourceConfig) connectorProperties).getCustomCredentialProvider();
}
if (customCredentialProvider != null) {
mongoClientSettingsBuilder.credential(
customCredentialProvider.getCustomCredential(connectorProperties.originals()));
}
setServerApi(mongoClientSettingsBuilder, config);

MongoClientSettings mongoClientSettings =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.mongodb.kafka.connect.util.custom.credentials;

import java.util.Map;

import com.mongodb.MongoCredential;

public interface CustomCredentialProvider {
MongoCredential getCustomCredential(Map<?, ?> configs);

void validate(Map<?, ?> configs);

void init(Map<?, ?> configs);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.mongodb.kafka.connect.util.custom.credentials;

public final class CustomCredentialProviderConstants {
private CustomCredentialProviderConstants() {}

public static final String CUSTOM_AUTH_ENABLE_CONFIG = "mongo.custom.auth.mechanism.enable";

public static final String CUSTOM_AUTH_PROVIDER_CLASS =
"mongo.custom.auth.mechanism.providerClass";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.mongodb.kafka.connect.util.custom.credentials;

import java.lang.reflect.InvocationTargetException;
import java.util.Map;

import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomCredentialProviderGenericInitializer {

static final Logger LOGGER =
LoggerFactory.getLogger(CustomCredentialProviderGenericInitializer.class);

public static CustomCredentialProvider initializeCustomProvider(Map<?, ?> originals)
throws ConfigException {
// Validate if CUSTOM_AUTH_ENABLE_CONFIG is set to true
String customAuthMechanismEnabled =
String.valueOf(originals.get(CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG));
if (customAuthMechanismEnabled == null
|| customAuthMechanismEnabled.equals("null")
|| customAuthMechanismEnabled.isEmpty()) {
throw new ConfigException(
CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG
+ " is not set to true. "
+ "CustomCredentialProvider should not be used.");
}
// Validate if CUSTOM_AUTH_PROVIDER_CLASS is provided
String qualifiedAuthProviderClassName =
String.valueOf(originals.get(CustomCredentialProviderConstants.CUSTOM_AUTH_PROVIDER_CLASS));
if (qualifiedAuthProviderClassName == null
|| qualifiedAuthProviderClassName.equals("null")
|| qualifiedAuthProviderClassName.isEmpty()) {
throw new ConfigException(
CustomCredentialProviderConstants.CUSTOM_AUTH_PROVIDER_CLASS
+ " is required when "
+ CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG
+ " is set to true.");
}
try {
// Validate if qualifiedAuthProviderClassName is on the class path.
Class<?> authProviderClass =
Class.forName(
qualifiedAuthProviderClassName,
false,
CustomCredentialProviderGenericInitializer.class.getClassLoader());
// Validate if qualifiedAuthProviderClassName implements CustomCredentialProvider interface.
if (!CustomCredentialProvider.class.isAssignableFrom(authProviderClass)) {
throw new ConfigException(
"Provided Class does not implement CustomCredentialProvider interface.");
}
CustomCredentialProvider customCredentialProvider =
initializeCustomProvider(authProviderClass);
// Perform config validations specific to CustomCredentialProvider impl provided
customCredentialProvider.validate(originals);
// Initialize custom variables required by implementation of CustomCredentialProvider
customCredentialProvider.init(originals);
return customCredentialProvider;
} catch (ClassNotFoundException e) {
throw new ConfigException(
"Unable to find " + qualifiedAuthProviderClassName + " on the classpath.");
}
}

private static CustomCredentialProvider initializeCustomProvider(Class<?> authProviderClass) {
try {
return (CustomCredentialProvider) authProviderClass.getDeclaredConstructor().newInstance();
} catch (InstantiationException
| IllegalAccessException
| InvocationTargetException
| NoSuchMethodException e) {
LOGGER.error("Error while instantiating " + authProviderClass + " class");
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.mongodb.kafka.connect.util.custom.credentials;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

public class CustomCredentialProviderGenericInitializerTest {

@Test
@DisplayName("Test Exception scenarios")
void testExceptions() {
Map<String, Object> props = new HashMap<>();
ConfigException configException =
assertThrows(
ConfigException.class,
() -> CustomCredentialProviderGenericInitializer.initializeCustomProvider(props),
"Expected initializeCustomProvider() to throw, but it didn't");
assertEquals(
CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG
+ " is not set to true. "
+ "CustomCredentialProvider should not be used.",
configException.getMessage());
props.put(CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG, true);
configException =
assertThrows(
ConfigException.class,
() -> CustomCredentialProviderGenericInitializer.initializeCustomProvider(props),
"Expected initializeCustomProvider() to throw, but it didn't");
assertEquals(
CustomCredentialProviderConstants.CUSTOM_AUTH_PROVIDER_CLASS
+ " is required when "
+ CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG
+ " is set to true.",
configException.getMessage());
String qualifiedAuthProviderClassName = "com.nonexistant.package.Test";
props.put(
CustomCredentialProviderConstants.CUSTOM_AUTH_PROVIDER_CLASS,
qualifiedAuthProviderClassName);
configException =
assertThrows(
ConfigException.class,
() -> CustomCredentialProviderGenericInitializer.initializeCustomProvider(props),
"Expected initializeCustomProvider() to throw, but it didn't");
assertEquals(
"Unable to find " + qualifiedAuthProviderClassName + " on the classpath.",
configException.getMessage());
qualifiedAuthProviderClassName =
"com.mongodb.kafka.connect.util.custom.credentials.TestInvalidCustomCredentialProvider";
props.put(
CustomCredentialProviderConstants.CUSTOM_AUTH_PROVIDER_CLASS,
qualifiedAuthProviderClassName);
configException =
assertThrows(
ConfigException.class,
() -> CustomCredentialProviderGenericInitializer.initializeCustomProvider(props),
"Expected initializeCustomProvider() to throw, but it didn't");
assertEquals(
"Provided Class does not implement CustomCredentialProvider interface.",
configException.getMessage());
}

@Test
@DisplayName("Test CustomCredentialProvider initialization")
void testInitializeCustomCredentialProvider() {
Map<String, Object> props = new HashMap<>();
props.put(CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG, true);
props.put(
CustomCredentialProviderConstants.CUSTOM_AUTH_PROVIDER_CLASS,
"com.mongodb.kafka.connect.util.custom.credentials.TestCustomCredentialProvider");
props.put("customProperty", "customValue");
CustomCredentialProvider customCredentialProvider =
CustomCredentialProviderGenericInitializer.initializeCustomProvider(props);
assertEquals(TestCustomCredentialProvider.class, customCredentialProvider.getClass());
}

@Test
@DisplayName("Test CustomCredentialProvider custom props initialization")
void testCustomPropsInit() {
Map<String, Object> props = new HashMap<>();
props.put(CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG, true);
props.put(
CustomCredentialProviderConstants.CUSTOM_AUTH_PROVIDER_CLASS,
"com.mongodb.kafka.connect.util.custom.credentials.TestCustomCredentialProvider");
props.put("customProperty", "customValue");
TestCustomCredentialProvider customCredentialProvider =
(TestCustomCredentialProvider)
CustomCredentialProviderGenericInitializer.initializeCustomProvider(props);
assertEquals("customValue", customCredentialProvider.getCustomProperty());
}

@Test
@DisplayName("Test CustomCredentialProvider custom props validation")
void testCustomPropsValidate() {
Map<String, Object> props = new HashMap<>();
props.put(CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG, true);
props.put(
CustomCredentialProviderConstants.CUSTOM_AUTH_PROVIDER_CLASS,
"com.mongodb.kafka.connect.util.custom.credentials.TestCustomCredentialProvider");
props.put("customProperty", "invalidValue");
ConfigException configException =
assertThrows(
ConfigException.class,
() -> CustomCredentialProviderGenericInitializer.initializeCustomProvider(props),
"Expected initializeCustomProvider() to throw, but it didn't");
assertEquals("Invalid value set for customProperty", configException.getMessage());
}
}
Loading