Skip to content

Commit

Permalink
Ability to revoke lease
Browse files Browse the repository at this point in the history
This adds an ability to revoke a lease by deleting it
from etcd. A corresponding API for server case is also added.
Once the lease is revoked, flannel will just acquire a new
one. This has limited use today but is part of #280 to
add reservations. With reservations, there will be a mode
to not acquire a lease if a reservation was not found.
  • Loading branch information
Eugene Yakubovich committed Nov 11, 2015
1 parent 73863e6 commit 6ab6edb
Show file tree
Hide file tree
Showing 12 changed files with 579 additions and 201 deletions.
4 changes: 4 additions & 0 deletions network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,16 @@ func (m *Manager) forEachNetwork(f func(n *Network)) {
func (m *Manager) runNetwork(n *Network) {
n.Run(m.extIface, func(bn backend.Network) {
if m.isMultiNetwork() {
log.Infof("%v: lease acquired: %v", n.Name, bn.Lease().Subnet)

path := filepath.Join(opts.subnetDir, n.Name) + ".env"
if err := writeSubnetFile(path, n.Config.Network, m.ipMasq, bn); err != nil {
log.Warningf("%v failed to write subnet file: %s", n.Name, err)
return
}
} else {
log.Infof("Lease acquired: %v", bn.Lease().Subnet)

if err := writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn); err != nil {
log.Warningf("%v failed to write subnet file: %s", n.Name, err)
return
Expand Down
93 changes: 73 additions & 20 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package network

import (
"errors"
"fmt"
"sync"
"time"

log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"

"github.com/coreos/flannel/backend"
"github.com/coreos/flannel/subnet"
)
Expand All @@ -29,7 +31,10 @@ const (
renewMargin = time.Hour
)

var backends map[string]backend.Backend
var (
errInterrupted = errors.New("interrupted")
errCanceled = errors.New("canceled")
)

type Network struct {
Name string
Expand Down Expand Up @@ -91,38 +96,45 @@ func (n *Network) init() error {
return nil
}

func (n *Network) Run(extIface *backend.ExternalInterface, inited func(bn backend.Network)) {
wg := sync.WaitGroup{}

For:
func (n *Network) retryInit() error {
for {
err := n.init()
switch err {
case nil:
break For
case context.Canceled:
return
default:
log.Error(err)
select {
case <-n.ctx.Done():
return
case <-time.After(time.Second):
}
if err == nil || err == context.Canceled {
return err
}

log.Error(err)

select {
case <-n.ctx.Done():
return n.ctx.Err()
case <-time.After(time.Second):
}
}
}

func (n *Network) runOnce(extIface *backend.ExternalInterface, inited func(bn backend.Network)) error {
if err := n.retryInit(); err != nil {
return errCanceled
}

inited(n.bn)

ctx, interruptFunc := context.WithCancel(n.ctx)

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
n.bn.Run(n.ctx)
n.bn.Run(ctx)
wg.Done()
}()

evts := make(chan subnet.Event)

wg.Add(1)
go func() {
subnet.LeaseRenewer(n.ctx, n.sm, n.Name, n.bn.Lease())
subnet.WatchLease(ctx, n.sm, n.Name, n.bn.Lease().Subnet, evts)
wg.Done()
}()

Expand All @@ -134,7 +146,48 @@ For:
}
}()

wg.Wait()
defer wg.Wait()

dur := n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin

for {
select {
case <-time.After(dur):
err := n.sm.RenewLease(n.ctx, n.Name, n.bn.Lease())
if err != nil {
log.Error("Error renewing lease (trying again in 1 min): ", err)
dur = time.Minute
continue
}

log.Info("Lease renewed, new expiration: ", n.bn.Lease().Expiration)
dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin

case e := <-evts:
if e.Type == subnet.EventRemoved {
log.Warning("Lease has been revoked")
interruptFunc()
return errInterrupted
}
dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin

case <-n.ctx.Done():
return errCanceled
}
}
}

func (n *Network) Run(extIface *backend.ExternalInterface, inited func(bn backend.Network)) {
for {
switch n.runOnce(extIface, inited) {
case errInterrupted:

case errCanceled:
return
default:
panic("unexpected error returned")
}
}
}

func (n *Network) Cancel() {
Expand Down
60 changes: 46 additions & 14 deletions remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport"
"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"

"github.com/coreos/flannel/pkg/ip"
"github.com/coreos/flannel/subnet"
)

Expand Down Expand Up @@ -93,7 +95,7 @@ func (m *RemoteManager) mkurl(network string, parts ...string) string {
func (m *RemoteManager) GetNetworkConfig(ctx context.Context, network string) (*subnet.Config, error) {
url := m.mkurl(network, "config")

resp, err := m.httpGet(ctx, url)
resp, err := m.httpVerb(ctx, "GET", url, "", nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -124,7 +126,7 @@ func (m *RemoteManager) AcquireLease(ctx context.Context, network string, attrs
return nil, err
}

resp, err := m.httpPutPost(ctx, "POST", url, "application/json", body)
resp, err := m.httpVerb(ctx, "POST", url, "application/json", body)
if err != nil {
return nil, err
}
Expand All @@ -150,7 +152,7 @@ func (m *RemoteManager) RenewLease(ctx context.Context, network string, lease *s
return err
}

resp, err := m.httpPutPost(ctx, "PUT", url, "application/json", body)
resp, err := m.httpVerb(ctx, "PUT", url, "application/json", body)
if err != nil {
return err
}
Expand All @@ -169,6 +171,22 @@ func (m *RemoteManager) RenewLease(ctx context.Context, network string, lease *s
return nil
}

func (m *RemoteManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error {
url := m.mkurl(network, "leases", subnet.MakeSubnetKey(sn))

resp, err := m.httpVerb(ctx, "DELETE", url, "", nil)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return httpError(resp)
}

return nil
}

func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{}, wr interface{}) error {
if cursor != nil {
c, ok := cursor.(string)
Expand All @@ -179,7 +197,7 @@ func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{
url = fmt.Sprintf("%v?next=%v", url, c)
}

resp, err := m.httpGet(ctx, url)
resp, err := m.httpVerb(ctx, "GET", url, "", nil)
if err != nil {
return err
}
Expand All @@ -196,6 +214,21 @@ func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{
return nil
}

func (m *RemoteManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
url := m.mkurl(network, "leases", subnet.MakeSubnetKey(sn))

wr := subnet.LeaseWatchResult{}
err := m.watch(ctx, url, cursor, &wr)
if err != nil {
return subnet.LeaseWatchResult{}, err
}
if _, ok := wr.Cursor.(string); !ok {
return subnet.LeaseWatchResult{}, fmt.Errorf("watch returned non-string cursor")
}

return wr, nil
}

func (m *RemoteManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.LeaseWatchResult, error) {
url := m.mkurl(network, "leases")

Expand Down Expand Up @@ -258,20 +291,19 @@ func (m *RemoteManager) httpDo(ctx context.Context, req *http.Request) (*http.Re
}
}

func (m *RemoteManager) httpGet(ctx context.Context, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
func (m *RemoteManager) httpVerb(ctx context.Context, method, url, contentType string, body []byte) (*http.Response, error) {
var r io.Reader
if body != nil {
r = bytes.NewBuffer(body)
}

return m.httpDo(ctx, req)
}

func (m *RemoteManager) httpPutPost(ctx context.Context, method, url, contentType string, body []byte) (*http.Response, error) {
req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
req, err := http.NewRequest(method, url, r)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", contentType)

if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
return m.httpDo(ctx, req)
}
Loading

0 comments on commit 6ab6edb

Please sign in to comment.