Skip to content

Commit

Permalink
unixfs: allow use of raw merkledag nodes for unixfs files
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
  • Loading branch information
whyrusleeping committed Oct 17, 2016
1 parent 9cb6ac1 commit 92c74f0
Show file tree
Hide file tree
Showing 23 changed files with 386 additions and 147 deletions.
7 changes: 5 additions & 2 deletions blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ func NewBlock(data []byte) *BasicBlock {
// we are able to be confident that the data is correct
func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) {
if u.Debug {
// TODO: fix assumptions
chkc := cid.NewCidV0(u.Hash(data))
chkc, err := c.Prefix().Sum(data)
if err != nil {
return nil, err
}

if !chkc.Equals(c) {
return nil, ErrWrongHash
}
Expand Down
22 changes: 13 additions & 9 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")

const (
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
)

var AddCmd = &cmds.Command{
Expand Down Expand Up @@ -78,6 +79,7 @@ You can now refer to the added file in a gateway, like so:
cmds.BoolOption(hiddenOptionName, "H", "Include files that are hidden. Only takes effect on recursive add.").Default(false),
cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."),
cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true),
cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"),
},
PreRun: func(req cmds.Request) error {
if quiet, _, _ := req.Option(quietOptionName).Bool(); quiet {
Expand Down Expand Up @@ -135,6 +137,7 @@ You can now refer to the added file in a gateway, like so:
silent, _, _ := req.Option(silentOptionName).Bool()
chunker, _, _ := req.Option(chunkerOptionName).String()
dopin, _, _ := req.Option(pinOptionName).Bool()
rawblks, _, _ := req.Option(rawLeavesOptionName).Bool()

if hash {
nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{
Expand Down Expand Up @@ -174,6 +177,7 @@ You can now refer to the added file in a gateway, like so:
fileAdder.Wrap = wrap
fileAdder.Pin = dopin
fileAdder.Silent = silent
fileAdder.RawLeaves = rawblks

if hash {
md := dagtest.Mock()
Expand Down
15 changes: 11 additions & 4 deletions core/corehttp/gateway_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package corehttp

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -18,10 +19,10 @@ import (
path "github.com/ipfs/go-ipfs/path"
uio "github.com/ipfs/go-ipfs/unixfs/io"

"context"
routing "gx/ipfs/QmNUgVQTYnXQVrGT2rajZYsuKV8GYdiL91cdZSQDKNPNgE/go-libp2p-routing"
humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)

const (
Expand All @@ -45,7 +46,7 @@ func newGatewayHandler(node *core.IpfsNode, conf GatewayConfig) *gatewayHandler
}

// TODO(cryptix): find these helpers somewhere else
func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.ProtoNode, error) {
func (i *gatewayHandler) newDagFromReader(r io.Reader) (node.Node, error) {
// TODO(cryptix): change and remove this helper once PR1136 is merged
// return ufs.AddFromReader(i.node, r.Body)
return importer.BuildDagFromReader(
Expand Down Expand Up @@ -353,7 +354,7 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) {
return
}

var newnode *dag.ProtoNode
var newnode node.Node
if rsegs[len(rsegs)-1] == "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn" {
newnode = uio.NewEmptyDirectory()
} else {
Expand Down Expand Up @@ -417,8 +418,14 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) {
return
}

pbnewnode, ok := newnode.(*dag.ProtoNode)
if !ok {
webError(w, "Cannot read non protobuf nodes through gateway", dag.ErrNotProtobuf, http.StatusBadRequest)
return
}

// object set-data case
pbnd.SetData(newnode.Data())
pbnd.SetData(pbnewnode.Data())

newcid, err = i.node.DAG.Add(pbnd)
if err != nil {
Expand Down
34 changes: 19 additions & 15 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coreunix

import (
"context"
"fmt"
"io"
"io/ioutil"
Expand All @@ -13,16 +14,18 @@ import (
"github.com/ipfs/go-ipfs/commands/files"
core "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/exchange/offline"
importer "github.com/ipfs/go-ipfs/importer"
balanced "github.com/ipfs/go-ipfs/importer/balanced"
"github.com/ipfs/go-ipfs/importer/chunk"
ihelper "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
dag "github.com/ipfs/go-ipfs/merkledag"
mfs "github.com/ipfs/go-ipfs/mfs"
"github.com/ipfs/go-ipfs/pin"
unixfs "github.com/ipfs/go-ipfs/unixfs"

context "context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)
Expand Down Expand Up @@ -97,6 +100,7 @@ type Adder struct {
Hidden bool
Pin bool
Trickle bool
RawLeaves bool
Silent bool
Wrap bool
Chunker string
Expand All @@ -111,22 +115,22 @@ func (adder *Adder) SetMfsRoot(r *mfs.Root) {
}

// Perform the actual add & pin locally, outputting results to reader
func (adder Adder) add(reader io.Reader) (*dag.ProtoNode, error) {
func (adder Adder) add(reader io.Reader) (node.Node, error) {
chnk, err := chunk.FromString(reader, adder.Chunker)
if err != nil {
return nil, err
}
params := ihelper.DagBuilderParams{
Dagserv: adder.dagService,
RawLeaves: adder.RawLeaves,
Maxlinks: ihelper.DefaultLinksPerBlock,
}

if adder.Trickle {
return importer.BuildTrickleDagFromReader(
adder.dagService,
chnk,
)
}
return importer.BuildDagFromReader(
adder.dagService,
chnk,
)
return trickle.TrickleLayout(params.New(chnk))
}

return balanced.BalancedLayout(params.New(chnk))
}

func (adder *Adder) RootNode() (*dag.ProtoNode, error) {
Expand Down Expand Up @@ -331,7 +335,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Pr
return gopath.Join(c.String(), filename), dagnode, nil
}

func (adder *Adder) addNode(node *dag.ProtoNode, path string) error {
func (adder *Adder) addNode(node node.Node, path string) error {
// patch it into the root
if path == "" {
path = node.Cid().String()
Expand Down Expand Up @@ -456,7 +460,7 @@ func (adder *Adder) maybePauseForGC() error {
}

// outputDagnode sends dagnode info over the output channel
func outputDagnode(out chan interface{}, name string, dn *dag.ProtoNode) error {
func outputDagnode(out chan interface{}, name string, dn node.Node) error {
if out == nil {
return nil
}
Expand All @@ -482,7 +486,7 @@ func NewMemoryDagService() dag.DAGService {
}

// from core/commands/object.go
func getOutput(dagnode *dag.ProtoNode) (*Object, error) {
func getOutput(dagnode node.Node) (*Object, error) {
c := dagnode.Cid()

output := &Object{
Expand Down
7 changes: 6 additions & 1 deletion importer/balanced/balanced_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ func buildTestDag(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error)
Maxlinks: h.DefaultLinksPerBlock,
}

return BalancedLayout(dbp.New(spl))
nd, err := BalancedLayout(dbp.New(spl))
if err != nil {
return nil, err
}

return nd.(*dag.ProtoNode), nil
}

func getTestDag(t *testing.T, ds dag.DAGService, size int64, blksize int64) (*dag.ProtoNode, []byte) {
Expand Down
16 changes: 12 additions & 4 deletions importer/balanced/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"errors"

h "github.com/ipfs/go-ipfs/importer/helpers"
dag "github.com/ipfs/go-ipfs/merkledag"

node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)

func BalancedLayout(db *h.DagBuilderHelper) (*dag.ProtoNode, error) {
func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {

Expand Down Expand Up @@ -56,14 +57,21 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {

// Base case
if depth <= 0 { // catch accidental -1's in case error above is removed.
return db.FillNodeWithData(node)
child, err := db.GetNextDataNode()
if err != nil {
return err
}

node.Set(child)
return nil
}

// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := h.NewUnixfsNode()

if err := fillNodeRec(db, child, depth-1); err != nil {
err := fillNodeRec(db, child, depth-1)
if err != nil {
return err
}

Expand Down
53 changes: 34 additions & 19 deletions importer/helpers/dagbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,30 @@ package helpers
import (
"github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"

node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)

// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv dag.DAGService
spl chunk.Splitter
recvdErr error
nextData []byte // the next item to return.
maxlinks int
batch *dag.Batch
dserv dag.DAGService
spl chunk.Splitter
recvdErr error
rawLeaves bool
nextData []byte // the next item to return.
maxlinks int
batch *dag.Batch
}

type DagBuilderParams struct {
// Maximum number of links per intermediate node
Maxlinks int

// RawLeaves signifies that the importer should use raw ipld nodes as leaves
// instead of using the unixfs TRaw type
RawLeaves bool

// DAGService to write blocks to (required)
Dagserv dag.DAGService
}
Expand All @@ -28,10 +35,11 @@ type DagBuilderParams struct {
// from chunks object
func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
return &DagBuilderHelper{
dserv: dbp.Dagserv,
spl: spl,
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
dserv: dbp.Dagserv,
spl: spl,
rawLeaves: dbp.RawLeaves,
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
}
}

Expand Down Expand Up @@ -78,9 +86,8 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {

// while we have room AND we're not done
for node.NumChildren() < db.maxlinks && !db.Done() {
child := NewUnixfsBlock()

if err := db.FillNodeWithData(child); err != nil {
child, err := db.GetNextDataNode()
if err != nil {
return err
}

Expand All @@ -92,21 +99,29 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
return nil
}

func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error {
func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
data := db.Next()
if data == nil { // we're done!
return nil
return nil, nil
}

if len(data) > BlockSizeLimit {
return ErrSizeLimitExceeded
return nil, ErrSizeLimitExceeded
}

node.SetData(data)
return nil
if db.rawLeaves {
return &UnixfsNode{
rawnode: dag.NewRawNode(data),
raw: true,
}, nil
} else {
blk := NewUnixfsBlock()
blk.SetData(data)
return blk, nil
}
}

func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.ProtoNode, error) {
func (db *DagBuilderHelper) Add(node *UnixfsNode) (node.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 92c74f0

Please sign in to comment.