Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: memory opt in sarama metadata fetching #114740

Merged
merged 1 commit into from
Dec 13, 2023

Conversation

wenyihu6
Copy link
Contributor

@wenyihu6 wenyihu6 commented Nov 20, 2023

Prior to this commit, sarama clients fetch metadata for all topics periodically.
This takes a substantial amount of memory when users create too many topics.
This patch changes sarama configuration so that we only fetch metadata for a
minimal set of topics that have been necessary.

Fixes: #113576
Release Note: none

Copy link

blathers-crl bot commented Nov 20, 2023

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@wenyihu6 wenyihu6 self-assigned this Nov 20, 2023
@wenyihu6 wenyihu6 force-pushed the sarama-fix branch 3 times, most recently from f6f7751 to 93d3582 Compare November 24, 2023 19:55
@wenyihu6 wenyihu6 marked this pull request as ready for review November 27, 2023 14:21
@wenyihu6 wenyihu6 requested review from a team as code owners November 27, 2023 14:21
@wenyihu6 wenyihu6 requested review from herkolategan, srosenberg, jayshrivastava and miretskiy and removed request for a team November 27, 2023 14:21
@wenyihu6
Copy link
Contributor Author

pkg/ccl/changefeedccl/sink_kafka.go line 265 at r3 (raw file):

			// RefreshMetadata manually to check for any connection error.
			// TODO(wenyihu6): check if this is what we want
			_ = client.Close()

I added this part of changes as a separate commit since I'm not sure if we want this. We can discuss more during our 1:1 today if it is complex.

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blathers is wrong -- nice job on writing the roachtest.

I have mostly nits/suggestions on how to improve the test. Good job on this!

Reviewed 1 of 1 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @wenyihu6)


pkg/ccl/changefeedccl/sink_kafka.go line 265 at r3 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

I added this part of changes as a separate commit since I'm not sure if we want this. We can discuss more during our 1:1 today if it is complex.

I think we need this -- otherwise, I think that the Dial() would actually be useless
as no network requests will be made and thus we won't know about
connectivity errors.

Let's return the error -- this is consistent with what was happening before.
NewClient code in sarama does:

if conf.Metadata.Full {  
  // do an initial fetch of all cluster metadata by specifying an empty list of topics  
  err := client.RefreshMetadata()  
  if err == nil {  
  } else if errors.Is(err, ErrLeaderNotAvailable) || errors.Is(err, ErrReplicaNotAvailable) || errors.Is(err, ErrTopicAuthorizationFailed) || errors.Is(err, ErrClusterAuthorizationFailed) {  
   // indicates that maybe part of the cluster is down, but is not fatal to creating the client  
   Logger.Println(err)  
  } else {  
   close(client.closed) // we haven't started the background updater yet, so we have to do this manually  
   \_ = client.Close()  
   return nil, err  
  }  
}

That is we call RefreshMetadata and sometimes we would return an error which we would treat as
as error to return to the caller. Thus we should do the same.

You also do not need to ignore client.Close() error here; instead you can do:

if err := client.RefreshMetadata(); err != nil {
   return errors.CombineErrors(err, client.Close())
}

pkg/ccl/changefeedccl/sink_kafka.go line 1085 at r3 (raw file):

	config.Producer.Return.Successes = true
	config.Producer.Partitioner = newChangefeedPartitioner
	config.Metadata.Full = false

I know we don't have comments on the other params, but let's start to make things better.
Let's add a short comment, something like // do not fetch metadata for all topics, just the ones we need


pkg/cmd/roachtest/tests/cdc.go line 1484 at r3 (raw file):

		Cluster:          r.MakeClusterSpec(4, spec.Arch(vm.ArchAMD64)),
		Leases:           registry.MetamorphicLeases,
		CompatibleClouds: registry.AllExceptAWS,

why except AWS? I think this should work on all clouds?
I think this is a copy-paste issue. It's fine to leave it as is since this is consistent with
other tests, but I have filed #115093


pkg/cmd/roachtest/tests/cdc.go line 1504 at r3 (raw file):

			t.Status("creating kafka topics")
			for i := 0; i < 1000; i++ {
				if err := kafka.createTopic(ctx, "topicname"+fmt.Sprintf("%v", i)); err != nil {

use "%d" format; or if you prefer, strconv.Itoa(i)


pkg/cmd/roachtest/tests/cdc.go line 1521 at r3 (raw file):

			results, err := ct.cluster.RunWithDetails(ct.ctx, t.L(),
				ct.cluster.Range(1, c.Spec().NodeCount-1),
				"grep \"client/metadata fetching metadata for all topics from broker\" logs/cockroach.log")

I think we should also verify that the log contains the message we do expect to see.
I think you can do this by simplifying your grep pattern: Grep for
client/metadata fetching metadata for. This will match both the thing we expect
not to see -- client/metadata fetching metadata for all topics -- as well as the thing
we expect to see client/metadata fetching metadata for [topicname0 topicname1 ....].

So, after you got the grep output, do the error checking and then make sure that
Stdout does not contain the thing we don't want to see and contains the thing we do want
to see.


pkg/cmd/roachtest/tests/cdc.go line 1532 at r3 (raw file):

					t.Fatalf("expected empty stdout but got %s", res.Stdout)
				}
				if res.Stderr != "" {

Probably not needed -- grep prints on stdout; and if anything is printed on stderr, it should also exit with error code (and if it's some sort of an "error info" message -- I don't know, like "disk full" -- we don't care about that).

@miretskiy
Copy link
Contributor

I think you should squash the commits together.

@wenyihu6
Copy link
Contributor Author

pkg/ccl/changefeedccl/sink_kafka.go line 1085 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I know we don't have comments on the other params, but let's start to make things better.
Let's add a short comment, something like // do not fetch metadata for all topics, just the ones we need

Done.

@wenyihu6
Copy link
Contributor Author

pkg/cmd/roachtest/tests/cdc.go line 1484 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

why except AWS? I think this should work on all clouds?
I think this is a copy-paste issue. It's fine to leave it as is since this is consistent with
other tests, but I have filed #115093

I will try looking into this issue after I finish my current milestones. From a brief look, it looks like this convention was introduced by 5eb4a53.

@wenyihu6
Copy link
Contributor Author

pkg/cmd/roachtest/tests/cdc.go line 1504 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

use "%d" format; or if you prefer, strconv.Itoa(i)

Done.

@wenyihu6
Copy link
Contributor Author

pkg/cmd/roachtest/tests/cdc.go line 1521 at r3 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

Just realized a chicken and egg situation here - calling Dial() here causes sarama to print client/metadata fetching metadata for all topics from broker again. We could try changing sarama.Logger inside Dial() but it feel unnecessary and might make the code confusing.

Done.

@wenyihu6 wenyihu6 force-pushed the sarama-fix branch 2 times, most recently from 2f2d360 to 080e44b Compare November 28, 2023 18:04
@wenyihu6
Copy link
Contributor Author

Do you might know why sarama outputs client/metadata fetching metadata for [‹×› ‹×› ‹×› ‹×› ‹×› ‹×› ‹×› ‹×› ‹×›] from broker ‹×› with the ‹×› placeholders rather than the actual strings? I tried printing the string and they were replaced by the actual string client/metadata fetching metadata for [‹customer› ‹history› ‹new_order› ‹district› ‹item› ‹order_line› ‹warehouse› ‹order› ‹stock›] from broker ‹10.142.1.135:9092› and the test passed. But if I do not try printing it, the test fails.

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Nov 28, 2023

pkg/cmd/roachtest/tests/cdc.go line 1503 at r6 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

does it take long time to create many topics?
nit: consider having const numTopics = ... and use "creating %d kafka topics" status message above.

I cut it to 10 mainly because we are checking to make sure we are not fetching metadata for every topics here. Changed it to 100 now that we only check for the topic name prefix.

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Nov 28, 2023

pkg/cmd/roachtest/tests/cdc.go line 1530 at r6 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

couldn't you just check to make sure that stdout contains
fmt.Sprintf("fetching metadata for %v topics", allTpccTargets)

(the %v should print the list, space separated, surrounded by [] -- like you see in the logs).

I think the order that the elements of the slice are printed do not seem to be consistent tho.
Screenshot 2023-11-28 at 3.33.21 PM.png

@wenyihu6
Copy link
Contributor Author

pkg/cmd/roachtest/tests/cdc.go line 1539 at r6 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I think checking for all topic names explicitly is probably an overkill.
You can simply take "topicname" and make it into a constant with an "obvious" value -- something like `const ignoreTopicPrefix = "ignore_topic_do_not_fetch". Simply check to see if stdout contains a single instance of that topic prefix.

Agreed. I changed it back to creating 100 topics as well.

@wenyihu6
Copy link
Contributor Author

pkg/cmd/roachtest/tests/cdc.go line 1532 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Probably not needed -- grep prints on stdout; and if anything is printed on stderr, it should also exit with error code (and if it's some sort of an "error info" message -- I don't know, like "disk full" -- we don't care about that).

Ack.

@wenyihu6
Copy link
Contributor Author

Friendly ping on this review - hoping to merge it this week.

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r7.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @wenyihu6)


pkg/ccl/changefeedccl/sink_kafka.go line 260 at r6 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

I think some unit tests failed with accessing nil pointer when I excluded this check.

I suspect that's because we use OverrideClientConfig knob (https://github.com/cockroachdb/cockroach/blob/b7b0b433f8db34eaf59872dfb376187f02f588b6/pkg/ccl/changefeedccl/sink_kafka.go#L277-L277)...

OverrideClientInit: func(config \*sarama.Config) (kafkaClient, error) {  
  return nil, nil  
},

I think it would be better to change those tests to return a non-nil, &fakeKafkaClient{config} instead.

@wenyihu6 wenyihu6 force-pushed the sarama-fix branch 3 times, most recently from 67e5d86 to f4eed60 Compare December 13, 2023 02:20
Prior to this commit, sarama clients fetch metadata for all topics periodically.
This takes a substantial amount of memory when users create too many topics.
This patch changes the configuration for sarama clients so that we only fetch
metadata for a minimal set of topics that have been necessary.

Fixes: cockroachdb#113576
Release Note: none
@wenyihu6
Copy link
Contributor Author

pkg/ccl/changefeedccl/sink_kafka.go line 260 at r6 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I suspect that's because we use OverrideClientConfig knob (https://github.com/cockroachdb/cockroach/blob/b7b0b433f8db34eaf59872dfb376187f02f588b6/pkg/ccl/changefeedccl/sink_kafka.go#L277-L277)...

OverrideClientInit: func(config \*sarama.Config) (kafkaClient, error) {  
  return nil, nil  
},

I think it would be better to change those tests to return a non-nil, &fakeKafkaClient{config} instead.

Done. Changed it to return fakeKafkaClient and nil error instead.

OverrideClientInit: func(config *sarama.Config) (kafkaClient, error) {
				client := &fakeKafkaClient{config}
				return client, nil
			},

@wenyihu6
Copy link
Contributor Author

The last push was to fix the comment & rebase on master.

TFTR!

bors r=miretskiy

@craig
Copy link
Contributor

craig bot commented Dec 13, 2023

Build failed (retrying...):

@craig
Copy link
Contributor

craig bot commented Dec 13, 2023

Build succeeded:

@craig craig bot merged commit 803dee3 into cockroachdb:master Dec 13, 2023
9 checks passed
@wenyihu6 wenyihu6 deleted the sarama-fix branch December 13, 2023 14:23
wenyihu6 added a commit to wenyihu6/cockroach that referenced this pull request Dec 20, 2023
Patch (cockroachdb#114740) changed Dial() in pkg/ccl/changefeedccl/sink_kafka.go to refresh
metadata exclusively for kafkaSink.topics() rather than for all topics. This
triggered a sarama error ( an invalid replication factor in Kafka servers). It
seems hard to get to the bottom of it given that there are too many unknowns
from sarama and kafka side (More in cockroachdb#116872). We decided to revert back to what
Dial() previously did in sarama code and will continue investigating afterwards.

Part of: cockroachdb#116358, cockroachdb#116872
Release note: None
wenyihu6 added a commit to wenyihu6/cockroach that referenced this pull request Dec 21, 2023
Patch (cockroachdb#114740) changed Dial() in pkg/ccl/changefeedccl/sink_kafka.go to refresh
metadata exclusively for kafkaSink.topics() rather than for all topics. This
triggered a sarama error ( an invalid replication factor in Kafka servers). It
seems hard to get to the bottom of it given that there are too many unknowns
from sarama and kafka side (More in cockroachdb#116872). We decided to revert back to what
Dial() previously did in sarama code and will continue investigating afterwards.

Part of: cockroachdb#116872
Fixes: cockroachdb#116358
Release note: None
craig bot pushed a commit that referenced this pull request Dec 21, 2023
116414: changefeedccl: revert Dial() to fetch all metadata topics  r=jayshrivastava a=wenyihu6

Patch (#114740) changed Dial() in pkg/ccl/changefeedccl/sink_kafka.go to refresh
metadata exclusively for kafkaSink.topics() rather than for all topics. This
triggered a sarama error ( an invalid replication factor in Kafka servers). It
seems hard to get to the bottom of it given that there are too many unknowns
from sarama and kafka side (More in #116872). We decided to revert back to what
Dial() previously did in sarama code and will continue investigating afterwards.

Part of: #116872
Fixes: #116358
Release note: None

116894: storage: increase testing shard count r=jbowens a=itsbilal

We're starting to sporadically see failures when the test shard for pkg/storage times out as a whole. This change doubles the shard count to give each test more headroom before it times out.

Fixes #116692.

Epic: none

Release note: None

116910: testserver: disable tenant randomization under race in multi-node clusters r=yuzefovich a=yuzefovich

We now run `race` builds in the EngFlow environment which has either 1 CPU or 2 CPU executors. If we have a multi-node cluster and then start a default test tenant, then it's very likely for that environment to be overloaded and lead to unactionable test failures. This commit prevents this from happening by disabling the tenant randomization under race in multi-node clusters. As a result, it optimistically unskips a few tests that we skipped due to those unactionable failures.

Fixes: #115619.

Release note: None

116956: sql: skip Test{Experimental}RelocateVoters under duress r=yuzefovich a=yuzefovich

There is little value in running these tests in complex configs.

Fixes: #116939.

Release note: None

Co-authored-by: Wenyi Hu <wenyi@cockroachlabs.com>
Co-authored-by: Bilal Akhtar <bilal@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

changefeedccl: kafka topic metadata memory usage with large number of changefeeds
3 participants