Skip to content

Commit

Permalink
Merge pull request #157 from mxpv/empty_payload
Browse files Browse the repository at this point in the history
Fix streaming with empty payloads
  • Loading branch information
dmcgowan authored Feb 5, 2024
2 parents baadfd8 + 44ca009 commit 90d421e
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 15 deletions.
36 changes: 22 additions & 14 deletions integration/streaming/test.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration/streaming/test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ service Streaming {
rpc DivideStream(Sum) returns (stream Part);
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
rpc EmptyPayloadStream(google.protobuf.Empty) returns (stream EchoPayload);
}

message EchoPayload {
Expand Down
55 changes: 55 additions & 0 deletions integration/streaming/test_ttrpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions integration/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/containerd/ttrpc"
"github.com/containerd/ttrpc/integration/streaming"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/protobuf/types/known/emptypb"
)

func runService(ctx context.Context, t testing.TB, service streaming.TTRPCStreamingService) (streaming.TTRPCStreamingClient, func()) {
Expand Down Expand Up @@ -190,6 +191,14 @@ func (tss *testStreamingService) EchoNullStream(_ context.Context, es streaming.
return sendErr
}

func (tss *testStreamingService) EmptyPayloadStream(_ context.Context, _ *emptypb.Empty, streamer streaming.TTRPCStreaming_EmptyPayloadStreamServer) error {
if err := streamer.Send(&streaming.EchoPayload{Seq: 1}); err != nil {
return err
}

return streamer.Send(&streaming.EchoPayload{Seq: 2})
}

func TestStreamingService(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -203,6 +212,7 @@ func TestStreamingService(t *testing.T) {
t.Run("DivideStream", divideStreamTest(ctx, client))
t.Run("EchoNull", echoNullTest(ctx, client))
t.Run("EchoNullStream", echoNullStreamTest(ctx, client))
t.Run("EmptyPayloadStream", emptyPayloadStream(ctx, client))
}

func echoTest(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
Expand Down Expand Up @@ -385,6 +395,33 @@ func echoNullStreamTest(ctx context.Context, client streaming.TTRPCStreamingClie
}
}

func emptyPayloadStream(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

stream, err := client.EmptyPayloadStream(ctx, nil)
if err != nil {
t.Fatal(err)
}

for i := uint32(1); i < 3; i++ {
first, err := stream.Recv()
if err != nil {
t.Fatal(err)
}

if first.Seq != i {
t.Fatalf("unexpected seq: %d != %d", first.Seq, i)
}
}

if _, err := stream.Recv(); err != io.EOF {
t.Fatalf("Expected io.EOF, got %v", err)
}
}
}

func assertNextEcho(t testing.TB, a, b *streaming.EchoPayload) {
t.Helper()
if a.Msg != b.Msg {
Expand Down
6 changes: 5 additions & 1 deletion services.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
respond(st, p, stream.StreamingServer, true)
}()

if req.Payload != nil {
// Empty proto messages serialized to 0 payloads,
// so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data);
// don't get invoked here, which causes hang on client side.
// See https://github.com/containerd/ttrpc/issues/126
if req.Payload != nil || !info.StreamingClient {
unmarshal := func(obj interface{}) error {
return protoUnmarshal(req.Payload, obj)
}
Expand Down

0 comments on commit 90d421e

Please sign in to comment.