Skip to content

Commit

Permalink
[feat][txn]Implement transactionCoordinatorClient (#953)
Browse files Browse the repository at this point in the history
* [feat][txn]Implement transactionCoordinatorClient
Master Issue:#932
### Motivation
Implement transaction coordinator client.
### Modifications
1. Implement transaction coordinator
2. implement transaction API
3. Add metric and test

* Fix checkstyle and CI

* fix some comments

* fix some comments

* fix some comments

* comment style consistent

* comment style consistent

* fix some comments
  • Loading branch information
liangyepianzhou committed Feb 27, 2023
1 parent 7d257b0 commit e2ea255
Show file tree
Hide file tree
Showing 13 changed files with 422 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#

IMAGE_NAME = pulsar-client-go-test:latest
PULSAR_VERSION ?= 2.8.3
PULSAR_VERSION ?= 2.10.3
PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION)
GO_VERSION ?= 1.18
GOLANG_IMAGE = golang:$(GO_VERSION)
Expand Down
27 changes: 23 additions & 4 deletions integration-tests/conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ anonymousUserRole=anonymous

# Enable authentication
authenticationEnabled=true

# Autentication provider name list, which is comma separated list of class names
# Authentication provider name list, which is comma separated list of class names
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls,org.apache.pulsar.broker.authentication.AuthenticationProviderToken,org.apache.pulsar.broker.authentication.AuthenticationProviderBasic

# Enforce authorization
Expand All @@ -111,8 +110,10 @@ superUserRoles=localhost,superUser,admin

# Authentication settings of the broker itself. Used when the broker connects to other brokers,
# either in same or other clusters
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
brokerClientTlsEnabled=true
brokerClientTrustCertsFilePath=/pulsar/certs/cacert.pem
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls
brokerClientAuthenticationParameters={"tlsCertFile":"/pulsar/certs/client-cert.pem","tlsKeyFile":"/pulsar/certs/client-key.pem"}

### --- BookKeeper Client --- ###

Expand Down Expand Up @@ -294,3 +295,21 @@ globalZookeeperServers=
brokerServicePurgeInactiveFrequencyInSeconds=60

acknowledgmentAtBatchIndexLevelEnabled=true
### --- Transaction config variables --- ###
# Enable transaction coordinator in broker
transactionCoordinatorEnabled=true
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider

# Transaction buffer take snapshot transaction count
transactionBufferSnapshotMaxTransactionCount=1000

# Transaction buffer take snapshot interval time
# Unit : millisecond
transactionBufferSnapshotMinTimeInMillis=5000

# Enable or disable system topic
systemTopicEnabled=true

# The schema compatibility strategy is used for system topics.
# Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE
2 changes: 2 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ type ClientOptions struct {
// Specify metric registerer used to register metrics.
// Default prometheus.DefaultRegisterer
MetricsRegisterer prometheus.Registerer

EnableTransaction bool
}

// Client represents a pulsar client
Expand Down
9 changes: 9 additions & 0 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type client struct {
handlers internal.ClientHandlers
lookupService internal.LookupService
metrics *internal.Metrics
tcClient *transactionCoordinatorClient

log log.Logger
}
Expand Down Expand Up @@ -162,6 +163,14 @@ func newClient(options ClientOptions) (Client, error) {

c.handlers = internal.NewClientHandlers()

if options.EnableTransaction {
c.tcClient = newTransactionCoordinatorClientImpl(c)
err = c.tcClient.start()
if err != nil {
return nil, err
}
}

return c, nil
}

Expand Down
37 changes: 31 additions & 6 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -425,7 +426,7 @@ func TestNamespaceTopics(t *testing.T) {
t.Fatal(err)
}
topic2 := fmt.Sprintf("%s/topic-2", namespace)
if err := httpPut("admin/v2/persistent/"+topic2, namespace); err != nil {
if err := httpPut("admin/v2/persistent/"+topic2, nil); err != nil {
t.Fatal(err)
}
defer func() {
Expand All @@ -446,7 +447,13 @@ func TestNamespaceTopics(t *testing.T) {
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(topics))
topicCount := 0
for _, value := range topics {
if !strings.Contains(value, "__transaction_buffer_snapshot") {
topicCount++
}
}
assert.Equal(t, 2, topicCount)

// add a non-persistent topic
topicName := fmt.Sprintf("non-persistent://%s/testNonPersistentTopic", namespace)
Expand All @@ -467,7 +474,13 @@ func TestNamespaceTopics(t *testing.T) {
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(topics))
topicCount = 0
for _, value := range topics {
if !strings.Contains(value, "__transaction_buffer_snapshot") {
topicCount++
}
}
assert.Equal(t, 2, topicCount)
}

func TestNamespaceTopicsWebURL(t *testing.T) {
Expand All @@ -488,7 +501,7 @@ func TestNamespaceTopicsWebURL(t *testing.T) {
t.Fatal(err)
}
topic2 := fmt.Sprintf("%s/topic-2", namespace)
if err := httpPut("admin/v2/persistent/"+topic2, namespace); err != nil {
if err := httpPut("admin/v2/persistent/"+topic2, nil); err != nil {
t.Fatal(err)
}
defer func() {
Expand All @@ -509,7 +522,13 @@ func TestNamespaceTopicsWebURL(t *testing.T) {
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(topics))
topicCount := 0
for _, value := range topics {
if !strings.Contains(value, "__transaction_buffer_snapshot") {
topicCount++
}
}
assert.Equal(t, 2, topicCount)

// add a non-persistent topic
topicName := fmt.Sprintf("non-persistent://%s/testNonPersistentTopic", namespace)
Expand All @@ -530,7 +549,13 @@ func TestNamespaceTopicsWebURL(t *testing.T) {
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(topics))
topicCount = 0
for _, value := range topics {
if !strings.Contains(value, "__transaction_buffer_snapshot") {
topicCount++
}
}
assert.Equal(t, 2, topicCount)
}

func anonymousNamespacePolicy() map[string]interface{} {
Expand Down
7 changes: 7 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ const (
ProducerClosed
// SchemaFailure means the payload could not be encoded using the Schema
SchemaFailure

// ReachMaxPendingOps means the pending operations in transaction_impl coordinator reach the maximum.
ReachMaxPendingOps
// InvalidStatus means the component status is not as expected.
InvalidStatus
// TransactionError means this is a transaction related error
TransactionError
)

// Error implement error interface, composed of two parts: msg and result.
Expand Down
7 changes: 7 additions & 0 deletions pulsar/helper_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ func topicStats(topic string) (map[string]interface{}, error) {
return metadata, err
}

func transactionStats(id *TxnID) (map[string]interface{}, error) {
var metadata map[string]interface{}
path := fmt.Sprintf("admin/v3/transactions/transactionMetadata/%d/%d", id.mostSigBits, id.leastSigBits)
err := httpGet(path, &metadata)
return metadata, err
}

func topicPath(topic string) string {
tn, _ := internal.ParseTopicName(topic)
idx := strings.LastIndex(tn.Name, "/")
Expand Down
11 changes: 11 additions & 0 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,17 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
cmd.GetOrCreateSchema = msg.(*pb.CommandGetOrCreateSchema)
case pb.BaseCommand_GET_SCHEMA:
cmd.GetSchema = msg.(*pb.CommandGetSchema)
case pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST:
cmd.TcClientConnectRequest = msg.(*pb.CommandTcClientConnectRequest)
case pb.BaseCommand_NEW_TXN:
cmd.NewTxn = msg.(*pb.CommandNewTxn)
case pb.BaseCommand_ADD_PARTITION_TO_TXN:
cmd.AddPartitionToTxn = msg.(*pb.CommandAddPartitionToTxn)
case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN:
cmd.AddSubscriptionToTxn = msg.(*pb.CommandAddSubscriptionToTxn)
case pb.BaseCommand_END_TXN:
cmd.EndTxn = msg.(*pb.CommandEndTxn)

default:
panic(fmt.Sprintf("Missing command type: %v", cmdType))
}
Expand Down
11 changes: 10 additions & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,16 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
c.handlePing()
case pb.BaseCommand_PONG:
c.handlePong()

case pb.BaseCommand_TC_CLIENT_CONNECT_RESPONSE:
c.handleResponse(cmd.TcClientConnectResponse.GetRequestId(), cmd)
case pb.BaseCommand_NEW_TXN_RESPONSE:
c.handleResponse(cmd.NewTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ADD_PARTITION_TO_TXN_RESPONSE:
c.handleResponse(cmd.AddPartitionToTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
c.handleResponse(cmd.AddSubscriptionToTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_END_TXN_RESPONSE:
c.handleResponse(cmd.EndTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
c.handleActiveConsumerChange(cmd.GetActiveConsumerChange())

Expand Down
23 changes: 23 additions & 0 deletions pulsar/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

type TxnID struct {
mostSigBits uint64
leastSigBits uint64
}
Loading

0 comments on commit e2ea255

Please sign in to comment.