Skip to content

Commit

Permalink
a
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyihu6 committed Nov 24, 2023
1 parent 77f9bd3 commit 93d3582
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,16 @@ func (s *kafkaSink) Dial() error {
return err
}

if client != nil {
if err := client.RefreshMetadata(); err != nil {
// Now that we do not fetch metadata for all topics by default, we try
// RefreshMetadata manually to check for any connection error.
// TODO(wenyihu6): check if this is what we want
_ = client.Close()
return err
}
}

producer, err := s.newAsyncProducer(client)
if err != nil {
return err
Expand Down

0 comments on commit 93d3582

Please sign in to comment.