Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve efficiency of readonly transactions by reusing the same read ts #2604

Merged
merged 4 commits into from
Sep 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
}

type linReadReq struct {
// A one-shot chan which we send a raft index upon
// A one-shot chan which we send a raft index upon.
indexCh chan<- uint64
}

Expand Down
2 changes: 1 addition & 1 deletion contrib/integration/bank/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *State) runTotal(dg *dgo.Dgraph) error {
}
}
`
txn := dg.NewTxn()
txn := dg.NewReadOnlyTxn()
defer txn.Discard(context.Background())
resp, err := txn.Query(context.Background(), query)
if err != nil {
Expand Down
43 changes: 32 additions & 11 deletions contrib/integration/increment/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,54 @@ import (
var (
addr = flag.String("addr", "localhost:9080", "Address of Dgraph server.")
num = flag.Int("num", 1, "How many times to run.")
ro = flag.Bool("ro", false, "Only read the counter value, don't update it.")
wait = flag.String("wait", "0", "How long to wait.")
pred = flag.String("pred", "counter.val", "Predicate to use for storing the counter.")
)

type Counter struct {
Uid string `json:"uid"`
Val int `json:"val"`

startTs uint64 // Only used for internal testing.
}

func increment(dg *dgo.Dgraph) (int, error) {
func queryCounter(txn *dgo.Txn) (Counter, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

txn := dg.NewTxn()
var counter Counter
query := fmt.Sprintf("{ q(func: has(%s)) { uid, val: %s }}", *pred, *pred)
resp, err := txn.Query(ctx, query)
if err != nil {
return 0, err
return counter, err
}
m := make(map[string][]Counter)
if err := json.Unmarshal(resp.Json, &m); err != nil {
return 0, err
return counter, err
}
var counter Counter
if len(m["q"]) == 0 {
// Do nothing.
} else if len(m["q"]) == 1 {
counter = m["q"][0]
} else {
log.Fatalf("Invalid response: %q", resp.Json)
panic(fmt.Sprintf("Invalid response: %q", resp.Json))
}
counter.startTs = resp.GetTxn().GetStartTs()
return counter, nil
}

func process(dg *dgo.Dgraph, readOnly bool) (Counter, error) {
if readOnly {
txn := dg.NewReadOnlyTxn()
defer txn.Discard(nil)
return queryCounter(txn)
}

txn := dg.NewTxn()
counter, err := queryCounter(txn)
if err != nil {
return Counter{}, err
}
counter.Val += 1

Expand All @@ -64,11 +82,14 @@ func increment(dg *dgo.Dgraph) (int, error) {
counter.Uid = "_:new"
}
mu.SetNquads = []byte(fmt.Sprintf(`<%s> <%s> "%d"^^<xs:int> .`, counter.Uid, *pred, counter.Val))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err = txn.Mutate(ctx, &mu)
if err != nil {
return 0, err
return Counter{}, err
}
return counter.Val, txn.Commit(ctx)
return counter, txn.Commit(ctx)
}

func main() {
Expand All @@ -87,13 +108,13 @@ func main() {
}

for *num > 0 {
val, err := increment(dg)
cnt, err := process(dg, *ro)
if err != nil {
fmt.Printf("While trying to increment counter: %v. Retrying...\n", err)
fmt.Printf("While trying to process counter: %v. Retrying...\n", err)
time.Sleep(time.Second)
continue
}
fmt.Printf("Counter SET OK: %d\n", val)
fmt.Printf("Counter VAL: %d [ Ts: %d ]\n", cnt.Val, cnt.startTs)
*num -= 1
time.Sleep(waitDur)
}
Expand Down
105 changes: 105 additions & 0 deletions contrib/integration/increment/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"context"
"log"
"strings"
"sync"
"testing"

"github.com/dgraph-io/dgo"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

const N = 10

func increment(t *testing.T, dg *dgo.Dgraph) int {
var max int
var mu sync.Mutex
storeMax := func(a int) {
mu.Lock()
if max < a {
max = a
}
mu.Unlock()
}

var wg sync.WaitGroup
// N goroutines, process N times each goroutine.
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
cnt, err := process(dg, false)
if err != nil {
if strings.Index(err.Error(), "Transaction has been aborted") >= 0 {
// pass
} else {
t.Logf("Error while incrementing: %v\n", err)
}
} else {
storeMax(cnt.Val)
}
}
}()
}
wg.Wait()
return max
}

func read(t *testing.T, dg *dgo.Dgraph, expected int) {
cnt, err := process(dg, true)
require.NoError(t, err)
ts := cnt.startTs
t.Logf("Readonly stage counter: %+v\n", cnt)

var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
cnt, err := process(dg, true)
if err != nil {
t.Logf("Error while reading: %v\n", err)
} else {
require.Equal(t, expected, cnt.Val)
require.Equal(t, ts, cnt.startTs)
}
}
}()
}
wg.Wait()
}

func TestIncrement(t *testing.T) {
conn, err := grpc.Dial("localhost:9180", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
dc := api.NewDgraphClient(conn)
dg := dgo.NewDgraphClient(dc)

op := api.Operation{DropAll: true}
x.Check(dg.Alter(context.Background(), &op))

cnt, err := process(dg, false)
if err != nil {
t.Logf("Error while reading: %v\n", err)
} else {
t.Logf("Initial value: %d\n", cnt.Val)
}

val := increment(t, dg)
t.Logf("Increment stage done. Got value: %d\n", val)
read(t, dg, val)
t.Logf("Read stage done with value: %d\n", val)
val = increment(t, dg)
t.Logf("Increment stage done. Got value: %d\n", val)
read(t, dg, val)
t.Logf("Read stage done with value: %d\n", val)
}
3 changes: 1 addition & 2 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/dgraph-io/badger"
bo "github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -180,7 +179,7 @@ func findRDFFiles(dir string) []string {
}

type uidRangeResponse struct {
uids *api.AssignedIds
uids *intern.AssignedIds
err error
}

Expand Down
60 changes: 47 additions & 13 deletions dgraph/cmd/zero/assign.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

"golang.org/x/net/context"

"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

var (
emptyNum intern.Num
emptyAssignedIds api.AssignedIds
emptyAssignedIds intern.AssignedIds
)

const (
Expand Down Expand Up @@ -48,11 +48,13 @@ func (s *Server) maxTxnTs() uint64 {
return s.state.MaxTxnTs
}

var servedFromMemory = errors.New("Lease was served from memory.")

// lease would either allocate ids or timestamps.
// This function is triggered by an RPC call. We ensure that only leader can assign new UIDs,
// so we can tackle any collisions that might happen with the leasemanager
// In essence, we just want one server to be handing out new uids.
func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.AssignedIds, error) {
func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*intern.AssignedIds, error) {
node := s.Node
// TODO: Fix when we move to linearizable reads, need to check if we are the leader, might be
// based on leader leases. If this node gets partitioned and unless checkquorum is enabled, this
Expand All @@ -61,14 +63,36 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
return &emptyAssignedIds, x.Errorf("Assigning IDs is only allowed on leader.")
}

val := int(num.Val)
if val == 0 {
return &emptyAssignedIds, x.Errorf("Nothing to be marked or assigned")
if num.Val == 0 && !num.ReadOnly {
return &emptyAssignedIds, x.Errorf("Nothing to be leased")
}
if glog.V(3) {
glog.Infof("Got lease request for txn: %v. Num: %+v\n", txn, num)
}

s.leaseLock.Lock()
defer s.leaseLock.Unlock()

if txn {
if num.Val == 0 && num.ReadOnly {
// If we're only asking for a readonly timestamp, we can potentially
// service it directly.
if glog.V(3) {
glog.Infof("Attempting to serve read only txn ts [%d, %d]",
s.readOnlyTs, s.nextTxnTs)
}
if s.readOnlyTs > 0 && s.readOnlyTs == s.nextTxnTs-1 {
return &intern.AssignedIds{ReadOnly: s.readOnlyTs}, servedFromMemory
}
}
// We couldn't service it. So, let's request an extra timestamp for
// readonly transactions, if needed.
}

// If we're asking for more ids than the standard lease bandwidth, then we
// should set howMany generously, so we can service future requests from
// memory, without asking for another lease. Only used if we need to renew
// our lease.
howMany := leaseBandwidth
if num.Val > leaseBandwidth {
howMany = num.Val + leaseBandwidth
Expand All @@ -81,6 +105,8 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
var maxLease, available uint64
var proposal intern.ZeroProposal

// Calculate how many ids do we have available in memory, before we need to
// renew our lease.
if txn {
maxLease = s.maxTxnTs()
available = maxLease - s.nextTxnTs + 1
Expand All @@ -91,19 +117,27 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
proposal.MaxLeaseId = maxLease + howMany
}

if available < num.Val {
// If we have less available than what we need, we need to renew our lease.
if available < num.Val+1 { // +1 for a potential readonly ts.
// Blocking propose to get more ids or timestamps.
if err := s.Node.proposeAndWait(ctx, &proposal); err != nil {
return nil, err
}
}

out := &api.AssignedIds{}
out := &intern.AssignedIds{}
if txn {
out.StartId = s.nextTxnTs
out.EndId = out.StartId + num.Val - 1
s.nextTxnTs = out.EndId + 1
s.orc.doneUntil.Begin(out.EndId)
if num.Val > 0 {
out.StartId = s.nextTxnTs
out.EndId = out.StartId + num.Val - 1
s.nextTxnTs = out.EndId + 1
}
if num.ReadOnly {
s.readOnlyTs = s.nextTxnTs
s.nextTxnTs++
out.ReadOnly = s.readOnlyTs
}
s.orc.doneUntil.Begin(x.Max(out.EndId, out.ReadOnly))
} else {
out.StartId = s.nextLeaseId
out.EndId = out.StartId + num.Val - 1
Expand All @@ -114,7 +148,7 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass

// AssignUids is used to assign new uids by communicating with the leader of the RAFT group
// responsible for handing out uids.
func (s *Server) AssignUids(ctx context.Context, num *intern.Num) (*api.AssignedIds, error) {
func (s *Server) AssignUids(ctx context.Context, num *intern.Num) (*intern.AssignedIds, error) {
if ctx.Err() != nil {
return &emptyAssignedIds, ctx.Err()
}
Expand Down
Loading