Skip to content

Commit

Permalink
Improve efficiency of readonly transactions by reusing the same read …
Browse files Browse the repository at this point in the history
…ts (#2604)

Currently, every transaction gets a new timestamp from Zero, which then has to propagate the allocated timestamp to every Alpha, so that it knows when to service that transaction. It would apply all the updates up until that ts, so the txn always returns consistent results.

If a user is only doing read-only transactions, then this system causes a lot of unnecessary work, because each txn gets a new ts from Zero. This PR changes that by allocating a new read-only timestamp, which can be reused across many read-only transactions, if no RW txns are going on in the system. This speeds up reads, because it avoids the wait for ts propagation from Zero to Alpha leader to followers.

Commits:

* Logic to retrieve read only timestamps from Dgraph cluster. This would make readonly queries more efficient by utilizing the same readonly timestamp, instead of having to lease and propagate a new MaxAssigned each time.

* Working code for ReadOnly Txns.

* Add a test to ensure that all read-only txns get the same timestamp. Also, avoid streaming MaxAssigned if nothing new was assigned.
  • Loading branch information
manishrjain authored Sep 20, 2018
1 parent 8f24674 commit fe7b749
Show file tree
Hide file tree
Showing 17 changed files with 1,015 additions and 664 deletions.
2 changes: 1 addition & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
}

type linReadReq struct {
// A one-shot chan which we send a raft index upon
// A one-shot chan which we send a raft index upon.
indexCh chan<- uint64
}

Expand Down
2 changes: 1 addition & 1 deletion contrib/integration/bank/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *State) runTotal(dg *dgo.Dgraph) error {
}
}
`
txn := dg.NewTxn()
txn := dg.NewReadOnlyTxn()
defer txn.Discard(context.Background())
resp, err := txn.Query(context.Background(), query)
if err != nil {
Expand Down
43 changes: 32 additions & 11 deletions contrib/integration/increment/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,54 @@ import (
var (
addr = flag.String("addr", "localhost:9080", "Address of Dgraph server.")
num = flag.Int("num", 1, "How many times to run.")
ro = flag.Bool("ro", false, "Only read the counter value, don't update it.")
wait = flag.String("wait", "0", "How long to wait.")
pred = flag.String("pred", "counter.val", "Predicate to use for storing the counter.")
)

type Counter struct {
Uid string `json:"uid"`
Val int `json:"val"`

startTs uint64 // Only used for internal testing.
}

func increment(dg *dgo.Dgraph) (int, error) {
func queryCounter(txn *dgo.Txn) (Counter, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

txn := dg.NewTxn()
var counter Counter
query := fmt.Sprintf("{ q(func: has(%s)) { uid, val: %s }}", *pred, *pred)
resp, err := txn.Query(ctx, query)
if err != nil {
return 0, err
return counter, err
}
m := make(map[string][]Counter)
if err := json.Unmarshal(resp.Json, &m); err != nil {
return 0, err
return counter, err
}
var counter Counter
if len(m["q"]) == 0 {
// Do nothing.
} else if len(m["q"]) == 1 {
counter = m["q"][0]
} else {
log.Fatalf("Invalid response: %q", resp.Json)
panic(fmt.Sprintf("Invalid response: %q", resp.Json))
}
counter.startTs = resp.GetTxn().GetStartTs()
return counter, nil
}

func process(dg *dgo.Dgraph, readOnly bool) (Counter, error) {
if readOnly {
txn := dg.NewReadOnlyTxn()
defer txn.Discard(nil)
return queryCounter(txn)
}

txn := dg.NewTxn()
counter, err := queryCounter(txn)
if err != nil {
return Counter{}, err
}
counter.Val += 1

Expand All @@ -64,11 +82,14 @@ func increment(dg *dgo.Dgraph) (int, error) {
counter.Uid = "_:new"
}
mu.SetNquads = []byte(fmt.Sprintf(`<%s> <%s> "%d"^^<xs:int> .`, counter.Uid, *pred, counter.Val))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err = txn.Mutate(ctx, &mu)
if err != nil {
return 0, err
return Counter{}, err
}
return counter.Val, txn.Commit(ctx)
return counter, txn.Commit(ctx)
}

func main() {
Expand All @@ -87,13 +108,13 @@ func main() {
}

for *num > 0 {
val, err := increment(dg)
cnt, err := process(dg, *ro)
if err != nil {
fmt.Printf("While trying to increment counter: %v. Retrying...\n", err)
fmt.Printf("While trying to process counter: %v. Retrying...\n", err)
time.Sleep(time.Second)
continue
}
fmt.Printf("Counter SET OK: %d\n", val)
fmt.Printf("Counter VAL: %d [ Ts: %d ]\n", cnt.Val, cnt.startTs)
*num -= 1
time.Sleep(waitDur)
}
Expand Down
105 changes: 105 additions & 0 deletions contrib/integration/increment/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"context"
"log"
"strings"
"sync"
"testing"

"github.com/dgraph-io/dgo"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

const N = 10

func increment(t *testing.T, dg *dgo.Dgraph) int {
var max int
var mu sync.Mutex
storeMax := func(a int) {
mu.Lock()
if max < a {
max = a
}
mu.Unlock()
}

var wg sync.WaitGroup
// N goroutines, process N times each goroutine.
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
cnt, err := process(dg, false)
if err != nil {
if strings.Index(err.Error(), "Transaction has been aborted") >= 0 {
// pass
} else {
t.Logf("Error while incrementing: %v\n", err)
}
} else {
storeMax(cnt.Val)
}
}
}()
}
wg.Wait()
return max
}

func read(t *testing.T, dg *dgo.Dgraph, expected int) {
cnt, err := process(dg, true)
require.NoError(t, err)
ts := cnt.startTs
t.Logf("Readonly stage counter: %+v\n", cnt)

var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
cnt, err := process(dg, true)
if err != nil {
t.Logf("Error while reading: %v\n", err)
} else {
require.Equal(t, expected, cnt.Val)
require.Equal(t, ts, cnt.startTs)
}
}
}()
}
wg.Wait()
}

func TestIncrement(t *testing.T) {
conn, err := grpc.Dial("localhost:9180", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
dc := api.NewDgraphClient(conn)
dg := dgo.NewDgraphClient(dc)

op := api.Operation{DropAll: true}
x.Check(dg.Alter(context.Background(), &op))

cnt, err := process(dg, false)
if err != nil {
t.Logf("Error while reading: %v\n", err)
} else {
t.Logf("Initial value: %d\n", cnt.Val)
}

val := increment(t, dg)
t.Logf("Increment stage done. Got value: %d\n", val)
read(t, dg, val)
t.Logf("Read stage done with value: %d\n", val)
val = increment(t, dg)
t.Logf("Increment stage done. Got value: %d\n", val)
read(t, dg, val)
t.Logf("Read stage done with value: %d\n", val)
}
3 changes: 1 addition & 2 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/dgraph-io/badger"
bo "github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -180,7 +179,7 @@ func findRDFFiles(dir string) []string {
}

type uidRangeResponse struct {
uids *api.AssignedIds
uids *intern.AssignedIds
err error
}

Expand Down
60 changes: 47 additions & 13 deletions dgraph/cmd/zero/assign.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

"golang.org/x/net/context"

"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

var (
emptyNum intern.Num
emptyAssignedIds api.AssignedIds
emptyAssignedIds intern.AssignedIds
)

const (
Expand Down Expand Up @@ -48,11 +48,13 @@ func (s *Server) maxTxnTs() uint64 {
return s.state.MaxTxnTs
}

var servedFromMemory = errors.New("Lease was served from memory.")

// lease would either allocate ids or timestamps.
// This function is triggered by an RPC call. We ensure that only leader can assign new UIDs,
// so we can tackle any collisions that might happen with the leasemanager
// In essence, we just want one server to be handing out new uids.
func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.AssignedIds, error) {
func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*intern.AssignedIds, error) {
node := s.Node
// TODO: Fix when we move to linearizable reads, need to check if we are the leader, might be
// based on leader leases. If this node gets partitioned and unless checkquorum is enabled, this
Expand All @@ -61,14 +63,36 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
return &emptyAssignedIds, x.Errorf("Assigning IDs is only allowed on leader.")
}

val := int(num.Val)
if val == 0 {
return &emptyAssignedIds, x.Errorf("Nothing to be marked or assigned")
if num.Val == 0 && !num.ReadOnly {
return &emptyAssignedIds, x.Errorf("Nothing to be leased")
}
if glog.V(3) {
glog.Infof("Got lease request for txn: %v. Num: %+v\n", txn, num)
}

s.leaseLock.Lock()
defer s.leaseLock.Unlock()

if txn {
if num.Val == 0 && num.ReadOnly {
// If we're only asking for a readonly timestamp, we can potentially
// service it directly.
if glog.V(3) {
glog.Infof("Attempting to serve read only txn ts [%d, %d]",
s.readOnlyTs, s.nextTxnTs)
}
if s.readOnlyTs > 0 && s.readOnlyTs == s.nextTxnTs-1 {
return &intern.AssignedIds{ReadOnly: s.readOnlyTs}, servedFromMemory
}
}
// We couldn't service it. So, let's request an extra timestamp for
// readonly transactions, if needed.
}

// If we're asking for more ids than the standard lease bandwidth, then we
// should set howMany generously, so we can service future requests from
// memory, without asking for another lease. Only used if we need to renew
// our lease.
howMany := leaseBandwidth
if num.Val > leaseBandwidth {
howMany = num.Val + leaseBandwidth
Expand All @@ -81,6 +105,8 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
var maxLease, available uint64
var proposal intern.ZeroProposal

// Calculate how many ids do we have available in memory, before we need to
// renew our lease.
if txn {
maxLease = s.maxTxnTs()
available = maxLease - s.nextTxnTs + 1
Expand All @@ -91,19 +117,27 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
proposal.MaxLeaseId = maxLease + howMany
}

if available < num.Val {
// If we have less available than what we need, we need to renew our lease.
if available < num.Val+1 { // +1 for a potential readonly ts.
// Blocking propose to get more ids or timestamps.
if err := s.Node.proposeAndWait(ctx, &proposal); err != nil {
return nil, err
}
}

out := &api.AssignedIds{}
out := &intern.AssignedIds{}
if txn {
out.StartId = s.nextTxnTs
out.EndId = out.StartId + num.Val - 1
s.nextTxnTs = out.EndId + 1
s.orc.doneUntil.Begin(out.EndId)
if num.Val > 0 {
out.StartId = s.nextTxnTs
out.EndId = out.StartId + num.Val - 1
s.nextTxnTs = out.EndId + 1
}
if num.ReadOnly {
s.readOnlyTs = s.nextTxnTs
s.nextTxnTs++
out.ReadOnly = s.readOnlyTs
}
s.orc.doneUntil.Begin(x.Max(out.EndId, out.ReadOnly))
} else {
out.StartId = s.nextLeaseId
out.EndId = out.StartId + num.Val - 1
Expand All @@ -114,7 +148,7 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass

// AssignUids is used to assign new uids by communicating with the leader of the RAFT group
// responsible for handing out uids.
func (s *Server) AssignUids(ctx context.Context, num *intern.Num) (*api.AssignedIds, error) {
func (s *Server) AssignUids(ctx context.Context, num *intern.Num) (*intern.AssignedIds, error) {
if ctx.Err() != nil {
return &emptyAssignedIds, ctx.Err()
}
Expand Down
Loading

0 comments on commit fe7b749

Please sign in to comment.