Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support incoming and outgoing duplicates #249

Merged
merged 24 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5996325
WIP
hannahhoward May 23, 2023
8e75735
feat(http): support IPFS 412 parameters
hannahhoward May 24, 2023
baff9da
refactor(duplicateadder): use more flexible block stream to handle tr…
hannahhoward May 24, 2023
91c6482
fix(storage): close underlying writer
hannahhoward May 24, 2023
80c25a8
style(lint): fix lint issues
hannahhoward May 24, 2023
fbfd04d
fix(storage): abort duplicate adder on context error
hannahhoward May 24, 2023
7b1bb6e
style(lint): fix lint issues
hannahhoward May 24, 2023
7ae1acd
fix(verifier): use max blocks
hannahhoward May 25, 2023
05af04b
fix(http): lower log output to info
hannahhoward May 25, 2023
2aace5b
refactor(http): move car closing to after http
hannahhoward May 25, 2023
6e8c1e4
refactor(itest): add log
hannahhoward May 25, 2023
6f4e2d8
fix(testpeer): remove superfluous write
hannahhoward May 25, 2023
8984b60
style(http): add comment
hannahhoward May 25, 2023
9d20893
fix(itest): fix merged diff
hannahhoward May 25, 2023
041c644
Update pkg/server/http/ipfs.go
hannahhoward May 25, 2023
69d1b08
Update pkg/server/http/ipfs.go
hannahhoward May 25, 2023
cf2585a
Update pkg/storage/duplicateaddercar.go
hannahhoward May 25, 2023
af14b93
Update pkg/storage/duplicateaddercar_test.go
hannahhoward May 25, 2023
d15734d
Update pkg/storage/duplicateaddercar_test.go
hannahhoward May 25, 2023
ab14be6
Update pkg/storage/duplicateaddercar_test.go
hannahhoward May 25, 2023
2a33b36
refactor(verifiedcar): rename allow dupes to expect dupes
hannahhoward May 25, 2023
325e9c2
style(lint): fix lint errors
hannahhoward May 25, 2023
a4974af
fix(verifiedcar): move interface to top
hannahhoward May 25, 2023
c7fd62b
fix(storage): remove uncertainty on car close
hannahhoward May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/lassie/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ func Fetch(cctx *cli.Context) error {
} else {
carWriter = storage.NewDeferredCarWriterForPath(rootCid, outfile)
}
carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempDir)
tempStore := storage.NewDeferredStorageCar(tempDir)
carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore)
defer carStore.Close()

var blockCount int
Expand Down
75 changes: 73 additions & 2 deletions pkg/internal/itest/http_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/lassie/pkg/internal/itest/mocknet"
"github.com/filecoin-project/lassie/pkg/internal/itest/testpeer"
"github.com/filecoin-project/lassie/pkg/internal/testutil"
"github.com/filecoin-project/lassie/pkg/lassie"
httpserver "github.com/filecoin-project/lassie/pkg/server/http"
"github.com/filecoin-project/lassie/pkg/verifiedcar"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
unixfs "github.com/ipfs/go-unixfsnode/testutil"
Expand All @@ -30,6 +32,8 @@ import (
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/memstore"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
Expand All @@ -49,7 +53,10 @@ func TestHttpFetch(t *testing.T) {
blockQuery := func(q url.Values, _ []testpeer.TestPeer) {
q.Set("dag-scope", "block")
}

noDups := func(header http.Header) {
header.Set("Accept", "application/vnd.ipld.car;order=dfs;version=1;dups=n;")
}
type headerSetter func(http.Header)
type queryModifier func(url.Values, []testpeer.TestPeer)
type bodyValidator func(*testing.T, unixfs.DirEntry, []byte)

Expand All @@ -66,6 +73,7 @@ func TestHttpFetch(t *testing.T) {
modifyHttpConfig func(httpserver.HttpServerConfig) httpserver.HttpServerConfig
generate func(*testing.T, io.Reader, []testpeer.TestPeer) []unixfs.DirEntry
paths []string
setHeader headerSetter
modifyQueries []queryModifier
validateBodies []bodyValidator
}{
Expand Down Expand Up @@ -196,6 +204,27 @@ func TestHttpFetch(t *testing.T) {
validateCarBody(t, body, srcData.Root, wantCids, true)
}},
},
{
name: "http max block limit",
httpRemotes: 1,
expectUncleanEnd: true,
modifyHttpConfig: func(cfg httpserver.HttpServerConfig) httpserver.HttpServerConfig {
cfg.MaxBlocksPerRequest = 3
return cfg
},
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)}
},
validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) {
// 3 blocks max, start at the root and then two blocks into the sharded data
wantCids := []cid.Cid{
srcData.Root,
srcData.SelfCids[0],
srcData.SelfCids[1],
}
validateCarBody(t, body, srcData.Root, wantCids, true)
}},
},
{
// dag-scope entity fetch should get the same DAG as full for a plain file
name: "graphsync large sharded file, dag-scope entity",
Expand Down Expand Up @@ -646,6 +675,43 @@ func TestHttpFetch(t *testing.T) {
v.Set("providers", strings.Join(maStrings, ","))
}},
},
{
name: "http large sharded file with dups",
httpRemotes: 1,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, testutil.ZeroReader{}, 4<<20)}
},
validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) {
store := &testutil.CorrectedMemStore{Store: &memstore.Store{
Bag: make(map[string][]byte),
}}
lsys := cidlink.DefaultLinkSystem()
lsys.SetReadStorage(store)
lsys.SetWriteStorage(store)
_, _, err := verifiedcar.Config{
Root: srcData.Root,
Selector: selectorparse.CommonSelector_ExploreAllRecursively,
ExpectDuplicatesIn: true,
}.VerifyCar(context.Background(), bytes.NewReader(body), lsys)
require.NoError(t, err)
}},
},
{
name: "http large sharded file with dups, no dups response requested",
httpRemotes: 1,
setHeader: noDups,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, testutil.ZeroReader{}, 4<<20)}
},
validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) {
wantCids := []cid.Cid{
srcData.Root, // "/""
srcData.SelfCids[1],
srcData.SelfCids[len(srcData.SelfCids)-1],
}
validateCarBody(t, body, srcData.Root, wantCids, true)
}},
},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -712,7 +778,11 @@ func TestHttpFetch(t *testing.T) {
addr := fmt.Sprintf("http://%s/ipfs/%s%s", httpServer.Addr(), srcData[i].Root.String(), path)
getReq, err := http.NewRequest("GET", addr, nil)
req.NoError(err)
getReq.Header.Add("Accept", "application/vnd.ipld.car")
if testCase.setHeader == nil {
getReq.Header.Add("Accept", "application/vnd.ipld.car")
} else {
testCase.setHeader(getReq.Header)
}
if testCase.modifyQueries != nil && testCase.modifyQueries[i] != nil {
q := getReq.URL.Query()
testCase.modifyQueries[i](q, mrn.Remotes)
Expand Down Expand Up @@ -778,6 +848,7 @@ func TestHttpFetch(t *testing.T) {
req.NoError(err)

if DEBUG_DATA {
t.Logf("Creating CAR %s in temp dir", fmt.Sprintf("%s_received%d.car", testCase.name, i))
dstf, err := os.CreateTemp("", fmt.Sprintf("%s_received%d.car", testCase.name, i))
req.NoError(err)
t.Logf("Writing received data to CAR @ %s", dstf.Name())
Expand Down
22 changes: 20 additions & 2 deletions pkg/internal/itest/testpeer/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,24 @@ func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Res
unixfsPath = "/" + strings.Join(urlPath[2:], "/")
}

acceptTypes := strings.Split(req.Header.Get("Accept"), ",")
includeDupes := false
for _, acceptType := range acceptTypes {
typeParts := strings.Split(acceptType, ";")
if typeParts[0] == "application/vnd.ipld.car" {
for _, nextPart := range typeParts[1:] {
pair := strings.Split(nextPart, "=")
if len(pair) == 2 {
attr := strings.TrimSpace(pair[0])
value := strings.TrimSpace(pair[1])
if attr == "dups" && value == "y" {
includeDupes = true
}
}
}
}
}

// We're always providing the dag-scope parameter, so add a failure case if we stop
// providing it in the future
if !req.URL.Query().Has("dag-scope") {
Expand Down Expand Up @@ -384,7 +402,7 @@ func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Res
}

// Write to response writer
carWriter, err := storage.NewWritable(res, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(false))
carWriter, err := storage.NewWritable(res, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(includeDupes))
if err != nil {
http.Error(res, fmt.Sprintf("Failed to create car writer: %v", err), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -433,7 +451,7 @@ func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Res

err = progress.WalkAdv(rootNode, sel, visitNoop)
if err != nil {
http.Error(res, fmt.Sprintf("Failed to traverse from root node: %v", err), http.StatusInternalServerError)
// if we loaded the first block, we can't write headers any more
return
}
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/internal/testutil/correctedmemstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package testutil

import (
"context"
"io"

format "github.com/ipfs/go-ipld-format"
"github.com/ipld/go-ipld-prime/storage/memstore"
)

// TODO: remove when this is fixed in IPLD prime
type CorrectedMemStore struct {
*memstore.Store
}

func (cms *CorrectedMemStore) Get(ctx context.Context, key string) ([]byte, error) {
data, err := cms.Store.Get(ctx, key)
if err != nil && err.Error() == "404" {
err = format.ErrNotFound{}
}
return data, err
}

func (cms *CorrectedMemStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
rc, err := cms.Store.GetStream(ctx, key)
if err != nil && err.Error() == "404" {
err = format.ErrNotFound{}
}
return rc, err
}
9 changes: 9 additions & 0 deletions pkg/internal/testutil/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,12 @@ func GenerateRetrievalIDs(t *testing.T, n int) []types.RetrievalID {
}
return retrievalIDs
}

type ZeroReader struct{}

func (ZeroReader) Read(b []byte) (n int, err error) {
for i := range b {
b[i] = 0
}
return len(b), nil
}
61 changes: 61 additions & 0 deletions pkg/internal/testutil/toblocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package testutil

import (
"bytes"
"io"
"testing"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/blocks"
"github.com/ipfs/go-unixfsnode"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/stretchr/testify/require"
)

// ToBlocks makes a block array from ordered blocks in a traversal
func ToBlocks(t *testing.T, lsys linking.LinkSystem, root cid.Cid, selNode datamodel.Node) []blocks.Block {
sel, err := selector.CompileSelector(selNode)
require.NoError(t, err)
traversedBlocks := make([]blocks.Block, 0)
unixfsnode.AddUnixFSReificationToLinkSystem(&lsys)
osro := lsys.StorageReadOpener
lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
r, err := osro(lc, l)
if err != nil {
return nil, err
}
byts, err := io.ReadAll(r)
if err != nil {
return nil, err
}
blk, err := blocks.NewBlockWithCid(byts, l.(cidlink.Link).Cid)
if err != nil {
return nil, err
}
traversedBlocks = append(traversedBlocks, blk)
return bytes.NewReader(byts), nil
}
var proto datamodel.NodePrototype = basicnode.Prototype.Any
if root.Prefix().Codec == cid.DagProtobuf {
proto = dagpb.Type.PBNode
}
rootNode, err := lsys.Load(linking.LinkContext{}, cidlink.Link{Cid: root}, proto)
require.NoError(t, err)
prog := traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
LinkTargetNodePrototypeChooser: dagpb.AddSupportToChooser(basicnode.Chooser),
},
}
vf := func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil }
err = prog.WalkAdv(rootNode, sel, vf)
require.NoError(t, err)

return traversedBlocks
}
26 changes: 2 additions & 24 deletions pkg/retriever/bitswapretriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/ipfs/go-cid"
gstestutil "github.com/ipfs/go-graphsync/testutil"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-libipfs/blocks"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking"
Expand All @@ -32,7 +31,7 @@ import (
func TestBitswapRetriever(t *testing.T) {
ctx := context.Background()

store := &correctedMemStore{&memstore.Store{
store := &testutil.CorrectedMemStore{Store: &memstore.Store{
Bag: make(map[string][]byte),
}}
lsys := cidlink.DefaultLinkSystem()
Expand Down Expand Up @@ -514,7 +513,7 @@ func makeLsys(blocks []blocks.Block) *linking.LinkSystem {
bag[cidlink.Link{Cid: block.Cid()}.Binary()] = block.RawData()
}
lsys := cidlink.DefaultLinkSystem()
store := &correctedMemStore{&memstore.Store{Bag: bag}}
store := &testutil.CorrectedMemStore{Store: &memstore.Store{Bag: bag}}
lsys.SetReadStorage(store)
lsys.SetWriteStorage(store)
return &lsys
Expand All @@ -528,27 +527,6 @@ func sizeOf(blocks []blocks.Block) uint64 {
return total
}

// TODO: remove when this is fixed in IPLD prime
type correctedMemStore struct {
*memstore.Store
}

func (cms *correctedMemStore) Get(ctx context.Context, key string) ([]byte, error) {
data, err := cms.Store.Get(ctx, key)
if err != nil && err.Error() == "404" {
err = format.ErrNotFound{}
}
return data, err
}

func (cms *correctedMemStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
rc, err := cms.Store.GetStream(ctx, key)
if err != nil && err.Error() == "404" {
err = format.ErrNotFound{}
}
return rc, err
}

type mockInProgressCids struct {
incremented []cid.Cid
decremented []cid.Cid
Expand Down
15 changes: 5 additions & 10 deletions pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/filecoin-project/lassie/pkg/types"
"github.com/filecoin-project/lassie/pkg/verifiedcar"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/metadata"
"github.com/multiformats/go-multicodec"
)
Expand Down Expand Up @@ -114,18 +113,14 @@ func (ph *ProtocolHttp) Retrieve(
ttfb = retrieval.Clock.Since(phaseStartTime)
shared.sendEvent(events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, candidate))
})

sel, err := selector.CompileSelector(retrieval.request.GetSelector())
if err != nil {
return nil, err
}

cfg := verifiedcar.Config{
Root: retrieval.request.Cid,
Selector: sel,
Root: retrieval.request.Cid,
Selector: retrieval.request.GetSelector(),
ExpectDuplicatesIn: true,
MaxBlocks: retrieval.request.MaxBlocks,
}

blockCount, byteCount, err := cfg.Verify(ctx, rdr, retrieval.request.LinkSystem)
blockCount, byteCount, err := cfg.VerifyCar(ctx, rdr, retrieval.request.LinkSystem)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/retriever/httpretriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func TestHTTPRetriever(t *testing.T) {
ctx := context.Background()

store := &correctedMemStore{&memstore.Store{
store := &testutil.CorrectedMemStore{Store: &memstore.Store{
Bag: make(map[string][]byte),
}}
lsys := cidlink.DefaultLinkSystem()
Expand Down
Loading