Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible data race in internalFlush #1258

Closed
Gilthoniel opened this issue Jul 26, 2024 · 2 comments · Fixed by #1261
Closed

Possible data race in internalFlush #1258

Gilthoniel opened this issue Jul 26, 2024 · 2 comments · Fixed by #1261

Comments

@Gilthoniel
Copy link
Contributor

Gilthoniel commented Jul 26, 2024

Expected behavior

When calling Flush or FlushWithCtx, enqueued messages should all be sent.

Actual behavior

When flushing, switching to a new channel can lead to a message loss:

	if len(p.dataChan) != 0 {
		oldDataChan := p.dataChan
		p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages)
		for len(oldDataChan) != 0 {
			pendingData := <-oldDataChan
			p.internalSend(pendingData)
		}
	}

If internalSendAsync is sending the request to the channel at the same time as the switch, it may happen that the length will be zero while flushing, and then become one so the message will be stuck in the channel.

Steps to reproduce

I never actually observed this in practice but I noticed this bug while reading the code for a different issue. I'm confident this can happen but I'd like your opinion.

System configuration

Pulsar version: v3.0.5

@Gilthoniel
Copy link
Contributor Author

Gilthoniel commented Jul 26, 2024

A different approach would be something like that:

// flushDataChan first empties the data channel as much as possible, then send
// the different pending requests.
func (p *partitionProducer) flushDataChan() {
	var reqs []*sendRequest

	for {
		select {
		case pendingData := <-p.dataChan:
			reqs = append(reqs, pendingData)
		default:
			for _, req := range reqs {
				p.internalSend(req)
			}
                        return
		}
	}
}

It would also avoid the channel allocation for every flush in high load scenarios.

Gilthoniel added a commit to Gilthoniel/pulsar-client-go that referenced this issue Jul 26, 2024
Gilthoniel added a commit to Gilthoniel/pulsar-client-go that referenced this issue Jul 26, 2024
@gunli
Copy link
Contributor

gunli commented Jul 29, 2024

Hmm, internalSendAsync() is called by Send() or SendAsync(), IMO, you should not call Flush() and Send()/SendAsync() at the same time, I think it should be a convention.

RobertIndie pushed a commit that referenced this issue Jul 30, 2024
Fixes #1258 

### Motivation

While flushing, the data channel is switched if a new allocated one which can cause the loss of messages because the length can be zero which would stop the procedure and at the same time a new message can be sent to the channel.

### Modifications

Instead of allocating a new channel, it empties the existing one up to the length of the buffer of the channel before proceeding with the flush.
RobertIndie pushed a commit that referenced this issue Jul 31, 2024
Fixes #1258

### Motivation

While flushing, the data channel is switched if a new allocated one which can cause the loss of messages because the length can be zero which would stop the procedure and at the same time a new message can be sent to the channel.

### Modifications

Instead of allocating a new channel, it empties the existing one up to the length of the buffer of the channel before proceeding with the flush.

(cherry picked from commit 8dd4ed1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants