Skip to content

Commit

Permalink
Keep track of offsets in the DAG builder and not the splitter.
Browse files Browse the repository at this point in the history
This completely elements the need for AdvReader and cleans up a lot of
other code.

License: MIT
Signed-off-by: Kevin Atkinson <k@kevina.org>
  • Loading branch information
kevina committed May 23, 2016
1 parent b54f2f4 commit f9a51b1
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 139 deletions.
37 changes: 0 additions & 37 deletions commands/files/adv_reader.go

This file was deleted.

11 changes: 11 additions & 0 deletions commands/files/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,14 @@ type SizeFile interface {

Size() (int64, error)
}

type FileInfo interface {
FullPath() string
Stat() os.FileInfo
}

type PosInfo struct {
Offset uint64
FullPath string
Stat os.FileInfo // can be nil
}
5 changes: 1 addition & 4 deletions commands/files/multipartfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type MultipartFile struct {
Part *multipart.Part
Reader *multipart.Reader
Mediatype string
offset int64
}

func NewFileFromPart(part *multipart.Part) (File, error) {
Expand Down Expand Up @@ -97,9 +96,7 @@ func (f *MultipartFile) Read(p []byte) (int, error) {
if f.IsDirectory() {
return 0, ErrNotReader
}
res, err := f.Part.Read(p)
f.offset += int64(res)
return res, err
return f.Part.Read(p)
}

func (f *MultipartFile) Close() error {
Expand Down
11 changes: 2 additions & 9 deletions commands/files/readerfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ type ReaderFile struct {
fullpath string
reader io.ReadCloser
stat os.FileInfo
offset uint64
}

func NewReaderFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile {
return &ReaderFile{filename, path, reader, stat, 0}
return &ReaderFile{filename, path, reader, stat}
}

func (f *ReaderFile) IsDirectory() bool {
Expand All @@ -36,14 +35,8 @@ func (f *ReaderFile) FullPath() string {
return f.fullpath
}

func (f *ReaderFile) PosInfo() *PosInfo {
return &PosInfo{f.offset,f.fullpath,f.stat}
}

func (f *ReaderFile) Read(p []byte) (int, error) {
res, err := f.reader.Read(p)
f.offset += uint64(res)
return res, err
return f.reader.Read(p)
}

func (f *ReaderFile) Close() error {
Expand Down
27 changes: 15 additions & 12 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type Adder struct {
}

// Perform the actual add & pin locally, outputting results to reader
func (adder Adder) add(reader files.AdvReader) (*dag.Node, error) {
func (adder Adder) add(reader io.Reader) (*dag.Node, error) {
chnk, err := chunk.FromString(reader, adder.Chunker)
if err != nil {
return nil, err
Expand Down Expand Up @@ -249,9 +249,7 @@ func Add(n *core.DataServices, r io.Reader) (string, error) {
return "", err
}

ar := files.AdvReaderAdapter(r)

node, err := fileAdder.add(ar)
node, err := fileAdder.add(r)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -400,9 +398,14 @@ func (adder *Adder) addFile(file files.File) error {
// case for regular file
// if the progress flag was specified, wrap the file so that we can send
// progress updates to the client (over the output channel)
reader := files.AdvReaderAdapter(file)
var reader io.Reader = file
if adder.Progress {
reader = &progressReader{reader: reader, filename: file.FileName(), out: adder.out}
rdr := &progressReader{file: file, out: adder.out}
if fi, ok := file.(files.FileInfo); ok {
reader = &progressReader2{rdr, fi}
} else {
reader = rdr
}
}

dagnode, err := adder.add(reader)
Expand Down Expand Up @@ -513,28 +516,28 @@ func getOutput(dagnode *dag.Node) (*Object, error) {
}

type progressReader struct {
reader files.AdvReader
filename string
file files.File
out chan interface{}
bytes int64
lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
n, err := i.reader.Read(p)
n, err := i.file.Read(p)

i.bytes += int64(n)
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
i.lastProgress = i.bytes
i.out <- &AddedObject{
Name: i.filename,
Name: i.file.FileName(),
Bytes: i.bytes,
}
}

return n, err
}

func (i *progressReader) PosInfo() *files.PosInfo {
return i.reader.PosInfo()
type progressReader2 struct {
*progressReader
files.FileInfo
}
2 changes: 1 addition & 1 deletion filestore/support/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type FilestoreBlock struct {
blocks.BasicBlock
AltData []byte
files.PosInfo
*files.PosInfo
Size uint64
}

Expand Down
15 changes: 10 additions & 5 deletions importer/balanced/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
)

func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) {
var offset uint64 = 0
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {

nroot := h.NewUnixfsNode()
db.SetPosInfo(nroot, 0)

// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
Expand All @@ -22,17 +24,18 @@ func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) {
}

// fill it up.
if err := fillNodeRec(db, nroot, level); err != nil {
if err := fillNodeRec(db, nroot, level, offset); err != nil {
return nil, err
}

offset = nroot.FileSize()
root = nroot

}
if root == nil {
root = h.NewUnixfsNode()
}

db.SetAsRoot(root)
out, err := db.Add(root)
if err != nil {
return nil, err
Expand All @@ -51,7 +54,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) {
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int, offset uint64) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}
Expand All @@ -64,14 +67,16 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := h.NewUnixfsNode()
db.SetPosInfo(child,offset)

if err := fillNodeRec(db, child, depth-1); err != nil {
if err := fillNodeRec(db, child, depth-1, offset); err != nil {
return err
}

if err := node.AddChild(child, db); err != nil {
return err
}
offset += child.FileSize()
}

return nil
Expand Down
14 changes: 10 additions & 4 deletions importer/chunk/rabin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package chunk
import (
"hash/fnv"
"io"

"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/chunker"
)

var IpfsRabinPoly = chunker.Pol(17437180132763653)

type Rabin struct {
r *chunker.Chunker
reader io.Reader
}

func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
Expand All @@ -26,14 +27,19 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {

return &Rabin{
r: ch,
reader: r,
}
}

func (r *Rabin) NextBytes() (Bytes, error) {
func (r *Rabin) NextBytes() ([]byte, error) {
ch, err := r.r.Next()
if err != nil {
return Bytes{}, err
return nil, err
}

return Bytes{nil, ch.Data}, nil
return ch.Data, nil
}

func (r *Rabin) Reader() io.Reader {
return r.reader
}
4 changes: 2 additions & 2 deletions importer/chunk/rabin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestRabinChunking(t *testing.T) {
t.Fatal(err)
}

chunks = append(chunks, chunk.Data)
chunks = append(chunks, chunk)
}

fmt.Printf("average block size: %d\n", len(data)/len(chunks))
Expand All @@ -53,7 +53,7 @@ func chunkData(t *testing.T, data []byte) map[key.Key]blocks.Block {
t.Fatal(err)
}

b := blocks.NewBlock(blk.Data)
b := blocks.NewBlock(blk)
blkmap[b.Key()] = b
}

Expand Down
28 changes: 13 additions & 15 deletions importer/chunk/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@ package chunk
import (
"io"

"github.com/ipfs/go-ipfs/commands/files"
logging "gx/ipfs/QmaDNZ4QMdBdku1YZWBysufYyoQt1negQGNav6PLYarbY8/go-log"
)

var log = logging.Logger("chunk")

var DefaultBlockSize int64 = 1024 * 256

type Bytes struct {
PosInfo *files.PosInfo
Data []byte
}

type Splitter interface {
NextBytes() (Bytes, error)
Reader() io.Reader
NextBytes() ([]byte, error)
}

type SplitterGen func(r io.Reader) Splitter
Expand Down Expand Up @@ -48,29 +43,28 @@ func Chan(s Splitter) (<-chan []byte, <-chan error) {
return
}

out <- b.Data
out <- b
}
}()
return out, errs
}

type sizeSplitterv2 struct {
r files.AdvReader
r io.Reader
size int64
err error
}

func NewSizeSplitter(r io.Reader, size int64) Splitter {
return &sizeSplitterv2{
r: files.AdvReaderAdapter(r),
r: r,
size: size,
}
}

func (ss *sizeSplitterv2) NextBytes() (Bytes, error) {
posInfo := ss.r.PosInfo()
func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
if ss.err != nil {
return Bytes{posInfo, nil}, ss.err
return nil, ss.err
}
buf := make([]byte, ss.size)
n, err := io.ReadFull(ss.r, buf)
Expand All @@ -79,8 +73,12 @@ func (ss *sizeSplitterv2) NextBytes() (Bytes, error) {
err = nil
}
if err != nil {
return Bytes{posInfo, nil}, err
return nil, err
}

return Bytes{posInfo, buf[:n]}, nil
return buf[:n], nil
}

func (ss *sizeSplitterv2) Reader() io.Reader {
return ss.r
}
Loading

0 comments on commit f9a51b1

Please sign in to comment.