From 6ab6edbd4550c042b0d69edf1b3b987da11ccce9 Mon Sep 17 00:00:00 2001 From: Eugene Yakubovich Date: Wed, 7 Oct 2015 16:58:29 -0700 Subject: [PATCH] Ability to revoke lease This adds an ability to revoke a lease by deleting it from etcd. A corresponding API for server case is also added. Once the lease is revoked, flannel will just acquire a new one. This has limited use today but is part of #280 to add reservations. With reservations, there will be a mode to not acquire a lease if a reservation was not found. --- network/manager.go | 4 + network/network.go | 93 +++++++++++++++++----- remote/client.go | 60 +++++++++++---- remote/remote_test.go | 167 +++++++++++++++++++++++++--------------- remote/server.go | 64 +++++++++++++++ subnet/local_manager.go | 52 ++++++++++++- subnet/mock_registry.go | 112 ++++++++++++++++++++++----- subnet/registry.go | 89 ++++++++++++++------- subnet/renew.go | 48 ------------ subnet/subnet.go | 22 +++++- subnet/subnet_test.go | 34 +++++++- subnet/watch.go | 35 +++++++++ 12 files changed, 579 insertions(+), 201 deletions(-) delete mode 100644 subnet/renew.go diff --git a/network/manager.go b/network/manager.go index 9cbac8046..5a0147f4b 100644 --- a/network/manager.go +++ b/network/manager.go @@ -235,12 +235,16 @@ func (m *Manager) forEachNetwork(f func(n *Network)) { func (m *Manager) runNetwork(n *Network) { n.Run(m.extIface, func(bn backend.Network) { if m.isMultiNetwork() { + log.Infof("%v: lease acquired: %v", n.Name, bn.Lease().Subnet) + path := filepath.Join(opts.subnetDir, n.Name) + ".env" if err := writeSubnetFile(path, n.Config.Network, m.ipMasq, bn); err != nil { log.Warningf("%v failed to write subnet file: %s", n.Name, err) return } } else { + log.Infof("Lease acquired: %v", bn.Lease().Subnet) + if err := writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn); err != nil { log.Warningf("%v failed to write subnet file: %s", n.Name, err) return diff --git a/network/network.go b/network/network.go index ee7b89ed2..74f675043 100644 --- a/network/network.go +++ b/network/network.go @@ -15,12 +15,14 @@ package network import ( + "errors" "fmt" "sync" "time" log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog" "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/flannel/backend" "github.com/coreos/flannel/subnet" ) @@ -29,7 +31,10 @@ const ( renewMargin = time.Hour ) -var backends map[string]backend.Backend +var ( + errInterrupted = errors.New("interrupted") + errCanceled = errors.New("canceled") +) type Network struct { Name string @@ -91,38 +96,45 @@ func (n *Network) init() error { return nil } -func (n *Network) Run(extIface *backend.ExternalInterface, inited func(bn backend.Network)) { - wg := sync.WaitGroup{} - -For: +func (n *Network) retryInit() error { for { err := n.init() - switch err { - case nil: - break For - case context.Canceled: - return - default: - log.Error(err) - select { - case <-n.ctx.Done(): - return - case <-time.After(time.Second): - } + if err == nil || err == context.Canceled { + return err + } + + log.Error(err) + + select { + case <-n.ctx.Done(): + return n.ctx.Err() + case <-time.After(time.Second): } } +} + +func (n *Network) runOnce(extIface *backend.ExternalInterface, inited func(bn backend.Network)) error { + if err := n.retryInit(); err != nil { + return errCanceled + } inited(n.bn) + ctx, interruptFunc := context.WithCancel(n.ctx) + + wg := sync.WaitGroup{} + wg.Add(1) go func() { - n.bn.Run(n.ctx) + n.bn.Run(ctx) wg.Done() }() + evts := make(chan subnet.Event) + wg.Add(1) go func() { - subnet.LeaseRenewer(n.ctx, n.sm, n.Name, n.bn.Lease()) + subnet.WatchLease(ctx, n.sm, n.Name, n.bn.Lease().Subnet, evts) wg.Done() }() @@ -134,7 +146,48 @@ For: } }() - wg.Wait() + defer wg.Wait() + + dur := n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin + + for { + select { + case <-time.After(dur): + err := n.sm.RenewLease(n.ctx, n.Name, n.bn.Lease()) + if err != nil { + log.Error("Error renewing lease (trying again in 1 min): ", err) + dur = time.Minute + continue + } + + log.Info("Lease renewed, new expiration: ", n.bn.Lease().Expiration) + dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin + + case e := <-evts: + if e.Type == subnet.EventRemoved { + log.Warning("Lease has been revoked") + interruptFunc() + return errInterrupted + } + dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin + + case <-n.ctx.Done(): + return errCanceled + } + } +} + +func (n *Network) Run(extIface *backend.ExternalInterface, inited func(bn backend.Network)) { + for { + switch n.runOnce(extIface, inited) { + case errInterrupted: + + case errCanceled: + return + default: + panic("unexpected error returned") + } + } } func (n *Network) Cancel() { diff --git a/remote/client.go b/remote/client.go index 2381548cb..b6c7697d7 100644 --- a/remote/client.go +++ b/remote/client.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "io/ioutil" "net" "net/http" @@ -27,6 +28,7 @@ import ( "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport" "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/flannel/pkg/ip" "github.com/coreos/flannel/subnet" ) @@ -93,7 +95,7 @@ func (m *RemoteManager) mkurl(network string, parts ...string) string { func (m *RemoteManager) GetNetworkConfig(ctx context.Context, network string) (*subnet.Config, error) { url := m.mkurl(network, "config") - resp, err := m.httpGet(ctx, url) + resp, err := m.httpVerb(ctx, "GET", url, "", nil) if err != nil { return nil, err } @@ -124,7 +126,7 @@ func (m *RemoteManager) AcquireLease(ctx context.Context, network string, attrs return nil, err } - resp, err := m.httpPutPost(ctx, "POST", url, "application/json", body) + resp, err := m.httpVerb(ctx, "POST", url, "application/json", body) if err != nil { return nil, err } @@ -150,7 +152,7 @@ func (m *RemoteManager) RenewLease(ctx context.Context, network string, lease *s return err } - resp, err := m.httpPutPost(ctx, "PUT", url, "application/json", body) + resp, err := m.httpVerb(ctx, "PUT", url, "application/json", body) if err != nil { return err } @@ -169,6 +171,22 @@ func (m *RemoteManager) RenewLease(ctx context.Context, network string, lease *s return nil } +func (m *RemoteManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error { + url := m.mkurl(network, "leases", subnet.MakeSubnetKey(sn)) + + resp, err := m.httpVerb(ctx, "DELETE", url, "", nil) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return httpError(resp) + } + + return nil +} + func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{}, wr interface{}) error { if cursor != nil { c, ok := cursor.(string) @@ -179,7 +197,7 @@ func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{ url = fmt.Sprintf("%v?next=%v", url, c) } - resp, err := m.httpGet(ctx, url) + resp, err := m.httpVerb(ctx, "GET", url, "", nil) if err != nil { return err } @@ -196,6 +214,21 @@ func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{ return nil } +func (m *RemoteManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) { + url := m.mkurl(network, "leases", subnet.MakeSubnetKey(sn)) + + wr := subnet.LeaseWatchResult{} + err := m.watch(ctx, url, cursor, &wr) + if err != nil { + return subnet.LeaseWatchResult{}, err + } + if _, ok := wr.Cursor.(string); !ok { + return subnet.LeaseWatchResult{}, fmt.Errorf("watch returned non-string cursor") + } + + return wr, nil +} + func (m *RemoteManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.LeaseWatchResult, error) { url := m.mkurl(network, "leases") @@ -258,20 +291,19 @@ func (m *RemoteManager) httpDo(ctx context.Context, req *http.Request) (*http.Re } } -func (m *RemoteManager) httpGet(ctx context.Context, url string) (*http.Response, error) { - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, err +func (m *RemoteManager) httpVerb(ctx context.Context, method, url, contentType string, body []byte) (*http.Response, error) { + var r io.Reader + if body != nil { + r = bytes.NewBuffer(body) } - return m.httpDo(ctx, req) -} - -func (m *RemoteManager) httpPutPost(ctx context.Context, method, url, contentType string, body []byte) (*http.Response, error) { - req, err := http.NewRequest(method, url, bytes.NewBuffer(body)) + req, err := http.NewRequest(method, url, r) if err != nil { return nil, err } - req.Header.Set("Content-Type", contentType) + + if contentType != "" { + req.Header.Set("Content-Type", contentType) + } return m.httpDo(ctx, req) } diff --git a/remote/remote_test.go b/remote/remote_test.go index e7326cf4d..4365ed2b3 100644 --- a/remote/remote_test.go +++ b/remote/remote_test.go @@ -32,25 +32,62 @@ import ( const expectedNetwork = "10.1.0.0/16" -func TestRemote(t *testing.T) { +type fixture struct { + ctx context.Context + cancel context.CancelFunc + srvAddr string + registry *subnet.MockSubnetRegistry + sm subnet.Manager + wg sync.WaitGroup +} + +func newFixture(t *testing.T) *fixture { + f := &fixture{} + config := fmt.Sprintf(`{"Network": %q}`, expectedNetwork) - serverRegistry := subnet.NewMockRegistry("", config, nil) - sm := subnet.NewMockManager(serverRegistry) + f.registry = subnet.NewMockRegistry("", config, nil) + sm := subnet.NewMockManager(f.registry) - addr := "127.0.0.1:9999" + f.srvAddr = "127.0.0.1:9999" - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) + f.ctx, f.cancel = context.WithCancel(context.Background()) + f.wg.Add(1) go func() { - RunServer(ctx, sm, addr, "", "", "") - wg.Done() + RunServer(f.ctx, sm, f.srvAddr, "", "", "") + f.wg.Done() }() - doTestRemote(ctx, t, addr, serverRegistry) + var err error + f.sm, err = NewRemoteManager(f.srvAddr, "", "", "") + if err != nil { + panic(fmt.Sprintf("Failed to create remote mananager: %v", err)) + } + + for i := 0; ; i++ { + _, err := f.sm.GetNetworkConfig(f.ctx, "_") + if err == nil { + break + } + + if isConnRefused(err) { + if i == 100 { + t.Fatalf("Out of connection retries") + } + + fmt.Println("Connection refused, retrying...") + time.Sleep(300 * time.Millisecond) + continue + } + + t.Fatalf("GetNetworkConfig failed: %v", err) + } - cancel() - wg.Wait() + return f +} + +func (f *fixture) Close() { + f.cancel() + f.wg.Wait() } func mustParseIP4(s string) ip.IP4 { @@ -81,38 +118,29 @@ func isConnRefused(err error) bool { return false } -func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string, serverRegistry *subnet.MockSubnetRegistry) { - sm, err := NewRemoteManager(remoteAddr, "", "", "") +func TestGetConfig(t *testing.T) { + f := newFixture(t) + defer f.Close() + + cfg, err := f.sm.GetNetworkConfig(f.ctx, "_") if err != nil { - t.Fatalf("Failed to create remote mananager: %v", err) + t.Fatalf("GetNetworkConfig failed: %v", err) } - for i := 0; ; i++ { - cfg, err := sm.GetNetworkConfig(ctx, "_") - if err != nil { - if isConnRefused(err) { - if i == 100 { - t.Fatalf("Out of connection retries") - } - - fmt.Println("Connection refused, retrying...") - time.Sleep(300 * time.Millisecond) - continue - } - - t.Fatalf("GetNetworkConfig failed: %v", err) - } - - if cfg.Network.String() != expectedNetwork { - t.Errorf("GetNetworkConfig returned bad network: %v vs %v", cfg.Network, expectedNetwork) - } - break + if cfg.Network.String() != expectedNetwork { + t.Errorf("GetNetworkConfig returned bad network: %v vs %v", cfg.Network, expectedNetwork) } +} + +func TestAcquireRenewLease(t *testing.T) { + f := newFixture(t) + defer f.Close() attrs := &subnet.LeaseAttrs{ PublicIP: mustParseIP4("1.1.1.1"), } - l, err := sm.AcquireLease(ctx, "_", attrs) + + l, err := f.sm.AcquireLease(f.ctx, "_", attrs) if err != nil { t.Fatalf("AcquireLease failed: %v", err) } @@ -121,28 +149,20 @@ func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string, serverRe t.Errorf("AcquireLease returned subnet not in network: %v (in %v)", l.Subnet, expectedNetwork) } - if err = sm.RenewLease(ctx, "_", l); err != nil { + if err = f.sm.RenewLease(f.ctx, "_", l); err != nil { t.Errorf("RenewLease failed: %v", err) } - - doTestWatchLeases(t, sm) - - doTestWatchNetworks(t, sm, serverRegistry) } -func doTestWatchLeases(t *testing.T, sm subnet.Manager) { - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - defer func() { - cancel() - wg.Wait() - }() +func TestWatchLeases(t *testing.T) { + f := newFixture(t) + defer f.Close() events := make(chan []subnet.Event) + f.wg.Add(1) go func() { - subnet.WatchLeases(ctx, sm, "_", nil, events) - wg.Done() + subnet.WatchLeases(f.ctx, f.sm, "_", nil, events) + f.wg.Done() }() // skip over the initial snapshot @@ -151,7 +171,7 @@ func doTestWatchLeases(t *testing.T, sm subnet.Manager) { attrs := &subnet.LeaseAttrs{ PublicIP: mustParseIP4("1.1.1.2"), } - l, err := sm.AcquireLease(ctx, "_", attrs) + l, err := f.sm.AcquireLease(f.ctx, "_", attrs) if err != nil { t.Errorf("AcquireLease failed: %v", err) return @@ -176,19 +196,38 @@ func doTestWatchLeases(t *testing.T, sm subnet.Manager) { } } -func doTestWatchNetworks(t *testing.T, sm subnet.Manager, serverRegistry *subnet.MockSubnetRegistry) { - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - defer func() { - cancel() - wg.Wait() - }() +func TestRevokeLease(t *testing.T) { + f := newFixture(t) + defer f.Close() + + attrs := &subnet.LeaseAttrs{ + PublicIP: mustParseIP4("1.1.1.1"), + } + + l, err := f.sm.AcquireLease(f.ctx, "_", attrs) + if err != nil { + t.Fatalf("AcquireLease failed: %v", err) + } + + if err := f.sm.RevokeLease(f.ctx, "_", l.Subnet); err != nil { + t.Fatalf("RevokeLease failed: %v", err) + } + + _, err = f.sm.WatchLease(f.ctx, "_", l.Subnet, nil) + if err == nil { + t.Fatalf("Revoked lease found") + } +} + +func TestWatchNetworks(t *testing.T) { + f := newFixture(t) + defer f.Close() events := make(chan []subnet.Event) + f.wg.Add(1) go func() { - subnet.WatchNetworks(ctx, sm, events) - wg.Done() + subnet.WatchNetworks(f.ctx, f.sm, events) + f.wg.Done() }() // skip over the initial snapshot @@ -196,7 +235,7 @@ func doTestWatchNetworks(t *testing.T, sm subnet.Manager, serverRegistry *subnet expectedNetname := "foobar" config := fmt.Sprintf(`{"Network": %q}`, expectedNetwork) - err := serverRegistry.CreateNetwork(ctx, expectedNetname, config) + err := f.registry.CreateNetwork(f.ctx, expectedNetname, config) if err != nil { t.Errorf("create network failed: %v", err) } @@ -216,7 +255,7 @@ func doTestWatchNetworks(t *testing.T, sm subnet.Manager, serverRegistry *subnet t.Errorf("WatchNetwork create produced wrong network: expected %s, got %s", expectedNetname, evt.Network) } - err = serverRegistry.DeleteNetwork(ctx, expectedNetname) + err = f.registry.DeleteNetwork(f.ctx, expectedNetname) if err != nil { t.Errorf("delete network failed: %v", err) } diff --git a/remote/server.go b/remote/server.go index 788cb48e3..f73bf1870 100644 --- a/remote/server.go +++ b/remote/server.go @@ -114,6 +114,30 @@ func handleRenewLease(ctx context.Context, sm subnet.Manager, w http.ResponseWri jsonResponse(w, http.StatusOK, lease) } +func handleRevokeLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + network := mux.Vars(r)["network"] + if network == "_" { + network = "" + } + + sn := subnet.ParseSubnetKey(mux.Vars(r)["subnet"]) + if sn == nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "failed to parse subnet") + return + } + + if err := sm.RevokeLease(ctx, network, *sn); err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, err) + return + } + + w.WriteHeader(http.StatusOK) +} + func getCursor(u *url.URL) interface{} { vals, ok := u.Query()["next"] if !ok { @@ -122,6 +146,44 @@ func getCursor(u *url.URL) interface{} { return vals[0] } +// GET /{network}/leases/subnet?next=cursor +func handleWatchLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + network := mux.Vars(r)["network"] + if network == "_" { + network = "" + } + + sn := subnet.ParseSubnetKey(mux.Vars(r)["subnet"]) + if sn == nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "bad subnet") + return + } + + cursor := getCursor(r.URL) + + wr, err := sm.WatchLease(ctx, network, *sn, cursor) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, err) + return + } + + switch wr.Cursor.(type) { + case string: + case fmt.Stringer: + wr.Cursor = wr.Cursor.(fmt.Stringer).String() + default: + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, fmt.Errorf("internal error: watch cursor is of unknown type")) + return + } + + jsonResponse(w, http.StatusOK, wr) +} + // GET /{network}/leases?next=cursor func handleWatchLeases(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) { defer r.Body.Close() @@ -262,7 +324,9 @@ func RunServer(ctx context.Context, sm subnet.Manager, listenAddr, cafile, certf r := mux.NewRouter() r.HandleFunc("/v1/{network}/config", bindHandler(handleGetNetworkConfig, ctx, sm)).Methods("GET") r.HandleFunc("/v1/{network}/leases", bindHandler(handleAcquireLease, ctx, sm)).Methods("POST") + r.HandleFunc("/v1/{network}/leases/{subnet}", bindHandler(handleWatchLease, ctx, sm)).Methods("GET") r.HandleFunc("/v1/{network}/leases/{subnet}", bindHandler(handleRenewLease, ctx, sm)).Methods("PUT") + r.HandleFunc("/v1/{network}/leases/{subnet}", bindHandler(handleRevokeLease, ctx, sm)).Methods("DELETE") r.HandleFunc("/v1/{network}/leases", bindHandler(handleWatchLeases, ctx, sm)).Methods("GET") r.HandleFunc("/v1/", bindHandler(handleNetworks, ctx, sm)).Methods("GET") diff --git a/subnet/local_manager.go b/subnet/local_manager.go index 91148b27f..9bb3f7a09 100644 --- a/subnet/local_manager.go +++ b/subnet/local_manager.go @@ -170,6 +170,10 @@ OuterLoop: } } +func (m *LocalManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error { + return m.registry.deleteSubnet(ctx, network, sn) +} + func (m *LocalManager) RenewLease(ctx context.Context, network string, lease *Lease) error { // TODO(eyakubovich): propogate ctx into registry exp, err := m.registry.updateSubnet(ctx, network, lease.Subnet, &lease.Attrs, subnetTTL, 0) @@ -199,9 +203,49 @@ func getNextIndex(cursor interface{}) (uint64, error) { return nextIndex, nil } +func (m *LocalManager) leaseWatchReset(ctx context.Context, network string, sn ip.IP4Net) (LeaseWatchResult, error) { + l, index, err := m.registry.getSubnet(ctx, network, sn) + if err != nil { + return LeaseWatchResult{}, err + } + + return LeaseWatchResult{ + Snapshot: []Lease{*l}, + Cursor: watchCursor{index}, + }, nil +} + +func (m *LocalManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error) { + if cursor == nil { + return m.leaseWatchReset(ctx, network, sn) + } + + nextIndex, err := getNextIndex(cursor) + if err != nil { + return LeaseWatchResult{}, err + } + + evt, index, err := m.registry.watchSubnet(ctx, network, nextIndex, sn) + + switch { + case err == nil: + return LeaseWatchResult{ + Events: []Event{evt}, + Cursor: watchCursor{index}, + }, nil + + case isIndexTooSmall(err): + log.Warning("Watch of subnet leases failed because etcd index outside history window") + return m.leaseWatchReset(ctx, network, sn) + + default: + return LeaseWatchResult{}, err + } +} + func (m *LocalManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error) { if cursor == nil { - return m.leaseWatchReset(ctx, network) + return m.leasesWatchReset(ctx, network) } nextIndex, err := getNextIndex(cursor) @@ -220,7 +264,7 @@ func (m *LocalManager) WatchLeases(ctx context.Context, network string, cursor i case isIndexTooSmall(err): log.Warning("Watch of subnet leases failed because etcd index outside history window") - return m.leaseWatchReset(ctx, network) + return m.leasesWatchReset(ctx, network) default: return LeaseWatchResult{}, err @@ -265,8 +309,8 @@ func isIndexTooSmall(err error) bool { return ok && etcdErr.Code == etcd.ErrorCodeEventIndexCleared } -// leaseWatchReset is called when incremental lease watch failed and we need to grab a snapshot -func (m *LocalManager) leaseWatchReset(ctx context.Context, network string) (LeaseWatchResult, error) { +// leasesWatchReset is called when incremental lease watch failed and we need to grab a snapshot +func (m *LocalManager) leasesWatchReset(ctx context.Context, network string) (LeaseWatchResult, error) { wr := LeaseWatchResult{} leases, index, err := m.registry.getSubnets(ctx, network) diff --git a/subnet/mock_registry.go b/subnet/mock_registry.go index 2c74361c6..3f15ce811 100644 --- a/subnet/mock_registry.go +++ b/subnet/mock_registry.go @@ -17,6 +17,7 @@ package subnet import ( "fmt" "strings" + "sync" "time" etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client" @@ -26,9 +27,36 @@ import ( ) type netwk struct { - config string - subnets []Lease - subnetEvents chan event + config string + subnets []Lease + subnetsEvents chan event + + mux sync.Mutex + subnetEvents map[ip.IP4Net]chan event +} + +func (n *netwk) sendSubnetEvent(sn ip.IP4Net, e event) { + n.subnetsEvents <- e + + n.mux.Lock() + c, ok := n.subnetEvents[sn] + if !ok { + c = make(chan event, 10) + n.subnetEvents[sn] = c + } + n.mux.Unlock() + c <- e +} + +func (n *netwk) subnetEventsChan(sn ip.IP4Net) chan event { + n.mux.Lock() + c, ok := n.subnetEvents[sn] + if !ok { + c = make(chan event, 10) + n.subnetEvents[sn] = c + } + n.mux.Unlock() + return c } type event struct { @@ -50,9 +78,10 @@ func NewMockRegistry(network, config string, initialSubnets []Lease) *MockSubnet } msr.networks[network] = &netwk{ - config: config, - subnets: initialSubnets, - subnetEvents: make(chan event, 1000), + config: config, + subnets: initialSubnets, + subnetsEvents: make(chan event, 1000), + subnetEvents: make(map[ip.IP4Net]chan event), } return msr } @@ -82,6 +111,19 @@ func (msr *MockSubnetRegistry) getSubnets(ctx context.Context, network string) ( return n.subnets, msr.index, nil } +func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) { + n, ok := msr.networks[network] + if !ok { + return nil, 0, fmt.Errorf("Network %s not found", network) + } + for _, l := range n.subnets { + if l.Subnet.Equal(sn) { + return &l, msr.index, nil + } + } + return nil, msr.index, fmt.Errorf("subnet %s not found", sn) +} + func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) { n, ok := msr.networks[network] if !ok { @@ -113,7 +155,8 @@ func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string, Lease: l, Network: network, } - n.subnetEvents <- event{evt, msr.index} + + n.sendSubnetEvent(sn, event{evt, msr.index}) return exp, nil } @@ -128,20 +171,22 @@ func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network string, exp := clock.Now().Add(ttl) - sub, _, err := n.findSubnet(sn) + sub, i, err := n.findSubnet(sn) if err != nil { return time.Time{}, err } sub.Attrs = *attrs - sub.Expiration = exp sub.asof = msr.index - n.subnetEvents <- event{ + sub.Expiration = exp + n.subnets[i] = sub + n.sendSubnetEvent(sn, event{ Event{ Type: EventAdded, Lease: sub, Network: network, - }, msr.index} + }, msr.index, + }) return sub.Expiration, nil } @@ -162,12 +207,13 @@ func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, network string, n.subnets[i] = n.subnets[len(n.subnets)-1] n.subnets = n.subnets[:len(n.subnets)-1] sub.asof = msr.index - n.subnetEvents <- event{ + n.sendSubnetEvent(sn, event{ Event{ Type: EventRemoved, Lease: sub, Network: network, - }, msr.index} + }, msr.index, + }) return nil } @@ -192,7 +238,36 @@ func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, network string, case <-ctx.Done(): return Event{}, msr.index, ctx.Err() - case e := <-n.subnetEvents: + case e := <-n.subnetsEvents: + if e.index <= since { + continue + } + return e.evt, msr.index, nil + } + } +} + +func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) { + n, ok := msr.networks[network] + if !ok { + return Event{}, msr.index, fmt.Errorf("Network %s not found", network) + } + + for { + if since < msr.index { + return Event{}, msr.index, etcd.Error{ + Code: etcd.ErrorCodeEventIndexCleared, + Cause: "out of date", + Message: "cursor is out of date", + Index: msr.index, + } + } + + select { + case <-ctx.Done(): + return Event{}, msr.index, ctx.Err() + + case e := <-n.subnetEventsChan(sn): if e.index <= since { continue } @@ -212,12 +287,12 @@ func (msr *MockSubnetRegistry) expireSubnet(network string, sn ip.IP4Net) { n.subnets[i] = n.subnets[len(n.subnets)-1] n.subnets = n.subnets[:len(n.subnets)-1] sub.asof = msr.index - n.subnetEvents <- event{ + n.sendSubnetEvent(sn, event{ Event{ Type: EventRemoved, Lease: sub, }, msr.index, - } + }) } } @@ -280,8 +355,9 @@ func (msr *MockSubnetRegistry) CreateNetwork(ctx context.Context, network, confi msr.index += 1 n := &netwk{ - config: network, - subnetEvents: make(chan event, 1000), + config: network, + subnetsEvents: make(chan event, 1000), + subnetEvents: make(map[ip.IP4Net]chan event), } msr.networks[network] = n diff --git a/subnet/registry.go b/subnet/registry.go index 22f957d60..79cc88c90 100644 --- a/subnet/registry.go +++ b/subnet/registry.go @@ -18,10 +18,8 @@ import ( "encoding/json" "errors" "fmt" - "net" "path" "regexp" - "strconv" "sync" "time" @@ -41,10 +39,12 @@ var ( type Registry interface { getNetworkConfig(ctx context.Context, network string) (string, error) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) + getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) + watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) getNetworks(ctx context.Context) ([]string, uint64, error) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) } @@ -127,29 +127,31 @@ func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) ( leases := []Lease{} for _, node := range resp.Node.Nodes { - if sn := parseSubnetKey(node.Key); sn != nil { - attrs := &LeaseAttrs{} - if err = json.Unmarshal([]byte(node.Value), attrs); err == nil { - exp := time.Time{} - if node.Expiration != nil { - exp = *node.Expiration - } - - lease := Lease{ - Subnet: *sn, - Attrs: *attrs, - Expiration: exp, - } - leases = append(leases, lease) - } + l, err := nodeToLease(node) + if err != nil { + log.Warningf("Ignoring bad subnet node: %v", err) + continue } + + leases = append(leases, *l) } return leases, resp.Index, nil } +func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) { + key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn)) + resp, err := esr.client().Get(ctx, key, nil) + if err != nil { + return nil, 0, err + } + + l, err := nodeToLease(resp.Node) + return l, resp.Index, err +} + func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) { - key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn.StringSep(".", "-")) + key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn)) value, err := json.Marshal(attrs) if err != nil { return time.Time{}, err @@ -170,7 +172,7 @@ func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, } func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) { - key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn.StringSep(".", "-")) + key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn)) value, err := json.Marshal(attrs) if err != nil { return time.Time{}, err @@ -189,7 +191,7 @@ func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, } func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error { - key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn.StringSep(".", "-")) + key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn)) _, err := esr.client().Delete(ctx, key, nil) return err } @@ -209,6 +211,21 @@ func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, return evt, e.Node.ModifiedIndex, err } +func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) { + key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn)) + opts := &etcd.WatcherOptions{ + AfterIndex: since, + } + + e, err := esr.client().Watcher(key, opts).Next(ctx) + if err != nil { + return Event{}, 0, err + } + + evt, err := parseSubnetWatchResponse(e) + return evt, e.Index, err +} + // getNetworks queries etcd to get a list of network names. It returns the // networks along with the 'as-of' etcd-index that can be used as the starting // point for etcd watch. @@ -280,7 +297,7 @@ func ensureExpiration(resp *etcd.Response, ttl time.Duration) { } func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) { - sn := parseSubnetKey(resp.Node.Key) + sn := ParseSubnetKey(resp.Node.Key) if sn == nil { return Event{}, fmt.Errorf("%v %q: not a subnet, skipping", resp.Action, resp.Node.Key) } @@ -366,14 +383,28 @@ func (esr *etcdSubnetRegistry) parseNetworkKey(s string) (string, bool) { return "", false } -func parseSubnetKey(s string) *ip.IP4Net { - if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 { - snIp := net.ParseIP(parts[1]).To4() - prefixLen, err := strconv.ParseUint(parts[2], 10, 5) - if snIp != nil && err == nil { - return &ip.IP4Net{IP: ip.FromIP(snIp), PrefixLen: uint(prefixLen)} - } +func nodeToLease(node *etcd.Node) (*Lease, error) { + sn := ParseSubnetKey(node.Key) + if sn == nil { + return nil, fmt.Errorf("failed to parse subnet key %q", *sn) + } + + attrs := &LeaseAttrs{} + if err := json.Unmarshal([]byte(node.Value), attrs); err != nil { + return nil, err + } + + exp := time.Time{} + if node.Expiration != nil { + exp = *node.Expiration + } + + lease := Lease{ + Subnet: *sn, + Attrs: *attrs, + Expiration: exp, + asof: node.ModifiedIndex, } - return nil + return &lease, nil } diff --git a/subnet/renew.go b/subnet/renew.go deleted file mode 100644 index 1370a74fa..000000000 --- a/subnet/renew.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2015 flannel authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package subnet - -import ( - "time" - - log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog" - "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context" -) - -const ( - renewMargin = time.Hour -) - -func LeaseRenewer(ctx context.Context, m Manager, network string, lease *Lease) { - dur := lease.Expiration.Sub(clock.Now()) - renewMargin - - for { - select { - case <-time.After(dur): - err := m.RenewLease(ctx, network, lease) - if err != nil { - log.Error("Error renewing lease (trying again in 1 min): ", err) - dur = time.Minute - continue - } - - log.Info("Lease renewed, new expiration: ", lease.Expiration) - dur = lease.Expiration.Sub(clock.Now()) - renewMargin - - case <-ctx.Done(): - return - } - } -} diff --git a/subnet/subnet.go b/subnet/subnet.go index 625d20999..ff14f88ef 100644 --- a/subnet/subnet.go +++ b/subnet/subnet.go @@ -18,6 +18,8 @@ import ( "encoding/json" "errors" "fmt" + "net" + "strconv" "time" "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context" @@ -39,7 +41,7 @@ type Lease struct { } func (l *Lease) Key() string { - return l.Subnet.StringSep(".", "-") + return MakeSubnetKey(l.Subnet) } type ( @@ -103,10 +105,28 @@ func (et *EventType) UnmarshalJSON(data []byte) error { return nil } +func ParseSubnetKey(s string) *ip.IP4Net { + if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 { + snIp := net.ParseIP(parts[1]).To4() + prefixLen, err := strconv.ParseUint(parts[2], 10, 5) + if snIp != nil && err == nil { + return &ip.IP4Net{IP: ip.FromIP(snIp), PrefixLen: uint(prefixLen)} + } + } + + return nil +} + +func MakeSubnetKey(sn ip.IP4Net) string { + return sn.StringSep(".", "-") +} + type Manager interface { GetNetworkConfig(ctx context.Context, network string) (*Config, error) AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error) RenewLease(ctx context.Context, network string, lease *Lease) error + RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error + WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error) WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error) WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error) } diff --git a/subnet/subnet_test.go b/subnet/subnet_test.go index 15069ecc4..a851e5af8 100644 --- a/subnet/subnet_test.go +++ b/subnet/subnet_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client" "github.com/coreos/flannel/Godeps/_workspace/src/github.com/jonboulle/clockwork" "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context" @@ -248,10 +249,14 @@ func TestRenewLease(t *testing.T) { t.Fatal("AcquireLease failed: ", err) } - go LeaseRenewer(ctx, sm, "_", l) + now = now.Add(subnetTTL) fakeClock.Advance(24 * time.Hour) + if err := sm.RenewLease(ctx, "_", l); err != nil { + t.Fatal("RenewLease failed: ", err) + } + // check that it's still good n, err := msr.getNetwork(ctx, "_") if err != nil { @@ -259,8 +264,9 @@ func TestRenewLease(t *testing.T) { } for _, sn := range n.subnets { if sn.Subnet.Equal(l.Subnet) { - if !sn.Expiration.Equal(now.Add(subnetTTL)) { - t.Errorf("Failed to renew lease: bad expiration; expected %v, got %v", now.Add(subnetTTL), sn.Expiration) + expected := now.Add(subnetTTL) + if !sn.Expiration.Equal(expected) { + t.Errorf("Failed to renew lease: bad expiration; expected %v, got %v", expected, sn.Expiration) } if !reflect.DeepEqual(sn.Attrs, attrs) { t.Errorf("LeaseAttrs changed: was %#v, now %#v", attrs, sn.Attrs) @@ -272,6 +278,28 @@ func TestRenewLease(t *testing.T) { t.Fatalf("Failed to find acquired lease") } +func TestLeaseRevoked(t *testing.T) { + msr := newDummyRegistry() + sm := NewMockManager(msr) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + l := acquireLease(ctx, t, sm) + + if err := sm.RevokeLease(ctx, "_", l.Subnet); err != nil { + t.Fatalf("RevokeLease failed: %v", err) + } + + _, _, err := msr.getSubnet(ctx, "_", l.Subnet) + if err == nil { + t.Fatalf("Revoked lease still exists") + } + if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code != etcd.ErrorCodeKeyNotFound { + t.Fatalf("getSubnets after revoked lease returned unexpected error: %v", err) + } +} + func TestWatchGetNetworks(t *testing.T) { msr := newDummyRegistry() sm := NewMockManager(msr) diff --git a/subnet/watch.go b/subnet/watch.go index bb21c43be..56f0cfb88 100644 --- a/subnet/watch.go +++ b/subnet/watch.go @@ -19,6 +19,8 @@ import ( log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog" "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context" + + "github.com/coreos/flannel/pkg/ip" ) // WatchLeases performs a long term watch of the given network's subnet leases @@ -42,6 +44,7 @@ func WatchLeases(ctx context.Context, sm Manager, network string, ownLease *Leas time.Sleep(time.Second) continue } + cursor = res.Cursor batch := []Event{} @@ -245,3 +248,35 @@ func (nw *netWatcher) remove(network string) Event { return Event{EventRemoved, Lease{}, network} } + +// WatchLease performs a long term watch of the given network's subnet lease +// and communicates addition/deletion events on receiver channel. It takes care +// of handling "fall-behind" logic where the history window has advanced too far +// and it needs to diff the latest snapshot with its saved state and generate events +func WatchLease(ctx context.Context, sm Manager, network string, sn ip.IP4Net, receiver chan Event) { + var cursor interface{} + + for { + wr, err := sm.WatchLease(ctx, network, sn, cursor) + if err != nil { + if err == context.Canceled || err == context.DeadlineExceeded { + return + } + + log.Errorf("Subnet watch failed: %v", err) + time.Sleep(time.Second) + continue + } + + if len(wr.Snapshot) > 0 { + receiver <- Event{ + Type: EventAdded, + Lease: wr.Snapshot[0], + } + } else { + receiver <- wr.Events[0] + } + + cursor = wr.Cursor + } +}