Skip to content

Commit 6e43b1d

Browse files
KAFKA-404: Support for extending MongoClient to allow for users to add custom auth such as AWS IAM / Assume Role (#161)
* Custom Auth Provider Plug point changes * added display names to tests * static analysis fixes. * driver version changed back to what it was. * Changelog updated. * documentation changes, version change. * readme modified. --------- Co-authored-by: nilaysundarkar <[email protected]>
1 parent d20fd15 commit 6e43b1d

14 files changed

+543
-2
lines changed

CHANGELOG.md

+11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22

33
## Changelog
44

5+
## 1.13.0
6+
7+
### Improvements
8+
- [KAFKA-404](https://jira.mongodb.org/browse/KAFKA-404) Support for extending MongoClient to allow for users to add custom auth such as AWS IAM / Assume Role.
9+
10+
## 1.12.0
11+
12+
### Improvements
13+
- [KAFKA-374](https://jira.mongodb.org/browse/KAFKA-374) Implement an error handler to address specific scenarios.
14+
15+
516
## 1.11.2
617

718
### Bug Fixes

README.md

+137
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,140 @@ A couple of manual configuration steps are required to run the code in IntelliJ:
7676
- Run the `compileBuildConfig` task: eg: `./gradlew compileBuildConfig` or via Gradle > mongo-kafka > Tasks > other > compileBuildConfig
7777
- Set `compileBuildConfig` to execute Before Build. via Gradle > Tasks > other > right click compileBuildConfig - click on "Execute Before Build"
7878
- Delegate all build actions to Gradle: Settings > Build, Execution, Deployment > Build Tools > Gradle > Runner - tick "Delegate IDE build/run actions to gradle"
79+
80+
## Custom Auth Provider Interface
81+
82+
The `com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider` interface can be implemented to provide an object of type `com.mongodb.MongoCredential` which gets wrapped in the MongoClient that is constructed for the sink and source connector.
83+
The following properties need to be set -
84+
85+
```
86+
mongo.custom.auth.mechanism.enable - set to true.
87+
mongo.custom.auth.mechanism.providerClass - qualified class name of the implementation class
88+
```
89+
Additional properties and can be set as required within the implementation class.
90+
The init and validate methods of the implementation class get called when the connector initializes.
91+
92+
### Example
93+
When using MONGODB-AWS authentication mechanism for atlas, one can specify the following configuration -
94+
95+
```
96+
"connection.uri": "mongodb+srv://<sever>/?authMechanism=MONGODB-AWS"
97+
"mongo.custom.auth.mechanism.enable": true,
98+
"mongo.custom.auth.mechanism.providerClass": "sample.AwsAssumeRoleCredentialProvider"
99+
"mongodbaws.auth.mechanism.roleArn": "arn:aws:iam::<ACCOUNTID>:role/<ROLENAME>"
100+
```
101+
Here the `sample.AwsAssumeRoleCredentialProvider` must be available on the classpath. `mongodbaws.auth.mechanism.roleArn` is an example of custom properties that can be read by `sample.AwsAssumeRoleCredentialProvider`.
102+
103+
### Sample code for implementing Custom role provider
104+
Here is sample code that can work.
105+
106+
```java
107+
public class AwsAssumeRoleCredentialProvider implements CustomCredentialProvider {
108+
109+
public AwsAssumeRoleCredentialProvider() {}
110+
@Override
111+
public MongoCredential getCustomCredential(Map<?, ?> map) {
112+
AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain();
113+
Supplier<AwsCredential> awsFreshCredentialSupplier = () -> {
114+
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()
115+
.withCredentials(provider)
116+
.withRegion("us-east-1")
117+
.build();
118+
AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)
119+
.withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn"))
120+
.withRoleSessionName("Test_Session");
121+
AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);
122+
Credentials creds = assumeRoleResult.getCredentials();
123+
// Add your code to fetch new credentials
124+
return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken());
125+
};
126+
return MongoCredential.createAwsCredential(null, null)
127+
.withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);
128+
}
129+
130+
@Override
131+
public void validate(Map<?, ?> map) {
132+
String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn");
133+
if (StringUtils.isNullOrEmpty(roleArn)) {
134+
throw new RuntimeException("Invalid value set for customProperty");
135+
}
136+
}
137+
138+
@Override
139+
public void init(Map<?, ?> map) {
140+
141+
}
142+
}
143+
```
144+
### pom file to build the sample CustomRoleProvider into a jar
145+
Here is the pom.xml that can build the complete jar containing the AwsAssumeRoleCredentialProvider
146+
147+
```java
148+
<?xml version="1.0" encoding="UTF-8"?>
149+
<project xmlns="http://maven.apache.org/POM/4.0.0"
150+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
151+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
152+
<modelVersion>4.0.0</modelVersion>
153+
<groupId>sample</groupId>
154+
<artifactId>AwsAssumeRoleCredentialProvider</artifactId>
155+
<version>1.0-SNAPSHOT</version>
156+
<build>
157+
<plugins>
158+
<plugin>
159+
<groupId>org.apache.maven.plugins</groupId>
160+
<artifactId>maven-shade-plugin</artifactId>
161+
<version>3.5.3</version>
162+
<configuration>
163+
<!-- put your configurations here -->
164+
</configuration>
165+
<executions>
166+
<execution>
167+
<phase>package</phase>
168+
<goals>
169+
<goal>shade</goal>
170+
</goals>
171+
</execution>
172+
</executions>
173+
</plugin>
174+
</plugins>
175+
</build>
176+
177+
<dependencies>
178+
<!-- Java MongoDB Driver dependency -->
179+
<!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
180+
<dependency>
181+
<groupId>org.mongodb</groupId>
182+
<artifactId>mongodb-driver-sync</artifactId>
183+
<version>5.1.0</version>
184+
</dependency>
185+
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk -->
186+
<dependency>
187+
<groupId>com.amazonaws</groupId>
188+
<artifactId>aws-java-sdk</artifactId>
189+
<version>1.12.723</version>
190+
</dependency>
191+
192+
<!-- slf4j logging dependency, required for logging output from the MongoDB Java Driver -->
193+
<dependency>
194+
<groupId>org.slf4j</groupId>
195+
<artifactId>slf4j-jdk14</artifactId>
196+
<version>1.7.28</version>
197+
</dependency>
198+
199+
<dependency>
200+
<groupId>kafka-connect</groupId>
201+
<artifactId>kafka-connect</artifactId>
202+
<scope>system</scope>
203+
<version>1.12.1-SNAPSHOT</version>
204+
<systemPath>/Users/jagadish.nallapaneni/mongo-kafka/build/libs/mongo-kafka-connect-1.12.1-SNAPSHOT-confluent.jar</systemPath>
205+
</dependency>
206+
</dependencies>
207+
208+
<properties>
209+
<maven.compiler.source>17</maven.compiler.source>
210+
<maven.compiler.target>17</maven.compiler.target>
211+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
212+
</properties>
213+
214+
</project>
215+
```

build.gradle.kts

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ plugins {
3939
}
4040

4141
group = "org.mongodb.kafka"
42-
version = "1.12.1-SNAPSHOT"
42+
version = "1.13.0"
4343
description = "The official MongoDB Apache Kafka Connect Connector."
4444

4545
repositories {
@@ -49,7 +49,7 @@ repositories {
4949
}
5050

5151
extra.apply {
52-
set("mongodbDriverVersion", "[4.7,4.7.99)")
52+
set("mongodbDriverVersion", "[4.7,4.7.99]")
5353
set("kafkaVersion", "2.6.0")
5454
set("avroVersion", "1.9.2")
5555

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkConfig.java

+12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import static com.mongodb.kafka.connect.util.ServerApiConfig.addServerApiConfig;
2626
import static com.mongodb.kafka.connect.util.SslConfigs.addSslConfigDef;
2727
import static com.mongodb.kafka.connect.util.Validators.errorCheckingPasswordValueValidator;
28+
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG;
29+
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderGenericInitializer.initializeCustomProvider;
2830
import static java.lang.String.format;
2931
import static java.util.Collections.emptyList;
3032
import static java.util.Collections.singletonList;
@@ -50,6 +52,7 @@
5052

5153
import com.mongodb.kafka.connect.MongoSinkConnector;
5254
import com.mongodb.kafka.connect.util.Validators;
55+
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;
5356

5457
public class MongoSinkConfig extends AbstractConfig {
5558
private static final String EMPTY_STRING = "";
@@ -100,6 +103,7 @@ public class MongoSinkConfig extends AbstractConfig {
100103
private final Optional<Pattern> topicsRegex;
101104
private Map<String, MongoSinkTopicConfig> topicSinkConnectorConfigMap;
102105
private ConnectionString connectionString;
106+
private CustomCredentialProvider customCredentialProvider;
103107

104108
public MongoSinkConfig(final Map<String, String> originals) {
105109
super(CONFIG, originals, false);
@@ -146,6 +150,10 @@ public MongoSinkConfig(final Map<String, String> originals) {
146150
}
147151
});
148152
}
153+
// Initialize CustomCredentialProvider if mongo.custom.auth.mechanism.enable is set to true
154+
if (Boolean.parseBoolean(originals.get(CUSTOM_AUTH_ENABLE_CONFIG))) {
155+
customCredentialProvider = initializeCustomProvider(originals);
156+
}
149157
}
150158

151159
public static final ConfigDef CONFIG = createConfigDef();
@@ -157,6 +165,10 @@ static String createOverrideKey(final String topic, final String config) {
157165
return format(TOPIC_OVERRIDE_CONFIG, topic, config);
158166
}
159167

168+
public CustomCredentialProvider getCustomCredentialProvider() {
169+
return customCredentialProvider;
170+
}
171+
160172
public ConnectionString getConnectionString() {
161173
return connectionString;
162174
}

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTask.java

+4
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ private static MongoClient createMongoClient(final MongoSinkConfig sinkConfig) {
155155
MongoClientSettings.builder()
156156
.applyConnectionString(sinkConfig.getConnectionString())
157157
.applyToSslSettings(sslBuilder -> setupSsl(sslBuilder, sinkConfig));
158+
if (sinkConfig.getCustomCredentialProvider() != null) {
159+
builder.credential(
160+
sinkConfig.getCustomCredentialProvider().getCustomCredential(sinkConfig.getOriginals()));
161+
}
158162
setServerApi(builder, sinkConfig);
159163

160164
return MongoClients.create(

src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java

+11
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import static com.mongodb.kafka.connect.util.Validators.errorCheckingPasswordValueValidator;
3636
import static com.mongodb.kafka.connect.util.Validators.errorCheckingValueValidator;
3737
import static com.mongodb.kafka.connect.util.VisibleForTesting.AccessModifier.PACKAGE;
38+
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG;
39+
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderGenericInitializer.initializeCustomProvider;
3840
import static java.lang.String.format;
3941
import static java.util.Arrays.asList;
4042
import static java.util.Collections.emptyList;
@@ -78,6 +80,7 @@
7880
import com.mongodb.kafka.connect.util.Validators;
7981
import com.mongodb.kafka.connect.util.VisibleForTesting;
8082
import com.mongodb.kafka.connect.util.config.BsonTimestampParser;
83+
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;
8184

8285
public class MongoSourceConfig extends AbstractConfig {
8386

@@ -583,6 +586,11 @@ public class MongoSourceConfig extends AbstractConfig {
583586
+ "connection details will be used.";
584587

585588
static final String PROVIDER_CONFIG = "provider";
589+
private CustomCredentialProvider customCredentialProvider;
590+
591+
public CustomCredentialProvider getCustomCredentialProvider() {
592+
return customCredentialProvider;
593+
}
586594

587595
public static final ConfigDef CONFIG = createConfigDef();
588596
private static final List<Consumer<MongoSourceConfig>> INITIALIZERS =
@@ -745,6 +753,9 @@ public String value() {
745753

746754
public MongoSourceConfig(final Map<?, ?> originals) {
747755
this(originals, true);
756+
if (Boolean.parseBoolean((String) originals.get(CUSTOM_AUTH_ENABLE_CONFIG))) {
757+
customCredentialProvider = initializeCustomProvider(originals);
758+
}
748759
}
749760

750761
private MongoSourceConfig(final Map<?, ?> originals, final boolean validateAll) {

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

+8
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,14 @@ public void commandFailed(final CommandFailedEvent event) {
133133
.applyConnectionString(sourceConfig.getConnectionString())
134134
.addCommandListener(statisticsCommandListener)
135135
.applyToSslSettings(sslBuilder -> setupSsl(sslBuilder, sourceConfig));
136+
137+
if (sourceConfig.getCustomCredentialProvider() != null) {
138+
builder.credential(
139+
sourceConfig
140+
.getCustomCredentialProvider()
141+
.getCustomCredential(sourceConfig.originals()));
142+
}
143+
136144
setServerApi(builder, sourceConfig);
137145

138146
mongoClient =

src/main/java/com/mongodb/kafka/connect/util/ConnectionValidator.java

+16
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@
4949
import com.mongodb.event.ClusterListener;
5050
import com.mongodb.event.ClusterOpeningEvent;
5151

52+
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
53+
import com.mongodb.kafka.connect.source.MongoSourceConfig;
54+
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;
55+
5256
public final class ConnectionValidator {
5357

5458
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionValidator.class);
@@ -77,6 +81,18 @@ public static Optional<MongoClient> validateCanConnect(
7781
new ConnectionString(((Password) configValue.value()).value());
7882
MongoClientSettings.Builder mongoClientSettingsBuilder =
7983
MongoClientSettings.builder().applyConnectionString(connectionString);
84+
CustomCredentialProvider customCredentialProvider = null;
85+
if (connectorProperties instanceof MongoSinkConfig) {
86+
customCredentialProvider =
87+
((MongoSinkConfig) connectorProperties).getCustomCredentialProvider();
88+
} else if (connectorProperties instanceof MongoSourceConfig) {
89+
customCredentialProvider =
90+
((MongoSourceConfig) connectorProperties).getCustomCredentialProvider();
91+
}
92+
if (customCredentialProvider != null) {
93+
mongoClientSettingsBuilder.credential(
94+
customCredentialProvider.getCustomCredential(connectorProperties.originals()));
95+
}
8096
setServerApi(mongoClientSettingsBuilder, config);
8197

8298
MongoClientSettings mongoClientSettings =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.kafka.connect.util.custom.credentials;
17+
18+
import java.util.Map;
19+
20+
import com.mongodb.MongoCredential;
21+
22+
public interface CustomCredentialProvider {
23+
MongoCredential getCustomCredential(Map<?, ?> configs);
24+
25+
void validate(Map<?, ?> configs);
26+
27+
void init(Map<?, ?> configs);
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.kafka.connect.util.custom.credentials;
17+
18+
public final class CustomCredentialProviderConstants {
19+
private CustomCredentialProviderConstants() {}
20+
21+
public static final String CUSTOM_AUTH_ENABLE_CONFIG = "mongo.custom.auth.mechanism.enable";
22+
23+
public static final String CUSTOM_AUTH_PROVIDER_CLASS =
24+
"mongo.custom.auth.mechanism.providerClass";
25+
}

0 commit comments

Comments
 (0)