diff --git a/CHANGELOG.md b/CHANGELOG.md index fbc1caed9..36ec53d73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes: - `gateway` Support for custom DNSLink / DoH resolvers on `localhost` to simplify integration with non-ICANN DNS systems [#645](https://github.com/ipfs/boxo/pull/645) ### Changed +- Do not send CANCEL to peer that block was received from, as this is redundant. [#784](https://github.com/ipfs/boxo/pull/784) - `gateway` The default DNSLink resolver for `.eth` TLD changed to `https://dns.eth.limo/dns-query` [#781](https://github.com/ipfs/boxo/pull/781) - `gateway` The default DNSLink resolver for `.crypto` TLD changed to `https://resolver.unstoppable.io/dns-query` [#782](https://github.com/ipfs/boxo/pull/782) diff --git a/FUNDING.json b/FUNDING.json new file mode 100644 index 000000000..4cabb0313 --- /dev/null +++ b/FUNDING.json @@ -0,0 +1,7 @@ +{ + "drips": { + "filecoin": { + "ownedBy": "0xEF22379b7527762a00FC5820AF55BdE363624f03" + } + } +} diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 2e83949cb..f1a3cf057 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -109,9 +109,7 @@ func WithoutDuplicatedBlockStats() Option { // lookups. The bitswap default ProviderQueryManager uses these options, which // may be more conservative than the ProviderQueryManager defaults: // -// - WithMaxInProcessRequests(16) -// - WithMaxProviders(10) -// - WithMaxTimeout(10 *time.Second) +// - WithMaxProviders(defaults.BitswapClientDefaultMaxProviders) // // To use a custom ProviderQueryManager, set to false and wrap directly the // content router provided with the WithContentRouting() option. Only takes @@ -196,9 +194,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr if bs.providerFinder != nil && bs.defaultProviderQueryManager { // network can do dialing. pqm, err := rpqm.New(network, bs.providerFinder, - rpqm.WithMaxInProcessRequests(16), - rpqm.WithMaxProviders(10), - rpqm.WithMaxTimeout(10*time.Second)) + rpqm.WithMaxProviders(defaults.BitswapClientDefaultMaxProviders)) if err != nil { // Should not be possible to hit this panic(err) diff --git a/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go b/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go index b88b12bd3..288de6975 100644 --- a/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go +++ b/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go @@ -15,7 +15,9 @@ type BlockPresenceManager struct { } func New() *BlockPresenceManager { - return &BlockPresenceManager{} + return &BlockPresenceManager{ + presence: make(map[cid.Cid]map[peer.ID]bool), + } } // ReceiveFrom is called when a peer sends us information about which blocks @@ -24,10 +26,6 @@ func (bpm *BlockPresenceManager) ReceiveFrom(p peer.ID, haves []cid.Cid, dontHav bpm.Lock() defer bpm.Unlock() - if bpm.presence == nil { - bpm.presence = make(map[cid.Cid]map[peer.ID]bool) - } - for _, c := range haves { bpm.updateBlockPresence(p, c, true) } @@ -39,7 +37,8 @@ func (bpm *BlockPresenceManager) ReceiveFrom(p peer.ID, haves []cid.Cid, dontHav func (bpm *BlockPresenceManager) updateBlockPresence(p peer.ID, c cid.Cid, present bool) { _, ok := bpm.presence[c] if !ok { - bpm.presence[c] = make(map[peer.ID]bool) + bpm.presence[c] = map[peer.ID]bool{p: present} + return } // Make sure not to change HAVE to DONT_HAVE @@ -121,9 +120,6 @@ func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) { for _, c := range ks { delete(bpm.presence, c) } - if len(bpm.presence) == 0 { - bpm.presence = nil - } } // RemovePeer removes the given peer from every cid key in the presence map. @@ -137,9 +133,6 @@ func (bpm *BlockPresenceManager) RemovePeer(p peer.ID) { delete(bpm.presence, c) } } - if len(bpm.presence) == 0 { - bpm.presence = nil - } } // HasKey indicates whether the BlockPresenceManager is tracking the given key diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index 6a6e44280..a8ab99830 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -125,32 +125,32 @@ func newRecallWantList() recallWantlist { } } -// Add want to the pending list -func (r *recallWantlist) Add(c cid.Cid, priority int32, wtype pb.Message_Wantlist_WantType) { +// add want to the pending list +func (r *recallWantlist) add(c cid.Cid, priority int32, wtype pb.Message_Wantlist_WantType) { r.pending.Add(c, priority, wtype) } -// Remove wants from both the pending list and the list of sent wants -func (r *recallWantlist) Remove(c cid.Cid) { +// remove wants from both the pending list and the list of sent wants +func (r *recallWantlist) remove(c cid.Cid) { r.pending.Remove(c) r.sent.Remove(c) delete(r.sentAt, c) } -// Remove wants by type from both the pending list and the list of sent wants -func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantType) { +// remove wants by type from both the pending list and the list of sent wants +func (r *recallWantlist) removeType(c cid.Cid, wtype pb.Message_Wantlist_WantType) { r.pending.RemoveType(c, wtype) r.sent.RemoveType(c, wtype) - if _, ok := r.sent.Contains(c); !ok { + if !r.sent.Has(c) { delete(r.sentAt, c) } } -// MarkSent moves the want from the pending to the sent list +// markSent moves the want from the pending to the sent list // // Returns true if the want was marked as sent. Returns false if the want wasn't // pending. -func (r *recallWantlist) MarkSent(e bswl.Entry) bool { +func (r *recallWantlist) markSent(e bswl.Entry) bool { if !r.pending.RemoveType(e.Cid, e.WantType) { return false } @@ -158,33 +158,34 @@ func (r *recallWantlist) MarkSent(e bswl.Entry) bool { return true } -// SentAt records the time at which a want was sent -func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) { +// setSentAt records the time at which a want was sent +func (r *recallWantlist) setSentAt(c cid.Cid, at time.Time) { // The want may have been canceled in the interim - if _, ok := r.sent.Contains(c); ok { + if r.sent.Has(c) { if _, ok := r.sentAt[c]; !ok { r.sentAt[c] = at } } } -// ClearSentAt clears out the record of the time a want was sent. +// clearSentAt clears out the record of the time a want was sent. // We clear the sent at time when we receive a response for a key as we // only need the first response for latency measurement. -func (r *recallWantlist) ClearSentAt(c cid.Cid) { +func (r *recallWantlist) clearSentAt(c cid.Cid) { delete(r.sentAt, c) } -// Refresh moves wants from the sent list back to the pending list. +// refresh moves wants from the sent list back to the pending list. // If a want has been sent for longer than the interval, it is moved back to the pending list. // Returns the number of wants that were refreshed. -func (r *recallWantlist) Refresh(now time.Time, interval time.Duration) int { +func (r *recallWantlist) refresh(now time.Time, interval time.Duration) int { var refreshed int for _, want := range r.sent.Entries() { - sentAt, ok := r.sentAt[want.Cid] + wantCid := want.Cid + sentAt, ok := r.sentAt[wantCid] if ok && now.Sub(sentAt) >= interval { - r.pending.Add(want.Cid, want.Priority, want.WantType) - r.sent.Remove(want.Cid) + r.sent.Remove(wantCid) + r.pending.Add(wantCid, want.Priority, want.WantType) refreshed++ } } @@ -320,7 +321,7 @@ func (mq *MessageQueue) AddBroadcastWantHaves(wantHaves []cid.Cid) { mq.wllock.Lock() for _, c := range wantHaves { - mq.bcstWants.Add(c, mq.priority, pb.Message_Wantlist_Have) + mq.bcstWants.add(c, mq.priority, pb.Message_Wantlist_Have) mq.priority-- // We're adding a want-have for the cid, so clear any pending cancel @@ -343,7 +344,7 @@ func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid) { mq.wllock.Lock() for _, c := range wantHaves { - mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Have) + mq.peerWants.add(c, mq.priority, pb.Message_Wantlist_Have) mq.priority-- // We're adding a want-have for the cid, so clear any pending cancel @@ -351,7 +352,7 @@ func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid) { mq.cancels.Remove(c) } for _, c := range wantBlocks { - mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Block) + mq.peerWants.add(c, mq.priority, pb.Message_Wantlist_Block) mq.priority-- // We're adding a want-block for the cid, so clear any pending cancel @@ -383,12 +384,12 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { // Remove keys from broadcast and peer wants, and add to cancels for _, c := range cancelKs { // Check if a want for the key was sent - _, wasSentBcst := mq.bcstWants.sent.Contains(c) - _, wasSentPeer := mq.peerWants.sent.Contains(c) + wasSentBcst := mq.bcstWants.sent.Has(c) + wasSentPeer := mq.peerWants.sent.Has(c) // Remove the want from tracking wantlists - mq.bcstWants.Remove(c) - mq.peerWants.Remove(c) + mq.bcstWants.remove(c) + mq.peerWants.remove(c) // Only send a cancel if a want was sent if wasSentBcst || wasSentPeer { @@ -524,7 +525,7 @@ func (mq *MessageQueue) runQueue() { func (mq *MessageQueue) rebroadcastWantlist(now time.Time, interval time.Duration) { mq.wllock.Lock() // Transfer wants from the rebroadcast lists into the pending lists. - toRebroadcast := mq.bcstWants.Refresh(now, interval) + mq.peerWants.Refresh(now, interval) + toRebroadcast := mq.bcstWants.refresh(now, interval) + mq.peerWants.refresh(now, interval) mq.wllock.Unlock() // If some wants were transferred from the rebroadcast list @@ -614,7 +615,7 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) { // Unlikely, but just in case check that the block hasn't been // received in the interim c := entry.Cid - if _, ok := mq.peerWants.sent.Contains(c); ok { + if mq.peerWants.sent.Has(c) { wants = append(wants, c) } } @@ -652,7 +653,7 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) { if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency { earliest = at } - mq.bcstWants.ClearSentAt(c) + mq.bcstWants.clearSentAt(c) } if at, ok := mq.peerWants.sentAt[c]; ok { if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency { @@ -661,7 +662,7 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) { // Clear out the sent time for the CID because we only want to // record the latency between the request and the first response // for that CID (not subsequent responses) - mq.peerWants.ClearSentAt(c) + mq.peerWants.clearSentAt(c) } } @@ -752,7 +753,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap // place if possible. for _, e := range peerEntries { if e.WantType == pb.Message_Wantlist_Have { - mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have) + mq.peerWants.removeType(e.Cid, pb.Message_Wantlist_Have) } else { filteredPeerEntries = append(filteredPeerEntries, e) } @@ -817,7 +818,7 @@ FINISH: // message that we've decided to cancel at the last minute. mq.wllock.Lock() for i, e := range peerEntries[:sentPeerEntries] { - if !mq.peerWants.MarkSent(e) { + if !mq.peerWants.markSent(e) { // It changed. mq.msg.Remove(e.Cid) peerEntries[i].Cid = cid.Undef @@ -825,7 +826,7 @@ FINISH: } for i, e := range bcstEntries[:sentBcstEntries] { - if !mq.bcstWants.MarkSent(e) { + if !mq.bcstWants.markSent(e) { mq.msg.Remove(e.Cid) bcstEntries[i].Cid = cid.Undef } @@ -849,13 +850,13 @@ FINISH: for _, e := range peerEntries[:sentPeerEntries] { if e.Cid.Defined() { // Check if want was canceled in the interim - mq.peerWants.SentAt(e.Cid, now) + mq.peerWants.setSentAt(e.Cid, now) } } for _, e := range bcstEntries[:sentBcstEntries] { if e.Cid.Defined() { // Check if want was canceled in the interim - mq.bcstWants.SentAt(e.Cid, now) + mq.bcstWants.setSentAt(e.Cid, now) } } diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index 4634ff164..cd0069bea 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci // SendCancels sends cancels for the given keys to all peers who had previously // received a want for those keys. -func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) { +func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) { pm.pqLk.Lock() defer pm.pqLk.Unlock() // Send a CANCEL to each peer that has been sent a want-block or want-have - pm.pwm.sendCancels(cancelKs) + pm.pwm.sendCancels(cancelKs, excludePeer) } // CurrentWants returns the list of pending wants (both want-haves and want-blocks). diff --git a/bitswap/client/internal/peermanager/peermanager_test.go b/bitswap/client/internal/peermanager/peermanager_test.go index b778c46e3..61226acda 100644 --- a/bitswap/client/internal/peermanager/peermanager_test.go +++ b/bitswap/client/internal/peermanager/peermanager_test.go @@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) { collectMessages(msgs, 2*time.Millisecond) // Send cancels for 1 want-block and 1 want-have - peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}) + peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "") collected := collectMessages(msgs, 2*time.Millisecond) if _, ok := collected[peer2]; ok { @@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) { } // Send cancels for all cids - peerManager.SendCancels(ctx, cids) + peerManager.SendCancels(ctx, cids, "") collected = collectMessages(msgs, 2*time.Millisecond) if _, ok := collected[peer2]; ok { @@ -261,6 +261,49 @@ func TestSendCancels(t *testing.T) { } } +func TestSendCancelsExclude(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + msgs := make(chan msg, 16) + peerQueueFactory := makePeerQueueFactory(msgs) + tp := random.Peers(3) + self, peer1, peer2 := tp[0], tp[1], tp[2] + peerManager := New(ctx, peerQueueFactory, self) + cids := random.Cids(4) + + // Connect to peer1 and peer2 + peerManager.Connected(peer1) + peerManager.Connected(peer2) + + // Send 2 want-blocks and 1 want-have to peer1 + peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]}) + + // Clear messages + collectMessages(msgs, 2*time.Millisecond) + + // Send cancels for 1 want-block and 1 want-have + peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1) + collected := collectMessages(msgs, 2*time.Millisecond) + + if _, ok := collected[peer2]; ok { + t.Fatal("Expected no cancels to be sent to peer that was not sent messages") + } + if len(collected[peer1].cancels) != 0 { + t.Fatal("Expected no cancels to be sent to excluded peer") + } + + // Send cancels for all cids + peerManager.SendCancels(ctx, cids, "") + collected = collectMessages(msgs, 2*time.Millisecond) + + if _, ok := collected[peer2]; ok { + t.Fatal("Expected no cancels to be sent to peer that was not sent messages") + } + if len(collected[peer1].cancels) != 3 { + t.Fatal("Expected cancel to be sent for want-blocks") + } +} + func (s *sess) ID() uint64 { return s.id } @@ -376,7 +419,7 @@ func BenchmarkPeerManager(b *testing.B) { limit := len(wanted) / 10 cancel := wanted[:limit] wanted = wanted[limit:] - peerManager.SendCancels(ctx, cancel) + peerManager.SendCancels(ctx, cancel, "") } } } diff --git a/bitswap/client/internal/peermanager/peerwantmanager.go b/bitswap/client/internal/peermanager/peerwantmanager.go index e9fdfbb46..765566155 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager.go +++ b/bitswap/client/internal/peermanager/peerwantmanager.go @@ -233,7 +233,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves // sendCancels sends a cancel to each peer to which a corresponding want was // sent -func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { +func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID) { if len(cancelKs) == 0 { return } @@ -298,6 +298,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { cancelPeers[p] = struct{}{} } } + delete(cancelPeers, excludePeer) for p := range cancelPeers { pws, ok := pwm.peerWants[p] if !ok { diff --git a/bitswap/client/internal/peermanager/peerwantmanager_test.go b/bitswap/client/internal/peermanager/peerwantmanager_test.go index bfe0c626d..ff9b4ebad 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager_test.go +++ b/bitswap/client/internal/peermanager/peerwantmanager_test.go @@ -245,7 +245,7 @@ func TestPWMSendCancels(t *testing.T) { // Cancel 1 want-block and 1 want-have that were sent to p0 clearSent(peerQueues) - pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}) + pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}, "") // Should cancel the want-block and want-have require.Empty(t, pq1.cancels, "Expected no cancels sent to p1") require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[0], wh1[0]}, "Expected 2 cids to be cancelled") @@ -255,7 +255,7 @@ func TestPWMSendCancels(t *testing.T) { // Cancel everything clearSent(peerQueues) allCids := append(allwb, allwh...) - pwm.sendCancels(allCids) + pwm.sendCancels(allCids, "") // Should cancel the remaining want-blocks and want-haves for p0 require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[1], wh1[1]}, "Expected un-cancelled cids to be cancelled") @@ -312,7 +312,7 @@ func TestStats(t *testing.T) { // Cancel 1 want-block that was sent to p0 // and 1 want-block that was not sent cids5 := random.Cids(1) - pwm.sendCancels(append(cids5, cids[0])) + pwm.sendCancels(append(cids5, cids[0]), "") require.Equal(t, 7, g.count, "Expected 7 wants") require.Equal(t, 3, wbg.count, "Expected 3 want-blocks") @@ -332,7 +332,7 @@ func TestStats(t *testing.T) { require.Zero(t, wbg.count, "Expected 0 want-blocks") // Cancel one remaining broadcast want-have - pwm.sendCancels(cids2[:1]) + pwm.sendCancels(cids2[:1], "") require.Equal(t, 2, g.count, "Expected 2 wants") require.Zero(t, wbg.count, "Expected 0 want-blocks") } @@ -362,7 +362,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) { require.Equal(t, 4, wbg.count, "Expected 4 want-blocks") // Cancel 1 of each group of cids - pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}) + pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}, "") require.Equal(t, 2, g.count, "Expected 2 wants") require.Equal(t, 2, wbg.count, "Expected 2 want-blocks") diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index b19763f3c..a64b39223 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -43,7 +43,7 @@ type PeerManager interface { // session discovery) BroadcastWantHaves(context.Context, []cid.Cid) // SendCancels tells the PeerManager to send cancels to all peers - SendCancels(context.Context, []cid.Cid) + SendCancels(context.Context, []cid.Cid, peer.ID) } // SessionManager manages all the sessions diff --git a/bitswap/client/internal/session/session_test.go b/bitswap/client/internal/session/session_test.go index c0d26a91d..5b33b351a 100644 --- a/bitswap/client/internal/session/session_test.go +++ b/bitswap/client/internal/session/session_test.go @@ -151,7 +151,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci case <-ctx.Done(): } } -func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {} +func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {} func TestSessionGetBlocks(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) diff --git a/bitswap/client/internal/session/sessionwantsender_test.go b/bitswap/client/internal/session/sessionwantsender_test.go index e5589dd58..a43c8af96 100644 --- a/bitswap/client/internal/session/sessionwantsender_test.go +++ b/bitswap/client/internal/session/sessionwantsender_test.go @@ -78,9 +78,9 @@ func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool { return false } -func (*mockPeerManager) UnregisterSession(uint64) {} -func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} -func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {} +func (*mockPeerManager) UnregisterSession(uint64) {} +func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} +func (*mockPeerManager) SendCancels(context.Context, []cid.Cid, peer.ID) {} func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool { pm.lk.Lock() diff --git a/bitswap/client/internal/sessionmanager/sessionmanager.go b/bitswap/client/internal/sessionmanager/sessionmanager.go index 0d2b24330..179877acc 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager.go @@ -173,7 +173,7 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid } // Send CANCEL to all peers with want-have / want-block - sm.peerManager.SendCancels(ctx, blks) + sm.peerManager.SendCancels(ctx, blks, p) } // CancelSessionWants is called when a session cancels wants because a call to @@ -193,5 +193,5 @@ func (sm *SessionManager) cancelWants(wants []cid.Cid) { // Send CANCEL to all peers for blocks that no session is interested in // anymore. // Note: use bitswap context because session context may already be Done. - sm.peerManager.SendCancels(sm.ctx, wants) + sm.peerManager.SendCancels(sm.ctx, wants, "") } diff --git a/bitswap/client/internal/sessionmanager/sessionmanager_test.go b/bitswap/client/internal/sessionmanager/sessionmanager_test.go index bad26ad90..bb9ad4755 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager_test.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager_test.go @@ -70,7 +70,7 @@ func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) func (*fakePeerManager) UnregisterSession(uint64) {} func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true } func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} -func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) { +func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) { fpm.lk.Lock() defer fpm.lk.Unlock() fpm.cancels = append(fpm.cancels, cancels...) diff --git a/bitswap/client/wantlist/wantlist.go b/bitswap/client/wantlist/wantlist.go index 245085af9..a79dbd9fc 100644 --- a/bitswap/client/wantlist/wantlist.go +++ b/bitswap/client/wantlist/wantlist.go @@ -71,14 +71,8 @@ func (w *Wantlist) Add(c cid.Cid, priority int32, wantType pb.Message_Wantlist_W } // Remove removes the given cid from the wantlist. -func (w *Wantlist) Remove(c cid.Cid) bool { - _, ok := w.set[c] - if !ok { - return false - } - +func (w *Wantlist) Remove(c cid.Cid) { w.delete(c) - return true } // Remove removes the given cid from the wantlist, respecting the type: @@ -108,9 +102,14 @@ func (w *Wantlist) put(c cid.Cid, e Entry) { w.set[c] = e } -// Contains returns the entry, if present, for the given CID, plus whether it -// was present. -func (w *Wantlist) Contains(c cid.Cid) (Entry, bool) { +func (w *Wantlist) Has(c cid.Cid) bool { + _, ok := w.set[c] + return ok +} + +// Get returns the entry, if present, for the given CID, plus whether it was +// present. +func (w *Wantlist) Get(c cid.Cid) (Entry, bool) { e, ok := w.set[c] return e, ok } diff --git a/bitswap/client/wantlist/wantlist_test.go b/bitswap/client/wantlist/wantlist_test.go index 901fe0d67..db5542ecf 100644 --- a/bitswap/client/wantlist/wantlist_test.go +++ b/bitswap/client/wantlist/wantlist_test.go @@ -26,54 +26,37 @@ func init() { } type wli interface { - Contains(cid.Cid) (Entry, bool) + Get(cid.Cid) (Entry, bool) + Has(cid.Cid) bool } func assertHasCid(t *testing.T, w wli, c cid.Cid) { - e, ok := w.Contains(c) - if !ok { - t.Fatal("expected to have ", c) - } - if !e.Cid.Equals(c) { - t.Fatal("returned entry had wrong cid value") - } + e, ok := w.Get(c) + require.True(t, ok) + require.Equal(t, c, e.Cid) } func TestBasicWantlist(t *testing.T) { wl := New() - if !wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) { - t.Fatal("expected true") - } + require.True(t, wl.Add(testcids[0], 5, pb.Message_Wantlist_Block)) assertHasCid(t, wl, testcids[0]) - if !wl.Add(testcids[1], 4, pb.Message_Wantlist_Block) { - t.Fatal("expected true") - } + require.True(t, wl.Add(testcids[1], 4, pb.Message_Wantlist_Block)) assertHasCid(t, wl, testcids[0]) assertHasCid(t, wl, testcids[1]) - if wl.Len() != 2 { - t.Fatal("should have had two items") - } + require.Equal(t, 2, wl.Len()) - if wl.Add(testcids[1], 4, pb.Message_Wantlist_Block) { - t.Fatal("add shouldnt report success on second add") - } + require.False(t, wl.Add(testcids[1], 4, pb.Message_Wantlist_Block), "add should not report success on second add") assertHasCid(t, wl, testcids[0]) assertHasCid(t, wl, testcids[1]) - if wl.Len() != 2 { - t.Fatal("should have had two items") - } + require.Equal(t, 2, wl.Len()) - if !wl.RemoveType(testcids[0], pb.Message_Wantlist_Block) { - t.Fatal("should have gotten true") - } + require.True(t, wl.RemoveType(testcids[0], pb.Message_Wantlist_Block)) assertHasCid(t, wl, testcids[1]) - if _, has := wl.Contains(testcids[0]); has { - t.Fatal("shouldnt have this cid") - } + require.False(t, wl.Has(testcids[0]), "should not have this cid") } func TestAddHaveThenBlock(t *testing.T) { @@ -82,13 +65,9 @@ func TestAddHaveThenBlock(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Have) wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) - e, ok := wl.Contains(testcids[0]) - if !ok { - t.Fatal("expected to have ", testcids[0]) - } - if e.WantType != pb.Message_Wantlist_Block { - t.Fatal("expected to be ", pb.Message_Wantlist_Block) - } + e, ok := wl.Get(testcids[0]) + require.True(t, ok) + require.Equal(t, pb.Message_Wantlist_Block, e.WantType) } func TestAddBlockThenHave(t *testing.T) { @@ -97,13 +76,9 @@ func TestAddBlockThenHave(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) wl.Add(testcids[0], 5, pb.Message_Wantlist_Have) - e, ok := wl.Contains(testcids[0]) - if !ok { - t.Fatal("expected to have ", testcids[0]) - } - if e.WantType != pb.Message_Wantlist_Block { - t.Fatal("expected to be ", pb.Message_Wantlist_Block) - } + e, ok := wl.Get(testcids[0]) + require.True(t, ok) + require.Equal(t, pb.Message_Wantlist_Block, e.WantType) } func TestAddHaveThenRemoveBlock(t *testing.T) { @@ -112,10 +87,7 @@ func TestAddHaveThenRemoveBlock(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Have) wl.RemoveType(testcids[0], pb.Message_Wantlist_Block) - _, ok := wl.Contains(testcids[0]) - if ok { - t.Fatal("expected not to have ", testcids[0]) - } + require.False(t, wl.Has(testcids[0])) } func TestAddBlockThenRemoveHave(t *testing.T) { @@ -124,13 +96,9 @@ func TestAddBlockThenRemoveHave(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) wl.RemoveType(testcids[0], pb.Message_Wantlist_Have) - e, ok := wl.Contains(testcids[0]) - if !ok { - t.Fatal("expected to have ", testcids[0]) - } - if e.WantType != pb.Message_Wantlist_Block { - t.Fatal("expected to be ", pb.Message_Wantlist_Block) - } + e, ok := wl.Get(testcids[0]) + require.True(t, ok) + require.Equal(t, pb.Message_Wantlist_Block, e.WantType) } func TestAddHaveThenRemoveAny(t *testing.T) { @@ -139,10 +107,7 @@ func TestAddHaveThenRemoveAny(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Have) wl.Remove(testcids[0]) - _, ok := wl.Contains(testcids[0]) - if ok { - t.Fatal("expected not to have ", testcids[0]) - } + require.False(t, wl.Has(testcids[0])) } func TestAddBlockThenRemoveAny(t *testing.T) { @@ -151,10 +116,7 @@ func TestAddBlockThenRemoveAny(t *testing.T) { wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) wl.Remove(testcids[0]) - _, ok := wl.Contains(testcids[0]) - if ok { - t.Fatal("expected not to have ", testcids[0]) - } + require.False(t, wl.Has(testcids[0])) } func TestSortEntries(t *testing.T) { diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index 646b56b0d..2c5cc3668 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -10,6 +10,9 @@ const ( // broadcasting outstanding wants for the first time. ProvSearchDelay = time.Second + // Maximum number of providers that are looked up per find request by the + // default bitswap client. 0 value means unlimited. + BitswapClientDefaultMaxProviders = 10 // Number of concurrent workers in decision engine that process requests to the blockstore BitswapEngineBlockstoreWorkerCount = 128 // the total number of simultaneous threads sending outgoing messages diff --git a/bitswap/message/message_test.go b/bitswap/message/message_test.go index 2b943aeb1..11f686a39 100644 --- a/bitswap/message/message_test.go +++ b/bitswap/message/message_test.go @@ -2,6 +2,7 @@ package message import ( "bytes" + "slices" "testing" "github.com/ipfs/boxo/bitswap/client/wantlist" @@ -58,7 +59,7 @@ func TestAppendBlock(t *testing.T) { // assert strings are in proto message for _, blockbytes := range m.ToProtoV0().GetBlocks() { s := bytes.NewBuffer(blockbytes).String() - if !contains(strs, s) { + if !slices.Contains(strs, s) { t.Fail() } } @@ -168,15 +169,6 @@ func wantlistContains(wantlist *pb.Message_Wantlist, c cid.Cid) bool { return false } -func contains(strs []string, x string) bool { - for _, s := range strs { - if s == x { - return true - } - } - return false -} - func TestDuplicates(t *testing.T) { b := blocks.NewBlock([]byte("foo")) msg := New(true) diff --git a/routing/providerquerymanager/providerquerymanager.go b/routing/providerquerymanager/providerquerymanager.go index 592f7f814..b37f25fa5 100644 --- a/routing/providerquerymanager/providerquerymanager.go +++ b/routing/providerquerymanager/providerquerymanager.go @@ -20,9 +20,15 @@ import ( var log = logging.Logger("routing/provqrymgr") const ( - defaultMaxInProcessRequests = 16 - defaultMaxProviders = 0 - defaultTimeout = 10 * time.Second + // DefaultMaxInProcessRequests is the default maximum number of requests + // that are processed concurrently. A value of 0 means unlimited. + DefaultMaxInProcessRequests = 8 + // DefaultMaxProviders is the default maximum number of providers that are + // looked up per find request. 0 value means unlimited. + DefaultMaxProviders = 0 + // DefaultTimeout is the limit on the amount of time to spend waiting for + // the maximum number of providers from a find request. + DefaultTimeout = 10 * time.Second ) type inProgressRequestStatus struct { @@ -112,9 +118,9 @@ func WithMaxTimeout(timeout time.Duration) Option { } } -// WithMaxInProcessRequests is the maximum number of requests that can be -// processed in parallel. If this is 0, then the number is unlimited. Default -// is defaultMaxInProcessRequests (16). +// WithMaxInProcessRequests sets maximum number of requests that are processed +// concurrently. A value of 0 means unlimited. Default is +// DefaultMaxInProcessRequests. func WithMaxInProcessRequests(count int) Option { return func(mgr *ProviderQueryManager) error { mgr.maxInProcessRequests = count @@ -122,9 +128,9 @@ func WithMaxInProcessRequests(count int) Option { } } -// WithMaxProviders is the maximum number of providers that will be looked up -// per query. We only return providers that we can connect to. Defaults to 0, -// which means unbounded. +// WithMaxProviders sets the maximum number of providers that are looked up per +// find request. Only providers that we can connect to are returned. Defaults +// to 0, which means unlimited. func WithMaxProviders(count int) Option { return func(mgr *ProviderQueryManager) error { mgr.maxProviders = count @@ -140,9 +146,9 @@ func New(dialer ProviderQueryDialer, router ProviderQueryRouter, opts ...Option) dialer: dialer, router: router, providerQueryMessages: make(chan providerQueryMessage), - findProviderTimeout: defaultTimeout, - maxInProcessRequests: defaultMaxInProcessRequests, - maxProviders: defaultMaxProviders, + findProviderTimeout: DefaultTimeout, + maxInProcessRequests: DefaultMaxInProcessRequests, + maxProviders: DefaultMaxProviders, } for _, o := range opts {