From 52b74e6ea741f5d8fce98dd20e0b309699b21853 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Mon, 4 Sep 2023 15:01:04 +0200 Subject: [PATCH 1/5] Avoid closing the stream if server still open --- grpc/grpc.go | 2 +- grpc/stream.go | 33 +++++++++++++++++---------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/grpc/grpc.go b/grpc/grpc.go index b83a485..9e35354 100644 --- a/grpc/grpc.go +++ b/grpc/grpc.go @@ -131,7 +131,7 @@ func (mi *ModuleInstance) stream(c goja.ConstructorCall) *goja.Object { instanceMetrics: mi.metrics, builtinMetrics: mi.vu.State().BuiltinMetrics, done: make(chan struct{}), - state: opened, + writing: opened, writeQueueCh: make(chan message), diff --git a/grpc/stream.go b/grpc/stream.go index e89eaf4..478e9be 100644 --- a/grpc/stream.go +++ b/grpc/stream.go @@ -28,7 +28,6 @@ type message struct { const ( opened = iota + 1 - closing closed ) @@ -51,8 +50,8 @@ type stream struct { obj *goja.Object // the object that is given to js to interact with the stream - state int8 - done chan struct{} + writing int8 + done chan struct{} writeQueueCh chan message @@ -185,17 +184,19 @@ func (s *stream) readData(wg *sync.WaitGroup) { return } - if len(msg) == 0 && isRegularClosing(err) { + if len(msg) > 0 { + s.queueMessage(msg) + } + + if isRegularClosing(err) { s.logger.WithError(err).Debug("stream is cancelled/finished") s.tq.Queue(func() error { - return s.closeWithError(nil) + return s.closeWithError(err) }) return } - - s.queueMessage(msg) } } @@ -299,7 +300,7 @@ func (s *stream) on(event string, listener func(goja.Value) (goja.Value, error)) // write writes a message to the stream func (s *stream) write(input goja.Value) { - if s.state != opened { + if s.writing != opened { return } @@ -320,11 +321,13 @@ func (s *stream) write(input goja.Value) { // end closes client the stream func (s *stream) end() { - if s.state == closed || s.state == closing { + if s.writing == closed { return } - s.state = closing + s.logger.Debug("stream is closing") + + s.writing = closed s.writeQueueCh <- message{isClosing: true} } @@ -334,15 +337,13 @@ func (s *stream) closeWithError(err error) error { return s.callErrorListeners(err) } -// close changes the stream state to closed and triggers the end event listeners +// close closes the stream and call end event listeners +// Note: in the regular closing the io.EOF could come func (s *stream) close(err error) { - if s.state == closed { + if err == nil { return } - s.logger.WithError(err).Debug("stream is closing") - - s.state = closed close(s.done) s.tq.Queue(func() error { return s.callEventListeners(eventEnd) @@ -354,7 +355,7 @@ func (s *stream) close(err error) { } func (s *stream) callErrorListeners(e error) error { - if e == nil { + if e == nil || errors.Is(e, io.EOF) { return nil } From e97227e5541a3109a29cfb62d5231773d8aef9ef Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Tue, 5 Sep 2023 10:48:06 +0200 Subject: [PATCH 2/5] Apply suggestions from code review --- grpc/grpc.go | 2 +- grpc/stream.go | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/grpc/grpc.go b/grpc/grpc.go index 9e35354..fdf2a8f 100644 --- a/grpc/grpc.go +++ b/grpc/grpc.go @@ -131,7 +131,7 @@ func (mi *ModuleInstance) stream(c goja.ConstructorCall) *goja.Object { instanceMetrics: mi.metrics, builtinMetrics: mi.vu.State().BuiltinMetrics, done: make(chan struct{}), - writing: opened, + writingState: opened, writeQueueCh: make(chan message), diff --git a/grpc/stream.go b/grpc/stream.go index 478e9be..d58a64b 100644 --- a/grpc/stream.go +++ b/grpc/stream.go @@ -50,8 +50,8 @@ type stream struct { obj *goja.Object // the object that is given to js to interact with the stream - writing int8 - done chan struct{} + writingState int8 + done chan struct{} writeQueueCh chan message @@ -300,7 +300,7 @@ func (s *stream) on(event string, listener func(goja.Value) (goja.Value, error)) // write writes a message to the stream func (s *stream) write(input goja.Value) { - if s.writing != opened { + if s.writingState != opened { return } @@ -321,13 +321,13 @@ func (s *stream) write(input goja.Value) { // end closes client the stream func (s *stream) end() { - if s.writing == closed { + if s.writingState == closed { return } s.logger.Debug("stream is closing") - s.writing = closed + s.writingState = closed s.writeQueueCh <- message{isClosing: true} } @@ -344,7 +344,15 @@ func (s *stream) close(err error) { return } + select { + case <-s.done: + s.logger.Debugf("stream %v is already closed", s.method) + return + default: + } + close(s.done) + s.tq.Queue(func() error { return s.callEventListeners(eventEnd) }) From 596304437f13d6abeff728cb3c2e3c933654f953 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Tue, 5 Sep 2023 13:34:47 +0200 Subject: [PATCH 3/5] Remove sleep --- examples/grpc_client_streaming.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/grpc_client_streaming.js b/examples/grpc_client_streaming.js index 0199527..8ebdbcc 100644 --- a/examples/grpc_client_streaming.js +++ b/examples/grpc_client_streaming.js @@ -97,8 +97,6 @@ export default () => { // close the client stream stream.end(); - - sleep(1); }; const pointSender = (stream, point) => { From 4ab59622d0c959202b7bc6e0023c640613bc682c Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Tue, 5 Sep 2023 14:10:27 +0200 Subject: [PATCH 4/5] a better logging --- grpc/stream.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/grpc/stream.go b/grpc/stream.go index d58a64b..5b43fc2 100644 --- a/grpc/stream.go +++ b/grpc/stream.go @@ -325,7 +325,7 @@ func (s *stream) end() { return } - s.logger.Debug("stream is closing") + s.logger.Debugf("finishing stream %s writing", s.method) s.writingState = closed s.writeQueueCh <- message{isClosing: true} @@ -351,6 +351,7 @@ func (s *stream) close(err error) { default: } + s.logger.Debugf("stream %s is closing", s.method) close(s.done) s.tq.Queue(func() error { From afff3150e790eaeaeb68e69dc3e15e603e3c5aa7 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Mon, 11 Sep 2023 09:20:40 +0200 Subject: [PATCH 5/5] Dedicated unit test This adds a dedicated unit test for checking that all responses from the server were received even if client declared that sending is finished (client.end) called. --- grpc/stream_test.go | 96 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/grpc/stream_test.go b/grpc/stream_test.go index fb8ba69..7b0e928 100644 --- a/grpc/stream_test.go +++ b/grpc/stream_test.go @@ -3,6 +3,7 @@ package grpc import ( "context" "testing" + "time" "github.com/dop251/goja" "github.com/grafana/xk6-grpc/grpc/testutils/grpcservice" @@ -188,6 +189,101 @@ func TestStream_ErrorHandling(t *testing.T) { ) } +// this test case is checking that everything that server sends +// after the client finished (client.end called) is delivered to the client +// and the end event is called +func TestStream_ReceiveAllServerResponsesAfterEnd(t *testing.T) { + t.Parallel() + + ts := newTestState(t) + + stub := &FeatureExplorerStub{} + + savedFeatures := []*grpcservice.Feature{ + { + Name: "foo", + Location: &grpcservice.Point{ + Latitude: 1, + Longitude: 2, + }, + }, + { + Name: "bar", + Location: &grpcservice.Point{ + Latitude: 3, + Longitude: 4, + }, + }, + } + + stub.listFeatures = func(rect *grpcservice.Rectangle, stream grpcservice.FeatureExplorer_ListFeaturesServer) error { + for _, feature := range savedFeatures { + // adding a delay to make server response "slower" + time.Sleep(200 * time.Millisecond) + + if err := stream.Send(feature); err != nil { + return err + } + } + + return nil + } + + grpcservice.RegisterFeatureExplorerServer(ts.httpBin.ServerGRPC, stub) + + replace := func(code string) (goja.Value, error) { + return ts.VU.Runtime().RunString(ts.httpBin.Replacer.Replace(code)) + } + + initString := codeBlock{ + code: ` + var client = new grpc.Client(); + client.load([], "../grpc/testutils/grpcservice/route_guide.proto");`, + } + vuString := codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + let stream = new grpc.Stream(client, "main.FeatureExplorer/ListFeatures") + stream.on('data', function (data) { + call('Feature:' + data.name); + }); + stream.on('end', function () { + call('End called'); + }); + + stream.write({ + lo: { + latitude: 1, + longitude: 2, + }, + hi: { + latitude: 1, + longitude: 2, + }, + }); + stream.end(); + `, + } + + val, err := replace(initString.code) + assertResponse(t, initString, err, val, ts) + + ts.ToVUContext() + + val, err = replace(vuString.code) + + ts.EventLoop.WaitOnRegistered() + + assertResponse(t, vuString, err, val, ts) + + assert.Equal(t, ts.callRecorder.Recorded(), []string{ + "Feature:foo", + "Feature:bar", + "End called", + }, + ) +} + // FeatureExplorerStub is a stub for FeatureExplorerServer // it has ability to override methods type FeatureExplorerStub struct {