diff --git a/contrib/integration/increment/main.go b/contrib/integration/increment/main.go index 93f01d0ea38..9a521536da6 100644 --- a/contrib/integration/increment/main.go +++ b/contrib/integration/increment/main.go @@ -55,7 +55,7 @@ func queryCounter(txn *dgo.Txn) (Counter, error) { query := fmt.Sprintf("{ q(func: has(%s)) { uid, val: %s }}", *pred, *pred) resp, err := txn.Query(ctx, query) if err != nil { - return counter, err + return counter, fmt.Errorf("Query error: %v", err) } m := make(map[string][]Counter) if err := json.Unmarshal(resp.Json, &m); err != nil { @@ -92,13 +92,12 @@ func process(dg *dgo.Dgraph, readOnly bool) (Counter, error) { } mu.SetNquads = []byte(fmt.Sprintf(`<%s> <%s> "%d"^^ .`, counter.Uid, *pred, counter.Val)) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _, err = txn.Mutate(ctx, &mu) + // Don't put any timeout for mutation. + _, err = txn.Mutate(context.Background(), &mu) if err != nil { return Counter{}, err } - return counter, txn.Commit(ctx) + return counter, txn.Commit(context.Background()) } func main() { diff --git a/contrib/integration/testtxn/main_test.go b/contrib/integration/testtxn/main_test.go index 0b7c8acb5a3..b5ecb70fe3f 100644 --- a/contrib/integration/testtxn/main_test.go +++ b/contrib/integration/testtxn/main_test.go @@ -26,7 +26,7 @@ import ( "github.com/dgraph-io/dgo" "github.com/dgraph-io/dgo/protos/api" - "github.com/dgraph-io/dgo/x" + "github.com/dgraph-io/dgraph/x" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -439,8 +439,10 @@ func TestReadIndexKeySameTxn(t *testing.T) { txn := s.dg.NewTxn() - mu := &api.Mutation{} - mu.SetJson = []byte(`{"name": "Manish"}`) + mu := &api.Mutation{ + CommitNow: true, + SetJson: []byte(`{"name": "Manish"}`), + } assigned, err := txn.Mutate(context.Background(), mu) if err != nil { log.Fatalf("Error while running mutation: %v\n", err) @@ -453,6 +455,8 @@ func TestReadIndexKeySameTxn(t *testing.T) { uid = u } + txn = s.dg.NewTxn() + defer txn.Discard(context.Background()) q := `{ me(func: le(name, "Manish")) { uid }}` resp, err := txn.Query(context.Background(), q) if err != nil { diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index fd1c3844129..a7ec2b4c657 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -91,7 +91,7 @@ they form a Raft group and provide synchronous replication. "actions (i.e., --whitelist 127.0.0.1:127.0.0.3,0.0.0.7:0.0.0.9)") flag.StringVar(&worker.Config.ExportPath, "export", "export", "Folder in which to store exports.") - flag.IntVar(&worker.Config.NumPendingProposals, "pending_proposals", 2000, + flag.IntVar(&worker.Config.NumPendingProposals, "pending_proposals", 256, "Number of pending mutation proposals. Useful for rate limiting.") flag.Float64Var(&worker.Config.Tracing, "trace", 0.0, "The ratio of queries to trace.") flag.StringVar(&worker.Config.MyAddr, "my", "", diff --git a/dgraph/cmd/live/batch.go b/dgraph/cmd/live/batch.go index 35c4ea056ff..809d9a9628b 100644 --- a/dgraph/cmd/live/batch.go +++ b/dgraph/cmd/live/batch.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math" + "math/rand" "strings" "sync" "sync/atomic" @@ -139,6 +140,10 @@ func handleError(err error) { x.Fatalf(s.Message()) case strings.Contains(s.Message(), "x509"): x.Fatalf(s.Message()) + case strings.Contains(s.Message(), "Server unavailable."): + dur := time.Duration(1+rand.Intn(10)) * time.Minute + x.Printf("Server is unavailable. Will retry after %s.", dur.Round(time.Minute)) + time.Sleep(dur) case err != y.ErrAborted && err != y.ErrConflict: x.Printf("Error while mutating %v\n", s.Message()) } @@ -197,9 +202,9 @@ func (l *loader) printCounters() { for range l.ticker.C { counter := l.Counter() rate := float64(counter.Rdfs) / counter.Elapsed.Seconds() - elapsed := ((time.Since(start) / time.Second) * time.Second).String() - fmt.Printf("Total Txns done: %8d RDFs per second: %7.0f Time Elapsed: %v, Aborts: %d\n", - counter.TxnsDone, rate, elapsed, counter.Aborts) + elapsed := time.Since(start).Round(time.Second) + fmt.Printf("[%6s] Txns: %d RDFs: %d RDFs/sec: %5.0f Aborts: %d\n", + elapsed, counter.TxnsDone, counter.Rdfs, rate, counter.Aborts) } } diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 346775f27d1..931537aa79a 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -87,7 +87,7 @@ func init() { flag.StringP("schema", "s", "", "Location of schema file") flag.StringP("dgraph", "d", "127.0.0.1:9080", "Dgraph gRPC server address") flag.StringP("zero", "z", "127.0.0.1:5080", "Dgraphzero gRPC server address") - flag.IntP("conc", "c", 100, + flag.IntP("conc", "c", 10, "Number of concurrent requests to make to Dgraph") flag.IntP("batch", "b", 1000, "Number of RDF N-Quads to send as part of a mutation.") diff --git a/posting/index.go b/posting/index.go index b3a45e836ff..1dd0a6e0f13 100644 --- a/posting/index.go +++ b/posting/index.go @@ -791,7 +791,6 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error { } func DeleteAll() error { - btree.DeleteAll() lcache.clear(func([]byte) bool { return true }) return deleteEntries(nil, func(key []byte) bool { pk := x.Parse(key) diff --git a/posting/list.go b/posting/list.go index fb0caf8581a..6dfe052acec 100644 --- a/posting/list.go +++ b/posting/list.go @@ -80,7 +80,6 @@ type List struct { deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation. estimatedSize int32 numCommits int - onDisk bool } // calculateSize would give you the size estimate. This is expensive, so run it carefully. diff --git a/posting/lists.go b/posting/lists.go index 0f86256208f..3103b71c136 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -18,7 +18,6 @@ package posting import ( "crypto/md5" - "errors" "fmt" "io/ioutil" "math" @@ -34,7 +33,6 @@ import ( "github.com/dgraph-io/badger" "github.com/dgraph-io/badger/y" - "github.com/golang/glog" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -211,7 +209,6 @@ func updateMemoryMetrics(lc *y.Closer) { var ( pstore *badger.DB lcache *listCache - btree *BTree closer *y.Closer ) @@ -219,88 +216,12 @@ var ( func Init(ps *badger.DB) { pstore = ps lcache = newListCache(math.MaxUint64) - btree = newBTree(2) x.LcacheCapacity.Set(math.MaxInt64) - closer = y.NewCloser(3) + closer = y.NewCloser(2) go periodicUpdateStats(closer) go updateMemoryMetrics(closer) - go clearBtree(closer) -} - -// clearBtree checks if the keys stored in the btree have reached their conclusion, and if so, -// removes them from the tree. Conclusion in this case would be that the key got written out to -// disk, or the txn which introduced the key got aborted. -func clearBtree(closer *y.Closer) { - defer closer.Done() - var removeKey = errors.New("Remove key from btree.") - - handleKey := func(txn *badger.Txn, k []byte) error { - _, err := txn.Get(k) - switch { - case err == badger.ErrKeyNotFound: - l := GetLru(k) // Retrieve from LRU cache, if it exists. - if l == nil { - // Posting list no longer in memory. So, it must have been either written to - // disk, or removed from memory after a txn abort. - return removeKey - } - l.RLock() - defer l.RUnlock() - if !l.hasPendingTxn() { - // This key's txn was aborted. So, we can remove it from btree. - return removeKey - } - return nil - case err != nil: - glog.Warningf("Error while checking key: %v\n", err) - return err - default: - // Key was found on disk. Remove from btree. - return removeKey - } - } - - removeKeysOnDisk := func() { - var keys []string - var count int - err := pstore.View(func(txn *badger.Txn) error { - var rerr error - btree.Ascend(func(k []byte) bool { - count++ - err := handleKey(txn, k) - if err == removeKey { - keys = append(keys, string(k)) - } else if err != nil { - rerr = err - return false - } - return true - }) - return rerr - }) - if glog.V(2) && count > 0 { - glog.Infof("Btree size: %d. Removing=%d. Error=%v\n", count, len(keys), err) - } - if err == nil { - for _, k := range keys { - btree.Delete([]byte(k)) - } - } - } - - ticker := time.NewTicker(15 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - removeKeysOnDisk() - case <-closer.HasBeenClosed(): - return - } - } } func Cleanup() { @@ -339,8 +260,6 @@ func Get(key []byte) (rlist *List, err error) { lp = lcache.PutIfMissing(string(key), l) if lp != l { x.CacheRace.Add(1) - } else if !lp.onDisk { - btree.Insert(lp.key) } return lp, nil } diff --git a/posting/mvcc.go b/posting/mvcc.go index 0ff06d6460b..212a9bd07c7 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -255,12 +255,10 @@ func getNew(key []byte, pstore *badger.DB) (*List, error) { it.Seek(key) l, err = ReadPostingList(key, it) } - if err != nil { return l, err } - l.onDisk = true l.Lock() size := l.calculateSize() l.Unlock() @@ -268,137 +266,3 @@ func getNew(key []byte, pstore *badger.DB) (*List, error) { atomic.StoreInt32(&l.estimatedSize, size) return l, nil } - -type BTreeIterator struct { - keys [][]byte - idx int - Reverse bool - Prefix []byte -} - -func (bi *BTreeIterator) Next() { - bi.idx++ -} - -func (bi *BTreeIterator) Key() []byte { - x.AssertTrue(bi.Valid()) - return bi.keys[bi.idx] -} - -func (bi *BTreeIterator) Valid() bool { - // No need to check HasPrefix here, because we are only picking those keys - // which have the right prefix in Seek. - return bi.idx < len(bi.keys) -} - -func (bi *BTreeIterator) Seek(key []byte) { - bi.keys = bi.keys[:0] - bi.idx = 0 - cont := func(key []byte) bool { - if !bytes.HasPrefix(key, bi.Prefix) { - return false - } - bi.keys = append(bi.keys, key) - return true - } - if !bi.Reverse { - btree.AscendGreaterOrEqual(key, cont) - } else { - btree.DescendLessOrEqual(key, cont) - } -} - -type TxnPrefixIterator struct { - btreeIter *BTreeIterator - badgerIter *badger.Iterator - prefix []byte - reverse bool - curKey []byte - userMeta byte // userMeta stored as part of badger item, used to skip empty PL in has query. -} - -func NewTxnPrefixIterator(txn *badger.Txn, - iterOpts badger.IteratorOptions, prefix, key []byte) *TxnPrefixIterator { - x.AssertTrue(iterOpts.PrefetchValues == false) - txnIt := new(TxnPrefixIterator) - txnIt.reverse = iterOpts.Reverse - txnIt.prefix = prefix - txnIt.btreeIter = &BTreeIterator{ - Reverse: iterOpts.Reverse, - Prefix: prefix, - } - txnIt.btreeIter.Seek(key) - // Create iterator only after copying the keys from btree, or else there could - // be race after creating iterator and before reading btree. Some keys might end up - // getting deleted and iterator won't be initialized with new memtbales. - txnIt.badgerIter = txn.NewIterator(iterOpts) - txnIt.badgerIter.Seek(key) - txnIt.Next() - return txnIt -} - -func (t *TxnPrefixIterator) Valid() bool { - return len(t.curKey) > 0 -} - -func (t *TxnPrefixIterator) compare(key1 []byte, key2 []byte) int { - if !t.reverse { - return bytes.Compare(key1, key2) - } - return bytes.Compare(key2, key1) -} - -func (t *TxnPrefixIterator) Next() { - if len(t.curKey) > 0 { - // Avoid duplicate keys during merging. - for t.btreeIter.Valid() && t.compare(t.btreeIter.Key(), t.curKey) <= 0 { - t.btreeIter.Next() - } - for t.badgerIter.ValidForPrefix(t.prefix) && - t.compare(t.badgerIter.Item().Key(), t.curKey) <= 0 { - t.badgerIter.Next() - } - } - - t.userMeta = 0 // reset it. - if !t.btreeIter.Valid() && !t.badgerIter.ValidForPrefix(t.prefix) { - t.curKey = nil - return - } else if !t.badgerIter.ValidForPrefix(t.prefix) { - t.storeKey(t.btreeIter.Key()) - t.btreeIter.Next() - } else if !t.btreeIter.Valid() { - t.userMeta = t.badgerIter.Item().UserMeta() - t.storeKey(t.badgerIter.Item().Key()) - t.badgerIter.Next() - } else { // Both are valid - if t.compare(t.btreeIter.Key(), t.badgerIter.Item().Key()) < 0 { - t.storeKey(t.btreeIter.Key()) - t.btreeIter.Next() - } else { - t.userMeta = t.badgerIter.Item().UserMeta() - t.storeKey(t.badgerIter.Item().Key()) - t.badgerIter.Next() - } - } -} - -func (t *TxnPrefixIterator) UserMeta() byte { - return t.userMeta -} - -func (t *TxnPrefixIterator) storeKey(key []byte) { - if cap(t.curKey) < len(key) { - t.curKey = make([]byte, 2*len(key)) - } - t.curKey = t.curKey[:len(key)] - copy(t.curKey, key) -} - -func (t *TxnPrefixIterator) Key() []byte { - return t.curKey -} - -func (t *TxnPrefixIterator) Close() { - t.badgerIter.Close() -} diff --git a/query/common_test.go b/query/common_test.go index 5cde60b8e0f..b6b55dcac2b 100644 --- a/query/common_test.go +++ b/query/common_test.go @@ -57,6 +57,7 @@ func taskValues(t *testing.T, v []*pb.ValueList) []string { } var index uint64 +var writer *x.TxnWriter func addEdge(t *testing.T, attr string, src uint64, edge *pb.DirectedEdge) { // Mutations don't go through normal flow, so default schema for predicate won't be present. @@ -75,6 +76,10 @@ func addEdge(t *testing.T, attr string, src uint64, edge *pb.DirectedEdge) { l.AddMutationWithIndex(context.Background(), edge, txn)) commit := timestamp() + // The following logic is based on node.commitOrAbort in worker/draft.go. + // We need to commit to disk, so secondary indices, particularly the ones + // which iterate over Badger, would work correctly. + require.NoError(t, txn.CommitToDisk(writer, commit)) require.NoError(t, txn.CommitToMemory(commit)) delta := &pb.OracleDelta{MaxAssigned: commit} delta.Txns = append(delta.Txns, &pb.TxnStatus{StartTs: startTs, CommitTs: commit}) @@ -290,6 +295,9 @@ func addPassword(t *testing.T, uid uint64, attr, password string) { func populateGraph(t *testing.T) { x.AssertTrue(ps != nil) + // Initialize a TxnWriter, so CommitToDisk can use it to write to Badger. + writer = &x.TxnWriter{DB: ps} + defer x.Check(writer.Flush()) const schemaStr = ` name : string @index(term, exact, trigram) @count @lang . diff --git a/systest/mutations_test.go b/systest/mutations_test.go index b21a2b2cc0b..42302dd211c 100644 --- a/systest/mutations_test.go +++ b/systest/mutations_test.go @@ -562,6 +562,7 @@ func LangAndSortBugTest(t *testing.T, c *dgo.Dgraph) { txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ + CommitNow: true, SetNquads: []byte(` _:michael "Michael" . _:michael _:sang . @@ -570,6 +571,10 @@ func LangAndSortBugTest(t *testing.T, c *dgo.Dgraph) { _:sang "Sang Hyun"@en . `), }) + require.NoError(t, err) + + txn = c.NewTxn() + defer txn.Discard(ctx) resp, err := txn.Query(ctx, ` { q(func: eq(name, "Michael")) { @@ -1547,8 +1552,10 @@ func HasDeletedEdge(t *testing.T, c *dgo.Dgraph) { // Remove the last entry from ids. ids = ids[:len(ids)-1] - // This time we didn't commit the txn. + // We must commit mutations before we expect them to show up as results in + // queries, involving secondary indices. assigned, err = txn.Mutate(ctx, &api.Mutation{ + CommitNow: true, SetNquads: []byte(` _:d "" . `), @@ -1560,6 +1567,8 @@ func HasDeletedEdge(t *testing.T, c *dgo.Dgraph) { ids = append(ids, uid) } + txn = c.NewTxn() + defer txn.Discard(ctx) uids = getUids(txn) require.Equal(t, 3, len(uids)) for _, uid := range uids { diff --git a/systest/plugin_test.go b/systest/plugin_test.go index ca14f908a2f..e908cd3f59b 100644 --- a/systest/plugin_test.go +++ b/systest/plugin_test.go @@ -39,10 +39,10 @@ func TestPlugins(t *testing.T) { var soFiles []string for i, src := range []string{ - "./customtok/anagram/main.go", - "./customtok/cidr/main.go", - "./customtok/factor/main.go", - "./customtok/rune/main.go", + "./_customtok/anagram/main.go", + "./_customtok/cidr/main.go", + "./_customtok/factor/main.go", + "./_customtok/rune/main.go", } { so := strconv.Itoa(i) + ".so" t.Logf("compiling plugin: src=%q so=%q", src, so) diff --git a/worker/draft.go b/worker/draft.go index 93e2ff21bf4..34c60187ca1 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -114,114 +114,6 @@ func (h *header) Decode(in []byte) { h.msgId = binary.LittleEndian.Uint16(in[4:6]) } -var errInternalRetry = errors.New("Retry Raft proposal internally") - -// proposeAndWait sends a proposal through RAFT. It waits on a channel for the proposal -// to be applied(written to WAL) to all the nodes in the group. -func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) error { - if n.Raft() == nil { - return x.Errorf("Raft isn't initialized yet") - } - - // TODO: Should be based on number of edges (amount of work) - pendingProposals <- struct{}{} - x.PendingProposals.Add(1) - defer func() { <-pendingProposals; x.PendingProposals.Add(-1) }() - if ctx.Err() != nil { - return ctx.Err() - } - // Do a type check here if schema is present - // In very rare cases invalid entries might pass through raft, which would - // be persisted, we do best effort schema check while writing - if proposal.Mutations != nil { - for _, edge := range proposal.Mutations.Edges { - if tablet := groups().Tablet(edge.Attr); tablet != nil && tablet.ReadOnly { - return errPredicateMoving - } else if tablet.GroupId != groups().groupId() { - // Tablet can move by the time request reaches here. - return errUnservedTablet - } - - su, ok := schema.State().Get(edge.Attr) - if !ok { - continue - } else if err := ValidateAndConvert(edge, &su); err != nil { - return err - } - } - for _, schema := range proposal.Mutations.Schema { - if tablet := groups().Tablet(schema.Predicate); tablet != nil && tablet.ReadOnly { - return errPredicateMoving - } - if err := checkSchema(schema); err != nil { - return err - } - } - } - - propose := func(timeout time.Duration) error { - cctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - che := make(chan error, 1) - pctx := &conn.ProposalCtx{ - Ch: che, - Ctx: cctx, - } - key := uniqueKey() - x.AssertTruef(n.Proposals.Store(key, pctx), "Found existing proposal with key: [%v]", key) - defer n.Proposals.Delete(key) // Ensure that it gets deleted on return. - proposal.Key = key - - if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("Proposing data with key: %s. Timeout: %v", key, timeout) - } - - data, err := proposal.Marshal() - if err != nil { - return err - } - - if err = n.Raft().Propose(cctx, data); err != nil { - return x.Wrapf(err, "While proposing") - } - if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("Waiting for the proposal.") - } - - select { - case err = <-che: - // We arrived here by a call to n.Proposals.Done(). - if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("Done with error: %v", err) - } - return err - case <-ctx.Done(): - if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("External context timed out with error: %v.", ctx.Err()) - } - return ctx.Err() - case <-cctx.Done(): - if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("Internal context timed out with error: %v. Retrying...", cctx.Err()) - } - return errInternalRetry - } - } - - // Some proposals can be stuck if leader change happens. For e.g. MsgProp message from follower - // to leader can be dropped/end up appearing with empty Data in CommittedEntries. - // Having a timeout here prevents the mutation being stuck forever in case they don't have a - // timeout. We should always try with a timeout and optionally retry. - err := errInternalRetry - timeout := 4 * time.Second - for err == errInternalRetry { - err = propose(timeout) - timeout *= 2 // Exponential backoff - } - return err -} - func (n *node) Ctx(key string) context.Context { ctx := context.Background() if pctx := n.Proposals.Get(key); pctx != nil { @@ -441,33 +333,26 @@ func (n *node) applyCommitted(proposal *pb.Proposal, index uint64) error { } func (n *node) processRollups() { - defer n.closer.Done() // CLOSER:1 + defer n.closer.Done() // CLOSER:1 + tick := time.NewTicker(5 * time.Minute) // Rolling up once every 5 minutes seems alright. + defer tick.Stop() + + var readTs, last uint64 for { select { case <-n.closer.HasBeenClosed(): return - case readTs := <-n.rollupCh: - // Let's empty out the rollupCh, so we're working with the latest - // value of readTs. - inner: - for { - select { - case readTs = <-n.rollupCh: - default: - break inner - } + case readTs = <-n.rollupCh: + case <-tick.C: + if readTs <= last { + break // Break out of the select case. } - if readTs == 0 { - glog.Warningln("Found ZERO read Ts for rolling up.") - break // Breaks the select case. - } - - // If we encounter error here, we don't need to do anything about - // it. Just let the user know. - err := n.rollupLists(readTs) - if err != nil { + if err := n.rollupLists(readTs); err != nil { + // If we encounter error here, we don't need to do anything about + // it. Just let the user know. glog.Errorf("Error while rolling up lists at %d: %v\n", readTs, err) } else { + last = readTs // Update last only if we succeeded. glog.Infof("List rollup at Ts %d: OK.\n", readTs) } } @@ -476,7 +361,16 @@ func (n *node) processRollups() { func (n *node) processApplyCh() { defer n.closer.Done() // CLOSER:1 - for entries := range n.applyCh { + + type P struct { + err error + size int + seen time.Time + } + previous := make(map[string]*P) + + // This function must be run serially. + handle := func(entries []raftpb.Entry) { for _, e := range entries { switch { case e.Type == raftpb.EntryConfChange: @@ -489,15 +383,56 @@ func (n *node) processApplyCh() { if err := proposal.Unmarshal(e.Data); err != nil { x.Fatalf("Unable to unmarshal proposal: %v %q\n", err, e.Data) } + // We use the size as a double check to ensure that we're + // working with the same proposal as before. + psz := proposal.Size() + + var perr error + p, ok := previous[proposal.Key] + if ok && p.err == nil && p.size == psz { + n.elog.Printf("Proposal with key: %s already applied. Skipping index: %d.\n", + proposal.Key, e.Index) + previous[proposal.Key].seen = time.Now() // Update the ts. + // Don't break here. We still need to call the Done below. + + } else { + perr = n.applyCommitted(proposal, e.Index) + if len(proposal.Key) > 0 { + p := &P{err: perr, size: psz, seen: time.Now()} + previous[proposal.Key] = p + } + n.elog.Printf("Applied proposal with key: %s, index: %d. Err: %v", + proposal.Key, e.Index, perr) + } - err := n.applyCommitted(proposal, e.Index) - n.elog.Printf("Applied proposal with key: %s, index: %d. Err: %v", - proposal.Key, e.Index, err) - n.Proposals.Done(proposal.Key, err) + n.Proposals.Done(proposal.Key, perr) n.Applied.Done(e.Index) } } } + + maxAge := 10 * time.Minute + tick := time.NewTicker(maxAge / 2) + defer tick.Stop() + + for { + select { + case entries, ok := <-n.applyCh: + if !ok { + return + } + handle(entries) + case <-tick.C: + // We use this ticker to clear out previous map. + now := time.Now() + for key, p := range previous { + if now.Sub(p.seen) > maxAge { + delete(previous, key) + } + } + n.elog.Printf("Size of previous map: %d", len(previous)) + } + } } func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error { @@ -1072,7 +1007,7 @@ func (n *node) InitAndStartNode() { // Get snapshot before joining peers as it can take time to retrieve it and we dont // want the quorum to be inactive when it happens. - // TODO: This is an optimization, which adds complexity because it requires us to + // Note: This is an optimization, which adds complexity because it requires us to // understand the Raft state of the node. Let's instead have the node retrieve the // snapshot as needed after joining the group, instead of us forcing one upfront. // diff --git a/worker/groups.go b/worker/groups.go index 732203f7ccb..60f1e8a419e 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -36,6 +36,7 @@ import ( "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" + "github.com/golang/glog" ) type groupi struct { @@ -748,7 +749,7 @@ func (g *groupi) processOracleDeltaStream() { for { delta, err := stream.Recv() if err != nil || delta == nil { - x.Printf("Error in oracle delta stream. Error: %v", err) + glog.Errorf("Error in oracle delta stream. Error: %v", err) return } @@ -800,9 +801,15 @@ func (g *groupi) processOracleDeltaStream() { elog.Errorf("No longer the leader of group %d. Exiting", g.groupId()) return } - // Block forever trying to propose this. elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) - g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta}) + for { + // Block forever trying to propose this. + err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta}) + if err == nil { + break + } + glog.Errorf("While proposing delta: %v. Error=%v. Retrying...\n", delta, err) + } } } diff --git a/worker/proposal.go b/worker/proposal.go new file mode 100644 index 00000000000..48f5b8aecd4 --- /dev/null +++ b/worker/proposal.go @@ -0,0 +1,211 @@ +/* + * Copyright 2018 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package worker + +import ( + "errors" + "sync/atomic" + "time" + + "github.com/dgraph-io/dgraph/conn" + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/schema" + "github.com/dgraph-io/dgraph/x" + "golang.org/x/net/context" + "golang.org/x/net/trace" +) + +const baseTimeout time.Duration = 4 * time.Second + +func newTimeout(retry int) time.Duration { + timeout := baseTimeout + for i := 0; i < retry; i++ { + timeout *= 2 + } + return timeout +} + +var limiter rateLimiter + +func init() { + go limiter.bleed() +} + +type rateLimiter struct { + iou int32 +} + +// Instead of using the time/rate package, we use this simple one, because that +// allows a certain number of ops per second, without taking any feedback into +// account. We however, limit solely based on feedback, allowing a certain +// number of ops to remain pending, and not anymore. +func (rl *rateLimiter) bleed() { + tick := time.NewTicker(time.Second) + defer tick.Stop() + + for range tick.C { + if atomic.AddInt32(&rl.iou, -1) >= 0 { + <-pendingProposals + x.PendingProposals.Add(-1) + } else { + atomic.AddInt32(&rl.iou, 1) + } + } +} + +func (rl *rateLimiter) incr(ctx context.Context, retry int) error { + // Let's not wait here via time.Sleep or similar. Let pendingProposals + // channel do its natural rate limiting. + weight := 1 << uint(retry) // Use an exponentially increasing weight. + for i := 0; i < weight; i++ { + select { + case pendingProposals <- struct{}{}: + x.PendingProposals.Add(1) + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} + +// Done would slowly bleed the retries out. +func (rl *rateLimiter) decr(retry int) { + if retry == 0 { + <-pendingProposals + x.PendingProposals.Add(-1) + return + } + weight := 1 << uint(retry) // Ensure that the weight calculation is a copy of incr. + atomic.AddInt32(&rl.iou, int32(weight)) +} + +var errInternalRetry = errors.New("Retry Raft proposal internally") +var errUnableToServe = errors.New("Server unavailable. Please retry later") + +// proposeAndWait sends a proposal through RAFT. It waits on a channel for the proposal +// to be applied(written to WAL) to all the nodes in the group. +func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) error { + if n.Raft() == nil { + return x.Errorf("Raft isn't initialized yet") + } + if ctx.Err() != nil { + return ctx.Err() + } + + // Do a type check here if schema is present + // In very rare cases invalid entries might pass through raft, which would + // be persisted, we do best effort schema check while writing + if proposal.Mutations != nil { + for _, edge := range proposal.Mutations.Edges { + if tablet := groups().Tablet(edge.Attr); tablet != nil && tablet.ReadOnly { + return errPredicateMoving + } else if tablet.GroupId != groups().groupId() { + // Tablet can move by the time request reaches here. + return errUnservedTablet + } + + su, ok := schema.State().Get(edge.Attr) + if !ok { + continue + } else if err := ValidateAndConvert(edge, &su); err != nil { + return err + } + } + for _, schema := range proposal.Mutations.Schema { + if tablet := groups().Tablet(schema.Predicate); tablet != nil && tablet.ReadOnly { + return errPredicateMoving + } + if err := checkSchema(schema); err != nil { + return err + } + } + } + + // Let's keep the same key, so multiple retries of the same proposal would + // have this shared key. Thus, each server in the group can identify + // whether it has already done this work, and if so, skip it. + key := uniqueKey() + proposal.Key = key + + propose := func(timeout time.Duration) error { + cctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + che := make(chan error, 1) + pctx := &conn.ProposalCtx{ + Ch: che, + Ctx: cctx, + } + x.AssertTruef(n.Proposals.Store(key, pctx), "Found existing proposal with key: [%v]", key) + defer n.Proposals.Delete(key) // Ensure that it gets deleted on return. + + if tr, ok := trace.FromContext(ctx); ok { + tr.LazyPrintf("Proposing data with key: %s. Timeout: %v", key, timeout) + } + + data, err := proposal.Marshal() + if err != nil { + return err + } + + if err = n.Raft().Propose(cctx, data); err != nil { + return x.Wrapf(err, "While proposing") + } + if tr, ok := trace.FromContext(ctx); ok { + tr.LazyPrintf("Waiting for the proposal.") + } + + select { + case err = <-che: + // We arrived here by a call to n.Proposals.Done(). + if tr, ok := trace.FromContext(ctx); ok { + tr.LazyPrintf("Done with error: %v", err) + } + return err + case <-ctx.Done(): + if tr, ok := trace.FromContext(ctx); ok { + tr.LazyPrintf("External context timed out with error: %v.", ctx.Err()) + } + return ctx.Err() + case <-cctx.Done(): + if tr, ok := trace.FromContext(ctx); ok { + tr.LazyPrintf("Internal context timed out with error: %v. Retrying...", cctx.Err()) + } + return errInternalRetry + } + } + + // Some proposals can be stuck if leader change happens. For e.g. MsgProp message from follower + // to leader can be dropped/end up appearing with empty Data in CommittedEntries. + // Having a timeout here prevents the mutation being stuck forever in case they don't have a + // timeout. We should always try with a timeout and optionally retry. + // + // Let's try 3 times before giving up. + for i := 0; i < 3; i++ { + // Each retry creates a new proposal, which adds to the number of pending proposals. We + // should consider this into account, when adding new proposals to the system. + if err := limiter.incr(ctx, i); err != nil { + return err + } + defer limiter.decr(i) + + if err := propose(newTimeout(i)); err != errInternalRetry { + return err + } + } + return errUnableToServe +} diff --git a/worker/sort.go b/worker/sort.go index 8cd08381b08..5b0e9693ba2 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -201,21 +201,21 @@ func sortWithIndex(ctx context.Context, ts *pb.SortMessage) *sortresult { // We need to reach the last key of this index type. seekKey = x.IndexKey(order.Attr, string(tokenizer.Identifier()+1)) } - it := posting.NewTxnPrefixIterator(txn, iterOpt, indexPrefix, seekKey) - defer it.Close() + itr := txn.NewIterator(iterOpt) + defer itr.Close() BUCKETS: // Outermost loop is over index buckets. - for it.Valid() { - key := it.Key() + for itr.Seek(seekKey); itr.ValidForPrefix(indexPrefix); itr.Next() { + item := itr.Item() + key := item.Key() // No need to copy. select { case <-ctx.Done(): return &sortresult{&emptySortResult, nil, ctx.Err()} default: k := x.Parse(key) if k == nil { - it.Next() continue } @@ -235,7 +235,6 @@ BUCKETS: default: return &sortresult{&emptySortResult, nil, err} } - it.Next() } } diff --git a/worker/stream_lists.go b/worker/stream_lists.go index d3017504d2c..303bc7919ea 100644 --- a/worker/stream_lists.go +++ b/worker/stream_lists.go @@ -219,7 +219,7 @@ func (sl *streamLists) streamKVs(ctx context.Context, prefix string, if err := sl.stream.Send(batch); err != nil { return err } - glog.Infof("%s Created batch of size: %s in %v.\n", + glog.V(2).Infof("%s Created batch of size: %s in %v.\n", prefix, humanize.Bytes(sz), time.Since(t)) return nil } diff --git a/worker/task.go b/worker/task.go index 072fecb915f..3448730e95a 100644 --- a/worker/task.go +++ b/worker/task.go @@ -1598,13 +1598,13 @@ func (cp *countParams) evaluate(out *pb.Result) error { Attr: cp.attr, } countPrefix := pk.CountPrefix(cp.reverse) - it := posting.NewTxnPrefixIterator(txn, itOpt, countPrefix, countKey) - defer it.Close() - for ; it.Valid(); it.Next() { - key := it.Key() - nk := make([]byte, len(key)) - copy(nk, key) + itr := txn.NewIterator(itOpt) + defer itr.Close() + + for itr.Seek(countKey); itr.ValidForPrefix(countPrefix); itr.Next() { + item := itr.Item() + key := item.KeyCopy(nil) pl, err := posting.Get(key) if err != nil { return err @@ -1635,33 +1635,9 @@ func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error { glog.Warningln("has function does not handle reverse iteration.") } - var cachedUids []uint64 - bitr := &posting.BTreeIterator{ - // Reverse: q.Reverse, // We don't handle reverse iteration. - Prefix: prefix, - } - for bitr.Seek(startKey); bitr.Valid(); bitr.Next() { - key := bitr.Key() - pl := posting.GetLru(key) - if pl == nil { - continue - } - pk := x.Parse(key) - var num int - if err := pl.Iterate(q.ReadTs, 0, func(_ *pb.Posting) error { - num++ - return posting.ErrStopIteration - }); err != nil { - return err - } - if num > 0 { - cachedUids = append(cachedUids, pk.Uid) - } - } - result := &pb.List{} var prevKey []byte - var cidx, w int + var w int itOpt := badger.DefaultIteratorOptions itOpt.PrefetchValues = false itOpt.AllVersions = true @@ -1680,19 +1656,6 @@ func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error { // iterator. pk := x.Parse(item.Key()) - // Pick up all the uids found in the Btree, which are <= this Uid. - for cidx < len(cachedUids) && cachedUids[cidx] < pk.Uid { - result.Uids = append(result.Uids, pk.Uid) - cidx++ - } - if cidx < len(cachedUids) && cachedUids[cidx] == pk.Uid { - // No need to check this key on disk. We already found it in the - // Btree. - result.Uids = append(result.Uids, pk.Uid) - cidx++ - continue - } - // We do need to copy over the key for ReadPostingList. l, err := posting.ReadPostingList(item.KeyCopy(nil), it) if err != nil { @@ -1722,11 +1685,6 @@ func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error { } } } - // Pick up any remaining UIDs from the Btree. - if cidx < len(cachedUids) { - result.Uids = append(result.Uids, cachedUids[cidx:]...) - } - out.UidMatrix = append(out.UidMatrix, result) return nil } diff --git a/worker/tokens.go b/worker/tokens.go index c8af92f7648..10559c89b8e 100644 --- a/worker/tokens.go +++ b/worker/tokens.go @@ -22,7 +22,7 @@ import ( "github.com/dgraph-io/badger" "bytes" - "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/tok" "github.com/dgraph-io/dgraph/types" @@ -159,11 +159,15 @@ func getInequalityTokens(readTs uint64, attr, f string, var out []string indexPrefix := x.IndexKey(attr, string(tokenizer.Identifier())) seekKey := x.IndexKey(attr, ineqToken) - it := posting.NewTxnPrefixIterator(txn, itOpt, indexPrefix, seekKey) + + itr := txn.NewIterator(itOpt) + defer itr.Close() + ineqTokenInBytes := []byte(ineqToken) //used for inequality comparison below - defer it.Close() - for ; it.Valid(); it.Next() { - key := it.Key() + + for itr.Seek(seekKey); itr.ValidForPrefix(indexPrefix); itr.Next() { + item := itr.Item() + key := item.Key() k := x.Parse(key) if k == nil { continue diff --git a/x/error.go b/x/error.go index 5e143b3ba15..b5f3da16767 100644 --- a/x/error.go +++ b/x/error.go @@ -60,6 +60,12 @@ func Check2(_ interface{}, err error) { Check(err) } +// Ignore function is used to ignore errors deliberately, while keeping the +// linter happy. +func Ignore(_ error) { + // Do nothing. +} + // AssertTrue asserts that b is true. Otherwise, it would log fatal. func AssertTrue(b bool) { if !b {