Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer can enter unresponsive retry loop on disconnect (ignoring send context) #461

Open
flowchartsman opened this issue Feb 4, 2021 · 1 comment

Comments

@flowchartsman
Copy link
Contributor

flowchartsman commented Feb 4, 2021

I have a producer set up like the following:

        client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	})
	if err != nil {
		log.Fatal(err)
	}

	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: "tn/ns/topic",
	})
	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	ticker := time.NewTicker(300 * time.Millisecond)
	defer ticker.Stop()

	ctx, cFunc := context.WithCancel(context.Background())
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, os.Interrupt, os.Kill)
	go func() {
		<-sig
		cFunc()
	}()

MainLoop:
	for {
		select {
		case <-ctx.Done():
			break MainLoop
		case <-ticker.C:
			// continue on
		}
		a := Article{
	                //fields here
		}
                 b, _ := json.Marshal(a)
		 _, err := producer.Send(ctx, &pulsar.ProducerMessage{
			Payload: b,
		})
		if err != nil {
			log.Println("error producing message:", err)
		}

If the broker goes away or I spin down the (standalone local) cluster, the producer enters a loop like the following and becomes unresponsive to sigint. It must be killed:

INFO[29795] [Connection closed]                           remote_addr="pulsar://localhost:6650"
INFO[29803] [Connecting to broker]                        remote_addr="pulsar://localhost:6650"
WARN[29803] [Failed to connect to broker.]                error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
INFO[29803] [Connection closed]                           remote_addr="pulsar://localhost:6650"
INFO[29819] [Connecting to broker]                        remote_addr="pulsar://localhost:6650"
WARN[29819] [Failed to connect to broker.]                error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
INFO[29819] [Connection closed]                           remote_addr="pulsar://localhost:6650"
WARN[29819] [Failed to lookup topic]                      error="connection error" producerID=1 producer_name=standalone-0-0 topic="persistent://tn/ns/topic"
INFO[29819] [Reconnecting to broker in  1m0s]             producerID=1 producer_name=standalone-0-0 topic="persistent://tn/ns/topic"
INFO[29820] [Connecting to broker]                        remote_addr="pulsar://localhost:6650"
WARN[29820] [Failed to connect to broker.]                error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"

This means it gets into a state where the send is ignoring the context. This may be occurring in the marked location in producer_partition.go below. I haven't had the time to research it yet, but it looks like a likely candidate.

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
	sr := &sendRequest{
		ctx:              ctx,
		msg:              msg,
		callback:         callback,
		flushImmediately: flushImmediately,
		publishTime:      time.Now(),
	}
	p.options.Interceptors.BeforeSend(p, msg)

	if p.options.DisableBlockIfQueueFull {
		if !p.publishSemaphore.TryAcquire() {
			if callback != nil {
				callback(nil, msg, errSendQueueIsFull)
			}
			return
		}
	} else {
		p.publishSemaphore.Acquire()
	}

	p.metrics.MessagesPending.Inc()
	p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))

	p.eventsChan <- sr  // <- Here? Select clause with ctx.Done?
}
@wuYin
Copy link
Contributor

wuYin commented Feb 4, 2021

Issue

Infinite reconnection caused the message sending timeout checking doesn't work? If so, it is an known bug

Background

The context passed to Send is actually not used, send timeout checking will be done before batch flush which drivered by flush ticker
When broker connection broken, partition producer blocked to reconnect until succeed, and further blocked it's eventloop goroutine, eventually there is no chance to check send timeout

This feature introduced in #394, and it's my mistake, I tried fixed in #460

Hope this explanation helps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants