Skip to content

Commit

Permalink
Merge pull request #506 from Shopify/better-producer-tests
Browse files Browse the repository at this point in the history
Improve async_producer tests and prep for more
  • Loading branch information
eapache committed Aug 13, 2015
2 parents ad7f1f7 + 762d9bf commit 5b18996
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
46 changes: 46 additions & 0 deletions mockresponses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,52 @@ func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int3
return kerror
}

// mockProduceResponse is a `ProduceResponse` builder.
type mockProduceResponse struct {
errors map[string]map[int32]KError
t *testing.T
}

func newMockProduceResponse(t *testing.T) *mockProduceResponse {
return &mockProduceResponse{t: t}
}

func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KError) *mockProduceResponse {
if mr.errors == nil {
mr.errors = make(map[string]map[int32]KError)
}
partitions := mr.errors[topic]
if partitions == nil {
partitions = make(map[int32]KError)
mr.errors[topic] = partitions
}
partitions[partition] = kerror
return mr
}

func (mr *mockProduceResponse) For(reqBody decoder) encoder {
req := reqBody.(*ProduceRequest)
res := &ProduceResponse{}
for topic, partitions := range req.msgSets {
for partition := range partitions {
res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
}
}
return res
}

func (mr *mockProduceResponse) getError(topic string, partition int32) KError {
partitions := mr.errors[topic]
if partitions == nil {
return ErrNoError
}
kerror, ok := partitions[partition]
if !ok {
return ErrNoError
}
return kerror
}

// mockOffsetFetchResponse is a `OffsetFetchResponse` builder.
type mockOffsetFetchResponse struct {
offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
Expand Down
6 changes: 5 additions & 1 deletion produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,12 @@ func (p *ProduceRequest) decode(pd packetDecoder) error {
if messageSetSize == 0 {
continue
}
msgSetDecoder, err := pd.getSubset(int(messageSetSize))
if err != nil {
return err
}
msgSet := &MessageSet{}
err = msgSet.decode(pd)
err = msgSet.decode(msgSetDecoder)
if err != nil {
return err
}
Expand Down

0 comments on commit 5b18996

Please sign in to comment.