Skip to content

Commit

Permalink
Remove inSnapshot logic (dgraph-io#2480)
Browse files Browse the repository at this point in the history
- Remove local Oracle map commit and abort tracking. Oracle only gets to know about it after the commits or aborts have already been applied to the posting lists.
- Simplify OracleDelta by using TxnStatus, instead of a map. This allows transaction statuses to be sorted by CommitTs, which adds determinism to our codebase.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent a2a5131 commit 70214c6
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 543 deletions.
45 changes: 21 additions & 24 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/base64"
"errors"
"math/rand"
"sort"
"time"

"github.com/dgraph-io/dgo/protos/api"
Expand Down Expand Up @@ -120,16 +121,22 @@ func (o *Oracle) aborted(startTs uint64) bool {
return ok
}

func sortTxns(delta *intern.OracleDelta) {
sort.Slice(delta.Txns, func(i, j int) bool {
return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs
})
}

func (o *Oracle) currentState() *intern.OracleDelta {
o.AssertRLock()
resp := &intern.OracleDelta{
Commits: make(map[uint64]uint64, len(o.commits)),
}
resp := &intern.OracleDelta{}
for start, commit := range o.commits {
resp.Commits[start] = commit
resp.Txns = append(resp.Txns,
&intern.TxnStatus{StartTs: start, CommitTs: commit})
}
for abort := range o.aborts {
resp.Aborts = append(resp.Aborts, abort)
resp.Txns = append(resp.Txns,
&intern.TxnStatus{StartTs: abort, CommitTs: 0})
}
resp.MaxAssigned = o.maxAssigned
return resp
Expand Down Expand Up @@ -158,26 +165,16 @@ func (o *Oracle) removeSubscriber(id int) {
}

func (o *Oracle) sendDeltasToSubscribers() {
delta := &intern.OracleDelta{
Commits: make(map[uint64]uint64),
}
delta := &intern.OracleDelta{}
for {
update, open := <-o.updates
if !open {
return
}
slurp_loop:
for {
// Consume tctx.
if update.MaxAssigned > delta.MaxAssigned {
delta.MaxAssigned = update.MaxAssigned
}
for _, startTs := range update.Aborts {
delta.Aborts = append(delta.Aborts, startTs)
}
for startTs, commitTs := range update.Commits {
delta.Commits[startTs] = commitTs
}
delta.MaxAssigned = x.Max(delta.MaxAssigned, update.MaxAssigned)
delta.Txns = append(delta.Txns, update.Txns...)
select {
case update, open = <-o.updates:
if !open {
Expand All @@ -187,6 +184,7 @@ func (o *Oracle) sendDeltasToSubscribers() {
break slurp_loop
}
}
sortTxns(delta) // Sort them in increasing order of CommitTs.
o.Lock()
for id, ch := range o.subscribers {
select {
Expand All @@ -197,9 +195,7 @@ func (o *Oracle) sendDeltasToSubscribers() {
}
}
o.Unlock()
delta = &intern.OracleDelta{
Commits: make(map[uint64]uint64),
}
delta = &intern.OracleDelta{}
}
}

Expand All @@ -225,10 +221,11 @@ func (o *Oracle) updateCommitStatus(index uint64, src *api.TxnContext) {
if o.updateCommitStatusHelper(index, src) {
delta := new(intern.OracleDelta)
if src.Aborted {
delta.Aborts = append(delta.Aborts, src.StartTs)
delta.Txns = append(delta.Txns,
&intern.TxnStatus{StartTs: src.StartTs, CommitTs: 0})
} else {
delta.Commits = make(map[uint64]uint64)
delta.Commits[src.StartTs] = src.CommitTs
delta.Txns = append(delta.Txns,
&intern.TxnStatus{StartTs: src.StartTs, CommitTs: src.CommitTs})
}
o.updates <- delta
}
Expand Down
45 changes: 14 additions & 31 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,14 +400,13 @@ func (l *List) commitMutation(ctx context.Context, startTs, commitTs uint64) err
// It was already committed, might be happening due to replay.
return nil
} else {
// We want to be able to access this, irrespective of whether we have the right locks or
// not.
atomic.StoreUint64(&plist.Commit, commitTs)
plist.Commit = commitTs
for _, mpost := range plist.Postings {
atomic.StoreUint64(&mpost.CommitTs, commitTs)
mpost.CommitTs = commitTs
}
l.numCommits += len(plist.Postings)
}

if commitTs > l.commitTs {
// This is for rolling up the posting list.
l.commitTs = commitTs
Expand Down Expand Up @@ -472,24 +471,8 @@ func (l *List) pickPostings(readTs uint64) (*intern.PostingList, []*intern.Posti
var deleteBelow uint64
var posts []*intern.Posting
for startTs, plist := range l.mutationMap {
pcommit := atomic.LoadUint64(&plist.Commit)
if pcommit == 0 {
commitTs := Oracle().CommitTs(startTs)
if commitTs > 0 {
// TODO: We should propose the txn status before applying them in local Oracle. In
// fact, we might not even need to have the local Oracle storage, if we propose them
// in the order we receive them.
// If everything is proposed upfront, this print should NOT happen.
// x.Printf("ORACLE: Found commit ts only via Oracle. Start: %d. Commit: %d\n", startTs, commitTs)
atomic.StoreUint64(&plist.Commit, commitTs)
for _, mpost := range plist.Postings {
atomic.StoreUint64(&mpost.CommitTs, commitTs)
}
pcommit = commitTs
}
}
// Pick up the transactions which are either committed, or the one which is ME.
effectiveTs := effective(startTs, pcommit)
effectiveTs := effective(startTs, plist.Commit)
if effectiveTs > deleteBelow {
// We're above the deleteBelow marker. We wouldn't reach here if effectiveTs is zero.
for _, mpost := range plist.Postings {
Expand All @@ -509,7 +492,7 @@ func (l *List) pickPostings(readTs uint64) (*intern.PostingList, []*intern.Posti
result := posts[:0]
// Trim the posts.
for _, post := range posts {
effectiveTs := effective(post.StartTs, atomic.LoadUint64(&post.CommitTs))
effectiveTs := effective(post.StartTs, post.CommitTs)
if effectiveTs < deleteBelow { // Do pick the posts at effectiveTs == deleteBelow.
continue
}
Expand All @@ -523,8 +506,8 @@ func (l *List) pickPostings(readTs uint64) (*intern.PostingList, []*intern.Posti
pi := posts[i]
pj := posts[j]
if pi.Uid == pj.Uid {
ei := effective(pi.StartTs, atomic.LoadUint64(&pi.CommitTs))
ej := effective(pj.StartTs, atomic.LoadUint64(&pj.CommitTs))
ei := effective(pi.StartTs, pi.CommitTs)
ej := effective(pj.StartTs, pj.CommitTs)
return ei > ej // Pick the higher, so we can discard older commits for the same UID.
}
return pi.Uid < pj.Uid
Expand Down Expand Up @@ -635,12 +618,6 @@ func doAsyncWrite(commitTs uint64, key []byte, data []byte, meta byte, f func(er
}
}

func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) {
l.Lock()
defer l.Unlock()
return l.syncIfDirty(delFromCache)
}

func (l *List) MarshalToKv() (*intern.KV, error) {
l.Lock()
defer l.Unlock()
Expand Down Expand Up @@ -716,7 +693,7 @@ func (l *List) rollup() error {
// Keep all uncommited Entries or postings with commitTs > l.commitTs
// in mutation map. Discard all else.
for startTs, plist := range l.mutationMap {
cl := atomic.LoadUint64(&plist.Commit)
cl := plist.Commit
if cl == 0 || cl > l.commitTs {
// Keep this.
} else {
Expand All @@ -731,6 +708,12 @@ func (l *List) rollup() error {
return nil
}

func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) {
l.Lock()
defer l.Unlock()
return l.syncIfDirty(delFromCache)
}

// Merge mutation layer and immutable layer.
func (l *List) syncIfDirty(delFromCache bool) (committed bool, err error) {
// We no longer set posting list to empty.
Expand Down
5 changes: 1 addition & 4 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,7 @@ func TestAddMutation_DelRead(t *testing.T) {
require.EqualValues(t, 0, ol.Length(3, 0))

// Commit sp* only in oracle, don't apply to pl yet
Oracle().commits[3] = 5
defer func() {
delete(Oracle().commits, 3)
}()
ol.CommitMutation(context.Background(), 3, 5)

// This read should ignore sp*, since readts is 4 and it was committed at 5
require.EqualValues(t, 1, ol.Length(4, 0))
Expand Down
4 changes: 4 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (t *Txn) Fill(ctx *api.TxnContext) {
}

// Don't call this for schema mutations. Directly commit them.
// TODO: Simplify this function. All it should be doing is to store the deltas, and not try to
// generate state. The state should only be generated by rollup, which in turn should look at the
// last Snapshot Ts, to determine how much of the PL to rollup. We only want to roll up the deltas,
// with commit ts <= snapshot ts, and not above.
func (tx *Txn) CommitMutations(ctx context.Context, commitTs uint64) error {
tx.Lock()
defer tx.Unlock()
Expand Down
59 changes: 3 additions & 56 deletions posting/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ type Txn struct {
type oracle struct {
x.SafeMutex

// TODO: Remove commits and aborts map from here. We don't need this, if we're doing transaction
// tracking correctly and applying the txn status back to posting lists correctly.
commits map[uint64]uint64 // startTs => commitTs map
aborts map[uint64]struct{} // key is startTs

// max start ts given out by Zero.
maxAssigned uint64

Expand All @@ -76,25 +71,10 @@ type oracle struct {
}

func (o *oracle) init() {
o.commits = make(map[uint64]uint64)
o.aborts = make(map[uint64]struct{})
o.waiters = make(map[uint64][]chan struct{})
o.pendingTxns = make(map[uint64]*Txn)
}

func (o *oracle) CommitTs(startTs uint64) uint64 {
o.RLock()
defer o.RUnlock()
return o.commits[startTs]
}

func (o *oracle) Aborted(startTs uint64) bool {
o.RLock()
defer o.RUnlock()
_, ok := o.aborts[startTs]
return ok
}

func (o *oracle) RegisterStartTs(ts uint64) *Txn {
o.Lock()
defer o.Unlock()
Expand Down Expand Up @@ -160,26 +140,6 @@ func (o *oracle) MaxAssigned() uint64 {
return o.maxAssigned
}

func (o *oracle) SetMaxPending(maxPending uint64) {
o.Lock()
defer o.Unlock()
o.maxAssigned = maxPending
}

func (o *oracle) CurrentState() *intern.OracleDelta {
od := new(intern.OracleDelta)
od.Commits = make(map[uint64]uint64)
o.RLock()
defer o.RUnlock()
for startTs := range o.aborts {
od.Aborts = append(od.Aborts, startTs)
}
for startTs, commitTs := range o.commits {
od.Commits[startTs] = commitTs
}
return od
}

func (o *oracle) WaitForTs(ctx context.Context, startTs uint64) error {
ch, ok := o.addToWaiters(startTs)
if !ok {
Expand All @@ -193,28 +153,15 @@ func (o *oracle) WaitForTs(ctx context.Context, startTs uint64) error {
}
}

func (o *oracle) done(startTs uint64) {
delete(o.commits, startTs)
delete(o.aborts, startTs)
delete(o.pendingTxns, startTs)
}

func (o *oracle) ProcessDelta(delta *intern.OracleDelta) {
o.Lock()
defer o.Unlock()
for startTs := range delta.Commits {
o.done(startTs)
}
for _, startTs := range delta.Aborts {
o.done(startTs)
for _, txn := range delta.Txns {
delete(o.pendingTxns, txn.StartTs)
}
// We should always be moving forward with Zero and with Raft logs. A move
// back should not be possible, unless there's a bigger issue in
// understanding or the codebase.
if delta.MaxAssigned == 0 {
if delta.MaxAssigned < o.maxAssigned {
return
}
x.AssertTrue(delta.MaxAssigned >= o.maxAssigned)

// Notify the waiting cattle.
for startTs, toNotify := range o.waiters {
Expand Down
Loading

0 comments on commit 70214c6

Please sign in to comment.