-
Notifications
You must be signed in to change notification settings - Fork 779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SASL: Support AWS_MSK_IAM mechanism #661
Comments
For potential implementation reference, I've added support for AWS_MSK_IAM in my franz-go client. |
Hello @jgiles, thanks for opening the discussion. We've historically refrained from bringing in more SASL mechanisms in kafka-go due to the external dependencies that they tend to require, and the often unfortunate consequences of forcing external dependencies on programs that depend on kafka-go (too many times we have had builds broken by external dependencies disappearing or not respecting versioning rules, etc...). We do recognize that there is a need to integrate kafka-go with these SASL mechanisms, so we are thinking of introducing them as a separate modules within the repository, probably with a structure like this:
The Let me know what you think about this. @twmb thanks for sharing the work you did, this will very likely be a useful reference to support adding AWS_MSK_IAM to kafka-go 👍 |
@achille-roussel I like the plan to use Go modules within this repo for supported SASL "plugins" with dependencies that should be isolated from the rest of the repo. I have seen this strategy used effectively in other open-source Go projects with similar needs. |
Great! @jgiles would you be able to submit a pull request to add the AWS_MSK_IAM SASL mechanism then? |
I or someone on my team might be able to, but the timing would be uncertain based on some ongoing internal design+prioritization discussions. @achille-roussel we will post on this issue if we pick this up for development (or establish a clear timeline for doing so). For now, a couple questions:
Just to check that we wouldn't be duplicating work or heading off on the wrong direction :-) |
This is correct, we use assignees to state who is the point of contact in the maintainers team.
I believe the could would have to work with the existing SASL interfaces we have in kafka-go https://github.com/segmentio/kafka-go/blob/master/sasl/sasl.go#L1-L44 No other concerns otherwise. I'll be happy to discuss further if there are other design decisions you'd want feedback on. |
I've started on an implementation of this, and, in testing, I've run into another issue that does not seem to be tied to the SASL mechanism. It's deep in the internals of the sasl handshake, before the actual sasl mechanism is called, and I was wondering if the maintainers would have some insights. Using this example (with real values, of course), and this sasl mechanism, I'm hitting and error in the first part of the sasl handshake. When Edit: The above issue was due to my failure to configure TLS. The error was coming back as a simple EOF, which wasn't very helpful, but that's a separate issue |
This should be supported as of v0.4.25! |
I ran into a connection issue when I was trying to implement this new msk iam feature, took me hours to figure it out so put my solution in case helpful:
|
@ddvkid Are you by chance using the I've tested the role itself w/ ec2Metadata using a test kafka-cli inside a pod/container in our EKS cluster, so I believe the role and eks serviceAccount are working. Error:
Example Code I'm using: package main
import (
"context"
"crypto/tls"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam"
"os"
"strings"
"time"
)
func main() {
sess, err := session.NewSession()
if err != nil {
log.Logger.Fatal().Err(err)
}
awsCredentials := credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.EnvProvider{},
&ec2rolecreds.EC2RoleProvider{
Client: ec2metadata.New(sess),
},
})
kafkaDialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: &aws_msk_iam.Mechanism{
Signer: sigv4.NewSigner(awsCredentials),
Region: os.Getenv("AWS_DEFAULT_REGION"),
},
TLS: &tls.Config{},
}
brokers := strings.Split(os.Getenv("KAFKA_SERVERS"), ",")
topic := "quickstart-events"
batchSize := int(10e6) // 10MB
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
Dialer: kafkaDialer,
Partition: 0,
MinBytes: batchSize,
MaxBytes: batchSize,
})
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Logger.Err(err).Msg("error while reading batch") // <-- Error log from above
break
}
log.Logger.Info().Msgf("Got Kafka Event %q", string(m.Value))
}
} |
I created a separate issue in the event this is a bug and wrong usage of the sdk: #832 |
Describe the solution you'd like
Supporting documentation
The text was updated successfully, but these errors were encountered: