From 5ba947bb2bc2db41808a5ffbbcb63822c573691d Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 26 Jul 2023 11:57:17 +0200 Subject: [PATCH] bitswap/client: add basic traceable blocks (#308) Co-authored-by: hannahhoward --- CHANGELOG.md | 1 + bitswap/client/bitswap_with_sessions_test.go | 30 +++++++++++++++---- bitswap/client/client.go | 15 +++++----- .../internal/notifications/notifications.go | 16 +++++++--- .../notifications/notifications_test.go | 22 +++++++++----- bitswap/client/traceability/block.go | 21 +++++++++++++ 6 files changed, 80 insertions(+), 25 deletions(-) create mode 100644 bitswap/client/traceability/block.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8388c06f9..da229502a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The following emojis are used to highlight certain changes: * The only change to the default behavior on CAR responses is that we follow IPIP-412 and make `order=dfs;dups=n` explicit in the returned `Content-Type` HTTP header. +* ✨ While the call signature remains the same, the blocks that Bitswap returns can now be cast to [traceability.Block](./bitswap/client/traceability/block.go), which will additionally tell you where the Block came from and how long it took to fetch. This helps consumers of Bitswap collect better metrics on Bitswap behavior. ### Changed diff --git a/bitswap/client/bitswap_with_sessions_test.go b/bitswap/client/bitswap_with_sessions_test.go index e28379113..2191a5a90 100644 --- a/bitswap/client/bitswap_with_sessions_test.go +++ b/bitswap/client/bitswap_with_sessions_test.go @@ -8,6 +8,7 @@ import ( "github.com/ipfs/boxo/bitswap" "github.com/ipfs/boxo/bitswap/client/internal/session" + "github.com/ipfs/boxo/bitswap/client/traceability" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" "github.com/ipfs/boxo/internal/test" @@ -17,6 +18,7 @@ import ( blocksutil "github.com/ipfs/go-ipfs-blocksutil" delay "github.com/ipfs/go-ipfs-delay" tu "github.com/libp2p/go-libp2p-testing/etc" + "github.com/libp2p/go-libp2p/core/peer" ) func getVirtualNetwork() tn.Network { @@ -71,9 +73,18 @@ func TestBasicSessions(t *testing.T) { if !blkout.Cid().Equals(block.Cid()) { t.Fatal("got wrong block") } + + traceBlock, ok := blkout.(traceability.Block) + if !ok { + t.Fatal("did not get tracable block") + } + + if traceBlock.From != b.Peer { + t.Fatal("should have received block from peer B, did not") + } } -func assertBlockLists(got, exp []blocks.Block) error { +func assertBlockListsFrom(from peer.ID, got, exp []blocks.Block) error { if len(got) != len(exp) { return fmt.Errorf("got wrong number of blocks, %d != %d", len(got), len(exp)) } @@ -81,6 +92,13 @@ func assertBlockLists(got, exp []blocks.Block) error { h := cid.NewSet() for _, b := range got { h.Add(b.Cid()) + traceableBlock, ok := b.(traceability.Block) + if !ok { + return fmt.Errorf("not a traceable block: %s", b.Cid()) + } + if traceableBlock.From != from { + return fmt.Errorf("incorrect peer sent block, expect %s, got %s", from, traceableBlock.From) + } } for _, b := range exp { if !h.Has(b.Cid()) { @@ -133,7 +151,7 @@ func TestSessionBetweenPeers(t *testing.T) { for b := range ch { got = append(got, b) } - if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil { + if err := assertBlockListsFrom(inst[0].Peer, got, blks[i*10:(i+1)*10]); err != nil { t.Fatal(err) } } @@ -192,7 +210,7 @@ func TestSessionSplitFetch(t *testing.T) { for b := range ch { got = append(got, b) } - if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil { + if err := assertBlockListsFrom(inst[i].Peer, got, blks[i*10:(i+1)*10]); err != nil { t.Fatal(err) } } @@ -238,7 +256,7 @@ func TestFetchNotConnected(t *testing.T) { for b := range ch { got = append(got, b) } - if err := assertBlockLists(got, blks); err != nil { + if err := assertBlockListsFrom(other.Peer, got, blks); err != nil { t.Fatal(err) } } @@ -289,7 +307,7 @@ func TestFetchAfterDisconnect(t *testing.T) { got = append(got, b) } - if err := assertBlockLists(got, blks[:5]); err != nil { + if err := assertBlockListsFrom(peerA.Peer, got, blks[:5]); err != nil { t.Fatal(err) } @@ -318,7 +336,7 @@ func TestFetchAfterDisconnect(t *testing.T) { } } - if err := assertBlockLists(got, blks); err != nil { + if err := assertBlockListsFrom(peerA.Peer, got, blks); err != nil { t.Fatal(err) } } diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 4e5c57b82..d29eb6faf 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -5,14 +5,9 @@ package client import ( "context" "errors" - "sync" "time" - delay "github.com/ipfs/go-ipfs-delay" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager" bsgetter "github.com/ipfs/boxo/bitswap/client/internal/getter" bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue" @@ -33,11 +28,14 @@ import ( exchange "github.com/ipfs/boxo/exchange" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + delay "github.com/ipfs/go-ipfs-delay" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" process "github.com/jbenet/goprocess" procctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var log = logging.Logger("bitswap-client") @@ -239,6 +237,7 @@ type counters struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. +// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block]. func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) { ctx, span := internal.StartSpan(ctx, "GetBlock", trace.WithAttributes(attribute.String("Key", k.String()))) defer span.End() @@ -248,6 +247,7 @@ func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) // GetBlocks returns a channel where the caller may receive blocks that // correspond to the provided |keys|. Returns an error if BitSwap is unable to // begin this request within the deadline enforced by the context. +// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block]. // // NB: Your request remains open until the context expires. To conserve // resources, provide a context with a reasonably short deadline (ie. not one @@ -284,7 +284,8 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err // Publish the block to any Bitswap clients that had requested blocks. // (the sessions use this pubsub mechanism to inform clients of incoming // blocks) - bs.notif.Publish(blks...) + var zero peer.ID + bs.notif.Publish(zero, blks...) return nil } @@ -325,7 +326,7 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl // (the sessions use this pubsub mechanism to inform clients of incoming // blocks) for _, b := range wanted { - bs.notif.Publish(b) + bs.notif.Publish(from, b) } for _, b := range wanted { diff --git a/bitswap/client/internal/notifications/notifications.go b/bitswap/client/internal/notifications/notifications.go index ed4b79f57..dc6dda899 100644 --- a/bitswap/client/internal/notifications/notifications.go +++ b/bitswap/client/internal/notifications/notifications.go @@ -3,10 +3,13 @@ package notifications import ( "context" "sync" + "time" pubsub "github.com/cskr/pubsub" + "github.com/ipfs/boxo/bitswap/client/traceability" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" ) const bufferSize = 16 @@ -15,7 +18,7 @@ const bufferSize = 16 // for cids. It's used internally by bitswap to decouple receiving blocks // and actually providing them back to the GetBlocks caller. type PubSub interface { - Publish(blocks ...blocks.Block) + Publish(from peer.ID, blocks ...blocks.Block) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block Shutdown() } @@ -35,7 +38,7 @@ type impl struct { closed chan struct{} } -func (ps *impl) Publish(blocks ...blocks.Block) { +func (ps *impl) Publish(from peer.ID, blocks ...blocks.Block) { ps.lk.RLock() defer ps.lk.RUnlock() select { @@ -45,7 +48,7 @@ func (ps *impl) Publish(blocks ...blocks.Block) { } for _, block := range blocks { - ps.wrapped.Pub(block, block.Cid().KeyString()) + ps.wrapped.Pub(traceability.Block{Block: block, From: from}, block.Cid().KeyString()) } } @@ -84,6 +87,8 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl default: } + subscribe := time.Now() + // AddSubOnceEach listens for each key in the list, and closes the channel // once all keys have been received ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...) @@ -113,10 +118,13 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl if !ok { return } - block, ok := val.(blocks.Block) + block, ok := val.(traceability.Block) if !ok { + // FIXME: silently dropping errors wtf ? return } + block.Delay = time.Since(subscribe) + select { case <-ctx.Done(): return diff --git a/bitswap/client/internal/notifications/notifications_test.go b/bitswap/client/internal/notifications/notifications_test.go index 09c8eb806..25b580f6a 100644 --- a/bitswap/client/internal/notifications/notifications_test.go +++ b/bitswap/client/internal/notifications/notifications_test.go @@ -10,10 +10,12 @@ import ( blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" + "github.com/libp2p/go-libp2p/core/peer" ) func TestDuplicates(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id b1 := blocks.NewBlock([]byte("1")) b2 := blocks.NewBlock([]byte("2")) @@ -22,16 +24,16 @@ func TestDuplicates(t *testing.T) { defer n.Shutdown() ch := n.Subscribe(context.Background(), b1.Cid(), b2.Cid()) - n.Publish(b1) + n.Publish(zero, b1) blockRecvd, ok := <-ch if !ok { t.Fail() } assertBlocksEqual(t, b1, blockRecvd) - n.Publish(b1) // ignored duplicate + n.Publish(zero, b1) // ignored duplicate - n.Publish(b2) + n.Publish(zero, b2) blockRecvd, ok = <-ch if !ok { t.Fail() @@ -41,6 +43,7 @@ func TestDuplicates(t *testing.T) { func TestPublishSubscribe(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id blockSent := blocks.NewBlock([]byte("Greetings from The Interval")) @@ -48,7 +51,7 @@ func TestPublishSubscribe(t *testing.T) { defer n.Shutdown() ch := n.Subscribe(context.Background(), blockSent.Cid()) - n.Publish(blockSent) + n.Publish(zero, blockSent) blockRecvd, ok := <-ch if !ok { t.Fail() @@ -60,6 +63,7 @@ func TestPublishSubscribe(t *testing.T) { func TestSubscribeMany(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id e1 := blocks.NewBlock([]byte("1")) e2 := blocks.NewBlock([]byte("2")) @@ -68,14 +72,14 @@ func TestSubscribeMany(t *testing.T) { defer n.Shutdown() ch := n.Subscribe(context.Background(), e1.Cid(), e2.Cid()) - n.Publish(e1) + n.Publish(zero, e1) r1, ok := <-ch if !ok { t.Fatal("didn't receive first expected block") } assertBlocksEqual(t, e1, r1) - n.Publish(e2) + n.Publish(zero, e2) r2, ok := <-ch if !ok { t.Fatal("didn't receive second expected block") @@ -87,6 +91,7 @@ func TestSubscribeMany(t *testing.T) { // would be requested twice at the same time. func TestDuplicateSubscribe(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id e1 := blocks.NewBlock([]byte("1")) @@ -95,7 +100,7 @@ func TestDuplicateSubscribe(t *testing.T) { ch1 := n.Subscribe(context.Background(), e1.Cid()) ch2 := n.Subscribe(context.Background(), e1.Cid()) - n.Publish(e1) + n.Publish(zero, e1) r1, ok := <-ch1 if !ok { t.Fatal("didn't receive first expected block") @@ -158,6 +163,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) { func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id g := blocksutil.NewBlockGenerator() ctx, cancel := context.WithCancel(context.Background()) @@ -179,7 +185,7 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { t.Log("cancel context before any blocks published") cancel() for _, b := range bs { - n.Publish(b) + n.Publish(zero, b) } t.Log("publishing the large number of blocks to the ignored channel must not deadlock") diff --git a/bitswap/client/traceability/block.go b/bitswap/client/traceability/block.go new file mode 100644 index 000000000..28b52118d --- /dev/null +++ b/bitswap/client/traceability/block.go @@ -0,0 +1,21 @@ +package traceability + +import ( + "time" + + blocks "github.com/ipfs/go-block-format" + "github.com/libp2p/go-libp2p/core/peer" +) + +// Block is a block whose provenance has been tracked. +type Block struct { + blocks.Block + + // From contains the peer id of the node who sent us the block. + // It will be the zero value if we did not downloaded this block from the + // network. (such as by getting the block from NotifyNewBlocks). + From peer.ID + // Delay contains how long did we had to wait between when we started being + // intrested and when we actually got the block. + Delay time.Duration +}