Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to handle with slow consumer? #204

Closed
liudanking opened this issue Jan 9, 2017 · 6 comments
Closed

how to handle with slow consumer? #204

liudanking opened this issue Jan 9, 2017 · 6 comments
Labels

Comments

@liudanking
Copy link

I have a very slow consumer with nsq topic:

func consumerHanlder(msg *nsq.Message) error {
	msg.Finish()
	...
       // cost about 10 min
       ...
	log.Printf("handler %s done", msg.Body)
	return nil
}

After a while, I got some error with nsq:

2017/01/09 15:24:01 ERR    2 [test-topic/test-topic-channel] (127.0.0.1:4150) IO error - EOF
2017/01/09 15:24:01 INF    2 [test-topic/test-topic-channel] (127.0.0.1:4150) beginning close
2017/01/09 15:24:01 INF    2 [test-topic/test-topic-channel] (127.0.0.1:4150) readLoop exiting
2017/01/09 15:24:01 INF    2 [test-topic/test-topic-channel] (127.0.0.1:4150) breaking out of writeLoop
2017/01/09 15:24:01 INF    2 [test-topic/test-topic-channel] (127.0.0.1:4150) writeLoop exiting
2017/01/09 15:24:01 INF    2 [test-topic/test-topic-channel] (127.0.0.1:4150) finished draining, cleanup exiting
2017/01/09 15:24:01 INF    2 [test-topic/test-topic-channel] (127.0.0.1:4150) clean close complete
2017/01/09 15:24:01 WRN    2 [test-topic/test-topic-channel] there are 0 connections left alive

What is the best practice with slow consumer handler?

@ploxiln
Copy link
Member

ploxiln commented Jan 9, 2017

Best practice is to use Message.Touch() periodically, perhaps roughly every 10 seconds, if the message is still being processed (successfully so far). The other thing you'll probably want to do is set the consumer Config setting "msg_timeout" which will tell the server how long to wait for a message sent to this consumer to finish before retrying it. It's not a good idea to set this too high because it makes failure/crash detection take a lot longer. By default this value is 60 seconds, and by default the server accepts a maximum of 15 minutes for this value.

nsq is optimized more for a very high rate and volume of relatively quick-to-process messages, so it might not be perfect for your use case.

@judwhite
Copy link
Contributor

@liudanking We use NSQ for long-running messages (10+ minutes) and we use TOUCH to let nsqd know we're still working on it. In the nsqd arguments you can set -msg-timeout to increase to default timeout, or you can set this in the client; we do this in the client and keep the nsqd default.

If you expect to go over the 15m value mentioned by @ploxiln you'll also need to set -max-msg-timeout on nsqd. This value cannot be overridden on the client and it doesn't matter if a message was recently TOUCH'd, it'll still requeue the message once this threshold is hit (as a hard limit for misbehaving clients). In our use case we increased -max-msg-timeout for some workloads which can't be fanned out.

@twmb
Copy link
Contributor

twmb commented Jan 18, 2017

I'm guessing that consumerHandler is called by an interface stub that implements Handlers HandleMessage, i.e.

func (s stub) HandleMessage(msg *nsq.Message) error {
        return consumerHandler(msg)
}

Nothing about this problem is related to the message timing out, as the first thing that happens inside consumerHandler is msg.Finish(). After this point, there is no more message to time out. Additionally, the heartbeat between nsqd and its consumers should prevent connection timeout.

The likely cause of the problem here is that consumerHandler is tying up the goroutines used for reading the connection:

go-nsq/conn.go

Line 519 in a53d495

c.delegate.OnMessage(c, msg)

If the delegate does not return, readLoop does not process heartbeats.

The better solution here would be to send the message to a concurrent goroutine that does the processing. If this is done, it's important to not have every message start a goroutine and eat memory (unless there are not many messages at a time). Lastly, the concurrent goroutine cannot block the handler, otherwise you will have the same issue that currently exists.

@mreiferson
Copy link
Member

Good catch, I completely missed that the body of the handler was calling msg.Finish() first.

Making it concurrent effectively creates an internal queue, which would still need to be bounded, and thus would ultimately block the read loop anyway. It might offer some optimization because you would be reading while processing though, shrug.

@judwhite
Copy link
Contributor

judwhite commented Jan 22, 2017

If the delegate does not return, readLoop does not process heartbeats.

@twmb Are you sure? Maybe I'm missing something

go-nsq/consumer.go

Lines 636 to 641 in a53d495

func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddInt64(&r.totalRdyCount, -1)
atomic.AddUint64(&r.messagesReceived, 1)
r.incomingMessages <- msg
r.maybeUpdateRDY(c)
}

Edit: I got you, if sending to the incomingMessages channel is blocked because you don't have any concurrent handlers left then you block readLoop.

Even if msg.Finish() wasn't called I wonder how the case of a long running handler that doesn't know it's been timed out affects readLoop. The RDY count wouldn't be updated so... it wouldn't receive more messages, and everything would be okay? I always wondered why MaxRDYCount and concurrent handlers were different concepts. Probably I should take this 'offline' to IRC. Sorry for the distraction.

@judwhite
Copy link
Contributor

judwhite commented Jan 22, 2017

@liudanking Try removing msg.Finish() from your handler; this is called automatically for you if your handler returns a nil error. I personally only use msg.Finish() if I know I've reached a permanent error state (such as message failed to deserialize, message validation failed, etc) and don't want the message retried.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants