Skip to content

Commit

Permalink
Merge branch 'main' into fix/ipns-protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero authored Jan 20, 2025
2 parents 8875c82 + de52bc8 commit c13acda
Show file tree
Hide file tree
Showing 19 changed files with 170 additions and 166 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes:
- `gateway` Support for custom DNSLink / DoH resolvers on `localhost` to simplify integration with non-ICANN DNS systems [#645](https://github.com/ipfs/boxo/pull/645)

### Changed
- Do not send CANCEL to peer that block was received from, as this is redundant. [#784](https://github.com/ipfs/boxo/pull/784)

- `gateway` The default DNSLink resolver for `.eth` TLD changed to `https://dns.eth.limo/dns-query` [#781](https://github.com/ipfs/boxo/pull/781)
- `gateway` The default DNSLink resolver for `.crypto` TLD changed to `https://resolver.unstoppable.io/dns-query` [#782](https://github.com/ipfs/boxo/pull/782)
Expand Down
7 changes: 7 additions & 0 deletions FUNDING.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"drips": {
"filecoin": {
"ownedBy": "0xEF22379b7527762a00FC5820AF55BdE363624f03"
}
}
}
8 changes: 2 additions & 6 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ func WithoutDuplicatedBlockStats() Option {
// lookups. The bitswap default ProviderQueryManager uses these options, which
// may be more conservative than the ProviderQueryManager defaults:
//
// - WithMaxInProcessRequests(16)
// - WithMaxProviders(10)
// - WithMaxTimeout(10 *time.Second)
// - WithMaxProviders(defaults.BitswapClientDefaultMaxProviders)
//
// To use a custom ProviderQueryManager, set to false and wrap directly the
// content router provided with the WithContentRouting() option. Only takes
Expand Down Expand Up @@ -196,9 +194,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
if bs.providerFinder != nil && bs.defaultProviderQueryManager {
// network can do dialing.
pqm, err := rpqm.New(network, bs.providerFinder,
rpqm.WithMaxInProcessRequests(16),
rpqm.WithMaxProviders(10),
rpqm.WithMaxTimeout(10*time.Second))
rpqm.WithMaxProviders(defaults.BitswapClientDefaultMaxProviders))
if err != nil {
// Should not be possible to hit this
panic(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ type BlockPresenceManager struct {
}

func New() *BlockPresenceManager {
return &BlockPresenceManager{}
return &BlockPresenceManager{
presence: make(map[cid.Cid]map[peer.ID]bool),
}
}

// ReceiveFrom is called when a peer sends us information about which blocks
Expand All @@ -24,10 +26,6 @@ func (bpm *BlockPresenceManager) ReceiveFrom(p peer.ID, haves []cid.Cid, dontHav
bpm.Lock()
defer bpm.Unlock()

if bpm.presence == nil {
bpm.presence = make(map[cid.Cid]map[peer.ID]bool)
}

for _, c := range haves {
bpm.updateBlockPresence(p, c, true)
}
Expand All @@ -39,7 +37,8 @@ func (bpm *BlockPresenceManager) ReceiveFrom(p peer.ID, haves []cid.Cid, dontHav
func (bpm *BlockPresenceManager) updateBlockPresence(p peer.ID, c cid.Cid, present bool) {
_, ok := bpm.presence[c]
if !ok {
bpm.presence[c] = make(map[peer.ID]bool)
bpm.presence[c] = map[peer.ID]bool{p: present}
return
}

// Make sure not to change HAVE to DONT_HAVE
Expand Down Expand Up @@ -121,9 +120,6 @@ func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) {
for _, c := range ks {
delete(bpm.presence, c)
}
if len(bpm.presence) == 0 {
bpm.presence = nil
}
}

// RemovePeer removes the given peer from every cid key in the presence map.
Expand All @@ -137,9 +133,6 @@ func (bpm *BlockPresenceManager) RemovePeer(p peer.ID) {
delete(bpm.presence, c)
}
}
if len(bpm.presence) == 0 {
bpm.presence = nil
}
}

// HasKey indicates whether the BlockPresenceManager is tracking the given key
Expand Down
71 changes: 36 additions & 35 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,66 +125,67 @@ func newRecallWantList() recallWantlist {
}
}

// Add want to the pending list
func (r *recallWantlist) Add(c cid.Cid, priority int32, wtype pb.Message_Wantlist_WantType) {
// add want to the pending list
func (r *recallWantlist) add(c cid.Cid, priority int32, wtype pb.Message_Wantlist_WantType) {
r.pending.Add(c, priority, wtype)
}

// Remove wants from both the pending list and the list of sent wants
func (r *recallWantlist) Remove(c cid.Cid) {
// remove wants from both the pending list and the list of sent wants
func (r *recallWantlist) remove(c cid.Cid) {
r.pending.Remove(c)
r.sent.Remove(c)
delete(r.sentAt, c)
}

// Remove wants by type from both the pending list and the list of sent wants
func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantType) {
// remove wants by type from both the pending list and the list of sent wants
func (r *recallWantlist) removeType(c cid.Cid, wtype pb.Message_Wantlist_WantType) {
r.pending.RemoveType(c, wtype)
r.sent.RemoveType(c, wtype)
if _, ok := r.sent.Contains(c); !ok {
if !r.sent.Has(c) {
delete(r.sentAt, c)
}
}

// MarkSent moves the want from the pending to the sent list
// markSent moves the want from the pending to the sent list
//
// Returns true if the want was marked as sent. Returns false if the want wasn't
// pending.
func (r *recallWantlist) MarkSent(e bswl.Entry) bool {
func (r *recallWantlist) markSent(e bswl.Entry) bool {
if !r.pending.RemoveType(e.Cid, e.WantType) {
return false
}
r.sent.Add(e.Cid, e.Priority, e.WantType)
return true
}

// SentAt records the time at which a want was sent
func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) {
// setSentAt records the time at which a want was sent
func (r *recallWantlist) setSentAt(c cid.Cid, at time.Time) {
// The want may have been canceled in the interim
if _, ok := r.sent.Contains(c); ok {
if r.sent.Has(c) {
if _, ok := r.sentAt[c]; !ok {
r.sentAt[c] = at
}
}
}

// ClearSentAt clears out the record of the time a want was sent.
// clearSentAt clears out the record of the time a want was sent.
// We clear the sent at time when we receive a response for a key as we
// only need the first response for latency measurement.
func (r *recallWantlist) ClearSentAt(c cid.Cid) {
func (r *recallWantlist) clearSentAt(c cid.Cid) {
delete(r.sentAt, c)
}

// Refresh moves wants from the sent list back to the pending list.
// refresh moves wants from the sent list back to the pending list.
// If a want has been sent for longer than the interval, it is moved back to the pending list.
// Returns the number of wants that were refreshed.
func (r *recallWantlist) Refresh(now time.Time, interval time.Duration) int {
func (r *recallWantlist) refresh(now time.Time, interval time.Duration) int {
var refreshed int
for _, want := range r.sent.Entries() {
sentAt, ok := r.sentAt[want.Cid]
wantCid := want.Cid
sentAt, ok := r.sentAt[wantCid]
if ok && now.Sub(sentAt) >= interval {
r.pending.Add(want.Cid, want.Priority, want.WantType)
r.sent.Remove(want.Cid)
r.sent.Remove(wantCid)
r.pending.Add(wantCid, want.Priority, want.WantType)
refreshed++
}
}
Expand Down Expand Up @@ -320,7 +321,7 @@ func (mq *MessageQueue) AddBroadcastWantHaves(wantHaves []cid.Cid) {
mq.wllock.Lock()

for _, c := range wantHaves {
mq.bcstWants.Add(c, mq.priority, pb.Message_Wantlist_Have)
mq.bcstWants.add(c, mq.priority, pb.Message_Wantlist_Have)
mq.priority--

// We're adding a want-have for the cid, so clear any pending cancel
Expand All @@ -343,15 +344,15 @@ func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid) {
mq.wllock.Lock()

for _, c := range wantHaves {
mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Have)
mq.peerWants.add(c, mq.priority, pb.Message_Wantlist_Have)
mq.priority--

// We're adding a want-have for the cid, so clear any pending cancel
// for the cid
mq.cancels.Remove(c)
}
for _, c := range wantBlocks {
mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Block)
mq.peerWants.add(c, mq.priority, pb.Message_Wantlist_Block)
mq.priority--

// We're adding a want-block for the cid, so clear any pending cancel
Expand Down Expand Up @@ -383,12 +384,12 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
// Remove keys from broadcast and peer wants, and add to cancels
for _, c := range cancelKs {
// Check if a want for the key was sent
_, wasSentBcst := mq.bcstWants.sent.Contains(c)
_, wasSentPeer := mq.peerWants.sent.Contains(c)
wasSentBcst := mq.bcstWants.sent.Has(c)
wasSentPeer := mq.peerWants.sent.Has(c)

// Remove the want from tracking wantlists
mq.bcstWants.Remove(c)
mq.peerWants.Remove(c)
mq.bcstWants.remove(c)
mq.peerWants.remove(c)

// Only send a cancel if a want was sent
if wasSentBcst || wasSentPeer {
Expand Down Expand Up @@ -524,7 +525,7 @@ func (mq *MessageQueue) runQueue() {
func (mq *MessageQueue) rebroadcastWantlist(now time.Time, interval time.Duration) {
mq.wllock.Lock()
// Transfer wants from the rebroadcast lists into the pending lists.
toRebroadcast := mq.bcstWants.Refresh(now, interval) + mq.peerWants.Refresh(now, interval)
toRebroadcast := mq.bcstWants.refresh(now, interval) + mq.peerWants.refresh(now, interval)
mq.wllock.Unlock()

// If some wants were transferred from the rebroadcast list
Expand Down Expand Up @@ -614,7 +615,7 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
// Unlikely, but just in case check that the block hasn't been
// received in the interim
c := entry.Cid
if _, ok := mq.peerWants.sent.Contains(c); ok {
if mq.peerWants.sent.Has(c) {
wants = append(wants, c)
}
}
Expand Down Expand Up @@ -652,7 +653,7 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency {
earliest = at
}
mq.bcstWants.ClearSentAt(c)
mq.bcstWants.clearSentAt(c)
}
if at, ok := mq.peerWants.sentAt[c]; ok {
if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency {
Expand All @@ -661,7 +662,7 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
// Clear out the sent time for the CID because we only want to
// record the latency between the request and the first response
// for that CID (not subsequent responses)
mq.peerWants.ClearSentAt(c)
mq.peerWants.clearSentAt(c)
}
}

Expand Down Expand Up @@ -752,7 +753,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
// place if possible.
for _, e := range peerEntries {
if e.WantType == pb.Message_Wantlist_Have {
mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have)
mq.peerWants.removeType(e.Cid, pb.Message_Wantlist_Have)
} else {
filteredPeerEntries = append(filteredPeerEntries, e)
}
Expand Down Expand Up @@ -817,15 +818,15 @@ FINISH:
// message that we've decided to cancel at the last minute.
mq.wllock.Lock()
for i, e := range peerEntries[:sentPeerEntries] {
if !mq.peerWants.MarkSent(e) {
if !mq.peerWants.markSent(e) {
// It changed.
mq.msg.Remove(e.Cid)
peerEntries[i].Cid = cid.Undef
}
}

for i, e := range bcstEntries[:sentBcstEntries] {
if !mq.bcstWants.MarkSent(e) {
if !mq.bcstWants.markSent(e) {
mq.msg.Remove(e.Cid)
bcstEntries[i].Cid = cid.Undef
}
Expand All @@ -849,13 +850,13 @@ FINISH:

for _, e := range peerEntries[:sentPeerEntries] {
if e.Cid.Defined() { // Check if want was canceled in the interim
mq.peerWants.SentAt(e.Cid, now)
mq.peerWants.setSentAt(e.Cid, now)
}
}

for _, e := range bcstEntries[:sentBcstEntries] {
if e.Cid.Defined() { // Check if want was canceled in the interim
mq.bcstWants.SentAt(e.Cid, now)
mq.bcstWants.setSentAt(e.Cid, now)
}
}

Expand Down
4 changes: 2 additions & 2 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci

// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

// Send a CANCEL to each peer that has been sent a want-block or want-have
pm.pwm.sendCancels(cancelKs)
pm.pwm.sendCancels(cancelKs, excludePeer)
}

// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
Expand Down
49 changes: 46 additions & 3 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) {
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]})
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "")
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) {
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids)
peerManager.SendCancels(ctx, cids, "")
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -261,6 +261,49 @@ func TestSendCancels(t *testing.T) {
}
}

func TestSendCancelsExclude(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
self, peer1, peer2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)
cids := random.Cids(4)

// Connect to peer1 and peer2
peerManager.Connected(peer1)
peerManager.Connected(peer2)

// Send 2 want-blocks and 1 want-have to peer1
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})

// Clear messages
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1)
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 0 {
t.Fatal("Expected no cancels to be sent to excluded peer")
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids, "")
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 3 {
t.Fatal("Expected cancel to be sent for want-blocks")
}
}

func (s *sess) ID() uint64 {
return s.id
}
Expand Down Expand Up @@ -376,7 +419,7 @@ func BenchmarkPeerManager(b *testing.B) {
limit := len(wanted) / 10
cancel := wanted[:limit]
wanted = wanted[limit:]
peerManager.SendCancels(ctx, cancel)
peerManager.SendCancels(ctx, cancel, "")
}
}
}
Loading

0 comments on commit c13acda

Please sign in to comment.