Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v0.24.0 #683

Merged
merged 30 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
171b0b7
Merge pull request #670 from ipfs/release
lidel Sep 5, 2024
43eb24b
feat: add protocol filtering
2color Sep 12, 2024
30b853c
test: improve tests
2color Sep 12, 2024
a920f23
fix: remove negative filter tests and fix filter
2color Sep 13, 2024
3dc1a58
chore: add query params conditionally
2color Sep 17, 2024
496805d
fix: tests
2color Sep 17, 2024
a4e9def
chore: update changelog
2color Sep 17, 2024
da4a194
fix: ensure protocol filter is case-insensitive
2color Sep 17, 2024
3a6de23
feat: add generic filter iterator and use for fitlering
2color Sep 24, 2024
b9958d0
feat: proto & addr filter in peer routing endpoint
2color Sep 24, 2024
0a8bebd
fix: use PeerRecord in the FindPeers
2color Sep 24, 2024
825d82f
chore: ignore check
2color Sep 24, 2024
628b0f6
ipld/unixfs/hamt: catch panic in walkChildren (#393)
Jorropo Sep 24, 2024
b6ed0dc
fix: conversion from bitswap record
2color Sep 25, 2024
f210c0d
test: add addr and protocol tests to peer handler
2color Sep 25, 2024
ef3f6b6
Apply suggestions from code review
2color Sep 25, 2024
9811e83
fix: return nil when a record doesnt pass filter
2color Sep 25, 2024
51f200a
fix: rename to protocolsAllowed for readability
2color Sep 25, 2024
d8edbe6
fix: include addresses that passed negative filters
2color Sep 25, 2024
3d2a8e5
docs: improve comments and add test
2color Sep 25, 2024
61b5def
Merge branch 'main' into add-protocol-filtering
2color Sep 25, 2024
f13c862
test: add real world test case
lidel Sep 26, 2024
137d34f
Apply suggestions from code review
2color Sep 27, 2024
4af06fd
Merge pull request #671 from ipfs/add-protocol-filtering
2color Sep 27, 2024
19a402b
feat: option to not read size of blocks for want-have requests (#672)
gammazero Sep 27, 2024
4d0ae45
feat: add protocol and address filtering to delegated routing api (#678)
2color Oct 1, 2024
d62e031
Update to latest go-libp2p (#681)
gammazero Oct 3, 2024
f61a371
update version
gammazero Oct 3, 2024
8464618
docs(changelog): v0.24.0
lidel Oct 3, 2024
eac6c25
chore: update go-multiaddr-dns (#684)
gammazero Oct 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,26 @@ The following emojis are used to highlight certain changes:

### Security

## [v0.24.0]

### Added

* `boxo/bitswap/server`:
* A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672)
* `routing/http`:
* added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671) [#678](https://github.com/ipfs/boxo/pull/678)
* added support for address and protocol filtering to the delegated routing client ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#678](https://github.com/ipfs/boxo/pull/678). To add filtering to the client, use the [`WithFilterAddrs`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterAddrs) and [`WithFilterProtocols`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterProtocols) options when creating the client.Client-side filtering for servers that don't support filtering is enabled by default. To disable it, use the [`disableLocalFiltering`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#disableLocalFiltering) option when creating the client.

### Changed

### Removed

### Fixed

- `unixfs/hamt` Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393)

### Security

## [v0.23.0]

### Added
Expand Down
3 changes: 3 additions & 0 deletions bitswap/internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ const (
// RebroadcastDelay is the default delay to trigger broadcast of
// random CIDs in the wantlist.
RebroadcastDelay = time.Minute

// DefaultWantHaveReplaceSize controls the implicit behavior of WithWantHaveReplaceSize.
DefaultWantHaveReplaceSize = 1024
)
7 changes: 7 additions & 0 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@
return Option{server.WithTaskComparator(comparator)}
}

// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
// which the bitswap server will replace a WantHave with a WantBlock response.
// See [server.WithWantHaveReplaceSize] for details.
func WithWantHaveReplaceSize(size int) Option {
return Option{server.WithWantHaveReplaceSize(size)}

Check warning on line 78 in bitswap/options.go

View check run for this annotation

Codecov / codecov/patch

bitswap/options.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}

func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return Option{client.ProviderSearchDelay(newProvSearchDelay)}
}
Expand Down
36 changes: 36 additions & 0 deletions bitswap/server/internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,42 @@
return res, nil
}

func (bsm *blockstoreManager) hasBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]struct{}, error) {
if len(ks) == 0 {
return nil, nil
}

Check warning on line 127 in bitswap/server/internal/decision/blockstoremanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/blockstoremanager.go#L126-L127

Added lines #L126 - L127 were not covered by tests
hasBlocks := make([]bool, len(ks))

var count atomic.Int32
err := bsm.jobPerKey(ctx, ks, func(i int, c cid.Cid) {
has, err := bsm.bs.Has(ctx, c)
if err != nil {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.Has(%c) error: %s", c, err)
return
}

Check warning on line 137 in bitswap/server/internal/decision/blockstoremanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/blockstoremanager.go#L134-L137

Added lines #L134 - L137 were not covered by tests
if has {
hasBlocks[i] = true
count.Add(1)
}
})
if err != nil {
return nil, err
}

Check warning on line 145 in bitswap/server/internal/decision/blockstoremanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/blockstoremanager.go#L144-L145

Added lines #L144 - L145 were not covered by tests
results := count.Load()
if results == 0 {
return nil, nil
}

res := make(map[cid.Cid]struct{}, results)
for i, ok := range hasBlocks {
if ok {
res[ks[i]] = struct{}{}
}
}
return res, nil
}

func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
if len(ks) == 0 {
return nil, nil
Expand Down
15 changes: 4 additions & 11 deletions bitswap/server/internal/decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,22 @@ func TestBlockstoreManager(t *testing.T) {
cids = append(cids, b.Cid())
}

sizes, err := bsm.getBlockSizes(ctx, cids)
hasBlocks, err := bsm.hasBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != len(blks)-1 {
if len(hasBlocks) != len(blks)-1 {
t.Fatal("Wrong response length")
}

for _, c := range cids {
expSize := len(exp[c].RawData())
size, ok := sizes[c]

// Only the last key should be missing
_, ok := hasBlocks[c]
if c.Equals(cids[len(cids)-1]) {
if ok {
t.Fatal("Non-existent block should not be in sizes map")
}
} else {
if !ok {
t.Fatal("Block should be in sizes map")
}
if size != expSize {
t.Fatal("Block has wrong size")
t.Fatal("Block should be in hasBlocks")
}
}
}
Expand Down
130 changes: 76 additions & 54 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@
// queuedTagWeight is the default weight for peers that have work queued
// on their behalf.
queuedTagWeight = 10

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock = 1024
)

// Envelope contains a message for a Peer.
Expand Down Expand Up @@ -202,9 +198,9 @@

targetMessageSize int

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int
// wantHaveReplaceSize is the maximum size of the block in bytes up to
// which to replace a WantHave with a WantBlock.
wantHaveReplaceSize int

sendDontHaves bool

Expand Down Expand Up @@ -343,6 +339,14 @@
}
}

// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
// which to replace a WantHave with a WantBlock response.
func WithWantHaveReplaceSize(size int) Option {
return func(e *Engine) {
e.wantHaveReplaceSize = size
}
}

// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
return func(a, b *peertask.QueueTask) bool {
Expand All @@ -369,32 +373,14 @@
}

// NewEngine creates a new block sending engine for the given block store.
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum
// work already outstanding.
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer
// more tasks if it has some maximum work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
maxBlockSizeReplaceHasWithBlock,
opts...,
)
}

func newEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
maxReplaceSize int,
opts ...Option,
) *Engine {
e := &Engine{
scoreLedger: NewDefaultScoreLedger(),
Expand All @@ -404,7 +390,7 @@
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
wantHaveReplaceSize: defaults.DefaultWantHaveReplaceSize,
taskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
sendDontHaves: true,
self: self,
Expand Down Expand Up @@ -445,6 +431,12 @@

e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...)

if e.wantHaveReplaceSize == 0 {
log.Info("Replace WantHave with WantBlock is disabled")
} else {
log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize)
}

return e
}

Expand Down Expand Up @@ -689,16 +681,38 @@
return true
}

noReplace := e.wantHaveReplaceSize == 0

// Get block sizes for unique CIDs.
wantKs := cid.NewSet()
wantKs := make([]cid.Cid, 0, len(wants))
var haveKs []cid.Cid
for _, entry := range wants {
wantKs.Add(entry.Cid)
if noReplace && entry.WantType == pb.Message_Wantlist_Have {
haveKs = append(haveKs, entry.Cid)
} else {
wantKs = append(wantKs, entry.Cid)
}
}
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs)
if err != nil {
log.Info("aborting message processing", err)
return false
}
if len(haveKs) != 0 {
hasBlocks, err := e.bsm.hasBlocks(ctx, haveKs)
if err != nil {
log.Info("aborting message processing", err)
return false
}

Check warning on line 706 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L704-L706

Added lines #L704 - L706 were not covered by tests
if len(hasBlocks) != 0 {
if blockSizes == nil {
blockSizes = make(map[cid.Cid]int, len(hasBlocks))
}
for blkCid := range hasBlocks {
blockSizes[blkCid] = 0
}
}
}

e.lock.Lock()

Expand All @@ -707,20 +721,7 @@
}

var overflow []bsmsg.Entry
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
wants = filteredWants
}
wants, overflow = e.filterOverflow(p, wants, overflow)

if len(overflow) != 0 {
log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow))
Expand Down Expand Up @@ -764,7 +765,7 @@
sendDontHave(entry)
}

// For each want-have / want-block
// For each want-block
for _, entry := range wants {
c := entry.Cid
blockSize, found := blockSizes[c]
Expand All @@ -776,7 +777,10 @@
continue
}
// The block was found, add it to the queue
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

// Check if this is a want-block or a have-block that can be converted
// to a want-block.
isWantBlock := blockSize != 0 && e.sendAsBlock(entry.WantType, blockSize)

log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock)

Expand Down Expand Up @@ -810,6 +814,25 @@
return false
}

func (e *Engine) filterOverflow(p peer.ID, wants, overflow []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
if len(wants) == 0 {
return wants, overflow
}

filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
return filteredWants, overflow
}

// handleOverflow processes incoming wants that could not be addded to the peer
// ledger without exceeding the peer want limit. These are handled by trying to
// make room by canceling existing wants for which there is no block. If this
Expand Down Expand Up @@ -913,17 +936,17 @@
continue
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}

if et.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c)
} else {
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c)
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}

// Do not take more wants that can be handled.
if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) {
wants = append(wants, et)
Expand Down Expand Up @@ -1057,8 +1080,7 @@
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
isWantBlock := wantType == pb.Message_Wantlist_Block
return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
return wantType == pb.Message_Wantlist_Block || blockSize <= e.wantHaveReplaceSize
}

func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Expand Down
12 changes: 3 additions & 9 deletions bitswap/server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,11 @@ func newEngineForTesting(
bs blockstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
maxReplaceSize int,
wantHaveReplaceSize int,
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
maxReplaceSize,
opts...,
)
opts = append(opts, WithWantHaveReplaceSize(wantHaveReplaceSize))
return NewEngine(ctx, bs, peerTagger, self, opts...)
}

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
Expand Down
Loading
Loading