Skip to content

Commit

Permalink
dispatcher threading and context handling (#13551)
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec committed Jun 15, 2024
1 parent 9e733a0 commit f7e0362
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 45 deletions.
5 changes: 5 additions & 0 deletions .changeset/rare-carpets-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal capability dispatcher threading and context usage
83 changes: 67 additions & 16 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
sync "sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand All @@ -24,7 +26,7 @@ type dispatcher struct {
peerID p2ptypes.PeerID
signer p2ptypes.Signer
registry core.CapabilitiesRegistry
receivers map[key]remotetypes.Receiver
receivers map[key]*receiver
mu sync.RWMutex
stopCh services.StopChan
wg sync.WaitGroup
Expand All @@ -45,7 +47,7 @@ func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, reg
peerWrapper: peerWrapper,
signer: signer,
registry: registry,
receivers: make(map[key]remotetypes.Receiver),
receivers: make(map[key]*receiver),
stopCh: make(services.StopChan),
lggr: lggr.Named("Dispatcher"),
}
Expand All @@ -58,29 +60,79 @@ func (d *dispatcher) Start(ctx context.Context) error {
return fmt.Errorf("peer is not initialized")
}
d.wg.Add(1)
go d.receive()
go func() {
defer d.wg.Done()
d.receive()
}()

d.lggr.Info("dispatcher started")
return nil
}

func (d *dispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error {
func (d *dispatcher) Close() error {
close(d.stopCh)
d.wg.Wait()
d.lggr.Info("dispatcher closed")
return nil
}

var capReceiveChannelUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "capability_receive_channel_usage",
Help: "The usage of the receive channel for each capability, 0 indicates empty, 1 indicates full.",
}, []string{"capabilityId", "donId"})

const receiverBufferSize = 10000

type receiver struct {
cancel context.CancelFunc
ch chan *remotetypes.MessageBody
}

func (d *dispatcher) SetReceiver(capabilityId string, donId string, rec remotetypes.Receiver) error {
d.mu.Lock()
defer d.mu.Unlock()
k := key{capabilityId, donId}
_, ok := d.receivers[k]
if ok {
return fmt.Errorf("receiver already exists for capability %s and don %s", capabilityId, donId)
}
d.receivers[k] = receiver

receiverCh := make(chan *remotetypes.MessageBody, receiverBufferSize)

ctx, cancelCtx := d.stopCh.NewCtx()
d.wg.Add(1)
go func() {
defer cancelCtx()
defer d.wg.Done()
for {
select {
case <-ctx.Done():
return
case msg := <-receiverCh:
rec.Receive(ctx, msg)
}
}
}()

d.receivers[k] = &receiver{
cancel: cancelCtx,
ch: receiverCh,
}

d.lggr.Debugw("receiver set", "capabilityId", capabilityId, "donId", donId)
return nil
}

func (d *dispatcher) RemoveReceiver(capabilityId string, donId string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.receivers, key{capabilityId, donId})
d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId)

receiverKey := key{capabilityId, donId}
if receiver, ok := d.receivers[receiverKey]; ok {
receiver.cancel()
delete(d.receivers, receiverKey)
d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId)
}
}

func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
Expand All @@ -105,7 +157,6 @@ func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBo
}

func (d *dispatcher) receive() {
defer d.wg.Done()
recvCh := d.peer.Receive()
for {
select {
Expand All @@ -128,7 +179,14 @@ func (d *dispatcher) receive() {
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
continue
}
receiver.Receive(body)

receiverQueueUsage := float64(len(receiver.ch)) / receiverBufferSize
capReceiveChannelUsage.WithLabelValues(k.capId, k.donId).Set(receiverQueueUsage)
select {
case receiver.ch <- body:
default:
d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capId, "donId", k.donId)
}
}
}
}
Expand All @@ -150,13 +208,6 @@ func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *remotetyp
}
}

func (d *dispatcher) Close() error {
close(d.stopCh)
d.wg.Wait()
d.lggr.Info("dispatcher closed")
return nil
}

func (d *dispatcher) Ready() error {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package remote_test

import (
"context"
"errors"
"testing"

Expand All @@ -26,7 +27,7 @@ func newReceiver() *testReceiver {
}
}

func (r *testReceiver) Receive(msg *remotetypes.MessageBody) {
func (r *testReceiver) Receive(_ context.Context, msg *remotetypes.MessageBody) {
r.ch <- msg
}

Expand Down
9 changes: 2 additions & 7 deletions core/capabilities/remote/target/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
return nil, fmt.Errorf("request for message ID %s already exists", messageID)
}

// TODO confirm reasons for below workaround and see if can be resolved
// The context passed in by the workflow engine is cancelled prior to the results being read from the response channel
// The wrapping of the context with 'WithoutCancel' is a workaround for that behaviour.
requestCtx := context.WithoutCancel(ctx)
req, err := request.NewClientRequest(requestCtx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
req, err := request.NewClientRequest(ctx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create client request: %w", err)
Expand All @@ -149,10 +145,9 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
return req.ResponseChan(), nil
}

func (c *client) Receive(msg *types.MessageBody) {
func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()
ctx, _ := c.stopCh.NewCtx()

messageID := GetMessageID(msg)

Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
ID: "workflow-don",
}

broker := newTestAsyncMessageBroker(100)
broker := newTestAsyncMessageBroker(t, 100)

receivers := make([]remotetypes.Receiver, numCapabilityPeers)
for i := 0; i < numCapabilityPeers; i++ {
Expand Down Expand Up @@ -229,7 +229,7 @@ func newTestServer(peerID p2ptypes.PeerID, dispatcher remotetypes.Dispatcher, wo
}
}

func (t *clientTestServer) Receive(msg *remotetypes.MessageBody) {
func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBody) {
t.mux.Lock()
defer t.mux.Unlock()

Expand Down Expand Up @@ -297,7 +297,7 @@ func NewTestDispatcher() *TestDispatcher {
}

func (t *TestDispatcher) SendToReceiver(msgBody *remotetypes.MessageBody) {
t.receiver.Receive(msgBody)
t.receiver.Receive(context.Background(), msgBody)
}

func (t *TestDispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error {
Expand Down
10 changes: 7 additions & 3 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
Expand Down Expand Up @@ -215,7 +216,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
F: workflowDonF,
}

broker := newTestAsyncMessageBroker(1000)
broker := newTestAsyncMessageBroker(t, 1000)

workflowDONs := map[string]commoncap.DON{
workflowDonInfo.ID: workflowDonInfo,
Expand Down Expand Up @@ -276,6 +277,8 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta

type testAsyncMessageBroker struct {
services.StateMachine
t *testing.T

nodes map[p2ptypes.PeerID]remotetypes.Receiver

sendCh chan *remotetypes.MessageBody
Expand All @@ -292,8 +295,9 @@ func (a *testAsyncMessageBroker) Name() string {
return "testAsyncMessageBroker"
}

func newTestAsyncMessageBroker(sendChBufferSize int) *testAsyncMessageBroker {
func newTestAsyncMessageBroker(t *testing.T, sendChBufferSize int) *testAsyncMessageBroker {
return &testAsyncMessageBroker{
t: t,
nodes: make(map[p2ptypes.PeerID]remotetypes.Receiver),
stopCh: make(services.StopChan),
sendCh: make(chan *remotetypes.MessageBody, sendChBufferSize),
Expand All @@ -318,7 +322,7 @@ func (a *testAsyncMessageBroker) Start(ctx context.Context) error {
panic("server not found for peer id")
}

receiver.Receive(msg)
receiver.Receive(tests.Context(a.t), msg)
}
}
}()
Expand Down
6 changes: 1 addition & 5 deletions core/capabilities/remote/target/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ func (r *server) expireRequests() {
}
}

// Receive handles incoming messages from remote nodes and dispatches them to the corresponding request.
func (r *server) Receive(msg *types.MessageBody) {
func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
r.receiveLock.Lock()
defer r.receiveLock.Unlock()

Expand Down Expand Up @@ -135,9 +134,6 @@ func (r *server) Receive(msg *types.MessageBody) {

req := r.requestIDToRequest[requestID]

// TODO context should be received from the dispatcher here - pending KS-296
ctx, cancel := r.stopCh.NewCtx()
defer cancel()
err := req.OnMessage(ctx, msg)
if err != nil {
r.lggr.Errorw("request failed to OnMessage new message", "request", req, "err", err)
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/target/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func testRemoteTargetServer(ctx context.Context, t *testing.T,
}

var srvcs []services.Service
broker := newTestAsyncMessageBroker(1000)
broker := newTestAsyncMessageBroker(t, 1000)
err := broker.Start(context.Background())
require.NoError(t, err)
srvcs = append(srvcs, broker)
Expand Down Expand Up @@ -183,7 +183,7 @@ type serverTestClient struct {
callerDonID string
}

func (r *serverTestClient) Receive(msg *remotetypes.MessageBody) {
func (r *serverTestClient) Receive(_ context.Context, msg *remotetypes.MessageBody) {
r.receivedMessages <- msg
}

Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (p *triggerPublisher) Start(ctx context.Context) error {
return nil
}

func (p *triggerPublisher) Receive(msg *types.MessageBody) {
func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
sender := ToPeerID(msg.Sender)
if msg.Method == types.MethodRegisterTrigger {
req, err := pb.UnmarshalCapabilityRequest(msg.Payload)
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestTriggerPublisher_Register(t *testing.T) {
CallerDonId: workflowDonInfo.ID,
Payload: marshaled,
}
publisher.Receive(regEvent)
publisher.Receive(ctx, regEvent)
forwarded := <-underlying.registrationsCh
require.Equal(t, capRequest.Metadata.WorkflowID, forwarded.Metadata.WorkflowID)

Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commo
return nil
}

func (s *triggerSubscriber) Receive(msg *types.MessageBody) {
func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
sender := ToPeerID(msg.Sender)
if _, found := s.capDonMembers[sender]; !found {
s.lggr.Errorw("received message from unexpected node", "capabilityId", s.capInfo.ID, "sender", sender)
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
},
Payload: marshaled,
}
subscriber.Receive(triggerEvent)
subscriber.Receive(ctx, triggerEvent)
response := <-triggerEventCallbackCh
require.Equal(t, response.Value, triggerEventValue)

Expand Down
8 changes: 5 additions & 3 deletions core/capabilities/remote/types/mocks/receiver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion core/capabilities/remote/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package types

import (
"context"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)
Expand All @@ -25,7 +27,7 @@ type Dispatcher interface {

//go:generate mockery --quiet --name Receiver --output ./mocks/ --case=underscore
type Receiver interface {
Receive(msg *MessageBody)
Receive(ctx context.Context, msg *MessageBody)
}

type Aggregator interface {
Expand Down

0 comments on commit f7e0362

Please sign in to comment.