From 762d9bf9773e5f5fa106886bd11020e1b07225f6 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 11 Aug 2015 12:06:52 -0400 Subject: [PATCH] Prep improvements for async_producer tests - implement a mockProduceResponse - fix decoding of produceRequests (bless my foresight in making getSubset a method on the packetDecoder) Heavily influenced by Maxim's recent work on the consumer tests. --- mockresponses_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++ produce_request.go | 6 +++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/mockresponses_test.go b/mockresponses_test.go index 4f09278a9..655d9fb3b 100644 --- a/mockresponses_test.go +++ b/mockresponses_test.go @@ -324,6 +324,52 @@ func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int3 return kerror } +// mockProduceResponse is a `ProduceResponse` builder. +type mockProduceResponse struct { + errors map[string]map[int32]KError + t *testing.T +} + +func newMockProduceResponse(t *testing.T) *mockProduceResponse { + return &mockProduceResponse{t: t} +} + +func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KError) *mockProduceResponse { + if mr.errors == nil { + mr.errors = make(map[string]map[int32]KError) + } + partitions := mr.errors[topic] + if partitions == nil { + partitions = make(map[int32]KError) + mr.errors[topic] = partitions + } + partitions[partition] = kerror + return mr +} + +func (mr *mockProduceResponse) For(reqBody decoder) encoder { + req := reqBody.(*ProduceRequest) + res := &ProduceResponse{} + for topic, partitions := range req.msgSets { + for partition := range partitions { + res.AddTopicPartition(topic, partition, mr.getError(topic, partition)) + } + } + return res +} + +func (mr *mockProduceResponse) getError(topic string, partition int32) KError { + partitions := mr.errors[topic] + if partitions == nil { + return ErrNoError + } + kerror, ok := partitions[partition] + if !ok { + return ErrNoError + } + return kerror +} + // mockOffsetFetchResponse is a `OffsetFetchResponse` builder. type mockOffsetFetchResponse struct { offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock diff --git a/produce_request.go b/produce_request.go index a712e352c..f21956137 100644 --- a/produce_request.go +++ b/produce_request.go @@ -93,8 +93,12 @@ func (p *ProduceRequest) decode(pd packetDecoder) error { if messageSetSize == 0 { continue } + msgSetDecoder, err := pd.getSubset(int(messageSetSize)) + if err != nil { + return err + } msgSet := &MessageSet{} - err = msgSet.decode(pd) + err = msgSet.decode(msgSetDecoder) if err != nil { return err }