diff --git a/CHANGELOG.md b/CHANGELOG.md index a7f85d984..141361958 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ The following emojis are used to highlight certain changes: * `NewRemoteCarBackend` allows you to create a gateway backend that uses one or multiple Trustless Gateways as backend. These gateways must support CAR requests (`application/vnd.ipld.car`), as well as the extensions describe in [IPIP-402](https://specs.ipfs.tech/ipips/ipip-0402/). With this, we also introduced `NewCarBackend`, `NewRemoteCarFetcher` and `NewRetryCarFetcher`. * `gateway` now sets the [`Content-Location`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Location) header for requests with non-default content format, as a result of content negotiation. This allows generic and misconfigured HTTP caches to store Deserialized, CAR and Block responses separately, under distinct cache keys. * `gateway` now supports `car-dups`, `car-order` and `car-version` as query parameters in addition to the `application/vnd.ipld.car` parameters sent via `Accept` header. The parameters in the `Accept` header have always priority, but including them in URL simplifies HTTP caching and allows use in `Content-Location` header on CAR responses to maximize interoperability with wide array of HTTP caches. -* `bitswap/server` now has two new options: `WithNotifyNewBlocks` and `WithNoPeerLedger` to allow disabling notifying new blocks and the usage of the peer ledger, respectively. +* `bitswap/server` now allows to override the default peer ledger with `WithPeerLedger`. ### Changed diff --git a/bitswap/options.go b/bitswap/options.go index 276119872..11e89fdf9 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -51,14 +51,6 @@ func SetSendDontHaves(send bool) Option { return Option{server.SetSendDontHaves(send)} } -func WithNotifyNewBlocks(notify bool) Option { - return Option{server.WithNotifyNewBlocks(notify)} -} - -func WithNoPeerLedger() Option { - return Option{server.WithNoPeerLedger()} -} - func WithPeerBlockRequestFilter(pbrf server.PeerBlockRequestFilter) Option { return Option{server.WithPeerBlockRequestFilter(pbrf)} } @@ -67,6 +59,10 @@ func WithScoreLedger(scoreLedger server.ScoreLedger) Option { return Option{server.WithScoreLedger(scoreLedger)} } +func WithPeerLedger(peerLedger server.PeerLedger) Option { + return Option{server.WithPeerLedger(peerLedger)} +} + func WithTargetMessageSize(tms int) Option { return Option{server.WithTargetMessageSize(tms)} } diff --git a/bitswap/server/forward.go b/bitswap/server/forward.go index ee353da19..c1ee1c44b 100644 --- a/bitswap/server/forward.go +++ b/bitswap/server/forward.go @@ -11,4 +11,6 @@ type ( TaskInfo = decision.TaskInfo ScoreLedger = decision.ScoreLedger ScorePeerFunc = decision.ScorePeerFunc + PeerLedger = decision.PeerLedger + PeerEntry = decision.PeerEntry ) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index cd4fac785..a3be94691 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -123,6 +123,25 @@ type ScoreLedger interface { Stop() } +type PeerEntry struct { + Peer peer.ID + Priority int32 + WantType pb.Message_Wantlist_WantType +} + +// PeerLedger is an external ledger dealing with peers and their want lists. +type PeerLedger interface { + Wants(p peer.ID, e wl.Entry) + CancelWant(p peer.ID, k cid.Cid) bool + CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) + Peers(k cid.Cid) []PeerEntry + CollectPeerIDs() []peer.ID + WantlistSizeForPeer(p peer.ID) int + WantlistForPeer(p peer.ID) []wl.Entry + ClearPeerWantlist(p peer.ID) + PeerDisconnected(p peer.ID) +} + // Engine manages sending requested blocks to peers. type Engine struct { // peerRequestQueue is a priority queue of requests received from peers. @@ -149,10 +168,8 @@ type Engine struct { lock sync.RWMutex // protects the fields immediately below - noPeerLedger bool - // peerLedger saves which peers are waiting for a Cid - peerLedger *peerLedger + peerLedger PeerLedger // an external ledger dealing with peer scores scoreLedger ScoreLedger @@ -170,8 +187,6 @@ type Engine struct { sendDontHaves bool - notifyNewBlocks bool - self peer.ID // metrics gauge for total pending tasks across all workers @@ -244,6 +259,13 @@ func WithScoreLedger(scoreledger ScoreLedger) Option { } } +// WithPeerLedger sets a custom [PeerLedger] to be used with this [Engine]. +func WithPeerLedger(peerLedger PeerLedger) Option { + return func(e *Engine) { + e.peerLedger = peerLedger + } +} + // WithBlockstoreWorkerCount sets the number of worker threads used for // blockstore operations in the decision engine func WithBlockstoreWorkerCount(count int) Option { @@ -298,24 +320,6 @@ func WithSetSendDontHave(send bool) Option { } } -// WithNotifyNewBlocks sets or not whether to notify peers when receiving a block. -// By default, it is true. This can be useful if you want the server to only -// reply if it has a block at the moment it receives a message, and not later. -func WithNotifyNewBlocks(notify bool) Option { - return func(e *Engine) { - e.notifyNewBlocks = notify - } -} - -// WithNoPeerLedger disables the usage of a peer ledger to track want lists. This -// means that this engine will not keep track of the CIDs that peers wanted in -// the past. -func WithNoPeerLedger() Option { - return func(e *Engine) { - e.noPeerLedger = true - } -} - // wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { return func(a, b *peertask.QueueTask) bool { @@ -380,9 +384,8 @@ func newEngine( maxBlockSizeReplaceHasWithBlock: maxReplaceSize, taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, - notifyNewBlocks: true, self: self, - peerLedger: newPeerLedger(), + peerLedger: NewDefaultPeerLedger(), pendingGauge: bmetrics.PendingEngineGauge(ctx), activeGauge: bmetrics.ActiveEngineGauge(ctx), targetMessageSize: defaultTargetMessageSize, @@ -722,10 +725,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap continue } - if !e.noPeerLedger { - e.peerLedger.Wants(p, entry.Entry) - } - + e.peerLedger.Wants(p, entry.Entry) filteredWants = append(filteredWants, entry) } clear := wants[len(filteredWants):] @@ -884,7 +884,7 @@ func (e *Engine) ReceivedBlocks(from peer.ID, blks []blocks.Block) { // NotifyNewBlocks is called when new blocks becomes available locally, and in particular when the caller of bitswap // decide to store those blocks and make them available on the network. func (e *Engine) NotifyNewBlocks(blks []blocks.Block) { - if !e.notifyNewBlocks || len(blks) == 0 { + if len(blks) == 0 { return } diff --git a/bitswap/server/internal/decision/peer_ledger.go b/bitswap/server/internal/decision/peer_ledger.go index cc7a5e1ac..4acea7ee1 100644 --- a/bitswap/server/internal/decision/peer_ledger.go +++ b/bitswap/server/internal/decision/peer_ledger.go @@ -8,20 +8,20 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) -type peerLedger struct { - // thoses two maps are inversions of each other +type DefaultPeerLedger struct { + // these two maps are inversions of each other peers map[peer.ID]map[cid.Cid]entry cids map[cid.Cid]map[peer.ID]entry } -func newPeerLedger() *peerLedger { - return &peerLedger{ +func NewDefaultPeerLedger() *DefaultPeerLedger { + return &DefaultPeerLedger{ peers: make(map[peer.ID]map[cid.Cid]entry), cids: make(map[cid.Cid]map[peer.ID]entry), } } -func (l *peerLedger) Wants(p peer.ID, e wl.Entry) { +func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) { cids, ok := l.peers[p] if !ok { cids = make(map[cid.Cid]entry) @@ -38,7 +38,7 @@ func (l *peerLedger) Wants(p peer.ID, e wl.Entry) { } // CancelWant returns true if the cid was present in the wantlist. -func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) bool { +func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool { wants, ok := l.peers[p] if !ok { return false @@ -53,7 +53,7 @@ func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) bool { } // CancelWantWithType will not cancel WantBlock if we sent a HAVE message. -func (l *peerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) { +func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) { wants, ok := l.peers[p] if !ok { return @@ -74,7 +74,7 @@ func (l *peerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wan l.removePeerFromCid(p, k) } -func (l *peerLedger) removePeerFromCid(p peer.ID, k cid.Cid) { +func (l *DefaultPeerLedger) removePeerFromCid(p peer.ID, k cid.Cid) { m, ok := l.cids[k] if !ok { return @@ -85,29 +85,28 @@ func (l *peerLedger) removePeerFromCid(p peer.ID, k cid.Cid) { } } -type entryForPeer struct { - Peer peer.ID - entry -} - type entry struct { Priority int32 WantType pb.Message_Wantlist_WantType } -func (l *peerLedger) Peers(k cid.Cid) []entryForPeer { +func (l *DefaultPeerLedger) Peers(k cid.Cid) []PeerEntry { m, ok := l.cids[k] if !ok { return nil } - peers := make([]entryForPeer, 0, len(m)) + peers := make([]PeerEntry, 0, len(m)) for p, e := range m { - peers = append(peers, entryForPeer{p, e}) + peers = append(peers, PeerEntry{ + Peer: p, + Priority: e.Priority, + WantType: e.WantType, + }) } return peers } -func (l *peerLedger) CollectPeerIDs() []peer.ID { +func (l *DefaultPeerLedger) CollectPeerIDs() []peer.ID { peers := make([]peer.ID, 0, len(l.peers)) for p := range l.peers { peers = append(peers, p) @@ -115,11 +114,11 @@ func (l *peerLedger) CollectPeerIDs() []peer.ID { return peers } -func (l *peerLedger) WantlistSizeForPeer(p peer.ID) int { +func (l *DefaultPeerLedger) WantlistSizeForPeer(p peer.ID) int { return len(l.peers[p]) } -func (l *peerLedger) WantlistForPeer(p peer.ID) []wl.Entry { +func (l *DefaultPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry { cids, ok := l.peers[p] if !ok { return nil @@ -139,7 +138,7 @@ func (l *peerLedger) WantlistForPeer(p peer.ID) []wl.Entry { // ClearPeerWantlist does not take an effort to fully erase it from memory. // This is intended when the peer is still connected and the map capacity could // be reused. If the memory should be freed use PeerDisconnected instead. -func (l *peerLedger) ClearPeerWantlist(p peer.ID) { +func (l *DefaultPeerLedger) ClearPeerWantlist(p peer.ID) { cids, ok := l.peers[p] if !ok { return @@ -150,7 +149,7 @@ func (l *peerLedger) ClearPeerWantlist(p peer.ID) { } } -func (l *peerLedger) PeerDisconnected(p peer.ID) { +func (l *DefaultPeerLedger) PeerDisconnected(p peer.ID) { l.ClearPeerWantlist(p) delete(l.peers, p) } diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 0dd2bc259..1e723ddce 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -161,17 +161,9 @@ func WithScoreLedger(scoreLedger decision.ScoreLedger) Option { } } -// WithNotifyNewBlocks configures the engine with [decision.WithNotifyNewBlocks]. -func WithNotifyNewBlocks(notify bool) Option { - o := decision.WithNotifyNewBlocks(notify) - return func(bs *Server) { - bs.engineOptions = append(bs.engineOptions, o) - } -} - -// WithNoPeerLedger configures the engine with [decision.WithNoPeerLedger]. -func WithNoPeerLedger() Option { - o := decision.WithNoPeerLedger() +// WithPeerLedger configures the engine with a custom [decision.PeerLedger]. +func WithPeerLedger(peerLedger decision.PeerLedger) Option { + o := decision.WithPeerLedger(peerLedger) return func(bs *Server) { bs.engineOptions = append(bs.engineOptions, o) }