Skip to content

Commit

Permalink
Port changes from enterprise lease fix
Browse files Browse the repository at this point in the history
  • Loading branch information
briankassouf committed Sep 22, 2020
1 parent 703b204 commit f192878
Show file tree
Hide file tree
Showing 23 changed files with 188 additions and 70 deletions.
2 changes: 1 addition & 1 deletion helper/forwarding/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion helper/identity/mfa/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion helper/identity/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion helper/storagepacker/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion physical/raft/chunking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestRaft_Chunking_Lifecycle(t *testing.T) {

t.Log("tearing down cluster")
require.NoError(b.TeardownCluster(nil))
require.NoError(b.fsm.db.Close())
require.NoError(b.fsm.getDB().Close())
require.NoError(b.stableStore.(*raftboltdb.BoltStore).Close())

t.Log("starting new backend")
Expand Down Expand Up @@ -195,6 +195,15 @@ func TestRaft_Chunking_AppliedIndex(t *testing.T) {
t.Fatal(err)
}

// Write a value to fastforward the index
err = raft.Put(context.Background(), &physical.Entry{
Key: "key",
Value: []byte("test"),
})
if err != nil {
t.Fatal(err)
}

currentIndex := raft.AppliedIndex()
// Write some data
for i := 0; i < 10; i++ {
Expand Down
25 changes: 25 additions & 0 deletions physical/raft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type FSM struct {
logger log.Logger
noopRestore bool

// applyDelay is used to simulate a slow apply in tests
applyDelay time.Duration

db *bolt.DB

// retoreCb is called after we've restored a snapshot
Expand Down Expand Up @@ -118,6 +121,21 @@ func NewFSM(path string, logger log.Logger) (*FSM, error) {
return f, nil
}

func (f *FSM) getDB() *bolt.DB {
f.l.RLock()
defer f.l.RUnlock()

return f.db
}

// SetFSMDelay adds a delay to the FSM apply. This is used in tests to simulate
// a slow apply.
func (r *RaftBackend) SetFSMDelay(delay time.Duration) {
r.fsm.l.Lock()
r.fsm.applyDelay = delay
r.fsm.l.Unlock()
}

func (f *FSM) openDBFile(dbPath string) error {
if len(dbPath) == 0 {
return errors.New("can not open empty filename")
Expand Down Expand Up @@ -222,6 +240,9 @@ func writeSnapshotMetaToDB(metadata *raft.SnapshotMeta, db *bolt.DB) error {
}

func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error {
f.l.RLock()
defer f.l.RUnlock()

err := writeSnapshotMetaToDB(metadata, f.db)
if err != nil {
return err
Expand Down Expand Up @@ -448,6 +469,10 @@ func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
f.l.RLock()
defer f.l.RUnlock()

if f.applyDelay > 0 {
time.Sleep(f.applyDelay)
}

err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
for _, commandRaw := range commands {
Expand Down
7 changes: 5 additions & 2 deletions physical/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,11 +768,14 @@ func (b *RaftBackend) AppliedIndex() uint64 {
b.l.RLock()
defer b.l.RUnlock()

if b.raft == nil {
if b.fsm == nil {
return 0
}

return b.raft.AppliedIndex()
// We use the latest index that the FSM has seen here, which may be behind
// raft.AppliedIndex() due to the async nature of the raft library.
indexState, _ := b.fsm.LatestState()
return indexState.Index
}

// RemovePeer removes the given peer ID from the raft cluster. If the node is
Expand Down
4 changes: 2 additions & 2 deletions physical/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str
}

for {
if backend.AppliedIndex() >= 2 {
if backend.raft.AppliedIndex() >= 2 {
break
}
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func compareFSMsWithErr(t *testing.T, fsm1, fsm2 *FSM) error {
return fmt.Errorf("configs did not match: %+v != %+v", config1, config2)
}

return compareDBs(t, fsm1.db, fsm2.db, false)
return compareDBs(t, fsm1.getDB(), fsm2.getDB(), false)
}

func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB, dataOnly bool) error {
Expand Down
6 changes: 3 additions & 3 deletions physical/raft/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestRaft_Snapshot_Peers(t *testing.T) {
ensureCommitApplied(t, commitIdx, raft2)

// Make sure the snapshot was applied correctly on the follower
if err := compareDBs(t, raft1.fsm.db, raft2.fsm.db, false); err != nil {
if err := compareDBs(t, raft1.fsm.getDB(), raft2.fsm.getDB(), false); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -758,13 +758,13 @@ func TestBoltSnapshotStore_CreateInstallSnapshot(t *testing.T) {
t.Fatal(err)
}

err = compareDBs(t, fsm.db, newFSM.db, true)
err = compareDBs(t, fsm.getDB(), newFSM.getDB(), true)
if err != nil {
t.Fatal(err)
}

// Make sure config data is different
err = compareDBs(t, fsm.db, newFSM.db, false)
err = compareDBs(t, fsm.getDB(), newFSM.getDB(), false)
if err == nil {
t.Fatal("expected error")
}
Expand Down
2 changes: 1 addition & 1 deletion physical/raft/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/database/dbplugin/database.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/logical/identity.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/logical/plugin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/plugin/pb/backend.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 30 additions & 33 deletions vault/activity/activity_log.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 31 additions & 1 deletion vault/cluster/inmem_layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type InmemLayer struct {
stopCh chan struct{}

connectionCh chan *ConnectionInfo
readerDelay time.Duration
}

// NewInmemLayer returns a new in-memory layer configured to listen on the
Expand All @@ -52,6 +53,26 @@ func (l *InmemLayer) SetConnectionCh(ch chan *ConnectionInfo) {
l.l.Unlock()
}

func (l *InmemLayer) SetReaderDelay(delay time.Duration) {
l.l.Lock()
defer l.l.Unlock()

l.readerDelay = delay

// Update the existing server and client connections
for _, servConns := range l.servConns {
for _, c := range servConns {
c.(*delayedConn).SetDelay(delay)
}
}

for _, clientConns := range l.clientConns {
for _, c := range clientConns {
c.(*delayedConn).SetDelay(delay)
}
}
}

// Addrs implements NetworkLayer.
func (l *InmemLayer) Addrs() []net.Addr {
l.l.Lock()
Expand Down Expand Up @@ -127,7 +148,7 @@ func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Con

tlsConn := tls.Client(conn, tlsConfig)

l.clientConns[addr] = append(l.clientConns[addr], tlsConn)
l.clientConns[addr] = append(l.clientConns[addr], conn)

return tlsConn, nil
}
Expand All @@ -149,6 +170,9 @@ func (l *InmemLayer) clientConn(addr string) (net.Conn, error) {

retConn, servConn := net.Pipe()

retConn = newDelayedConn(retConn, l.readerDelay)
servConn = newDelayedConn(servConn, l.readerDelay)

l.servConns[addr] = append(l.servConns[addr], servConn)

if l.logger.IsDebug() {
Expand Down Expand Up @@ -372,6 +396,12 @@ func (ic *InmemLayerCluster) SetConnectionCh(ch chan *ConnectionInfo) {
}
}

func (ic *InmemLayerCluster) SetReaderDelay(delay time.Duration) {
for _, node := range ic.layers {
node.SetReaderDelay(delay)
}
}

type ConnectionInfo struct {
Node string
Remote string
Expand Down
Loading

0 comments on commit f192878

Please sign in to comment.