Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channelz: ensure SubChannel is always registered with parent #7094

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions channelz/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,10 @@ func (s) TestGetChannel(t *testing.T) {
},
})

subChan := channelz.RegisterSubChannel(cids[0].ID, refNames[2])
subChan, err := channelz.RegisterSubChannel(cids[0], refNames[2])
if err != nil {
t.Fatal(err)
}
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: "SubChannel Created",
Severity: channelz.CtInfo,
Expand Down Expand Up @@ -425,7 +428,10 @@ func (s) TestGetSubChannel(t *testing.T) {
Desc: "Channel Created",
Severity: channelz.CtInfo,
})
subChan := channelz.RegisterSubChannel(chann.ID, refNames[1])
subChan, err := channelz.RegisterSubChannel(chann, refNames[1])
if err != nil {
t.Fatal(err)
}
defer channelz.RemoveEntry(subChan.ID)
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: subchanCreated,
Expand Down
6 changes: 5 additions & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,12 +838,16 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
addrs: copyAddressesWithoutBalancerAttributes(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz.ID, ""),
resetBackoff: make(chan struct{}),
stateChan: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)

var err error
ac.channelz, err = channelz.RegisterSubChannel(cc.channelz, "")
if err != nil {
return nil, err
}
channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
Desc: "Subchannel created",
Severity: channelz.CtInfo,
Expand Down
19 changes: 14 additions & 5 deletions internal/channelz/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package channelz

import (
"errors"
"sync/atomic"
"time"

Expand Down Expand Up @@ -143,21 +144,29 @@ func RegisterChannel(parent *Channel, target string) *Channel {
// Returns a unique channelz identifier assigned to this subChannel.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterSubChannel(pid int64, ref string) *SubChannel {
func RegisterSubChannel(pid *Channel, ref string) (*SubChannel, error) {
if pid == nil {
return nil, errors.New("a SubChannel's parent id cannot be nil")
}
Comment on lines +148 to +150
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not return an error here since it complicates the code so much, and it should just be an invariant of the code like any other dereference. Just panic and we'll make sure RegisterSubChannel is always called properly.

Actually, this change is pretty important to get done quickly, so I created #7101 and we will be doing a patch release with it ASAP. Thank you for the fix!


id := IDGen.genID()
if !IsOn() {
return &SubChannel{ID: id}
return &SubChannel{
RefName: ref,
parent: pid,
ID: id,
}, nil
}

sc := &SubChannel{
RefName: ref,
ID: id,
sockets: make(map[int64]string),
parent: db.getChannel(pid),
parent: pid,
trace: &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},
}
db.addSubChannel(id, sc, pid)
return sc
db.addSubChannel(id, sc, pid.ID)
return sc, nil
}

// RegisterServer registers the given server s in channelz database. It returns
Expand Down
31 changes: 24 additions & 7 deletions internal/transport/keepalive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
case err := <-errCh:
if err != io.EOF {
t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)

}
case <-ctx.Done():
t.Fatalf("Test timed out before server closed the connection.")
Expand Down Expand Up @@ -256,15 +255,21 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
// logic is running even without any active streams.
func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
pid := channelz.RegisterChannel(nil, "test channel parent")
defer channelz.RemoveEntry(pid.ID)
subChan, err := channelz.RegisterSubChannel(pid, "test subchan")
if err != nil {
t.Fatal(err)
}
defer channelz.RemoveEntry(subChan.ID)
copts := ConnectOptions{
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchan"),
ChannelzParent: subChan,
KeepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Millisecond,
Timeout: 10 * time.Millisecond,
PermitWithoutStream: true,
},
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))
Expand All @@ -287,14 +292,20 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
// active streams, and therefore the transport stays open.
func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
pid := channelz.RegisterChannel(nil, "test channel parent")
defer channelz.RemoveEntry(pid.ID)
subChan, err := channelz.RegisterSubChannel(pid, "test subchan")
if err != nil {
t.Fatal(err)
}
defer channelz.RemoveEntry(subChan.ID)
copts := ConnectOptions{
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchan"),
ChannelzParent: subChan,
KeepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Millisecond,
Timeout: 10 * time.Millisecond,
},
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))
Expand All @@ -318,14 +329,20 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
// transport even when there is an active stream.
func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
connCh := make(chan net.Conn, 1)
pid := channelz.RegisterChannel(nil, "test channel parent")
defer channelz.RemoveEntry(pid.ID)
subChan, err := channelz.RegisterSubChannel(pid, "test subchan")
if err != nil {
t.Fatal(err)
}
defer channelz.RemoveEntry(subChan.ID)
copts := ConnectOptions{
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchan"),
ChannelzParent: subChan,
KeepaliveParams: keepalive.ClientParameters{
Time: 500 * time.Millisecond,
Timeout: 500 * time.Millisecond,
},
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
// TODO(i/6099): Setup a server which can ping and no-ping based on a flag to
// reduce the flakiness in this test.
client, cancel := setUpWithNoPingServer(t, copts, connCh)
Expand Down
43 changes: 35 additions & 8 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,13 @@ func setUp(t *testing.T, port int, ht hType) (*server, *http2Client, func()) {
func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
server := setUpServerOnly(t, port, sc, ht)
addr := resolver.Address{Addr: "localhost:" + server.port}
copts.ChannelzParent = channelz.RegisterSubChannel(-1, "test channel")
pid := channelz.RegisterChannel(nil, "test channel parent")
t.Cleanup(func() { channelz.RemoveEntry(pid.ID) })
var err error
copts.ChannelzParent, err = channelz.RegisterSubChannel(pid, "test channel")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { channelz.RemoveEntry(copts.ChannelzParent.ID) })

connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
Expand Down Expand Up @@ -1186,7 +1192,6 @@ func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) {
if _, err := sstream1.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("_.Read(_) = %v, want io.EOF", err)
}

}

func (s) TestServerWithMisbehavedClient(t *testing.T) {
Expand Down Expand Up @@ -1321,7 +1326,12 @@ func (s) TestClientHonorsConnectContext(t *testing.T) {
connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
time.AfterFunc(100*time.Millisecond, cancel)

parent := channelz.RegisterSubChannel(-1, "test channel")
pid := channelz.RegisterChannel(nil, "test channel parent")
t.Cleanup(func() { channelz.RemoveEntry(pid.ID) })
parent, err := channelz.RegisterSubChannel(pid, "test channel")
if err != nil {
t.Fatal(err)
}
copts := ConnectOptions{ChannelzParent: parent}
defer channelz.RemoveEntry(parent.ID)
_, err = NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
Expand Down Expand Up @@ -1414,7 +1424,12 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) {
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
defer cancel()

parent := channelz.RegisterSubChannel(-1, "test channel")
pid := channelz.RegisterChannel(nil, "test channel parent")
t.Cleanup(func() { channelz.RemoveEntry(pid.ID) })
parent, err := channelz.RegisterSubChannel(pid, "test channel")
if err != nil {
t.Fatal(err)
}
defer channelz.RemoveEntry(parent.ID)
copts := ConnectOptions{ChannelzParent: parent}
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
Expand Down Expand Up @@ -2423,11 +2438,17 @@ func (s) TestClientHandshakeInfo(t *testing.T) {
defer cancel()
creds := &attrTransportCreds{}

pid := channelz.RegisterChannel(nil, "test channel parent")
t.Cleanup(func() { channelz.RemoveEntry(pid.ID) })
subChan, err := channelz.RegisterSubChannel(pid, "test subchannel")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { channelz.RemoveEntry(subChan.ID) })
copts := ConnectOptions{
TransportCredentials: creds,
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchannel"),
ChannelzParent: subChan,
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
Expand Down Expand Up @@ -2465,11 +2486,17 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) {
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
}

pid := channelz.RegisterChannel(nil, "test channel parent")
t.Cleanup(func() { channelz.RemoveEntry(pid.ID) })
subChan, err := channelz.RegisterSubChannel(pid, "test subchannel")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { channelz.RemoveEntry(subChan.ID) })
copts := ConnectOptions{
Dialer: dialer,
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchannel"),
ChannelzParent: subChan,
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
Expand Down
10 changes: 8 additions & 2 deletions test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,14 @@ func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
// Socket1 Socket2

topChan := channelz.RegisterChannel(nil, "")
subChan1 := channelz.RegisterSubChannel(topChan.ID, "")
subChan2 := channelz.RegisterSubChannel(topChan.ID, "")
subChan1, err := channelz.RegisterSubChannel(topChan, "")
if err != nil {
t.Fatal(err)
}
subChan2, err := channelz.RegisterSubChannel(topChan, "")
if err != nil {
t.Fatal(err)
}
skt1 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})
skt2 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})

Expand Down
Loading