Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connector is not working as expected using AWS IAM AUTH #142

Closed
ns-se-ops opened this issue Nov 15, 2023 · 3 comments
Closed

Connector is not working as expected using AWS IAM AUTH #142

ns-se-ops opened this issue Nov 15, 2023 · 3 comments

Comments

@ns-se-ops
Copy link

ns-se-ops commented Nov 15, 2023

I have two jars in my plugin zip file :

https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/2.1.0
https://github.com/aws/aws-msk-iam-auth/releases

I created connector using below configuration and selected source AWS MSK with IAM Authentication :

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
snowflake.topic2table.map=snowflake-topic:kafka_table
tasks.max=1
topics=snowflake-topic
buffer.flush.time=60
snowflake.url.name=<removed>.snowflakecomputing.com:443
snowflake.database.name=kafka_db
snowflake.schema.name=kafka_schema
buffer.count.records=1
snowflake.user.name=kafka_connector_user
snowflake.private.key=<removed>
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
buffer.size.bytes=5000000

The Connector was running in the console but in backend I got below errors :

1700071730000,"[Worker-0f5c0100d321988e8] [2023-11-15 18:08:50,626] INFO [snowflake-connector|task-0] [SF_KAFKA_CONNECTOR] task started, execution time: 0 milliseconds (com.snowflake.kafka.connector.SnowflakeSinkTask:46)"
1700071730000,"[Worker-0f5c0100d321988e8] [2023-11-15 18:08:50,626] INFO [snowflake-connector|task-0] WorkerSinkTask{id=snowflake-connector-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:309)"
1700071730000,"[Worker-0f5c0100d321988e8] [2023-11-15 18:08:50,629] INFO [snowflake-connector|task-0] [Consumer clientId=connector-consumer-snowflake-connector-0, groupId=connect-snowflake-connector] Failed to create channel due to  (org.apache.kafka.common.network.SaslChannelBuilder:227)"
1700071730000,[Worker-0f5c0100d321988e8] org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700071730000,[Worker-0f5c0100d321988e8] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM
1700071730000,"[Worker-0f5c0100d321988e8] [2023-11-15 18:08:50,630] WARN [snowflake-connector|task-0] [Consumer clientId=connector-consumer-snowflake-connector-0, groupId=connect-snowflake-connector] Error connecting to node b-1.msk.xxxxx.xx.kafka.us-east-1.amazonaws.com:9098 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1011)"
1700071730000,[Worker-0f5c0100d321988e8] java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]

1700071730000,[Worker-0f5c0100d321988e8] Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700071730000,[Worker-0f5c0100d321988e8] 	at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:228)
1700071730000,[Worker-0f5c0100d321988e8] 	at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
1700071730000,[Worker-0f5c0100d321988e8] 	... 24 more
1700071730000,[Worker-0f5c0100d321988e8] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700071730000,[Worker-0f5c0100d321988e8] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM

At first, I thought this could be an issue with the SnowflakeSinkConnector so I decided to create another connector with S3Sink : https://www.confluent.io/hub/confluentinc/kafka-connect-s3

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
format.class=io.confluent.connect.s3.format.json.JsonFormat
flush.size=1
schema.compatibility=NONE
topics=s3sinktopic
tasks.max=1
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=<removed>

What I see, the same issue the connector is running in the console, but in backend I got these errors :

1700108453000,"[Worker-0c13cfeafdaec1993] [2023-11-16 04:20:53,001] INFO [s3sinkiamauth|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)"
1700108453000,"[Worker-0c13cfeafdaec1993] [2023-11-16 04:20:53,001] INFO [s3sinkiamauth|task-0] WorkerSinkTask{id=s3sinkiamauth-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:309)"
1700108453000,"[Worker-0c13cfeafdaec1993] [2023-11-16 04:20:53,005] INFO [s3sinkiamauth|task-0] [Consumer clientId=connector-consumer-s3sinkiamauth-0, groupId=connect-s3sinkiamauth] Failed to create channel due to  (org.apache.kafka.common.network.SaslChannelBuilder:227)"
1700108453000,[Worker-0c13cfeafdaec1993] org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700108453000,[Worker-0c13cfeafdaec1993] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM
1700108453000,"[Worker-0c13cfeafdaec1993] [2023-11-16 04:20:53,006] WARN [s3sinkiamauth|task-0] [Consumer clientId=connector-consumer-s3sinkiamauth-0, groupId=connect-s3sinkiamauth] Error connecting to node b-1.msk.xxxxx.xx.kafka.us-east-1.amazonaws.com:9098 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1011)"
1700108453000,[Worker-0c13cfeafdaec1993] java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1006)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:313)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:457)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
1700108453000,[Worker-0c13cfeafdaec1993] 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
1700108453000,[Worker-0c13cfeafdaec1993] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
1700108453000,[Worker-0c13cfeafdaec1993] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
1700108453000,[Worker-0c13cfeafdaec1993] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
1700108453000,[Worker-0c13cfeafdaec1993] 	at java.base/java.lang.Thread.run(Thread.java:829)
1700108453000,[Worker-0c13cfeafdaec1993] Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:228)
1700108453000,[Worker-0c13cfeafdaec1993] 	at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
1700108453000,[Worker-0c13cfeafdaec1993] 	... 24 more
1700108453000,[Worker-0c13cfeafdaec1993] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700108453000,[Worker-0c13cfeafdaec1993] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM

I am able to produce and consume using kafka-console-producer and kafka-console-consumer with below client.properties :

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

If you are planning to tell me add the same details in the connector configuration, I would tell you not to do so. Because when we select IAM Auth for connection, these are added automatically.

Could you please let me know, what is wrong in it ?

Thanks,

@ns-se-ops ns-se-ops closed this as not planned Won't fix, can't repro, duplicate, stale Nov 15, 2023
Copy link

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

@ns-se-ops ns-se-ops changed the title Unable to download jar file from the repo Connector is not working as expected using AWS IAM AUTH Nov 16, 2023
@ns-se-ops ns-se-ops reopened this Nov 16, 2023
@ns-se-ops
Copy link
Author

Resolution Steps for Creating AWS MSK Connect :

  • Make sure broker type >= m5.large (Due to connection limitations as mentioned here)
  • Do not include IAM JAR in the plugin zip (The service adds it automatically when IAM Auth is selected)
  • Do not include IAM SASL properties (The service adds it automatically when IAM Auth is selected)

Copy link

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant