Skip to content

Commit

Permalink
bitswap/server: allow overriding peer ledger
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Apr 26, 2024
1 parent cf47211 commit 14cd7bd
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 71 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The following emojis are used to highlight certain changes:
* `NewRemoteCarBackend` allows you to create a gateway backend that uses one or multiple Trustless Gateways as backend. These gateways must support CAR requests (`application/vnd.ipld.car`), as well as the extensions describe in [IPIP-402](https://specs.ipfs.tech/ipips/ipip-0402/). With this, we also introduced `NewCarBackend`, `NewRemoteCarFetcher` and `NewRetryCarFetcher`.
* `gateway` now sets the [`Content-Location`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Location) header for requests with non-default content format, as a result of content negotiation. This allows generic and misconfigured HTTP caches to store Deserialized, CAR and Block responses separately, under distinct cache keys.
* `gateway` now supports `car-dups`, `car-order` and `car-version` as query parameters in addition to the `application/vnd.ipld.car` parameters sent via `Accept` header. The parameters in the `Accept` header have always priority, but including them in URL simplifies HTTP caching and allows use in `Content-Location` header on CAR responses to maximize interoperability with wide array of HTTP caches.
* `bitswap/server` now has two new options: `WithNotifyNewBlocks` and `WithNoPeerLedger` to allow disabling notifying new blocks and the usage of the peer ledger, respectively.
* `bitswap/server` now allows to override the default peer ledger with `WithPeerLedger`.

### Changed

Expand Down
12 changes: 4 additions & 8 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ func SetSendDontHaves(send bool) Option {
return Option{server.SetSendDontHaves(send)}
}

func WithNotifyNewBlocks(notify bool) Option {
return Option{server.WithNotifyNewBlocks(notify)}
}

func WithNoPeerLedger() Option {
return Option{server.WithNoPeerLedger()}
}

func WithPeerBlockRequestFilter(pbrf server.PeerBlockRequestFilter) Option {
return Option{server.WithPeerBlockRequestFilter(pbrf)}
}
Expand All @@ -67,6 +59,10 @@ func WithScoreLedger(scoreLedger server.ScoreLedger) Option {
return Option{server.WithScoreLedger(scoreLedger)}
}

func WithPeerLedger(peerLedger server.PeerLedger) Option {
return Option{server.WithPeerLedger(peerLedger)}

Check warning on line 63 in bitswap/options.go

View check run for this annotation

Codecov / codecov/patch

bitswap/options.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}

func WithTargetMessageSize(tms int) Option {
return Option{server.WithTargetMessageSize(tms)}
}
Expand Down
2 changes: 2 additions & 0 deletions bitswap/server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ type (
TaskInfo = decision.TaskInfo
ScoreLedger = decision.ScoreLedger
ScorePeerFunc = decision.ScorePeerFunc
PeerLedger = decision.PeerLedger
PeerEntry = decision.PeerEntry
)
60 changes: 30 additions & 30 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,25 @@ type ScoreLedger interface {
Stop()
}

type PeerEntry struct {
Peer peer.ID
Priority int32
WantType pb.Message_Wantlist_WantType
}

// PeerLedger is an external ledger dealing with peers and their want lists.
type PeerLedger interface {
Wants(p peer.ID, e wl.Entry)
CancelWant(p peer.ID, k cid.Cid) bool
CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType)
Peers(k cid.Cid) []PeerEntry
CollectPeerIDs() []peer.ID
WantlistSizeForPeer(p peer.ID) int
WantlistForPeer(p peer.ID) []wl.Entry
ClearPeerWantlist(p peer.ID)
PeerDisconnected(p peer.ID)
}

// Engine manages sending requested blocks to peers.
type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
Expand All @@ -149,10 +168,8 @@ type Engine struct {

lock sync.RWMutex // protects the fields immediately below

noPeerLedger bool

// peerLedger saves which peers are waiting for a Cid
peerLedger *peerLedger
peerLedger PeerLedger

// an external ledger dealing with peer scores
scoreLedger ScoreLedger
Expand All @@ -170,8 +187,6 @@ type Engine struct {

sendDontHaves bool

notifyNewBlocks bool

self peer.ID

// metrics gauge for total pending tasks across all workers
Expand Down Expand Up @@ -244,6 +259,13 @@ func WithScoreLedger(scoreledger ScoreLedger) Option {
}
}

// WithPeerLedger sets a custom [PeerLedger] to be used with this [Engine].
func WithPeerLedger(peerLedger PeerLedger) Option {
return func(e *Engine) {
e.peerLedger = peerLedger

Check warning on line 265 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L263-L265

Added lines #L263 - L265 were not covered by tests
}
}

// WithBlockstoreWorkerCount sets the number of worker threads used for
// blockstore operations in the decision engine
func WithBlockstoreWorkerCount(count int) Option {
Expand Down Expand Up @@ -298,24 +320,6 @@ func WithSetSendDontHave(send bool) Option {
}
}

// WithNotifyNewBlocks sets or not whether to notify peers when receiving a block.
// By default, it is true. This can be useful if you want the server to only
// reply if it has a block at the moment it receives a message, and not later.
func WithNotifyNewBlocks(notify bool) Option {
return func(e *Engine) {
e.notifyNewBlocks = notify
}
}

// WithNoPeerLedger disables the usage of a peer ledger to track want lists. This
// means that this engine will not keep track of the CIDs that peers wanted in
// the past.
func WithNoPeerLedger() Option {
return func(e *Engine) {
e.noPeerLedger = true
}
}

// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
return func(a, b *peertask.QueueTask) bool {
Expand Down Expand Up @@ -380,9 +384,8 @@ func newEngine(
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
taskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
sendDontHaves: true,
notifyNewBlocks: true,
self: self,
peerLedger: newPeerLedger(),
peerLedger: NewDefaultPeerLedger(),
pendingGauge: bmetrics.PendingEngineGauge(ctx),
activeGauge: bmetrics.ActiveEngineGauge(ctx),
targetMessageSize: defaultTargetMessageSize,
Expand Down Expand Up @@ -722,10 +725,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
continue
}

if !e.noPeerLedger {
e.peerLedger.Wants(p, entry.Entry)
}

e.peerLedger.Wants(p, entry.Entry)
filteredWants = append(filteredWants, entry)
}
clear := wants[len(filteredWants):]
Expand Down Expand Up @@ -884,7 +884,7 @@ func (e *Engine) ReceivedBlocks(from peer.ID, blks []blocks.Block) {
// NotifyNewBlocks is called when new blocks becomes available locally, and in particular when the caller of bitswap
// decide to store those blocks and make them available on the network.
func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
if !e.notifyNewBlocks || len(blks) == 0 {
if len(blks) == 0 {
return
}

Expand Down
41 changes: 20 additions & 21 deletions bitswap/server/internal/decision/peer_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

type peerLedger struct {
// thoses two maps are inversions of each other
type DefaultPeerLedger struct {
// these two maps are inversions of each other
peers map[peer.ID]map[cid.Cid]entry
cids map[cid.Cid]map[peer.ID]entry
}

func newPeerLedger() *peerLedger {
return &peerLedger{
func NewDefaultPeerLedger() *DefaultPeerLedger {
return &DefaultPeerLedger{
peers: make(map[peer.ID]map[cid.Cid]entry),
cids: make(map[cid.Cid]map[peer.ID]entry),
}
}

func (l *peerLedger) Wants(p peer.ID, e wl.Entry) {
func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) {
cids, ok := l.peers[p]
if !ok {
cids = make(map[cid.Cid]entry)
Expand All @@ -38,7 +38,7 @@ func (l *peerLedger) Wants(p peer.ID, e wl.Entry) {
}

// CancelWant returns true if the cid was present in the wantlist.
func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
wants, ok := l.peers[p]
if !ok {
return false
Expand All @@ -53,7 +53,7 @@ func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
}

// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
func (l *peerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) {
func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) {
wants, ok := l.peers[p]
if !ok {
return
Expand All @@ -74,7 +74,7 @@ func (l *peerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wan
l.removePeerFromCid(p, k)
}

func (l *peerLedger) removePeerFromCid(p peer.ID, k cid.Cid) {
func (l *DefaultPeerLedger) removePeerFromCid(p peer.ID, k cid.Cid) {
m, ok := l.cids[k]
if !ok {
return
Expand All @@ -85,41 +85,40 @@ func (l *peerLedger) removePeerFromCid(p peer.ID, k cid.Cid) {
}
}

type entryForPeer struct {
Peer peer.ID
entry
}

type entry struct {
Priority int32
WantType pb.Message_Wantlist_WantType
}

func (l *peerLedger) Peers(k cid.Cid) []entryForPeer {
func (l *DefaultPeerLedger) Peers(k cid.Cid) []PeerEntry {
m, ok := l.cids[k]
if !ok {
return nil
}
peers := make([]entryForPeer, 0, len(m))
peers := make([]PeerEntry, 0, len(m))
for p, e := range m {
peers = append(peers, entryForPeer{p, e})
peers = append(peers, PeerEntry{
Peer: p,
Priority: e.Priority,
WantType: e.WantType,
})
}
return peers
}

func (l *peerLedger) CollectPeerIDs() []peer.ID {
func (l *DefaultPeerLedger) CollectPeerIDs() []peer.ID {
peers := make([]peer.ID, 0, len(l.peers))
for p := range l.peers {
peers = append(peers, p)
}
return peers
}

func (l *peerLedger) WantlistSizeForPeer(p peer.ID) int {
func (l *DefaultPeerLedger) WantlistSizeForPeer(p peer.ID) int {
return len(l.peers[p])
}

func (l *peerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
func (l *DefaultPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
cids, ok := l.peers[p]
if !ok {
return nil
Expand All @@ -139,7 +138,7 @@ func (l *peerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
// ClearPeerWantlist does not take an effort to fully erase it from memory.
// This is intended when the peer is still connected and the map capacity could
// be reused. If the memory should be freed use PeerDisconnected instead.
func (l *peerLedger) ClearPeerWantlist(p peer.ID) {
func (l *DefaultPeerLedger) ClearPeerWantlist(p peer.ID) {
cids, ok := l.peers[p]
if !ok {
return
Expand All @@ -150,7 +149,7 @@ func (l *peerLedger) ClearPeerWantlist(p peer.ID) {
}
}

func (l *peerLedger) PeerDisconnected(p peer.ID) {
func (l *DefaultPeerLedger) PeerDisconnected(p peer.ID) {
l.ClearPeerWantlist(p)
delete(l.peers, p)
}
14 changes: 3 additions & 11 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,9 @@ func WithScoreLedger(scoreLedger decision.ScoreLedger) Option {
}
}

// WithNotifyNewBlocks configures the engine with [decision.WithNotifyNewBlocks].
func WithNotifyNewBlocks(notify bool) Option {
o := decision.WithNotifyNewBlocks(notify)
return func(bs *Server) {
bs.engineOptions = append(bs.engineOptions, o)
}
}

// WithNoPeerLedger configures the engine with [decision.WithNoPeerLedger].
func WithNoPeerLedger() Option {
o := decision.WithNoPeerLedger()
// WithPeerLedger configures the engine with a custom [decision.PeerLedger].
func WithPeerLedger(peerLedger decision.PeerLedger) Option {
o := decision.WithPeerLedger(peerLedger)
return func(bs *Server) {
bs.engineOptions = append(bs.engineOptions, o)

Check warning on line 168 in bitswap/server/server.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/server.go#L165-L168

Added lines #L165 - L168 were not covered by tests
}
Expand Down

0 comments on commit 14cd7bd

Please sign in to comment.