Skip to content

Commit

Permalink
do not send CANCEL to peer we got block from
Browse files Browse the repository at this point in the history
The serving peer cleans the client's wantlint after serving the block, making sending CANCEL to the serving peer redundant. So, exclude the serving peer when sending cancels after receiving a block.

Closes #694
  • Loading branch information
gammazero committed Jan 8, 2025
1 parent 13869e2 commit 3ae79f9
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 19 deletions.
4 changes: 2 additions & 2 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci

// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

// Send a CANCEL to each peer that has been sent a want-block or want-have
pm.pwm.sendCancels(cancelKs)
pm.pwm.sendCancels(cancelKs, excludePeer)
}

// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
Expand Down
49 changes: 46 additions & 3 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) {
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]})
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "")
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) {
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids)
peerManager.SendCancels(ctx, cids, "")
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -261,6 +261,49 @@ func TestSendCancels(t *testing.T) {
}
}

func TestSendCancelsExclude(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
self, peer1, peer2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)
cids := random.Cids(4)

// Connect to peer1 and peer2
peerManager.Connected(peer1)
peerManager.Connected(peer2)

// Send 2 want-blocks and 1 want-have to peer1
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})

// Clear messages
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1)
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 0 {
t.Fatal("Expected no cancels to be sent to excluded peer")
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids, "")
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 3 {
t.Fatal("Expected cancel to be sent for want-blocks")
}
}

func (s *sess) ID() uint64 {
return s.id
}
Expand Down Expand Up @@ -376,7 +419,7 @@ func BenchmarkPeerManager(b *testing.B) {
limit := len(wanted) / 10
cancel := wanted[:limit]
wanted = wanted[limit:]
peerManager.SendCancels(ctx, cancel)
peerManager.SendCancels(ctx, cancel, "")
}
}
}
3 changes: 2 additions & 1 deletion bitswap/client/internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves

// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID) {
if len(cancelKs) == 0 {
return
}
Expand Down Expand Up @@ -298,6 +298,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
cancelPeers[p] = struct{}{}
}
}
delete(cancelPeers, excludePeer)
for p := range cancelPeers {
pws, ok := pwm.peerWants[p]
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions bitswap/client/internal/peermanager/peerwantmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestPWMSendCancels(t *testing.T) {

// Cancel 1 want-block and 1 want-have that were sent to p0
clearSent(peerQueues)
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]})
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}, "")
// Should cancel the want-block and want-have
require.Empty(t, pq1.cancels, "Expected no cancels sent to p1")
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[0], wh1[0]}, "Expected 2 cids to be cancelled")
Expand All @@ -255,7 +255,7 @@ func TestPWMSendCancels(t *testing.T) {
// Cancel everything
clearSent(peerQueues)
allCids := append(allwb, allwh...)
pwm.sendCancels(allCids)
pwm.sendCancels(allCids, "")
// Should cancel the remaining want-blocks and want-haves for p0
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[1], wh1[1]}, "Expected un-cancelled cids to be cancelled")

Expand Down Expand Up @@ -312,7 +312,7 @@ func TestStats(t *testing.T) {
// Cancel 1 want-block that was sent to p0
// and 1 want-block that was not sent
cids5 := random.Cids(1)
pwm.sendCancels(append(cids5, cids[0]))
pwm.sendCancels(append(cids5, cids[0]), "")

require.Equal(t, 7, g.count, "Expected 7 wants")
require.Equal(t, 3, wbg.count, "Expected 3 want-blocks")
Expand All @@ -332,7 +332,7 @@ func TestStats(t *testing.T) {
require.Zero(t, wbg.count, "Expected 0 want-blocks")

// Cancel one remaining broadcast want-have
pwm.sendCancels(cids2[:1])
pwm.sendCancels(cids2[:1], "")
require.Equal(t, 2, g.count, "Expected 2 wants")
require.Zero(t, wbg.count, "Expected 0 want-blocks")
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) {
require.Equal(t, 4, wbg.count, "Expected 4 want-blocks")

// Cancel 1 of each group of cids
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]})
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}, "")

require.Equal(t, 2, g.count, "Expected 2 wants")
require.Equal(t, 2, wbg.count, "Expected 2 want-blocks")
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type PeerManager interface {
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
// SendCancels tells the PeerManager to send cancels to all peers
SendCancels(context.Context, []cid.Cid)
SendCancels(context.Context, []cid.Cid, peer.ID)
}

// SessionManager manages all the sessions
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci
case <-ctx.Done():
}
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand Down
6 changes: 3 additions & 3 deletions bitswap/client/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool {
return false
}

func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {}
func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid, peer.ID) {}

func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool {
pm.lk.Lock()
Expand Down
4 changes: 2 additions & 2 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid
}

// Send CANCEL to all peers with want-have / want-block
sm.peerManager.SendCancels(ctx, blks)
sm.peerManager.SendCancels(ctx, blks, p)
}

// CancelSessionWants is called when a session cancels wants because a call to
Expand All @@ -193,5 +193,5 @@ func (sm *SessionManager) cancelWants(wants []cid.Cid) {
// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context may already be Done.
sm.peerManager.SendCancels(sm.ctx, wants)
sm.peerManager.SendCancels(sm.ctx, wants, "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session)
func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true }
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {
fpm.lk.Lock()
defer fpm.lk.Unlock()
fpm.cancels = append(fpm.cancels, cancels...)
Expand Down

0 comments on commit 3ae79f9

Please sign in to comment.