Skip to content

Commit

Permalink
revert some error wrap changes to reduce PR noise
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicbarnes committed Mar 25, 2022
1 parent 94fa5e7 commit b9b23fb
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 14 deletions.
2 changes: 1 addition & 1 deletion addoffsetstotxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *Client) AddOffsetsToTxn(
GroupID: req.GroupID,
})
if err != nil {
return nil, fmt.Errorf("failed to add offsets to transaction: %w", err)
return nil, fmt.Errorf("kafka.(*Client).AddOffsetsToTxn: %w", err)
}

r := m.(*addoffsetstotxn.Response)
Expand Down
2 changes: 1 addition & 1 deletion addpartitionstotxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *Client) AddPartitionsToTxn(

m, err := c.roundTrip(ctx, req.Addr, protoReq)
if err != nil {
return nil, fmt.Errorf("failed to add partitions to transaction: %w", err)
return nil, fmt.Errorf("kafka.(*Client).AddPartitionsToTxn: %w", err)
}

r := m.(*addpartitionstotxn.Response)
Expand Down
2 changes: 1 addition & 1 deletion alterconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*A
})

if err != nil {
return nil, fmt.Errorf("failed to alter configs: %w", err)
return nil, fmt.Errorf("kafka.(*Client).AlterConfigs: %w", err)
}

res := m.(*alterconfigs.Response)
Expand Down
3 changes: 1 addition & 2 deletions apiversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kafka

import (
"context"
"fmt"
"net"

"github.com/segmentio/kafka-go/protocol"
Expand Down Expand Up @@ -50,7 +49,7 @@ func (c *Client) ApiVersions(
apiReq,
)
if err != nil {
return nil, fmt.Errorf("failed to determine supported API versions from broker: %w", err)
return nil, err
}
apiResp := protoResp.(*apiversions.Response)

Expand Down
3 changes: 1 addition & 2 deletions balancer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kafka

import (
"fmt"
"hash"
"hash/crc32"
"hash/fnv"
Expand Down Expand Up @@ -154,7 +153,7 @@ func (h *Hash) Balance(msg Message, partitions ...int) int {

hasher.Reset()
if _, err := hasher.Write(msg.Key); err != nil {
panic(fmt.Errorf("HashBalancer failed to write to hasher: %w", err))
panic(err)
}

// uses same algorithm that Sarama's hashPartitioner uses
Expand Down
10 changes: 3 additions & 7 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kafka
import (
"bufio"
"errors"
"fmt"
"io"
"sync"
"time"
Expand Down Expand Up @@ -71,10 +70,7 @@ func (batch *Batch) Close() error {
batch.mutex.Lock()
err := batch.close()
batch.mutex.Unlock()
if err != nil {
fmt.Errorf("failed to close batch: %w", err)
}
return nil
return err
}

func (batch *Batch) close() (err error) {
Expand All @@ -87,8 +83,8 @@ func (batch *Batch) close() (err error) {
batch.msgs.discard()
}

if err = batch.err; errors.Is(err, io.EOF) {
err = nil
if errors.Is(batch.err, io.EOF) {
err = batch.err
}

if conn != nil {
Expand Down

0 comments on commit b9b23fb

Please sign in to comment.