diff --git a/README.md b/README.md index ae625b5..40db157 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# redis-go [![CircleCI](https://circleci.com/gh/segmentio/redis-go.svg?style=shield)](https://circleci.com/gh/segmentio/redis-go) [![Go Report Card](https://goreportcard.com/badge/github.com/segmentio/redis-go)](https://goreportcard.com/report/github.com/segmentio/redis-go) [![GoDoc](https://godoc.org/github.com/segmentio/redis-go?status.svg)](https://godoc.org/github.com/segmentio/redis-go) +# redis-go [![CircleCI](https://circleci.com/gh/JoseFeng/redis-go.svg?style=shield)](https://circleci.com/gh/JoseFeng/redis-go) [![Go Report Card](https://goreportcard.com/badge/github.com/JoseFeng/redis-go)](https://goreportcard.com/report/github.com/JoseFeng/redis-go) [![GoDoc](https://godoc.org/github.com/JoseFeng/redis-go?status.svg)](https://godoc.org/github.com/JoseFeng/redis-go) Go package providing tools for building redis clients, servers and middleware. @@ -27,7 +27,7 @@ import ( "fmt" "time" - "github.com/segmentio/redis-go" + "github.com/JoseFeng/redis-go" ) func main() { @@ -61,7 +61,7 @@ func main() { package main import ( - "github.com/segmentio/redis-go" + "github.com/JoseFeng/redis-go" ) func main() { diff --git a/args_test.go b/args_test.go index 48ee807..7e16acc 100644 --- a/args_test.go +++ b/args_test.go @@ -4,7 +4,7 @@ import ( "reflect" "testing" - redis "github.com/segmentio/redis-go" + redis "github.com/JoseFeng/redis-go" ) func TestList(t *testing.T) { diff --git a/client_test.go b/client_test.go index e334c15..8f94666 100644 --- a/client_test.go +++ b/client_test.go @@ -4,8 +4,8 @@ import ( "context" "testing" - redis "github.com/segmentio/redis-go" - "github.com/segmentio/redis-go/redistest" + redis "github.com/JoseFeng/redis-go" + "github.com/JoseFeng/redis-go/redistest" ) func TestClient(t *testing.T) { diff --git a/cmd/red/proxy.go b/cmd/red/proxy.go index efb3a54..6efe897 100644 --- a/cmd/red/proxy.go +++ b/cmd/red/proxy.go @@ -16,7 +16,7 @@ import ( consul "github.com/segmentio/consul-go" "github.com/segmentio/events" eventslog "github.com/segmentio/events/log" - redis "github.com/segmentio/redis-go" + redis "github.com/JoseFeng/redis-go" "github.com/segmentio/stats" "github.com/segmentio/stats/datadog" "github.com/segmentio/stats/redisstats" @@ -170,7 +170,7 @@ func makeConsulRegistry(u *url.URL) *consulRegistry { cluster: v.Get("cluster"), client: &consul.Client{ Address: u.Host, - UserAgent: fmt.Sprintf("RED (github.com/segmentio/redis-go, version %s)", version), + UserAgent: fmt.Sprintf("RED (github.com/JoseFeng/redis-go, version %s)", version), Datacenter: v.Get("dc"), }, resolver: &consul.Resolver{ diff --git a/cmd/red/test.go b/cmd/red/test.go index e94454f..47a7740 100644 --- a/cmd/red/test.go +++ b/cmd/red/test.go @@ -5,8 +5,8 @@ import ( "github.com/segmentio/conf" "github.com/segmentio/events" eventslog "github.com/segmentio/events/log" - redis "github.com/segmentio/redis-go" - "github.com/segmentio/redis-go/redistest" + redis "github.com/JoseFeng/redis-go" + "github.com/JoseFeng/redis-go/redistest" "github.com/segmentio/stats" "github.com/segmentio/stats/datadog" "github.com/segmentio/stats/redisstats" diff --git a/conn.go b/conn.go index 558fc0e..976f395 100644 --- a/conn.go +++ b/conn.go @@ -22,6 +22,7 @@ var ( type Conn struct { conn net.Conn + serv *Server rmutex sync.Mutex rbuffer bufio.Reader decoder objconv.StreamDecoder @@ -67,9 +68,10 @@ func NewClientConn(conn net.Conn) *Conn { // NewServerConn creates a new redis connection from an already open server // connections. -func NewServerConn(conn net.Conn) *Conn { +func NewServerConn(conn net.Conn, s *Server) *Conn { c := &Conn{ conn: conn, + serv: s, rbuffer: *bufio.NewReader(conn), wbuffer: *bufio.NewWriter(conn), } @@ -513,6 +515,32 @@ func (c *Conn) setWriteTimeout(timeout time.Duration) { } } + +func (c *Conn) setState(state http.ConnState) { + if state > 0xff || state < 0 { + panic("internal error") + } + + srv := c.serv + switch state { + case http.StateNew: + srv.trackConnection(c) + case http.StateHijacked, http.StateClosed: + srv.untrackConnection(c) + } + + packedState := uint64(time.Now().Unix()<<8) | uint64(state) + atomic.StoreUint64(&c.curState.atomic, packedState) + if hook := srv.ConnState; hook != nil { + hook(c, state) + } +} + +func (c *Conn) getState() (state http.ConnState, unixSec int64) { + packedState := atomic.LoadUint64(&c.curState.atomic) + return http.ConnState(packedState & 0xff), int64(packedState >> 8) +} + type connArgs struct { mutex sync.Mutex decoder objconv.StreamDecoder diff --git a/conn_test.go b/conn_test.go index 733090c..7fb20ea 100644 --- a/conn_test.go +++ b/conn_test.go @@ -12,7 +12,7 @@ import ( "time" "github.com/segmentio/objconv/resp" - redis "github.com/segmentio/redis-go" + redis "github.com/JoseFeng/redis-go" ) func TestConn(t *testing.T) { diff --git a/examples/client/main.go b/examples/client/main.go index c0b09ff..feceeca 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/segmentio/redis-go" + "github.com/JoseFeng/redis-go" ) func main() { diff --git a/examples/server/main.go b/examples/server/main.go index 2fcbab8..830139a 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -1,6 +1,6 @@ package main -import "github.com/segmentio/redis-go" +import "github.com/JoseFeng/redis-go" func main() { // Starts a new server speaking the redis protocol, the server automatically diff --git a/proxy_test.go b/proxy_test.go index 17dfa1f..e6d9591 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -1,8 +1,8 @@ package redis_test import ( - redis "github.com/segmentio/redis-go" - "github.com/segmentio/redis-go/redistest" + redis "github.com/JoseFeng/redis-go" + "github.com/JoseFeng/redis-go/redistest" "github.com/stretchr/testify/assert" "log" "net/url" diff --git a/redistest/client.go b/redistest/client.go index 57575ed..25e66e3 100644 --- a/redistest/client.go +++ b/redistest/client.go @@ -9,7 +9,7 @@ import ( "testing" "time" - redis "github.com/segmentio/redis-go" + redis "github.com/JoseFeng/redis-go" ) // Client is an interface that must be implemented by types that represent redis diff --git a/redistest/registry.go b/redistest/registry.go index 7f39ec7..bb36516 100644 --- a/redistest/registry.go +++ b/redistest/registry.go @@ -6,7 +6,7 @@ import ( "testing" "time" - redis "github.com/segmentio/redis-go" + redis "github.com/JoseFeng/redis-go" ) // MakeServerRegistry is the type of factory functions that the diff --git a/registry_test.go b/registry_test.go index ab740bf..9975181 100644 --- a/registry_test.go +++ b/registry_test.go @@ -3,8 +3,8 @@ package redis_test import ( "testing" - redis "github.com/segmentio/redis-go" - "github.com/segmentio/redis-go/redistest" + redis "github.com/JoseFeng/redis-go" + "github.com/JoseFeng/redis-go/redistest" ) func TestServerEndpoint(t *testing.T) { diff --git a/server.go b/server.go index 44441ec..e134fca 100644 --- a/server.go +++ b/server.go @@ -92,6 +92,12 @@ type Server struct { // Handler invoked to handle Redis requests, must not be nil. Handler Handler + // AcceptCallBack when the client establishes the connection + AcceptCallBack func(conn *Conn) bool + + // ClosedCallBack when the client closes the connection + ClosedCallBack func(conn *Conn, err error) + // ReadTimeout is the maximum duration for reading the entire request, // including the reading the argument list. ReadTimeout time.Duration @@ -244,6 +250,11 @@ func (s *Server) Serve(l net.Listener) error { } } + if s.AcceptCallBack != nil && !s.AcceptCallBack(conn) { + conn.Close() + continue + } + attempt = 0 c := NewServerConn(conn) s.trackConnection(c) @@ -258,14 +269,21 @@ func (s *Server) serveConnection(ctx context.Context, c *Conn, config serverConf defer s.untrackConnection(c) var addr = c.RemoteAddr().String() + var err error + + if s.ClosedCallBack != nil { + defer s.ClosedCallBack(c, err) + } + for { select { default: case <-ctx.Done(): + err = ctx.Err() return } - if c.waitReadyRead(config.idleTimeout) != nil { + if err = c.waitReadyRead(config.idleTimeout); err != nil { return } @@ -276,7 +294,8 @@ func (s *Server) serveConnection(ctx context.Context, c *Conn, config serverConf cmds = append(cmds, Command{}) if !cmdReader.Read(&cmds[0]) { - s.log(cmdReader.Close()) + err = cmdReader.Close() + s.log(err) return } @@ -307,7 +326,7 @@ func (s *Server) serveConnection(ctx context.Context, c *Conn, config serverConf if cmds[lastIndex].Cmd == "DISCARD" { cmds[lastIndex].Args.Close() - if err := c.WriteArgs(List("OK")); err != nil { + if err = c.WriteArgs(List("OK")); err != nil { return } @@ -317,12 +336,12 @@ func (s *Server) serveConnection(ctx context.Context, c *Conn, config serverConf cmds = cmds[1:lastIndex] } - if err := s.serveCommands(c, addr, cmds, config); err != nil { + if err = s.serveCommands(c, addr, cmds, config); err != nil { s.log(err) return } - if err := cmdReader.Close(); err != nil { + if err = cmdReader.Close(); err != nil { s.log(err) return } diff --git a/server_test.go b/server_test.go index c854b88..db7fab3 100644 --- a/server_test.go +++ b/server_test.go @@ -11,7 +11,7 @@ import ( "time" "github.com/segmentio/objconv/resp" - redis "github.com/segmentio/redis-go" + redis "github.com/JoseFeng/redis-go" ) func TestServer(t *testing.T) { diff --git a/transport_test.go b/transport_test.go index 841915e..5d0df4c 100644 --- a/transport_test.go +++ b/transport_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - redis "github.com/segmentio/redis-go" + redis "github.com/JoseFeng/redis-go" ) func TestTransport(t *testing.T) { diff --git a/vendor/vendor.json b/vendor/vendor.json index 9b36538..af60b23 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -159,5 +159,5 @@ "revisionTime": "2017-07-13T20:15:20Z" } ], - "rootPath": "github.com/segmentio/redis-go" + "rootPath": "github.com/JoseFeng/redis-go" }