Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate user provided topics before using them #426

Merged
merged 1 commit into from
Jan 4, 2021

Conversation

wuYin
Copy link
Contributor

@wuYin wuYin commented Dec 21, 2020

Issue

If user provided retry topic is same with consumer topic

  • Currently underlayer will create 2 same topic consumers to implement retry, but it's unnecessary, client-java only create 1 consumer to do this, see ConsumerBuilderImpl.java#L139
  • When user invoke consumer.Close(), only the 2th consumer will be closed directly, consumer 1st is still active, there are potential memory leak.

Reproducation

func main() {
	client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
	defer client.Close()

	topic := "topic-01"
	consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            topic,
		SubscriptionName: "my-sub",
		Type:             pulsar.Shared,
		RetryEnable:      true,
		DLQ: &pulsar.DLQPolicy{
			RetryLetterTopic: topic,
			MaxDeliveries:    2,
		},
	})
	consumer.Close()
}
  • issue: consumer 1 created but not close directly, it will be closed before client closing.

  • log

    INFO[0000] [Connected consumer]  consumerID=1 name=pzxfg subscription=my-sub topic="persistent://public/default/topic-01"
    INFO[0000] [Created consumer]    consumerID=1 name=pzxfg subscription=my-sub topic="persistent://public/default/topic-01"
    INFO[0000] [Connected consumer]  consumerID=2 name=pzxfg subscription=my-sub topic="persistent://public/default/topic-01"
    INFO[0000] [Created consumer]    consumerID=2 name=pzxfg subscription=my-sub topic="persistent://public/default/topic-01"
    INFO[0000] Closing consumer=[2]  consumerID=2 name=pzxfg subscription=my-sub topic="persistent://public/default/topic-01"
    INFO[0000] [Closed consumer]     consumerID=2 name=pzxfg subscription=my-sub topic="persistent://public/default/topic-01"
    ---end
    

Cause

MultiTopicConsumer is using map[topic]consumer to manage consumers, but in the above case, only 2th consumer will be
corresponded with the topic, the 1st dosen't since map overwriting, it means 1st isn't managed by multiTopicConsumer , see consumer_multitopic.go#L66

Motivation

Modifications

  • add distinct utility function to deduplicate user provided topics

Verifying this change

  • Make sure that the change passes the CI checks.

@wuYin wuYin changed the title deduplicate user topics before using them deduplicate user provided topics before using them Dec 21, 2020
@wuYin wuYin changed the title deduplicate user provided topics before using them Deduplicate user provided topics before using them Dec 21, 2020
@wolfstudy wolfstudy added this to the 0.4.0 milestone Jan 4, 2021
Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

@wolfstudy wolfstudy merged commit fb10d4f into apache:master Jan 4, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants