Skip to content

Commit

Permalink
Optimize and refactor facet filtering (#2829)
Browse files Browse the repository at this point in the history
* Refactor how facet filtering is done.
* Avoid panics during facetsMatrix iteration.
* Check facet param instead of length. Keep it consistent
* balance uidMatrix with facetMatrix
* Fix the tests
  • Loading branch information
manishrjain authored Dec 18, 2018
1 parent 734650b commit fee3ac6
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 62 deletions.
6 changes: 6 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,12 @@ func (l *List) hasPendingTxn() bool {
return false
}

func (l *List) ApproxLen() int {
l.RLock()
defer l.RUnlock()
return len(l.mutationMap) + codec.ApproxLen(l.plist.Pack)
}

// Uids returns the UIDs given some query params.
// We have to apply the filtering before applying (offset, count).
// WARNING: Calling this function just to get Uids is expensive
Expand Down
16 changes: 13 additions & 3 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
// Can happen in recurse query.
continue
}
if len(pc.facetsMatrix) > 0 && len(pc.facetsMatrix) != len(pc.uidMatrix) {
return x.Errorf("length of facetsMatrix and uidMatrix mismatch: %d vs %d",
len(pc.facetsMatrix), len(pc.uidMatrix))
}

idx := algo.IndexOf(pc.SrcUIDs, uid)
if idx < 0 {
Expand All @@ -411,8 +415,10 @@ func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
fieldName := pc.fieldName()
if len(pc.counts) > 0 {
addCount(pc, uint64(pc.counts[idx]), dst)

} else if pc.SrcFunc != nil && pc.SrcFunc.Name == "checkpwd" {
addCheckPwd(pc, pc.valueMatrix[idx].Values, dst)

} else if idx < len(pc.uidMatrix) && len(pc.uidMatrix[idx].Uids) > 0 {
var fcsList []*pb.Facets
if pc.Params.Facet != nil {
Expand Down Expand Up @@ -485,7 +491,7 @@ func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
continue
}

if pc.Params.Facet != nil && len(pc.facetsMatrix[idx].FacetsList) > 0 {
if len(pc.facetsMatrix) > idx && len(pc.facetsMatrix[idx].FacetsList) > 0 {
// in case of Value we have only one Facets
for _, f := range pc.facetsMatrix[idx].FacetsList[0].Facets {
fVal, err := facets.ValFor(f)
Expand Down Expand Up @@ -1279,7 +1285,7 @@ func (sg *SubGraph) populatePostAggregation(doneVars map[string]varValue, path [

// Filters might have updated the destuids. facetMatrix should also be updated.
func (sg *SubGraph) updateFacetMatrix() {
if sg.Params.Facet == nil {
if len(sg.facetsMatrix) != len(sg.uidMatrix) {
return
}

Expand Down Expand Up @@ -2205,9 +2211,13 @@ func (sg *SubGraph) updateDestUids() {
}

func (sg *SubGraph) sortAndPaginateUsingFacet(ctx context.Context) error {
if sg.facetsMatrix == nil {
if len(sg.facetsMatrix) == 0 {
return nil
}
if len(sg.facetsMatrix) != len(sg.uidMatrix) {
return x.Errorf("Facet matrix and UID matrix mismatch: %d vs %d",
len(sg.facetsMatrix), len(sg.uidMatrix))
}
orderby := sg.Params.FacetOrder
for i := 0; i < len(sg.uidMatrix); i++ {
ul := sg.uidMatrix[i]
Expand Down
2 changes: 1 addition & 1 deletion query/shortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (sg *SubGraph) getCost(matrix, list int) (cost float64,
fcs *pb.Facets, rerr error) {

cost = 1.0
if sg.Params.Facet == nil {
if len(sg.facetsMatrix) <= matrix {
return cost, fcs, rerr
}
fcsList := sg.facetsMatrix[matrix].FacetsList
Expand Down
83 changes: 37 additions & 46 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ import (
)

var (
emptyUIDList pb.List
emptyResult pb.Result
emptyValueList = pb.ValueList{Values: []*pb.TaskValue{}}
emptyUIDList pb.List
emptyFacetsList pb.FacetsList
emptyResult pb.Result
emptyValueList = pb.ValueList{Values: []*pb.TaskValue{}}
)

func invokeNetworkRequest(
Expand Down Expand Up @@ -276,11 +277,6 @@ func needsIndex(fnType FuncType) bool {
}
}

type result struct {
uid uint64
facets []*api.Facet
}

type funcArgs struct {
q *pb.Query
gid uint32
Expand Down Expand Up @@ -382,11 +378,11 @@ func handleValuePostings(ctx context.Context, args funcArgs) error {

if err == posting.ErrNoValue || len(vals) == 0 {
out.UidMatrix = append(out.UidMatrix, &emptyUIDList)
out.FacetMatrix = append(out.FacetMatrix, &emptyFacetsList)
if q.DoCount {
out.Counts = append(out.Counts, 0)
} else {
out.ValueMatrix = append(out.ValueMatrix, &emptyValueList)
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
Expand Down Expand Up @@ -445,6 +441,8 @@ func handleValuePostings(ctx context.Context, args funcArgs) error {
}
out.FacetMatrix = append(out.FacetMatrix,
&pb.FacetsList{FacetsList: []*pb.Facets{{Facets: fs}}})
} else {
out.FacetMatrix = append(out.FacetMatrix, &emptyFacetsList)
}

switch {
Expand Down Expand Up @@ -575,39 +573,6 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti
return err
}

// get filtered uids and facets.
var filteredRes []*result

var perr error
filteredRes = make([]*result, 0)
err = pl.Postings(opts, func(p *pb.Posting) error {
res := true
res, perr = applyFacetsTree(p.Facets, facetsTree)
if perr != nil {
return posting.ErrStopIteration
}
if res {
filteredRes = append(filteredRes, &result{
uid: p.Uid,
facets: facets.CopyFacets(p.Facets, q.FacetParam)})
}
return nil // continue iteration.
})
if err != nil {
return err
} else if perr != nil {
return perr
}

// add facets to result.
if q.FacetParam != nil {
var fcsList []*pb.Facets
for _, fres := range filteredRes {
fcsList = append(fcsList, &pb.Facets{Facets: fres.facets})
}
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{FacetsList: fcsList})
}

switch {
case q.DoCount:
if i == 0 {
Expand Down Expand Up @@ -667,12 +632,38 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti
if i == 0 {
span.Annotate(nil, "default")
}
// The more usual case: Getting the UIDs.
uidList := new(pb.List)
for _, fres := range filteredRes {
uidList.Uids = append(uidList.Uids, fres.uid)

uidList := &pb.List{
Uids: make([]uint64, 0, pl.ApproxLen()),
}

var fcsList []*pb.Facets
err = pl.Postings(opts, func(p *pb.Posting) error {
pick, err := applyFacetsTree(p.Facets, facetsTree)
if err != nil {
return err
}
if pick {
// TODO: This way of picking Uids differs from how
// pl.Uids works. So, have a look to see if we're
// catching all the edge cases here.
uidList.Uids = append(uidList.Uids, p.Uid)
if q.FacetParam != nil {
fcsList = append(fcsList, &pb.Facets{
Facets: facets.CopyFacets(p.Facets, q.FacetParam),
})
}
}
return nil // continue iteration.
})
if err != nil {
return err
}

out.UidMatrix = append(out.UidMatrix, uidList)
if q.FacetParam != nil {
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{FacetsList: fcsList})
}
}
}
return nil
Expand Down
24 changes: 12 additions & 12 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestProcessTaskIndexMLayer(t *testing.T) {
require.NoError(t, err)

require.EqualValues(t, [][]uint64{
nil,
{},
{10, 12},
}, algo.ToUintsListForTest(r.UidMatrix))

Expand All @@ -180,9 +180,9 @@ func TestProcessTaskIndexMLayer(t *testing.T) {
require.NoError(t, err)

require.EqualValues(t, [][]uint64{
nil,
{},
{12},
nil,
{},
{10},
}, algo.ToUintsListForTest(r.UidMatrix))

Expand Down Expand Up @@ -211,8 +211,8 @@ func TestProcessTaskIndexMLayer(t *testing.T) {

require.EqualValues(t, [][]uint64{
{12},
nil,
nil,
{},
{},
}, algo.ToUintsListForTest(r.UidMatrix))

query = newQuery("friend", nil, []string{"anyofterms", "", "photon notphoton ignored"})
Expand All @@ -221,8 +221,8 @@ func TestProcessTaskIndexMLayer(t *testing.T) {

require.EqualValues(t, [][]uint64{
{12},
nil,
nil,
{},
{},
}, algo.ToUintsListForTest(r.UidMatrix))
}

Expand All @@ -236,7 +236,7 @@ func TestProcessTaskIndex(t *testing.T) {
require.NoError(t, err)

require.EqualValues(t, [][]uint64{
nil,
{},
{10, 12},
}, algo.ToUintsListForTest(r.UidMatrix))

Expand All @@ -258,9 +258,9 @@ func TestProcessTaskIndex(t *testing.T) {
require.NoError(t, err)

require.EqualValues(t, [][]uint64{
nil,
{},
{12},
nil,
{},
{10},
}, algo.ToUintsListForTest(r.UidMatrix))

Expand Down Expand Up @@ -289,8 +289,8 @@ func TestProcessTaskIndex(t *testing.T) {

require.EqualValues(t, [][]uint64{
{12},
nil,
nil,
{},
{},
}, algo.ToUintsListForTest(r.UidMatrix))
}

Expand Down

0 comments on commit fee3ac6

Please sign in to comment.