Skip to content

Commit e4f499d

Browse files
committed
Working connection test
1 parent 052454e commit e4f499d

File tree

5 files changed

+115
-28
lines changed

5 files changed

+115
-28
lines changed

dial.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ func Dial(ctx context.Context, u string, opts ...DialOption) (_ *Conn, _ *http.R
108108
r := io.LimitReader(resp.Body, 1024)
109109
b, _ := ioutil.ReadAll(r)
110110
resp.Body = ioutil.NopCloser(bytes.NewReader(b))
111+
respBody.Close()
111112
}
112-
respBody.Close()
113113
}()
114114

115115
if resp.StatusCode != http.StatusSwitchingProtocols {

header_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,18 @@ func TestHeader(t *testing.T) {
3636
rand.Read(h.maskKey[:])
3737
}
3838

39-
t.Logf("header: %#v", h)
40-
4139
b := marshalHeader(h)
42-
t.Logf("bytes: %b", b)
43-
4440
r := bytes.NewReader(b)
4541
h2, err := readHeader(r)
4642
if err != nil {
43+
t.Logf("header: %#v", h)
44+
t.Logf("bytes: %b", b)
4745
t.Fatalf("failed to read header: %v", err)
4846
}
4947

5048
if !cmp.Equal(h, h2, cmp.AllowUnexported(header{})) {
49+
t.Logf("header: %#v", h)
50+
t.Logf("bytes: %b", b)
5151
t.Fatalf("parsed and read header differ: %v", cmp.Diff(h, h2, cmp.AllowUnexported(header{})))
5252
}
5353
}

json.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,43 @@ package websocket
22

33
import (
44
"context"
5+
"encoding/json"
6+
7+
"golang.org/x/net/websocket"
8+
"golang.org/x/xerrors"
59
)
610

711
// ReadJSON reads a json message from c into v.
812
func ReadJSON(ctx context.Context, c *Conn, v interface{}) error {
9-
panic("TODO")
13+
typ, r, err := c.ReadMessage(ctx)
14+
if err != nil {
15+
return xerrors.Errorf("failed to read json: %v", err)
16+
}
17+
18+
if typ != websocket.TextFrame {
19+
return xerrors.Errorf("unexpected frame type for json (expected TextFrame): %v", typ)
20+
}
21+
22+
d := json.NewDecoder(r)
23+
err = d.Decode(v)
24+
if err != nil {
25+
return xerrors.Errorf("failed to read json: %v", err)
26+
}
27+
return nil
1028
}
1129

1230
// WriteJSON writes the json message v into c.
1331
func WriteJSON(ctx context.Context, c *Conn, v interface{}) error {
14-
panic("TODO")
32+
w := c.MessageWriter(websocket.TextFrame)
33+
w.SetContext(ctx)
34+
e := json.NewEncoder(w)
35+
err := e.Encode(v)
36+
if err != nil {
37+
return xerrors.Errorf("failed to write json: %v", err)
38+
}
39+
err = w.Close()
40+
if err != nil {
41+
return xerrors.Errorf("failed to write json: %v", err)
42+
}
43+
return nil
1544
}

websocket.go

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"log"
89
"sync"
910
"time"
1011

@@ -19,6 +20,7 @@ type controlFrame struct {
1920
// Conn represents a WebSocket connection.
2021
// Pings will always be automatically responded to with pongs, you do not
2122
// have to do anything special.
23+
// TODO set finalizer
2224
type Conn struct {
2325
subprotocol string
2426
br *bufio.Reader
@@ -80,6 +82,9 @@ func (c *Conn) init() {
8082
c.write = make(chan opcode)
8183
c.read = make(chan opcode)
8284
c.readBytes = make(chan []byte)
85+
86+
go c.writeLoop()
87+
go c.readLoop()
8388
}
8489

8590
func (c *Conn) writeLoop() {
@@ -197,29 +202,49 @@ func (c *Conn) readLoop() {
197202
}
198203

199204
c.readDone = make(chan struct{})
200-
201-
var maskPos int
202-
left := h.payloadLength
205+
c.read <- h.opcode
203206
for {
204-
select {
205-
case <-c.closed:
206-
return
207-
case b := <-c.readBytes:
208-
if int64(len(b)) > left {
209-
b = b[left:]
210-
}
211-
_, err = io.ReadFull(c.br, b)
212-
if err != nil {
213-
c.close(xerrors.Errorf("failed to read from connection: %v", err))
207+
var maskPos int
208+
left := h.payloadLength
209+
for left > 0 {
210+
select {
211+
case <-c.closed:
214212
return
215-
}
213+
case b := <-c.readBytes:
214+
log.Println("readbytes", left)
215+
216+
if int64(len(b)) > left {
217+
b = b[:left]
218+
}
219+
220+
_, err = io.ReadFull(c.br, b)
221+
if err != nil {
222+
c.close(xerrors.Errorf("failed to read from connection: %v", err))
223+
return
224+
}
225+
left -= int64(len(b))
216226

217-
if h.masked {
218-
maskPos = mask(h.maskKey, maskPos, b)
227+
if h.masked {
228+
maskPos = mask(h.maskKey, maskPos, b)
229+
}
230+
231+
select {
232+
case <-c.closed:
233+
return
234+
case c.readDone <- struct{}{}:
235+
}
219236
}
237+
}
220238

221-
c.readDone <- struct{}{}
239+
if h.fin {
240+
break
241+
}
242+
h, err = readHeader(c.br)
243+
if err != nil {
244+
c.close(xerrors.Errorf("failed to read header: %v", err))
245+
return
222246
}
247+
// TODO check opcode.
223248
}
224249
close(c.readDone)
225250
}

websocket_test.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ var httpclient = &http.Client{
1616
Timeout: time.Second * 15,
1717
}
1818

19-
func TestHandshake(t *testing.T) {
19+
func TestConnection(t *testing.T) {
2020
t.Parallel()
2121

22+
obj := make(chan interface{})
2223
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2324
c, err := websocket.Accept(w, r,
2425
websocket.AcceptSubprotocols("myproto"),
@@ -27,19 +28,34 @@ func TestHandshake(t *testing.T) {
2728
t.Errorf("failed to accept connection: %v", err)
2829
return
2930
}
30-
_ = c
31+
defer c.Close(websocket.StatusInternalError, "")
32+
33+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
34+
defer cancel()
35+
36+
var v interface{}
37+
err = websocket.ReadJSON(ctx, c, &v)
38+
if err != nil {
39+
t.Error(err)
40+
return
41+
}
42+
43+
t.Log("success", v)
44+
obj <- v
45+
46+
c.Close(websocket.StatusNormalClosure, "")
3147
}))
3248
defer s.Close()
3349

34-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
50+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
3551
defer cancel()
3652
c, resp, err := websocket.Dial(ctx, s.URL,
3753
websocket.DialSubprotocols("myproto"),
3854
)
3955
if err != nil {
4056
t.Fatalf("failed to do handshake request: %v", err)
4157
}
42-
_ = c
58+
defer c.Close(websocket.StatusInternalError, "")
4359

4460
checkHeader := func(h, exp string) {
4561
t.Helper()
@@ -53,4 +69,21 @@ func TestHandshake(t *testing.T) {
5369
checkHeader("Upgrade", "websocket")
5470
checkHeader("Sec-WebSocket-Accept", "ICX+Yqv66kxgM0FcWaLWlFLwTAI=")
5571
checkHeader("Sec-WebSocket-Protocol", "myproto")
72+
73+
v := map[string]interface{}{
74+
"anmol": "wowow",
75+
}
76+
err = websocket.WriteJSON(ctx, c, v)
77+
if err != nil {
78+
t.Fatal(err)
79+
}
80+
81+
select {
82+
case v2 := <-obj:
83+
if !cmp.Equal(v, v2) {
84+
t.Fatalf("unexpected value read: %v", cmp.Diff(v, v2))
85+
}
86+
case <-time.After(time.Second * 10):
87+
t.Fatalf("test timed out")
88+
}
5689
}

0 commit comments

Comments
 (0)