Skip to content

Commit

Permalink
Decouple transport flow control from application read. (#1265)
Browse files Browse the repository at this point in the history
* Decouple transport flow control from application read.

* post-review update

* Added comment in http2_server as well.

* Added another test

* Fixed typos in comments.
  • Loading branch information
MakMukhi committed Jun 1, 2017
1 parent a113590 commit 1e47334
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 81 deletions.
8 changes: 0 additions & 8 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,3 @@ func (f *inFlow) onRead(n uint32) uint32 {
}
return 0
}

func (f *inFlow) resetPendingData() uint32 {
f.mu.Lock()
defer f.mu.Unlock()
n := f.pendingData
f.pendingData = 0
return n
}
29 changes: 11 additions & 18 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,11 +595,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
s.mu.Lock()
rstStream = s.rstStream
rstError = s.rstError
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n})
}
}
if s.state == streamDone {
s.mu.Unlock()
return
Expand Down Expand Up @@ -831,9 +826,6 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
if s.state == streamDone {
return
}
if w := t.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
Expand All @@ -845,22 +837,26 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
t.notifyError(connectionErrorf(true, err, "%v", err))
return
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
// whether user application has read the data or not. Such a
// restriction is already imposed on the stream's flow control,
// and therefore the sender will be blocked anyways.
// Decoupling the connection flow control will prevent other
// active(fast) streams from starving in presence of slow or
// inactive streams.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
// The stream has been closed. Release the corresponding quota.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if err := s.fc.onData(uint32(size)); err != nil {
Expand All @@ -872,9 +868,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
Expand Down
29 changes: 11 additions & 18 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,6 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
if s.state == streamDone {
return
}
if w := t.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
Expand All @@ -477,22 +474,26 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
t.Close()
return
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
// whether user application has read the data or not. Such a
// restriction is already imposed on the stream's flow control,
// and therefore the sender will be blocked anyways.
// Decoupling the connection flow control will prevent other
// active(fast) streams from starving in presence of slow or
// inactive streams.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
// The stream has been closed. Release the corresponding quota.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if err := s.fc.onData(uint32(size)); err != nil {
Expand All @@ -502,9 +503,6 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
Expand Down Expand Up @@ -1066,11 +1064,6 @@ func (t *http2Server) closeStream(s *Stream) {
// called to interrupt the potential blocking on other goroutines.
s.cancel()
s.mu.Lock()
if q := s.fc.resetPendingData(); q > 0 {
if w := t.fc.onRead(q); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
}
if s.state == streamDone {
s.mu.Unlock()
return
Expand Down
Loading

0 comments on commit 1e47334

Please sign in to comment.