From 6d716464e8a3b1bf41b7d938d1784a60957859a1 Mon Sep 17 00:00:00 2001 From: Ben Burkert Date: Fri, 11 Dec 2015 11:47:50 -0800 Subject: [PATCH 1/2] Add a buffer around the broker's net.Conn for reads. Reduces the number of read syscalls caused by small, sequential reads on the broker's net.Conn. --- broker.go | 1 + utils.go | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/broker.go b/broker.go index 46f06a0f3..ac9ae34e5 100644 --- a/broker.go +++ b/broker.go @@ -85,6 +85,7 @@ func (b *Broker) Open(conf *Config) error { Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) return } + b.conn = NewBufConn(b.conn) b.conf = conf b.done = make(chan bool) diff --git a/utils.go b/utils.go index fef7c7381..4b10aaaa1 100644 --- a/utils.go +++ b/utils.go @@ -1,6 +1,10 @@ package sarama -import "sort" +import ( + "bufio" + "net" + "sort" +) type none struct{} @@ -87,3 +91,21 @@ func (b ByteEncoder) Encode() ([]byte, error) { func (b ByteEncoder) Length() int { return len(b) } + +// bufConn wraps a net.Conn with a buffer for reads to reduce the number of +// reads that trigger syscalls. +type bufConn struct { + net.Conn + buf *bufio.Reader +} + +func NewBufConn(conn net.Conn) *bufConn { + return &bufConn{ + Conn: conn, + buf: bufio.NewReader(conn), + } +} + +func (bc *bufConn) Read(b []byte) (n int, err error) { + return bc.buf.Read(b) +} From 797cb29ada7d84b939a417ad9960de88a1a60f65 Mon Sep 17 00:00:00 2001 From: Ben Burkert Date: Mon, 14 Dec 2015 19:03:38 -0800 Subject: [PATCH 2/2] un-export newBufConn --- broker.go | 2 +- utils.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/broker.go b/broker.go index ac9ae34e5..aea1075b9 100644 --- a/broker.go +++ b/broker.go @@ -85,7 +85,7 @@ func (b *Broker) Open(conf *Config) error { Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) return } - b.conn = NewBufConn(b.conn) + b.conn = newBufConn(b.conn) b.conf = conf b.done = make(chan bool) diff --git a/utils.go b/utils.go index 4b10aaaa1..04ca88750 100644 --- a/utils.go +++ b/utils.go @@ -99,7 +99,7 @@ type bufConn struct { buf *bufio.Reader } -func NewBufConn(conn net.Conn) *bufConn { +func newBufConn(conn net.Conn) *bufConn { return &bufConn{ Conn: conn, buf: bufio.NewReader(conn),