Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transaction coordinator client cann't reconnect to the broker #1237

Merged
merged 13 commits into from
Jul 11, 2024

Conversation

RobertIndie
Copy link
Member

@RobertIndie RobertIndie commented Jul 5, 2024

Fixes #1227

Motivation

There are some issues with the transactionCoordinatorClient in the go client. When using the transaction, if there are any reconnection happens during the transaction operation. The connection to the transaction coordinator won't be reconnected. This causes all following operations to fail with the connection closed error.

Modifications

  • Introduced transactionHandler to manage reconnections and handle requests.
  • Fix the tc client would crash if the broker doesn't enable the transaction
  • Improved the error handling in the tc client.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@RobertIndie RobertIndie self-assigned this Jul 5, 2024
@RobertIndie RobertIndie marked this pull request as ready for review July 8, 2024 10:33
@RobertIndie RobertIndie marked this pull request as draft July 8, 2024 10:38
@RobertIndie RobertIndie marked this pull request as ready for review July 9, 2024 10:19
@RobertIndie RobertIndie added this to the v0.13.0 milestone Jul 10, 2024
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR might be over complicated.

IIUC, the root cause is that when RequestOnCnx returns an error that indicates the connection is closed, it just propagate to the caller side. Here is a simple fix (without any backoff mechanism and timeout):

func (tc *transactionCoordinatorClient) sendRequest(cnxIndex uint64, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) error {
        for {
                res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[cnxIndex], requestID, cmdType, message)
                if err != nil {
                        if err.Error() != "connection closed" {
                                return err
                        }
                        if err = tc.grabConn(cnxIndex); err != nil {
                                return err
                        }
                        continue
                }
                if res.Response.EndTxnResponse.Error != nil {
                        return getErrorFromServerError(res.Response.EndTxnResponse.Error)
                }
                return nil
        }
}

Then replace the RequestOnCnx call like:

        err := tc.sendRequest(id.MostSigBits, requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
        tc.semaphore.Release()
        if err != nil {
                return err
        }
        return nil

The fix above will bring much less code changes and the code will look more simple.

pulsar/error.go Outdated Show resolved Hide resolved
@RobertIndie
Copy link
Member Author

The fix above will bring much less code changes and the code will look more simple.

This implementation misses some key details. For example, it lacks a backoff mechanism and connection timeout logic as you said. Also, what happens if many goroutines call grabConn at the same time? We might end up with multiple goroutines trying to handle reconnections simultaneously. This doesn't seem like a good approach. Although we could use locks or other methods to manage concurrent executions, there are still issues. What if there's a connection timeout and we need to notify the user? Plus, how do we inform other goroutines to stop attempting reconnections in that case? It seems we need complex and less intuitive logic to address these issues.

IIUC, the main simplification in this implementation involves transfering the XXOp and the event loop logic, right? The key difference appears to be the use of the event loop model. However, reducing this complexity doesn't seem very significant to me. Both producers and consumers already use the event loop for handling requests.

func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
}()
pc.log.Debug("get into runEventsLoop")

The Java implementation of the TransactionMetaStoreHandler also uses something like event loop, similar to Go:
TransactionMetaStoreHandler Java code

Moreover, without the event loop, it seems challenging to implement asynchronous methods like XXXAsync. For example, Producer.SendAsync. Although we don't have this feature yet, it would be beneficial to make these operations truly asynchronous in the future. The Java client already supports this feature:
Async support in Java client

This PR mainly adopts the eventloop patterns of the the existing producer and consumer. Maybe it's better to start a separate discussion for this.

@BewareMyPower
Copy link
Contributor

I see, currently the Context object is also not used so we need an event loop to handle it.

@BewareMyPower BewareMyPower merged commit 50dce7e into apache:master Jul 11, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] calling NewTransaction always fails after reconnection happens
3 participants