From 720d617eb2545059f231fa40b34e3d98f707434b Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Thu, 16 Jan 2025 06:17:08 -1000 Subject: [PATCH 1/5] do not send cancel message to peer that sent block (#784) * do not send CANCEL to peer we got block from The serving peer cleans the client's wantlint after serving the block, making sending CANCEL to the serving peer redundant. So, exclude the serving peer when sending cancels after receiving a block. Closes #694 --- CHANGELOG.md | 1 + .../internal/peermanager/peermanager.go | 4 +- .../internal/peermanager/peermanager_test.go | 49 +++++++++++++++++-- .../internal/peermanager/peerwantmanager.go | 3 +- .../peermanager/peerwantmanager_test.go | 10 ++-- bitswap/client/internal/session/session.go | 2 +- .../client/internal/session/session_test.go | 2 +- .../session/sessionwantsender_test.go | 6 +-- .../internal/sessionmanager/sessionmanager.go | 4 +- .../sessionmanager/sessionmanager_test.go | 2 +- 10 files changed, 64 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f98ac052..da94b7837 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes: - `gateway` Support for custom DNSLink / DoH resolvers on `localhost` to simplify integration with non-ICANN DNS systems [#645](https://github.com/ipfs/boxo/pull/645) ### Changed +- Do not send CANCEL to peer that block was received from, as this is redundant. [#784](https://github.com/ipfs/boxo/pull/784) - `gateway` The default DNSLink resolver for `.eth` TLD changed to `https://dns.eth.limo/dns-query` [#781](https://github.com/ipfs/boxo/pull/781) - `gateway` The default DNSLink resolver for `.crypto` TLD changed to `https://resolver.unstoppable.io/dns-query` [#782](https://github.com/ipfs/boxo/pull/782) diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index 4634ff164..cd0069bea 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci // SendCancels sends cancels for the given keys to all peers who had previously // received a want for those keys. -func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) { +func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) { pm.pqLk.Lock() defer pm.pqLk.Unlock() // Send a CANCEL to each peer that has been sent a want-block or want-have - pm.pwm.sendCancels(cancelKs) + pm.pwm.sendCancels(cancelKs, excludePeer) } // CurrentWants returns the list of pending wants (both want-haves and want-blocks). diff --git a/bitswap/client/internal/peermanager/peermanager_test.go b/bitswap/client/internal/peermanager/peermanager_test.go index b778c46e3..61226acda 100644 --- a/bitswap/client/internal/peermanager/peermanager_test.go +++ b/bitswap/client/internal/peermanager/peermanager_test.go @@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) { collectMessages(msgs, 2*time.Millisecond) // Send cancels for 1 want-block and 1 want-have - peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}) + peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "") collected := collectMessages(msgs, 2*time.Millisecond) if _, ok := collected[peer2]; ok { @@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) { } // Send cancels for all cids - peerManager.SendCancels(ctx, cids) + peerManager.SendCancels(ctx, cids, "") collected = collectMessages(msgs, 2*time.Millisecond) if _, ok := collected[peer2]; ok { @@ -261,6 +261,49 @@ func TestSendCancels(t *testing.T) { } } +func TestSendCancelsExclude(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + msgs := make(chan msg, 16) + peerQueueFactory := makePeerQueueFactory(msgs) + tp := random.Peers(3) + self, peer1, peer2 := tp[0], tp[1], tp[2] + peerManager := New(ctx, peerQueueFactory, self) + cids := random.Cids(4) + + // Connect to peer1 and peer2 + peerManager.Connected(peer1) + peerManager.Connected(peer2) + + // Send 2 want-blocks and 1 want-have to peer1 + peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]}) + + // Clear messages + collectMessages(msgs, 2*time.Millisecond) + + // Send cancels for 1 want-block and 1 want-have + peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1) + collected := collectMessages(msgs, 2*time.Millisecond) + + if _, ok := collected[peer2]; ok { + t.Fatal("Expected no cancels to be sent to peer that was not sent messages") + } + if len(collected[peer1].cancels) != 0 { + t.Fatal("Expected no cancels to be sent to excluded peer") + } + + // Send cancels for all cids + peerManager.SendCancels(ctx, cids, "") + collected = collectMessages(msgs, 2*time.Millisecond) + + if _, ok := collected[peer2]; ok { + t.Fatal("Expected no cancels to be sent to peer that was not sent messages") + } + if len(collected[peer1].cancels) != 3 { + t.Fatal("Expected cancel to be sent for want-blocks") + } +} + func (s *sess) ID() uint64 { return s.id } @@ -376,7 +419,7 @@ func BenchmarkPeerManager(b *testing.B) { limit := len(wanted) / 10 cancel := wanted[:limit] wanted = wanted[limit:] - peerManager.SendCancels(ctx, cancel) + peerManager.SendCancels(ctx, cancel, "") } } } diff --git a/bitswap/client/internal/peermanager/peerwantmanager.go b/bitswap/client/internal/peermanager/peerwantmanager.go index e9fdfbb46..765566155 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager.go +++ b/bitswap/client/internal/peermanager/peerwantmanager.go @@ -233,7 +233,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves // sendCancels sends a cancel to each peer to which a corresponding want was // sent -func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { +func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID) { if len(cancelKs) == 0 { return } @@ -298,6 +298,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { cancelPeers[p] = struct{}{} } } + delete(cancelPeers, excludePeer) for p := range cancelPeers { pws, ok := pwm.peerWants[p] if !ok { diff --git a/bitswap/client/internal/peermanager/peerwantmanager_test.go b/bitswap/client/internal/peermanager/peerwantmanager_test.go index bfe0c626d..ff9b4ebad 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager_test.go +++ b/bitswap/client/internal/peermanager/peerwantmanager_test.go @@ -245,7 +245,7 @@ func TestPWMSendCancels(t *testing.T) { // Cancel 1 want-block and 1 want-have that were sent to p0 clearSent(peerQueues) - pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}) + pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}, "") // Should cancel the want-block and want-have require.Empty(t, pq1.cancels, "Expected no cancels sent to p1") require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[0], wh1[0]}, "Expected 2 cids to be cancelled") @@ -255,7 +255,7 @@ func TestPWMSendCancels(t *testing.T) { // Cancel everything clearSent(peerQueues) allCids := append(allwb, allwh...) - pwm.sendCancels(allCids) + pwm.sendCancels(allCids, "") // Should cancel the remaining want-blocks and want-haves for p0 require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[1], wh1[1]}, "Expected un-cancelled cids to be cancelled") @@ -312,7 +312,7 @@ func TestStats(t *testing.T) { // Cancel 1 want-block that was sent to p0 // and 1 want-block that was not sent cids5 := random.Cids(1) - pwm.sendCancels(append(cids5, cids[0])) + pwm.sendCancels(append(cids5, cids[0]), "") require.Equal(t, 7, g.count, "Expected 7 wants") require.Equal(t, 3, wbg.count, "Expected 3 want-blocks") @@ -332,7 +332,7 @@ func TestStats(t *testing.T) { require.Zero(t, wbg.count, "Expected 0 want-blocks") // Cancel one remaining broadcast want-have - pwm.sendCancels(cids2[:1]) + pwm.sendCancels(cids2[:1], "") require.Equal(t, 2, g.count, "Expected 2 wants") require.Zero(t, wbg.count, "Expected 0 want-blocks") } @@ -362,7 +362,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) { require.Equal(t, 4, wbg.count, "Expected 4 want-blocks") // Cancel 1 of each group of cids - pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}) + pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}, "") require.Equal(t, 2, g.count, "Expected 2 wants") require.Equal(t, 2, wbg.count, "Expected 2 want-blocks") diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index b19763f3c..a64b39223 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -43,7 +43,7 @@ type PeerManager interface { // session discovery) BroadcastWantHaves(context.Context, []cid.Cid) // SendCancels tells the PeerManager to send cancels to all peers - SendCancels(context.Context, []cid.Cid) + SendCancels(context.Context, []cid.Cid, peer.ID) } // SessionManager manages all the sessions diff --git a/bitswap/client/internal/session/session_test.go b/bitswap/client/internal/session/session_test.go index c0d26a91d..5b33b351a 100644 --- a/bitswap/client/internal/session/session_test.go +++ b/bitswap/client/internal/session/session_test.go @@ -151,7 +151,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci case <-ctx.Done(): } } -func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {} +func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {} func TestSessionGetBlocks(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) diff --git a/bitswap/client/internal/session/sessionwantsender_test.go b/bitswap/client/internal/session/sessionwantsender_test.go index e5589dd58..a43c8af96 100644 --- a/bitswap/client/internal/session/sessionwantsender_test.go +++ b/bitswap/client/internal/session/sessionwantsender_test.go @@ -78,9 +78,9 @@ func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool { return false } -func (*mockPeerManager) UnregisterSession(uint64) {} -func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} -func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {} +func (*mockPeerManager) UnregisterSession(uint64) {} +func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} +func (*mockPeerManager) SendCancels(context.Context, []cid.Cid, peer.ID) {} func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool { pm.lk.Lock() diff --git a/bitswap/client/internal/sessionmanager/sessionmanager.go b/bitswap/client/internal/sessionmanager/sessionmanager.go index 0d2b24330..179877acc 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager.go @@ -173,7 +173,7 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid } // Send CANCEL to all peers with want-have / want-block - sm.peerManager.SendCancels(ctx, blks) + sm.peerManager.SendCancels(ctx, blks, p) } // CancelSessionWants is called when a session cancels wants because a call to @@ -193,5 +193,5 @@ func (sm *SessionManager) cancelWants(wants []cid.Cid) { // Send CANCEL to all peers for blocks that no session is interested in // anymore. // Note: use bitswap context because session context may already be Done. - sm.peerManager.SendCancels(sm.ctx, wants) + sm.peerManager.SendCancels(sm.ctx, wants, "") } diff --git a/bitswap/client/internal/sessionmanager/sessionmanager_test.go b/bitswap/client/internal/sessionmanager/sessionmanager_test.go index bad26ad90..bb9ad4755 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager_test.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager_test.go @@ -70,7 +70,7 @@ func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) func (*fakePeerManager) UnregisterSession(uint64) {} func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true } func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} -func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) { +func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) { fpm.lk.Lock() defer fpm.lk.Unlock() fpm.cancels = append(fpm.cancels, cancels...) From b62b60e0ecb8353d1c3e32621b687737a72a30d0 Mon Sep 17 00:00:00 2001 From: dashangcun <907225865@qq.com> Date: Fri, 17 Jan 2025 06:08:09 +0100 Subject: [PATCH 2/5] refactor: using slices.Contains to simplify the code (#791) Signed-off-by: dashangcun <907225865@qq.com> Co-authored-by: Andrew Gillis <11790789+gammazero@users.noreply.github.com> --- bitswap/message/message_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/bitswap/message/message_test.go b/bitswap/message/message_test.go index 2b943aeb1..11f686a39 100644 --- a/bitswap/message/message_test.go +++ b/bitswap/message/message_test.go @@ -2,6 +2,7 @@ package message import ( "bytes" + "slices" "testing" "github.com/ipfs/boxo/bitswap/client/wantlist" @@ -58,7 +59,7 @@ func TestAppendBlock(t *testing.T) { // assert strings are in proto message for _, blockbytes := range m.ToProtoV0().GetBlocks() { s := bytes.NewBuffer(blockbytes).String() - if !contains(strs, s) { + if !slices.Contains(strs, s) { t.Fail() } } @@ -168,15 +169,6 @@ func wantlistContains(wantlist *pb.Message_Wantlist, c cid.Cid) bool { return false } -func contains(strs []string, x string) bool { - for _, s := range strs { - if s == x { - return true - } - } - return false -} - func TestDuplicates(t *testing.T) { b := blocks.NewBlock([]byte("foo")) msg := New(true) From 7fa984613dccc687b05a65ad7127d5034533b06b Mon Sep 17 00:00:00 2001 From: Cameron Wood Date: Fri, 17 Jan 2025 16:04:13 +0100 Subject: [PATCH 3/5] Create FUNDING.json [skip changelog] (#795) Add FUNDING.json indicating that this repo is owned/maintained by Shipyard. --- FUNDING.json | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 FUNDING.json diff --git a/FUNDING.json b/FUNDING.json new file mode 100644 index 000000000..4cabb0313 --- /dev/null +++ b/FUNDING.json @@ -0,0 +1,7 @@ +{ + "drips": { + "filecoin": { + "ownedBy": "0xEF22379b7527762a00FC5820AF55BdE363624f03" + } + } +} From 8ca0ca28612a37df47deb72bc9d7e188e5c2b96d Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Fri, 17 Jan 2025 05:51:11 -1000 Subject: [PATCH 4/5] Do not return unused values from wantlists (#792) - Do not return unused value from Remove or from Contains - Do not export functions unnecessarily.* Do not return unused values from wantlists - Remove unnecessary setting empty dict to nil The presence map in blockPresenceManager is never empty during normal use, so do not bother setting the map to nil for CG is its size becomes zero. --- .../blockpresencemanager.go | 17 ++-- .../internal/messagequeue/messagequeue.go | 71 +++++++-------- bitswap/client/wantlist/wantlist.go | 19 ++-- bitswap/client/wantlist/wantlist_test.go | 86 ++++++------------- 4 files changed, 74 insertions(+), 119 deletions(-) diff --git a/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go b/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go index b88b12bd3..288de6975 100644 --- a/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go +++ b/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go @@ -15,7 +15,9 @@ type BlockPresenceManager struct { } func New() *BlockPresenceManager { - return &BlockPresenceManager{} + return &BlockPresenceManager{ + presence: make(map[cid.Cid]map[peer.ID]bool), + } } // ReceiveFrom is called when a peer sends us information about which blocks @@ -24,10 +26,6 @@ func (bpm *BlockPresenceManager) ReceiveFrom(p peer.ID, haves []cid.Cid, dontHav bpm.Lock() defer bpm.Unlock() - if bpm.presence == nil { - bpm.presence = make(map[cid.Cid]map[peer.ID]bool) - } - for _, c := range haves { bpm.updateBlockPresence(p, c, true) } @@ -39,7 +37,8 @@ func (bpm *BlockPresenceManager) ReceiveFrom(p peer.ID, haves []cid.Cid, dontHav func (bpm *BlockPresenceManager) updateBlockPresence(p peer.ID, c cid.Cid, present bool) { _, ok := bpm.presence[c] if !ok { - bpm.presence[c] = make(map[peer.ID]bool) + bpm.presence[c] = map[peer.ID]bool{p: present} + return } // Make sure not to change HAVE to DONT_HAVE @@ -121,9 +120,6 @@ func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) { for _, c := range ks { delete(bpm.presence, c) } - if len(bpm.presence) == 0 { - bpm.presence = nil - } } // RemovePeer removes the given peer from every cid key in the presence map. @@ -137,9 +133,6 @@ func (bpm *BlockPresenceManager) RemovePeer(p peer.ID) { delete(bpm.presence, c) } } - if len(bpm.presence) == 0 { - bpm.presence = nil - } } // HasKey indicates whether the BlockPresenceManager is tracking the given key diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index 6a6e44280..a8ab99830 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -125,32 +125,32 @@ func newRecallWantList() recallWantlist { } } -// Add want to the pending list -func (r *recallWantlist) Add(c cid.Cid, priority int32, wtype pb.Message_Wantlist_WantType) { +// add want to the pending list +func (r *recallWantlist) add(c cid.Cid, priority int32, wtype pb.Message_Wantlist_WantType) { r.pending.Add(c, priority, wtype) } -// Remove wants from both the pending list and the list of sent wants -func (r *recallWantlist) Remove(c cid.Cid) { +// remove wants from both the pending list and the list of sent wants +func (r *recallWantlist) remove(c cid.Cid) { r.pending.Remove(c) r.sent.Remove(c) delete(r.sentAt, c) } -// Remove wants by type from both the pending list and the list of sent wants -func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantType) { +// remove wants by type from both the pending list and the list of sent wants +func (r *recallWantlist) removeType(c cid.Cid, wtype pb.Message_Wantlist_WantType) { r.pending.RemoveType(c, wtype) r.sent.RemoveType(c, wtype) - if _, ok := r.sent.Contains(c); !ok { + if !r.sent.Has(c) { delete(r.sentAt, c) } } -// MarkSent moves the want from the pending to the sent list +// markSent moves the want from the pending to the sent list // // Returns true if the want was marked as sent. Returns false if the want wasn't // pending. -func (r *recallWantlist) MarkSent(e bswl.Entry) bool { +func (r *recallWantlist) markSent(e bswl.Entry) bool { if !r.pending.RemoveType(e.Cid, e.WantType) { return false } @@ -158,33 +158,34 @@ func (r *recallWantlist) MarkSent(e bswl.Entry) bool { return true } -// SentAt records the time at which a want was sent -func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) { +// setSentAt records the time at which a want was sent +func (r *recallWantlist) setSentAt(c cid.Cid, at time.Time) { // The want may have been canceled in the interim - if _, ok := r.sent.Contains(c); ok { + if r.sent.Has(c) { if _, ok := r.sentAt[c]; !ok { r.sentAt[c] = at } } } -// ClearSentAt clears out the record of the time a want was sent. +// clearSentAt clears out the record of the time a want was sent. // We clear the sent at time when we receive a response for a key as we // only need the first response for latency measurement. -func (r *recallWantlist) ClearSentAt(c cid.Cid) { +func (r *recallWantlist) clearSentAt(c cid.Cid) { delete(r.sentAt, c) } -// Refresh moves wants from the sent list back to the pending list. +// refresh moves wants from the sent list back to the pending list. // If a want has been sent for longer than the interval, it is moved back to the pending list. // Returns the number of wants that were refreshed. -func (r *recallWantlist) Refresh(now time.Time, interval time.Duration) int { +func (r *recallWantlist) refresh(now time.Time, interval time.Duration) int { var refreshed int for _, want := range r.sent.Entries() { - sentAt, ok := r.sentAt[want.Cid] + wantCid := want.Cid + sentAt, ok := r.sentAt[wantCid] if ok && now.Sub(sentAt) >= interval { - r.pending.Add(want.Cid, want.Priority, want.WantType) - r.sent.Remove(want.Cid) + r.sent.Remove(wantCid) + r.pending.Add(wantCid, want.Priority, want.WantType) refreshed++ } } @@ -320,7 +321,7 @@ func (mq *MessageQueue) AddBroadcastWantHaves(wantHaves []cid.Cid) { mq.wllock.Lock() for _, c := range wantHaves { - mq.bcstWants.Add(c, mq.priority, pb.Message_Wantlist_Have) + mq.bcstWants.add(c, mq.priority, pb.Message_Wantlist_Have) mq.priority-- // We're adding a want-have for the cid, so clear any pending cancel @@ -343,7 +344,7 @@ func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid) { mq.wllock.Lock() for _, c := range wantHaves { - mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Have) + mq.peerWants.add(c, mq.priority, pb.Message_Wantlist_Have) mq.priority-- // We're adding a want-have for the cid, so clear any pending cancel @@ -351,7 +352,7 @@ func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid) { mq.cancels.Remove(c) } for _, c := range wantBlocks { - mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Block) + mq.peerWants.add(c, mq.priority, pb.Message_Wantlist_Block) mq.priority-- // We're adding a want-block for the cid, so clear any pending cancel @@ -383,12 +384,12 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { // Remove keys from broadcast and peer wants, and add to cancels for _, c := range cancelKs { // Check if a want for the key was sent - _, wasSentBcst := mq.bcstWants.sent.Contains(c) - _, wasSentPeer := mq.peerWants.sent.Contains(c) + wasSentBcst := mq.bcstWants.sent.Has(c) + wasSentPeer := mq.peerWants.sent.Has(c) // Remove the want from tracking wantlists - mq.bcstWants.Remove(c) - mq.peerWants.Remove(c) + mq.bcstWants.remove(c) + mq.peerWants.remove(c) // Only send a cancel if a want was sent if wasSentBcst || wasSentPeer { @@ -524,7 +525,7 @@ func (mq *MessageQueue) runQueue() { func (mq *MessageQueue) rebroadcastWantlist(now time.Time, interval time.Duration) { mq.wllock.Lock() // Transfer wants from the rebroadcast lists into the pending lists. - toRebroadcast := mq.bcstWants.Refresh(now, interval) + mq.peerWants.Refresh(now, interval) + toRebroadcast := mq.bcstWants.refresh(now, interval) + mq.peerWants.refresh(now, interval) mq.wllock.Unlock() // If some wants were transferred from the rebroadcast list @@ -614,7 +615,7 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) { // Unlikely, but just in case check that the block hasn't been // received in the interim c := entry.Cid - if _, ok := mq.peerWants.sent.Contains(c); ok { + if mq.peerWants.sent.Has(c) { wants = append(wants, c) } } @@ -652,7 +653,7 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) { if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency { earliest = at } - mq.bcstWants.ClearSentAt(c) + mq.bcstWants.clearSentAt(c) } if at, ok := mq.peerWants.sentAt[c]; ok { if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency { @@ -661,7 +662,7 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) { // Clear out the sent time for the CID because we only want to // record the latency between the request and the first response // for that CID (not subsequent responses) - mq.peerWants.ClearSentAt(c) + mq.peerWants.clearSentAt(c) } } @@ -752,7 +753,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap // place if possible. for _, e := range peerEntries { if e.WantType == pb.Message_Wantlist_Have { - mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have) + mq.peerWants.removeType(e.Cid, pb.Message_Wantlist_Have) } else { filteredPeerEntries = append(filteredPeerEntries, e) } @@ -817,7 +818,7 @@ FINISH: // message that we've decided to cancel at the last minute. mq.wllock.Lock() for i, e := range peerEntries[:sentPeerEntries] { - if !mq.peerWants.MarkSent(e) { + if !mq.peerWants.markSent(e) { // It changed. mq.msg.Remove(e.Cid) peerEntries[i].Cid = cid.Undef @@ -825,7 +826,7 @@ FINISH: } for i, e := range bcstEntries[:sentBcstEntries] { - if !mq.bcstWants.MarkSent(e) { + if !mq.bcstWants.markSent(e) { mq.msg.Remove(e.Cid) bcstEntries[i].Cid = cid.Undef } @@ -849,13 +850,13 @@ FINISH: for _, e := range peerEntries[:sentPeerEntries] { if e.Cid.Defined() { // Check if want was canceled in the interim - mq.peerWants.SentAt(e.Cid, now) + mq.peerWants.setSentAt(e.Cid, now) } } for _, e := range bcstEntries[:sentBcstEntries] { if e.Cid.Defined() { // Check if want was canceled in the interim - mq.bcstWants.SentAt(e.Cid, now) + mq.bcstWants.setSentAt(e.Cid, now) } } diff --git a/bitswap/client/wantlist/wantlist.go b/bitswap/client/wantlist/wantlist.go index 245085af9..a79dbd9fc 100644 --- a/bitswap/client/wantlist/wantlist.go +++ b/bitswap/client/wantlist/wantlist.go @@ -71,14 +71,8 @@ func (w *Wantlist) Add(c cid.Cid, priority int32, wantType pb.Message_Wantlist_W } // Remove removes the given cid from the wantlist. -func (w *Wantlist) Remove(c cid.Cid) bool { - _, ok := w.set[c] - if !ok { - return false - } - +func (w *Wantlist) Remove(c cid.Cid) { w.delete(c) - return true } // Remove removes the given cid from the wantlist, respecting the type: @@ -108,9 +102,14 @@ func (w *Wantlist) put(c cid.Cid, e Entry) { w.set[c] = e } -// Contains returns the entry, if present, for the given CID, plus whether it -// was present. -func (w *Wantlist) Contains(c cid.Cid) (Entry, bool) { +func (w *Wantlist) Has(c cid.Cid) bool { + _, ok := w.set[c] + return ok +} + +// Get returns the entry, if present, for the given CID, plus whether it was +// present. +func (w *Wantlist) Get(c cid.Cid) (Entry, bool) { e, ok := w.set[c] return e, ok } diff --git a/bitswap/client/wantlist/wantlist_test.go b/bitswap/client/wantlist/wantlist_test.go index 901fe0d67..db5542ecf 100644 --- a/bitswap/client/wantlist/wantlist_test.go +++ b/bitswap/client/wantlist/wantlist_test.go @@ -26,54 +26,37 @@ func init() { } type wli interface { - Contains(cid.Cid) (Entry, bool) + Get(cid.Cid) (Entry, bool) + Has(cid.Cid) bool } func assertHasCid(t *testing.T, w wli, c cid.Cid) { - e, ok := w.Contains(c) - if !ok { - t.Fatal("expected to have ", c) - } - if !e.Cid.Equals(c) { - t.Fatal("returned entry had wrong cid value") - } + e, ok := w.Get(c) + require.True(t, ok) + require.Equal(t, c, e.Cid) } func TestBasicWantlist(t *testing.T) { wl := New() - if !wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) { - t.Fatal("expected true") - } + require.True(t, wl.Add(testcids[0], 5, pb.Message_Wantlist_Block)) assertHasCid(t, wl, testcids[0]) - if !wl.Add(testcids[1], 4, pb.Message_Wantlist_Block) { - t.Fatal("expected true") - } + require.True(t, wl.Add(testcids[1], 4, pb.Message_Wantlist_Block)) assertHasCid(t, wl, testcids[0]) assertHasCid(t, wl, testcids[1]) - if wl.Len() != 2 { - t.Fatal("should have had two items") - } + require.Equal(t, 2, wl.Len()) - if wl.Add(testcids[1], 4, pb.Message_Wantlist_Block) { - t.Fatal("add shouldnt report success on second add") - } + require.False(t, wl.Add(testcids[1], 4, pb.Message_Wantlist_Block), "add should not report success on second add") assertHasCid(t, wl, testcids[0]) assertHasCid(t, wl, testcids[1]) - if wl.Len() != 2 { - t.Fatal("should have had two items") - } + require.Equal(t, 2, wl.Len()) - if !wl.RemoveType(testcids[0], pb.Message_Wantlist_Block) { - t.Fatal("should have gotten true") - } + require.True(t, wl.RemoveType(testcids[0], pb.Message_Wantlist_Block)) assertHasCid(t, wl, testcids[1]) - if _, has := wl.Contains(testcids[0]); has { - t.Fatal("shouldnt have this cid") - } + require.False(t, wl.Has(testcids[0]), "should not have this cid") } func TestAddHaveThenBlock(t *testing.T) { @@ -82,13 +65,9 @@ func TestAddHaveThenBlock(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Have) wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) - e, ok := wl.Contains(testcids[0]) - if !ok { - t.Fatal("expected to have ", testcids[0]) - } - if e.WantType != pb.Message_Wantlist_Block { - t.Fatal("expected to be ", pb.Message_Wantlist_Block) - } + e, ok := wl.Get(testcids[0]) + require.True(t, ok) + require.Equal(t, pb.Message_Wantlist_Block, e.WantType) } func TestAddBlockThenHave(t *testing.T) { @@ -97,13 +76,9 @@ func TestAddBlockThenHave(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) wl.Add(testcids[0], 5, pb.Message_Wantlist_Have) - e, ok := wl.Contains(testcids[0]) - if !ok { - t.Fatal("expected to have ", testcids[0]) - } - if e.WantType != pb.Message_Wantlist_Block { - t.Fatal("expected to be ", pb.Message_Wantlist_Block) - } + e, ok := wl.Get(testcids[0]) + require.True(t, ok) + require.Equal(t, pb.Message_Wantlist_Block, e.WantType) } func TestAddHaveThenRemoveBlock(t *testing.T) { @@ -112,10 +87,7 @@ func TestAddHaveThenRemoveBlock(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Have) wl.RemoveType(testcids[0], pb.Message_Wantlist_Block) - _, ok := wl.Contains(testcids[0]) - if ok { - t.Fatal("expected not to have ", testcids[0]) - } + require.False(t, wl.Has(testcids[0])) } func TestAddBlockThenRemoveHave(t *testing.T) { @@ -124,13 +96,9 @@ func TestAddBlockThenRemoveHave(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) wl.RemoveType(testcids[0], pb.Message_Wantlist_Have) - e, ok := wl.Contains(testcids[0]) - if !ok { - t.Fatal("expected to have ", testcids[0]) - } - if e.WantType != pb.Message_Wantlist_Block { - t.Fatal("expected to be ", pb.Message_Wantlist_Block) - } + e, ok := wl.Get(testcids[0]) + require.True(t, ok) + require.Equal(t, pb.Message_Wantlist_Block, e.WantType) } func TestAddHaveThenRemoveAny(t *testing.T) { @@ -139,10 +107,7 @@ func TestAddHaveThenRemoveAny(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Have) wl.Remove(testcids[0]) - _, ok := wl.Contains(testcids[0]) - if ok { - t.Fatal("expected not to have ", testcids[0]) - } + require.False(t, wl.Has(testcids[0])) } func TestAddBlockThenRemoveAny(t *testing.T) { @@ -151,10 +116,7 @@ func TestAddBlockThenRemoveAny(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) wl.Remove(testcids[0]) - _, ok := wl.Contains(testcids[0]) - if ok { - t.Fatal("expected not to have ", testcids[0]) - } + require.False(t, wl.Has(testcids[0])) } func TestSortEntries(t *testing.T) { From de52bc8101ad80f0dcc414547d05470d4e1f7aeb Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Mon, 20 Jan 2025 05:50:58 -1000 Subject: [PATCH 5/5] reduce default number of routing in-process requests (#793) * reduce default number of routing in-process requests Reduce the routing ProviderQueryManager default MaxInProcessRequests from 16 to 8. This prevents a situation where goroutines can be created faster than they can be completed, leading to OOM. * export providerquerymanager default consts * Only set providerquerymanager options if they differ from default * Get default bitswap client settings from defaults.go --- bitswap/client/client.go | 8 ++--- bitswap/internal/defaults/defaults.go | 3 ++ .../providerquerymanager.go | 30 +++++++++++-------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 2e83949cb..f1a3cf057 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -109,9 +109,7 @@ func WithoutDuplicatedBlockStats() Option { // lookups. The bitswap default ProviderQueryManager uses these options, which // may be more conservative than the ProviderQueryManager defaults: // -// - WithMaxInProcessRequests(16) -// - WithMaxProviders(10) -// - WithMaxTimeout(10 *time.Second) +// - WithMaxProviders(defaults.BitswapClientDefaultMaxProviders) // // To use a custom ProviderQueryManager, set to false and wrap directly the // content router provided with the WithContentRouting() option. Only takes @@ -196,9 +194,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr if bs.providerFinder != nil && bs.defaultProviderQueryManager { // network can do dialing. pqm, err := rpqm.New(network, bs.providerFinder, - rpqm.WithMaxInProcessRequests(16), - rpqm.WithMaxProviders(10), - rpqm.WithMaxTimeout(10*time.Second)) + rpqm.WithMaxProviders(defaults.BitswapClientDefaultMaxProviders)) if err != nil { // Should not be possible to hit this panic(err) diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index 646b56b0d..2c5cc3668 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -10,6 +10,9 @@ const ( // broadcasting outstanding wants for the first time. ProvSearchDelay = time.Second + // Maximum number of providers that are looked up per find request by the + // default bitswap client. 0 value means unlimited. + BitswapClientDefaultMaxProviders = 10 // Number of concurrent workers in decision engine that process requests to the blockstore BitswapEngineBlockstoreWorkerCount = 128 // the total number of simultaneous threads sending outgoing messages diff --git a/routing/providerquerymanager/providerquerymanager.go b/routing/providerquerymanager/providerquerymanager.go index 592f7f814..b37f25fa5 100644 --- a/routing/providerquerymanager/providerquerymanager.go +++ b/routing/providerquerymanager/providerquerymanager.go @@ -20,9 +20,15 @@ import ( var log = logging.Logger("routing/provqrymgr") const ( - defaultMaxInProcessRequests = 16 - defaultMaxProviders = 0 - defaultTimeout = 10 * time.Second + // DefaultMaxInProcessRequests is the default maximum number of requests + // that are processed concurrently. A value of 0 means unlimited. + DefaultMaxInProcessRequests = 8 + // DefaultMaxProviders is the default maximum number of providers that are + // looked up per find request. 0 value means unlimited. + DefaultMaxProviders = 0 + // DefaultTimeout is the limit on the amount of time to spend waiting for + // the maximum number of providers from a find request. + DefaultTimeout = 10 * time.Second ) type inProgressRequestStatus struct { @@ -112,9 +118,9 @@ func WithMaxTimeout(timeout time.Duration) Option { } } -// WithMaxInProcessRequests is the maximum number of requests that can be -// processed in parallel. If this is 0, then the number is unlimited. Default -// is defaultMaxInProcessRequests (16). +// WithMaxInProcessRequests sets maximum number of requests that are processed +// concurrently. A value of 0 means unlimited. Default is +// DefaultMaxInProcessRequests. func WithMaxInProcessRequests(count int) Option { return func(mgr *ProviderQueryManager) error { mgr.maxInProcessRequests = count @@ -122,9 +128,9 @@ func WithMaxInProcessRequests(count int) Option { } } -// WithMaxProviders is the maximum number of providers that will be looked up -// per query. We only return providers that we can connect to. Defaults to 0, -// which means unbounded. +// WithMaxProviders sets the maximum number of providers that are looked up per +// find request. Only providers that we can connect to are returned. Defaults +// to 0, which means unlimited. func WithMaxProviders(count int) Option { return func(mgr *ProviderQueryManager) error { mgr.maxProviders = count @@ -140,9 +146,9 @@ func New(dialer ProviderQueryDialer, router ProviderQueryRouter, opts ...Option) dialer: dialer, router: router, providerQueryMessages: make(chan providerQueryMessage), - findProviderTimeout: defaultTimeout, - maxInProcessRequests: defaultMaxInProcessRequests, - maxProviders: defaultMaxProviders, + findProviderTimeout: DefaultTimeout, + maxInProcessRequests: DefaultMaxInProcessRequests, + maxProviders: DefaultMaxProviders, } for _, o := range opts {