Skip to content

Commit

Permalink
Bring in Badger updates (dgraph-io#2697)
Browse files Browse the repository at this point in the history
* Brought in Badger
* Raftwal works with the new WriteBatch API.
* Fix up the build.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent 4fc83a7 commit cb26c87
Show file tree
Hide file tree
Showing 17 changed files with 589 additions and 344 deletions.
101 changes: 31 additions & 70 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,45 +32,6 @@ import (
"github.com/dgraph-io/dgraph/x"
)

type txnUnifier struct {
txn *badger.Txn
db *badger.DB
}

func (w *DiskStorage) newUnifier() *txnUnifier {
return &txnUnifier{txn: w.db.NewTransaction(true), db: w.db}
}

func (u *txnUnifier) run(k, v []byte, delete bool) error {
var err error
if delete {
err = u.txn.Delete(k)
} else {
err = u.txn.Set(k, v)
}
if err != badger.ErrTxnTooBig {
// Error can be nil, and we can return here.
return err
}
err = u.txn.Commit(nil)
if err != nil {
return err
}
u.txn = u.db.NewTransaction(true)
if delete {
return u.txn.Delete(k)
}
return u.txn.Set(k, v)
}

func (u *txnUnifier) Done() error {
return u.txn.Commit(nil)
}

func (u *txnUnifier) Cancel() {
u.txn.Discard()
}

type localCache struct {
sync.RWMutex
firstIndex uint64
Expand Down Expand Up @@ -265,7 +226,7 @@ func (w *DiskStorage) FirstIndex() (uint64, error) {
if err == nil {
glog.V(2).Infof("Setting first index: %d", index+1)
w.cache.setFirst(index + 1)
} else {
} else if glog.V(2) {
glog.Errorf("While seekEntry. Error: %v", err)
}
return index + 1, err
Expand All @@ -280,7 +241,7 @@ func (w *DiskStorage) LastIndex() (uint64, error) {
// Keep the entry at the snapshot index, for simplification of logic.
// It is the application's responsibility to not attempt to deleteUntil an index
// greater than raftLog.applied.
func (w *DiskStorage) deleteUntil(u *txnUnifier, until uint64) error {
func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error {
var keys []string
err := w.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
Expand Down Expand Up @@ -311,7 +272,7 @@ func (w *DiskStorage) deleteUntil(u *txnUnifier, until uint64) error {
if err != nil {
return err
}
return w.deleteKeys(u, keys)
return w.deleteKeys(batch, keys)
}

// Snapshot returns the most recent snapshot.
Expand Down Expand Up @@ -340,15 +301,15 @@ func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) {
// setSnapshot would store the snapshot. We can delete all the entries up until the snapshot
// index. But, keep the raft entry at the snapshot index, to make it easier to build the logic; like
// the dummy entry in MemoryStorage.
func (w *DiskStorage) setSnapshot(u *txnUnifier, s pb.Snapshot) error {
func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error {
if raft.IsEmptySnap(s) {
return nil
}
data, err := s.Marshal()
if err != nil {
return x.Wrapf(err, "wal.Store: While marshal snapshot")
}
if err := u.run(w.snapshotKey(), data, false); err != nil {
if err := batch.Set(w.snapshotKey(), data, 0); err != nil {
return err
}

Expand All @@ -357,7 +318,7 @@ func (w *DiskStorage) setSnapshot(u *txnUnifier, s pb.Snapshot) error {
if err != nil {
return err
}
if err := u.run(w.entryKey(e.Index), data, false); err != nil {
if err := batch.Set(w.entryKey(e.Index), data, 0); err != nil {
return err
}

Expand All @@ -367,24 +328,24 @@ func (w *DiskStorage) setSnapshot(u *txnUnifier, s pb.Snapshot) error {
}

// SetHardState saves the current HardState.
func (w *DiskStorage) setHardState(u *txnUnifier, st pb.HardState) error {
func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) error {
if raft.IsEmptyHardState(st) {
return nil
}
data, err := st.Marshal()
if err != nil {
return x.Wrapf(err, "wal.Store: While marshal hardstate")
}
return u.run(w.hardStateKey(), data, false)
return batch.Set(w.hardStateKey(), data, 0)
}

// reset resets the entries. Used for testing.
func (w *DiskStorage) reset(es []pb.Entry) error {
// Clean out the state.
u := w.newUnifier()
defer u.Cancel()
batch := w.db.NewWriteBatch()
defer batch.Cancel()

if err := w.deleteFrom(u, 0); err != nil {
if err := w.deleteFrom(batch, 0); err != nil {
return err
}

Expand All @@ -394,28 +355,28 @@ func (w *DiskStorage) reset(es []pb.Entry) error {
return x.Wrapf(err, "wal.Store: While marshal entry")
}
k := w.entryKey(e.Index)
if err := u.run(k, data, false); err != nil {
if err := batch.Set(k, data, 0); err != nil {
return err
}
}
return u.Done()
return batch.Flush()
}

func (w *DiskStorage) deleteKeys(u *txnUnifier, keys []string) error {
func (w *DiskStorage) deleteKeys(batch *badger.WriteBatch, keys []string) error {
if len(keys) == 0 {
return nil
}

for _, k := range keys {
if err := u.run([]byte(k), nil, true); err != nil {
if err := batch.Delete([]byte(k)); err != nil {
return err
}
}
return nil
}

// Delete entries in the range of index [from, inf).
func (w *DiskStorage) deleteFrom(u *txnUnifier, from uint64) error {
func (w *DiskStorage) deleteFrom(batch *badger.WriteBatch, from uint64) error {
var keys []string
err := w.db.View(func(txn *badger.Txn) error {
start := w.entryKey(from)
Expand All @@ -434,7 +395,7 @@ func (w *DiskStorage) deleteFrom(u *txnUnifier, from uint64) error {
if err != nil {
return err
}
return w.deleteKeys(u, keys)
return w.deleteKeys(batch, keys)
}

func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) {
Expand Down Expand Up @@ -593,39 +554,39 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
snap.Metadata.ConfState = *cs
snap.Data = data

u := w.newUnifier()
defer u.Cancel()
if err := w.setSnapshot(u, snap); err != nil {
batch := w.db.NewWriteBatch()
defer batch.Cancel()
if err := w.setSnapshot(batch, snap); err != nil {
return err
}
if err := w.deleteUntil(u, snap.Metadata.Index); err != nil {
if err := w.deleteUntil(batch, snap.Metadata.Index); err != nil {
return err
}
return u.Done()
return batch.Flush()
}

// Save would write Entries, HardState and Snapshot to persistent storage in order, i.e. Entries
// first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic
// writes then all of them can be written together. Note that when writing an Entry with Index i,
// any previously-persisted entries with Index >= i must be discarded.
func (w *DiskStorage) Save(h pb.HardState, es []pb.Entry, snap pb.Snapshot) error {
u := w.newUnifier()
defer u.Cancel()
batch := w.db.NewWriteBatch()
defer batch.Cancel()

if err := w.addEntries(u, es); err != nil {
if err := w.addEntries(batch, es); err != nil {
return err
}
if err := w.setHardState(u, h); err != nil {
if err := w.setHardState(batch, h); err != nil {
return err
}
if err := w.setSnapshot(u, snap); err != nil {
if err := w.setSnapshot(batch, snap); err != nil {
return err
}
return u.Done()
return batch.Flush()
}

// Append the new entries to storage.
func (w *DiskStorage) addEntries(u *txnUnifier, entries []pb.Entry) error {
func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []pb.Entry) error {
if len(entries) == 0 {
return nil
}
Expand Down Expand Up @@ -656,13 +617,13 @@ func (w *DiskStorage) addEntries(u *txnUnifier, entries []pb.Entry) error {
if err != nil {
return x.Wrapf(err, "wal.Append: While marshal entry")
}
if err := u.run(k, data, false); err != nil {
if err := batch.Set(k, data, 0); err != nil {
return err
}
}
laste := entries[len(entries)-1].Index
if laste < last {
return w.deleteFrom(u, laste+1)
return w.deleteFrom(batch, laste+1)
}
return nil
}
18 changes: 9 additions & 9 deletions raftwal/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ func TestStorageFirstIndex(t *testing.T) {
t.Errorf("first = %d, want %d", first, 4)
}

u := ds.newUnifier()
require.NoError(t, ds.deleteUntil(u, 4))
require.NoError(t, u.Done())
batch := db.NewWriteBatch()
require.NoError(t, ds.deleteUntil(batch, 4))
require.NoError(t, batch.Flush())
ds.cache.firstIndex = 0
first, err = ds.FirstIndex()
if err != nil {
Expand Down Expand Up @@ -238,9 +238,9 @@ func TestStorageCompact(t *testing.T) {
}

for i, tt := range tests {
u := ds.newUnifier()
err := ds.deleteUntil(u, tt.i)
require.NoError(t, u.Done())
batch := db.NewWriteBatch()
err := ds.deleteUntil(batch, tt.i)
require.NoError(t, batch.Flush())
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}
Expand Down Expand Up @@ -350,12 +350,12 @@ func TestStorageAppend(t *testing.T) {

for i, tt := range tests {
require.NoError(t, ds.reset(ents))
u := ds.newUnifier()
err := ds.addEntries(u, tt.entries)
batch := db.NewWriteBatch()
err := ds.addEntries(batch, tt.entries)
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}
require.NoError(t, u.Done())
require.NoError(t, batch.Flush())
all, err := ds.allEntries(0, math.MaxUint64, math.MaxUint64)
require.NoError(t, err)
if !reflect.DeepEqual(all, tt.wentries) {
Expand Down
5 changes: 5 additions & 0 deletions vendor/github.com/dgraph-io/badger/CODE_OF_CONDUCT.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 23 additions & 48 deletions vendor/github.com/dgraph-io/badger/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cb26c87

Please sign in to comment.