diff --git a/traversal/traversing_linksystem.go b/traversal/traversing_linksystem.go new file mode 100644 index 00000000..1be687b1 --- /dev/null +++ b/traversal/traversing_linksystem.go @@ -0,0 +1,151 @@ +package traversal + +import ( + "io" + + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" +) + +type pathNode struct { + link datamodel.Link + children map[datamodel.PathSegment]*pathNode +} + +func newPath(link datamodel.Link) *pathNode { + return &pathNode{ + link: link, + children: make(map[datamodel.PathSegment]*pathNode), + } +} + +func (pn pathNode) addPath(p []datamodel.PathSegment, link datamodel.Link) { + if len(p) == 0 { + return + } + if _, ok := pn.children[p[0]]; !ok { +var child *pathNode +if len(p) == 1 { + child = newPath(link) +} else { + child = newPath(nil) +} + pn.children[p[0]] = child + } + pn.children[p[0]].addPath(p[1:], link) +} + +func (pn pathNode) allLinks() []datamodel.Link { + if len(pn.children) == 0 { + return []datamodel.Link{pn.link} + } + links := make([]datamodel.Link, 0) + if pn.link != nil { + links = append(links, pn.link) + } + for _, v := range pn.children { + links = append(links, v.allLinks()...) + } + return links +} + +// getPaths returns reconstructed paths in the tree rooted at 'root' +func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link { + segs := root.Segments() + switch len(segs) { + case 0: + if pn.link != nil { + return []datamodel.Link{pn.link} + } + return []datamodel.Link{} + case 1: + // base case 1: get all paths below this child. + next := segs[0] + if child, ok := pn.children[next]; ok { + return child.allLinks() + } + return []datamodel.Link{} + default: + } + + next := segs[0] + if _, ok := pn.children[next]; !ok { + // base case 2: not registered sub-path. + return []datamodel.Link{} + } + return pn.children[next].getLinks(datamodel.NewPathNocopy(segs[1:])) +} + +// TraverseResumer allows resuming a progress from a previously encountered path in the selector. +type TraverseResumer func(from datamodel.Path) error + +type traversalState struct { + underlyingOpener linking.BlockReadOpener + position int + pathOrder map[int]datamodel.Path + pathTree *pathNode + target *datamodel.Path + progress *Progress +} + +func (ts *traversalState) resume(from datamodel.Path) error { + if ts.progress == nil { + return nil + } + // reset progress and traverse until target. + ts.progress.SeenLinks = make(map[datamodel.Link]struct{}) + ts.position = 0 + ts.target = &from + return nil +} + +func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Reader, error) { + // when not in replay mode, we track metadata + if ts.target == nil { + ts.pathOrder[ts.position] = lc.LinkPath + ts.pathTree.addPath(lc.LinkPath.Segments(), l) + ts.position++ + return ts.underlyingOpener(lc, l) + } + + // if we reach the target, we exit replay mode (by removing target) + if lc.LinkPath.String() == ts.target.String() { + ts.target = nil + return ts.underlyingOpener(lc, l) + } + + // when replaying, we skip links not of our direct ancestor, + // and add all links on the path under them as 'seen' + targetSegments := ts.target.Segments() + seg := lc.LinkPath.Segments() + for i, s := range seg { + if i >= len(targetSegments) { + break + } + if targetSegments[i].String() != s.String() { + links := ts.pathTree.getLinks(datamodel.NewPathNocopy(seg[0:i])) + for _, l := range links { + ts.progress.SeenLinks[l] = struct{}{} + } + return nil, SkipMe{} + } + } + + // descend. + return ts.underlyingOpener(lc, l) +} + +// WithTraversingLinksystem extends a progress for traversal such that it can +// subsequently resume and perform subsets of the walk efficiently from +// an arbitrary position within the selector traversal. +func WithTraversingLinksystem(p *Progress) (TraverseResumer, error) { + ts := &traversalState{ + underlyingOpener: p.Cfg.LinkSystem.StorageReadOpener, + pathOrder: make(map[int]datamodel.Path), + pathTree: newPath(nil), + progress: p, + } + p.Cfg.LinkSystem.StorageReadOpener = ts.traverse + return ts.resume, nil +} diff --git a/traversal/traversing_linksystem_test.go b/traversal/traversing_linksystem_test.go new file mode 100644 index 00000000..49ae2663 --- /dev/null +++ b/traversal/traversing_linksystem_test.go @@ -0,0 +1,119 @@ +package traversal_test + +import ( + "errors" + "testing" + + "github.com/ipld/go-ipld-prime/datamodel" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" +) + +func TestWalkResume(t *testing.T) { + seen := 0 + count := func(p traversal.Progress, n datamodel.Node, _ traversal.VisitReason) error { + seen++ + return nil + } + + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(&store) + p := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + } + resumer, err := traversal.WithTraversingLinksystem(&p) + if err != nil { + t.Fatal(err) + } + sd := selectorparse.CommonSelector_ExploreAllRecursively + s, _ := selector.CompileSelector(sd) + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected total traversal to visit 14 nodes, got %d", seen) + } + + // resume from beginning. + resumer(datamodel.NewPath(nil)) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected resumed traversal to visit 14 nodes, got %d", seen) + } + + // resume from middle. + resumer(datamodel.NewPath([]datamodel.PathSegment{datamodel.PathSegmentOfString("linkedMap")})) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + // one less: will not visit 'linkedString' before linked map. + if seen != 13 { + t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) + } + + // resume from middle. + resumer(datamodel.NewPath([]datamodel.PathSegment{datamodel.PathSegmentOfString("linkedList")})) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + // will not visit 'linkedString' or 'linkedMap' before linked list. + if seen != 7 { + t.Fatalf("expected resumed traversal to visit 7 nodes, got %d", seen) + } +} + +func TestWalkResumePartialWalk(t *testing.T) { + seen := 0 + limit := 0 + countUntil := func(p traversal.Progress, n datamodel.Node, _ traversal.VisitReason) error { + seen++ + if seen >= limit { + return traversal.SkipMe{} + } + return nil + } + + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(&store) + p := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + } + resumer, err := traversal.WithTraversingLinksystem(&p) + if err != nil { + t.Fatal(err) + } + sd := selectorparse.CommonSelector_ExploreAllRecursively + s, _ := selector.CompileSelector(sd) + limit = 9 + if err := p.WalkAdv(rootNode, s, countUntil); !errors.Is(err, traversal.SkipMe{}) { + t.Fatal(err) + } + if seen != limit { + t.Fatalf("expected partial traversal, got %d", seen) + } + + // resume. + resumer(datamodel.NewPath([]datamodel.PathSegment{datamodel.PathSegmentOfString("linkedMap")})) + seen = 0 + limit = 14 + if err := p.WalkAdv(rootNode, s, countUntil); err != nil { + t.Fatal(err) + } + if seen != 13 { + t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) + } +} diff --git a/traversal/walk.go b/traversal/walk.go index 80250170..00959a58 100644 --- a/traversal/walk.go +++ b/traversal/walk.go @@ -147,6 +147,21 @@ func (prog Progress) WalkAdv(n datamodel.Node, s selector.Selector, fn AdvVisitF return prog.walkAdv(n, s, fn) } +// ResumeWalkAdv resumes a WalkAdv by short-cutting to the path specified by Progress.Path, +// and then continuing. The specified node n should be the root of the traversal. +func (prog Progress) ResumeWalkAdv(n datamodel.Node, s selector.Selector, fn AdvVisitFn) error { + innerProg := Progress{ + Cfg: prog.Cfg, + Path: datamodel.NewPath(nil), + Budget: prog.Budget, + SeenLinks: prog.SeenLinks, + } + + return innerProg.walkAdv(n, s, func(p Progress, n datamodel.Node, vr VisitReason) error { + return SkipMe{} + }) +} + func (prog Progress) walkAdv(n datamodel.Node, s selector.Selector, fn AdvVisitFn) error { // Check the budget! if prog.Budget != nil {