From 73f9a90cbf439acc89d9740d6ed507861a6cd5b6 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Sun, 16 Oct 2016 21:06:28 -0400 Subject: [PATCH 1/2] Create a FilestoreNode object to carry PosInfo When doing a filestore add, we wrap whatever nodes we create in a FilestoreNode object and add the PosInfo to it so that the filestore will be able to extract information as needed. Edited by whyrusleeping License: MIT Signed-off-by: Kevin Atkinson --- commands/files/file.go | 5 ++++ core/coreunix/add.go | 12 +++++++++- importer/balanced/builder.go | 12 +++++++--- importer/chunk/rabin.go | 10 ++++++-- importer/chunk/splitting.go | 5 ++++ importer/helpers/dagbuilder.go | 44 ++++++++++++++++++++++++++++------ importer/helpers/helpers.go | 23 ++++++++++++++++++ thirdparty/posinfo/posinfo.go | 18 ++++++++++++++ 8 files changed, 116 insertions(+), 13 deletions(-) create mode 100644 thirdparty/posinfo/posinfo.go diff --git a/commands/files/file.go b/commands/files/file.go index c2185153c78..22d8ac2d00a 100644 --- a/commands/files/file.go +++ b/commands/files/file.go @@ -55,3 +55,8 @@ type SizeFile interface { Size() (int64, error) } + +type FileInfo interface { + FullPath() string + Stat() os.FileInfo +} diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 34702f088a4..275ab1c95b4 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -398,7 +398,12 @@ func (adder *Adder) addFile(file files.File) error { // progress updates to the client (over the output channel) var reader io.Reader = file if adder.Progress { - reader = &progressReader{file: file, out: adder.Out} + rdr := &progressReader{file: file, out: adder.Out} + if fi, ok := file.(files.FileInfo); ok { + reader = &progressReader2{rdr, fi} + } else { + reader = rdr + } } dagnode, err := adder.add(reader) @@ -520,3 +525,8 @@ func (i *progressReader) Read(p []byte) (int, error) { return n, err } + +type progressReader2 struct { + *progressReader + files.FileInfo +} diff --git a/importer/balanced/builder.go b/importer/balanced/builder.go index 70b3c55b009..057126c1c3d 100644 --- a/importer/balanced/builder.go +++ b/importer/balanced/builder.go @@ -9,10 +9,12 @@ import ( ) func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) { + var offset uint64 = 0 var root *h.UnixfsNode for level := 0; !db.Done(); level++ { nroot := h.NewUnixfsNode() + db.SetPosInfo(nroot, 0) // add our old root as a child of the new root. if root != nil { // nil if it's the first node. @@ -22,11 +24,13 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) { } // fill it up. - if err := fillNodeRec(db, nroot, level); err != nil { + if err := fillNodeRec(db, nroot, level, offset); err != nil { return nil, err } + offset = nroot.FileSize() root = nroot + } if root == nil { root = h.NewUnixfsNode() @@ -50,7 +54,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) { // it returns the total dataSize of the node, and a potential error // // warning: **children** pinned indirectly, but input node IS NOT pinned. -func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { +func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int, offset uint64) error { if depth < 0 { return errors.New("attempt to fillNode at depth < 0") } @@ -69,8 +73,9 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { // while we have room AND we're not done for node.NumChildren() < db.Maxlinks() && !db.Done() { child := h.NewUnixfsNode() + db.SetPosInfo(child, offset) - err := fillNodeRec(db, child, depth-1) + err := fillNodeRec(db, child, depth-1, offset) if err != nil { return err } @@ -78,6 +83,7 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { if err := node.AddChild(child, db); err != nil { return err } + offset += child.FileSize() } return nil diff --git a/importer/chunk/rabin.go b/importer/chunk/rabin.go index ce9b5fc5679..d2d71460d34 100644 --- a/importer/chunk/rabin.go +++ b/importer/chunk/rabin.go @@ -10,7 +10,8 @@ import ( var IpfsRabinPoly = chunker.Pol(17437180132763653) type Rabin struct { - r *chunker.Chunker + r *chunker.Chunker + reader io.Reader } func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin { @@ -25,7 +26,8 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin { ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max) return &Rabin{ - r: ch, + r: ch, + reader: r, } } @@ -37,3 +39,7 @@ func (r *Rabin) NextBytes() ([]byte, error) { return ch.Data, nil } + +func (r *Rabin) Reader() io.Reader { + return r.reader +} diff --git a/importer/chunk/splitting.go b/importer/chunk/splitting.go index f3256c4587c..6fd55e22da8 100644 --- a/importer/chunk/splitting.go +++ b/importer/chunk/splitting.go @@ -12,6 +12,7 @@ var log = logging.Logger("chunk") var DefaultBlockSize int64 = 1024 * 256 type Splitter interface { + Reader() io.Reader NextBytes() ([]byte, error) } @@ -77,3 +78,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { return buf[:n], nil } + +func (ss *sizeSplitterv2) Reader() io.Reader { + return ss.r +} diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index 696b90e4b70..c2683bd9bf9 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -1,6 +1,10 @@ package helpers import ( + "io" + "os" + + "github.com/ipfs/go-ipfs/commands/files" "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" @@ -17,6 +21,8 @@ type DagBuilderHelper struct { nextData []byte // the next item to return. maxlinks int batch *dag.Batch + fullPath string + stat os.FileInfo } type DagBuilderParams struct { @@ -34,13 +40,18 @@ type DagBuilderParams struct { // Generate a new DagBuilderHelper from the given params, which data source comes // from chunks object func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { - return &DagBuilderHelper{ + db := &DagBuilderHelper{ dserv: dbp.Dagserv, spl: spl, rawLeaves: dbp.RawLeaves, maxlinks: dbp.Maxlinks, batch: dbp.Dagserv.Batch(), } + if fi, ok := spl.Reader().(files.FileInfo); ok { + db.fullPath = fi.FullPath() + db.stat = fi.Stat() + } + return db } // prepareNext consumes the next item from the splitter and puts it @@ -48,12 +59,14 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { // it will do nothing. func (db *DagBuilderHelper) prepareNext() { // if we already have data waiting to be consumed, we're ready - if db.nextData != nil { + if db.nextData != nil || db.recvdErr != nil { return } - // TODO: handle err (which wasn't handled either when the splitter was channeled) - db.nextData, _ = db.spl.NextBytes() + db.nextData, db.recvdErr = db.spl.NextBytes() + if db.recvdErr == io.EOF { + db.recvdErr = nil + } } // Done returns whether or not we're done consuming the incoming data. @@ -61,17 +74,24 @@ func (db *DagBuilderHelper) Done() bool { // ensure we have an accurate perspective on data // as `done` this may be called before `next`. db.prepareNext() // idempotent + if db.recvdErr != nil { + return false + } return db.nextData == nil } // Next returns the next chunk of data to be inserted into the dag // if it returns nil, that signifies that the stream is at an end, and // that the current building operation should finish -func (db *DagBuilderHelper) Next() []byte { +func (db *DagBuilderHelper) Next() ([]byte, error) { db.prepareNext() // idempotent d := db.nextData db.nextData = nil // signal we've consumed it - return d + if db.recvdErr != nil { + return nil, db.recvdErr + } else { + return d, nil + } } // GetDagServ returns the dagservice object this Helper is using @@ -100,7 +120,11 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { } func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) { - data := db.Next() + data, err := db.Next() + if err != nil { + return nil, err + } + if data == nil { // we're done! return nil, nil } @@ -121,6 +145,12 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) { } } +func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) { + if db.stat != nil { + node.SetPosInfo(offset, db.fullPath, db.stat) + } +} + func (db *DagBuilderHelper) Add(node *UnixfsNode) (node.Node, error) { dn, err := node.GetDagNode() if err != nil { diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index e374507b72e..1c3921b87b9 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -3,9 +3,11 @@ package helpers import ( "context" "fmt" + "os" chunk "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" + pi "github.com/ipfs/go-ipfs/thirdparty/posinfo" ft "github.com/ipfs/go-ipfs/unixfs" node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" @@ -43,6 +45,7 @@ type UnixfsNode struct { rawnode *dag.RawNode node *dag.ProtoNode ufmt *ft.FSNode + posInfo *pi.PosInfo } // NewUnixfsNode creates a new Unixfs node to represent a file @@ -144,9 +147,29 @@ func (n *UnixfsNode) FileSize() uint64 { return n.ufmt.FileSize() } +func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo) { + n.posInfo = &pi.PosInfo{offset, fullPath, stat} +} + // getDagNode fills out the proper formatting for the unixfs node // inside of a DAG node and returns the dag node func (n *UnixfsNode) GetDagNode() (node.Node, error) { + nd, err := n.getBaseDagNode() + if err != nil { + return nil, err + } + + if n.posInfo != nil { + return &pi.FilestoreNode{ + Node: nd, + PosInfo: n.posInfo, + }, nil + } + + return nd, nil +} + +func (n *UnixfsNode) getBaseDagNode() (node.Node, error) { if n.raw { return n.rawnode, nil } diff --git a/thirdparty/posinfo/posinfo.go b/thirdparty/posinfo/posinfo.go new file mode 100644 index 00000000000..4c8912e3c81 --- /dev/null +++ b/thirdparty/posinfo/posinfo.go @@ -0,0 +1,18 @@ +package posinfo + +import ( + "os" + + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" +) + +type PosInfo struct { + Offset uint64 + FullPath string + Stat os.FileInfo // can be nil +} + +type FilestoreNode struct { + node.Node + PosInfo *PosInfo +} From 65ffff241806279547372a9bd4bb40f4af6b8b96 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Mon, 24 Oct 2016 16:37:57 -0400 Subject: [PATCH 2/2] Add tests that PosInfo() is getting set. License: MIT Signed-off-by: Kevin Atkinson --- core/coreunix/add_test.go | 116 +++++++++++++++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index abe82f0d51e..fbfab82ea77 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -2,20 +2,26 @@ package coreunix import ( "bytes" + "context" "io" "io/ioutil" + "math/rand" + "os" "testing" "time" + "github.com/ipfs/go-ipfs/blocks" + "github.com/ipfs/go-ipfs/blocks/blockstore" + "github.com/ipfs/go-ipfs/blockservice" "github.com/ipfs/go-ipfs/commands/files" "github.com/ipfs/go-ipfs/core" dag "github.com/ipfs/go-ipfs/merkledag" "github.com/ipfs/go-ipfs/pin/gc" "github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo/config" + pi "github.com/ipfs/go-ipfs/thirdparty/posinfo" "github.com/ipfs/go-ipfs/thirdparty/testutil" - "context" cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid" ) @@ -162,3 +168,111 @@ func TestAddGCLive(t *testing.T) { t.Fatal(err) } } + +func testAddWPosInfo(t *testing.T, rawLeaves bool) { + r := &repo.Mock{ + C: config.Config{ + Identity: config.Identity{ + PeerID: "Qmfoo", // required by offline node + }, + }, + D: testutil.ThreadSafeCloserMapDatastore(), + } + node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r}) + if err != nil { + t.Fatal(err) + } + + bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: "/tmp/foo.txt", t: t} + bserv := blockservice.New(bs, node.Exchange) + dserv := dag.NewDAGService(bserv) + adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv) + if err != nil { + t.Fatal(err) + } + adder.Out = make(chan interface{}) + adder.Progress = true + adder.RawLeaves = rawLeaves + + data := make([]byte, 5*1024*1024) + rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error + fileData := ioutil.NopCloser(bytes.NewBuffer(data)) + fileInfo := dummyFileInfo{"foo.txt", int64(len(data)), time.Now()} + file := files.NewReaderFile("foo.txt", "/tmp/foo.txt", fileData, &fileInfo) + + go func() { + defer close(adder.Out) + err = adder.AddFile(file) + if err != nil { + t.Fatal(err) + } + }() + for _ = range adder.Out { + } + + if bs.countAtOffsetZero != 2 { + t.Fatal("expected 2 blocks with an offset at zero (one root and one leafh), got", bs.countAtOffsetZero) + } + if bs.countAtOffsetNonZero != 19 { + // note: the exact number will depend on the size and the sharding algo. used + t.Fatal("expected 19 blocks with an offset > 0, got", bs.countAtOffsetNonZero) + } +} + +func TestAddWPosInfo(t *testing.T) { + testAddWPosInfo(t, false) +} + +func TestAddWPosInfoAndRawLeafs(t *testing.T) { + testAddWPosInfo(t, true) +} + + +type testBlockstore struct { + blockstore.GCBlockstore + expectedPath string + t *testing.T + countAtOffsetZero int + countAtOffsetNonZero int +} + +func (bs *testBlockstore) Put(block blocks.Block) error { + bs.CheckForPosInfo(block) + return bs.GCBlockstore.Put(block) +} + +func (bs *testBlockstore) PutMany(blocks []blocks.Block) error { + for _, blk := range blocks { + bs.CheckForPosInfo(blk) + } + return bs.GCBlockstore.PutMany(blocks) +} + +func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) error { + fsn, ok := block.(*pi.FilestoreNode) + if ok { + posInfo := fsn.PosInfo + if posInfo.FullPath != bs.expectedPath { + bs.t.Fatal("PosInfo does not have the expected path") + } + if posInfo.Offset == 0 { + bs.countAtOffsetZero += 1 + } else { + bs.countAtOffsetNonZero += 1 + } + } + return nil +} + +type dummyFileInfo struct { + name string + size int64 + modTime time.Time +} + +func (fi *dummyFileInfo) Name() string { return fi.name } +func (fi *dummyFileInfo) Size() int64 { return fi.size } +func (fi *dummyFileInfo) Mode() os.FileMode { return 0 } +func (fi *dummyFileInfo) ModTime() time.Time { return fi.modTime } +func (fi *dummyFileInfo) IsDir() bool { return false } +func (fi *dummyFileInfo) Sys() interface{} { return nil }