Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a FilestoreNode object to carry PosInfo #3314

Merged
merged 2 commits into from
Oct 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions commands/files/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ type SizeFile interface {

Size() (int64, error)
}

type FileInfo interface {
FullPath() string
Stat() os.FileInfo
}
12 changes: 11 additions & 1 deletion core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,12 @@ func (adder *Adder) addFile(file files.File) error {
// progress updates to the client (over the output channel)
var reader io.Reader = file
if adder.Progress {
reader = &progressReader{file: file, out: adder.Out}
rdr := &progressReader{file: file, out: adder.Out}
if fi, ok := file.(files.FileInfo); ok {
reader = &progressReader2{rdr, fi}
} else {
reader = rdr
}
}

dagnode, err := adder.add(reader)
Expand Down Expand Up @@ -520,3 +525,8 @@ func (i *progressReader) Read(p []byte) (int, error) {

return n, err
}

type progressReader2 struct {
*progressReader
files.FileInfo
}
116 changes: 115 additions & 1 deletion core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ package coreunix

import (
"bytes"
"context"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
"time"

"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/commands/files"
"github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin/gc"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/config"
pi "github.com/ipfs/go-ipfs/thirdparty/posinfo"
"github.com/ipfs/go-ipfs/thirdparty/testutil"

"context"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
)

Expand Down Expand Up @@ -162,3 +168,111 @@ func TestAddGCLive(t *testing.T) {
t.Fatal(err)
}
}

func testAddWPosInfo(t *testing.T, rawLeaves bool) {
r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
PeerID: "Qmfoo", // required by offline node
},
},
D: testutil.ThreadSafeCloserMapDatastore(),
}
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
if err != nil {
t.Fatal(err)
}

bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: "/tmp/foo.txt", t: t}
bserv := blockservice.New(bs, node.Exchange)
dserv := dag.NewDAGService(bserv)
adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
if err != nil {
t.Fatal(err)
}
adder.Out = make(chan interface{})
adder.Progress = true
adder.RawLeaves = rawLeaves

data := make([]byte, 5*1024*1024)
rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
fileData := ioutil.NopCloser(bytes.NewBuffer(data))
fileInfo := dummyFileInfo{"foo.txt", int64(len(data)), time.Now()}
file := files.NewReaderFile("foo.txt", "/tmp/foo.txt", fileData, &fileInfo)

go func() {
defer close(adder.Out)
err = adder.AddFile(file)
if err != nil {
t.Fatal(err)
}
}()
for _ = range adder.Out {
}

if bs.countAtOffsetZero != 2 {
t.Fatal("expected 2 blocks with an offset at zero (one root and one leafh), got", bs.countAtOffsetZero)
}
if bs.countAtOffsetNonZero != 19 {
// note: the exact number will depend on the size and the sharding algo. used
t.Fatal("expected 19 blocks with an offset > 0, got", bs.countAtOffsetNonZero)
}
}

func TestAddWPosInfo(t *testing.T) {
testAddWPosInfo(t, false)
}

func TestAddWPosInfoAndRawLeafs(t *testing.T) {
testAddWPosInfo(t, true)
}


type testBlockstore struct {
blockstore.GCBlockstore
expectedPath string
t *testing.T
countAtOffsetZero int
countAtOffsetNonZero int
}

func (bs *testBlockstore) Put(block blocks.Block) error {
bs.CheckForPosInfo(block)
return bs.GCBlockstore.Put(block)
}

func (bs *testBlockstore) PutMany(blocks []blocks.Block) error {
for _, blk := range blocks {
bs.CheckForPosInfo(blk)
}
return bs.GCBlockstore.PutMany(blocks)
}

func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) error {
fsn, ok := block.(*pi.FilestoreNode)
if ok {
posInfo := fsn.PosInfo
if posInfo.FullPath != bs.expectedPath {
bs.t.Fatal("PosInfo does not have the expected path")
}
if posInfo.Offset == 0 {
bs.countAtOffsetZero += 1
} else {
bs.countAtOffsetNonZero += 1
}
}
return nil
}

type dummyFileInfo struct {
name string
size int64
modTime time.Time
}

func (fi *dummyFileInfo) Name() string { return fi.name }
func (fi *dummyFileInfo) Size() int64 { return fi.size }
func (fi *dummyFileInfo) Mode() os.FileMode { return 0 }
func (fi *dummyFileInfo) ModTime() time.Time { return fi.modTime }
func (fi *dummyFileInfo) IsDir() bool { return false }
func (fi *dummyFileInfo) Sys() interface{} { return nil }
12 changes: 9 additions & 3 deletions importer/balanced/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
)

func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
var offset uint64 = 0
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {

nroot := h.NewUnixfsNode()
db.SetPosInfo(nroot, 0)

// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
Expand All @@ -22,11 +24,13 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
}

// fill it up.
if err := fillNodeRec(db, nroot, level); err != nil {
if err := fillNodeRec(db, nroot, level, offset); err != nil {
return nil, err
}

offset = nroot.FileSize()
root = nroot

}
if root == nil {
root = h.NewUnixfsNode()
Expand All @@ -50,7 +54,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int, offset uint64) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}
Expand All @@ -69,15 +73,17 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := h.NewUnixfsNode()
db.SetPosInfo(child, offset)

err := fillNodeRec(db, child, depth-1)
err := fillNodeRec(db, child, depth-1, offset)
if err != nil {
return err
}

if err := node.AddChild(child, db); err != nil {
return err
}
offset += child.FileSize()
}

return nil
Expand Down
10 changes: 8 additions & 2 deletions importer/chunk/rabin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
var IpfsRabinPoly = chunker.Pol(17437180132763653)

type Rabin struct {
r *chunker.Chunker
r *chunker.Chunker
reader io.Reader
}

func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
Expand All @@ -25,7 +26,8 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max)

return &Rabin{
r: ch,
r: ch,
reader: r,
}
}

Expand All @@ -37,3 +39,7 @@ func (r *Rabin) NextBytes() ([]byte, error) {

return ch.Data, nil
}

func (r *Rabin) Reader() io.Reader {
return r.reader
}
5 changes: 5 additions & 0 deletions importer/chunk/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var log = logging.Logger("chunk")
var DefaultBlockSize int64 = 1024 * 256

type Splitter interface {
Reader() io.Reader
NextBytes() ([]byte, error)
}

Expand Down Expand Up @@ -77,3 +78,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {

return buf[:n], nil
}

func (ss *sizeSplitterv2) Reader() io.Reader {
return ss.r
}
44 changes: 37 additions & 7 deletions importer/helpers/dagbuilder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package helpers

import (
"io"
"os"

"github.com/ipfs/go-ipfs/commands/files"
"github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"

Expand All @@ -17,6 +21,8 @@ type DagBuilderHelper struct {
nextData []byte // the next item to return.
maxlinks int
batch *dag.Batch
fullPath string
stat os.FileInfo
}

type DagBuilderParams struct {
Expand All @@ -34,44 +40,58 @@ type DagBuilderParams struct {
// Generate a new DagBuilderHelper from the given params, which data source comes
// from chunks object
func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
return &DagBuilderHelper{
db := &DagBuilderHelper{
dserv: dbp.Dagserv,
spl: spl,
rawLeaves: dbp.RawLeaves,
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
}
if fi, ok := spl.Reader().(files.FileInfo); ok {
db.fullPath = fi.FullPath()
db.stat = fi.Stat()
}
return db
}

// prepareNext consumes the next item from the splitter and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
func (db *DagBuilderHelper) prepareNext() {
// if we already have data waiting to be consumed, we're ready
if db.nextData != nil {
if db.nextData != nil || db.recvdErr != nil {
return
}

// TODO: handle err (which wasn't handled either when the splitter was channeled)
db.nextData, _ = db.spl.NextBytes()
db.nextData, db.recvdErr = db.spl.NextBytes()
if db.recvdErr == io.EOF {
db.recvdErr = nil
}
}

// Done returns whether or not we're done consuming the incoming data.
func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
if db.recvdErr != nil {
return false
}
return db.nextData == nil
}

// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func (db *DagBuilderHelper) Next() []byte {
func (db *DagBuilderHelper) Next() ([]byte, error) {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
return d
if db.recvdErr != nil {
return nil, db.recvdErr
} else {
return d, nil
}
}

// GetDagServ returns the dagservice object this Helper is using
Expand Down Expand Up @@ -100,7 +120,11 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
}

func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
data := db.Next()
data, err := db.Next()
if err != nil {
return nil, err
}

if data == nil { // we're done!
return nil, nil
}
Expand All @@ -121,6 +145,12 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
}
}

func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) {
if db.stat != nil {
node.SetPosInfo(offset, db.fullPath, db.stat)
}
}

func (db *DagBuilderHelper) Add(node *UnixfsNode) (node.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
Expand Down
Loading