diff --git a/mockresponses_test.go b/mockresponses_test.go index 4f09278a9..655d9fb3b 100644 --- a/mockresponses_test.go +++ b/mockresponses_test.go @@ -324,6 +324,52 @@ func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int3 return kerror } +// mockProduceResponse is a `ProduceResponse` builder. +type mockProduceResponse struct { + errors map[string]map[int32]KError + t *testing.T +} + +func newMockProduceResponse(t *testing.T) *mockProduceResponse { + return &mockProduceResponse{t: t} +} + +func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KError) *mockProduceResponse { + if mr.errors == nil { + mr.errors = make(map[string]map[int32]KError) + } + partitions := mr.errors[topic] + if partitions == nil { + partitions = make(map[int32]KError) + mr.errors[topic] = partitions + } + partitions[partition] = kerror + return mr +} + +func (mr *mockProduceResponse) For(reqBody decoder) encoder { + req := reqBody.(*ProduceRequest) + res := &ProduceResponse{} + for topic, partitions := range req.msgSets { + for partition := range partitions { + res.AddTopicPartition(topic, partition, mr.getError(topic, partition)) + } + } + return res +} + +func (mr *mockProduceResponse) getError(topic string, partition int32) KError { + partitions := mr.errors[topic] + if partitions == nil { + return ErrNoError + } + kerror, ok := partitions[partition] + if !ok { + return ErrNoError + } + return kerror +} + // mockOffsetFetchResponse is a `OffsetFetchResponse` builder. type mockOffsetFetchResponse struct { offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock diff --git a/produce_request.go b/produce_request.go index a712e352c..f21956137 100644 --- a/produce_request.go +++ b/produce_request.go @@ -93,8 +93,12 @@ func (p *ProduceRequest) decode(pd packetDecoder) error { if messageSetSize == 0 { continue } + msgSetDecoder, err := pd.getSubset(int(messageSetSize)) + if err != nil { + return err + } msgSet := &MessageSet{} - err = msgSet.decode(pd) + err = msgSet.decode(msgSetDecoder) if err != nil { return err }