Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share): Shwap Bitwap composition #3421

Merged
merged 40 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6fad566
composition redesign
Wondertan May 27, 2024
7d14e2b
remove generics and type assert
walldiss May 27, 2024
3041174
cleaning progress
Wondertan May 28, 2024
b3b64ba
duplicates test
Wondertan May 30, 2024
7cf5c63
populate missing containers
walldiss May 30, 2024
4943855
support for multipe populators
Wondertan May 30, 2024
108f450
remove populatorsMap and fix cid mapping for blocks
walldiss May 30, 2024
60b3a25
beutify
Wondertan May 30, 2024
a44b8b3
fix context
Wondertan May 30, 2024
94b8519
abort irrelevant change
Wondertan May 30, 2024
4ad1ac7
comment and API fixes
Wondertan Jun 4, 2024
e14c010
cid over the wire
Wondertan Jun 4, 2024
21bd564
simplify and deflakify(some cases) duplicate test
Wondertan Jun 5, 2024
3ac32ab
simplify note
Wondertan Jun 5, 2024
f1b3c49
extract readCID func
Wondertan Jun 11, 2024
9f8b0ec
generalize protobuf message and respective refactorings
Wondertan Jun 12, 2024
01439ef
migrate to Accessor
Wondertan Jun 12, 2024
5af5300
extract blockstore
Wondertan Jun 12, 2024
11ee5c7
split BlockFromEDS into several methods
Wondertan Jun 14, 2024
b25d590
populate -> marshal confusion
Wondertan Jun 14, 2024
9bd193f
improve bitswap tests and ensure there are multiple servers in the setup
Wondertan Jun 15, 2024
963b3da
notifyblock, cleanup on defer and revert duplicates and atomics
Wondertan Jun 15, 2024
e84e349
improve test for samples
Wondertan Jun 15, 2024
7732ce2
synchronize access to marshallers
Wondertan Jun 15, 2024
a6ec650
remove redundant allowlist
Wondertan Jun 15, 2024
ef45389
sessions and fetch limits
Wondertan Jun 20, 2024
491163d
pb comment
Wondertan Jun 20, 2024
502dbae
improve docs, logging and errors
Wondertan Jun 20, 2024
3e8ac7b
micro fix
Wondertan Jun 20, 2024
297e726
improve comments
Wondertan Jun 20, 2024
2b13d15
review comments
Wondertan Jun 24, 2024
9459c89
avoid nolint directive
Wondertan Jun 24, 2024
c947ce8
verify blocks aren't empty
Wondertan Jun 24, 2024
ff00d94
WithFetcher and WithStore options
Wondertan Jun 24, 2024
77fc979
improve comment
Wondertan Jun 24, 2024
b1d7285
untagle Block unmarshalling and extract proto funcs in a separate file
Wondertan Jun 25, 2024
99821a9
report number of empty blocks
Wondertan Jun 25, 2024
5ad3d76
introduce test block and avoid testing Fetch functions over real blocks
Wondertan Jun 25, 2024
4842fcb
comment improve and simplyf test construction
Wondertan Jun 26, 2024
f88f5ed
simplify and remove IsEmpty
Wondertan Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions share/shwap/p2p/bitswap/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package bitswap

import (
"context"

"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/share"
eds "github.com/celestiaorg/celestia-node/share/new_eds"
)

var log = logger.Logger("shwap/bitswap")

// Block represents Bitswap compatible generalization over Shwap containers.
// All Shwap containers must have a registered wrapper
// implementing the interface in order to be compatible with Bitswap.
// NOTE: This is not a Blockchain block, but an IPFS/Bitswap block.
type Block interface {
// CID returns Shwap ID of the Block formatted as CID.
CID() cid.Cid
// Height reports the Height of the Shwap container behind the Block.
Height() uint64

// IsEmpty reports whether the Block holds respective Shwap container.
IsEmpty() bool
// Populate fills up the Block with the Shwap container getting it out of the EDS
// Accessor.
Populate(context.Context, eds.Accessor) error
// Marshal serializes bytes of the Shwap Container the Block holds.
// MUST exclude the Shwap ID.
Marshal() ([]byte, error)
// UnmarshalFn returns closure that unmarshal the Block with the Shwap container.
// Unmarshalling involves data validation against the given Root.
UnmarshalFn(*share.Root) UnmarshalFn
}

// UnmarshalFn is a closure produced by a Block that unmarshalls and validates
// the given serialized bytes of a Shwap container and populates the Block with it on success.
type UnmarshalFn func([]byte) error
222 changes: 222 additions & 0 deletions share/shwap/p2p/bitswap/block_fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package bitswap

import (
"context"
"crypto/sha256"
"fmt"
"sync"

"github.com/ipfs/boxo/exchange"
"github.com/ipfs/go-cid"

"github.com/celestiaorg/celestia-node/share"
bitswappb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb"
)

// WithFetchSession instantiates a new Fetch session and saves it on the context.
// Session reuses peers known to have Blocks without rediscovering.
func WithFetchSession(ctx context.Context, exchg exchange.SessionExchange) context.Context {
fetcher := exchg.NewSession(ctx)
return context.WithValue(ctx, fetcherKey, fetcher)
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved

// Fetch fetches and populates given Blocks using Fetcher wrapping Bitswap.
//
// Validates Block against the given Root and skips Blocks that are already populated.
// Gracefully synchronize identical Blocks requested simultaneously.
// Blocks until either context is canceled or all Blocks are fetched and populated.
func Fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks ...Block) error {
for from, to := 0, 0; to < len(blks); { //nolint:wastedassign // it's not actually wasted
from, to = to, to+maxPerFetch
if to >= len(blks) {
to = len(blks)
}

err := fetch(ctx, exchg, root, blks[from:to]...)
walldiss marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
}

return ctx.Err()
}

// maxPerFetch sets the limit for maximum items in a single fetch.
// It's a heuristic coming from Bitswap, which apparently can't process more than ~1024 in a single
// GetBlock call. Going beyond that stalls the call indefinitely.
const maxPerFetch = 1024
walldiss marked this conversation as resolved.
Show resolved Hide resolved

// fetch fetches given Blocks.
// See [Fetch] for detailed description.
func fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks ...Block) error {
fetcher := getFetcher(ctx, exchg)
walldiss marked this conversation as resolved.
Show resolved Hide resolved
cids := make([]cid.Cid, 0, len(blks))
duplicates := make(map[cid.Cid]Block)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
for _, blk := range blks {
if !blk.IsEmpty() {
continue // skip populated Blocks
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved

cid := blk.CID() // memoize CID for reuse as it ain't free
cids = append(cids, cid)
walldiss marked this conversation as resolved.
Show resolved Hide resolved

// store the UnmarshalFn s.t. hasher can access it
// and fill in the Block
unmarshalFn := blk.UnmarshalFn(root)
_, exists := unmarshalFns.LoadOrStore(cid, &unmarshalEntry{UnmarshalFn: unmarshalFn})
if exists {
// the unmarshalFn has already been stored for the cid
// means there is ongoing fetch happening for the same cid
duplicates[cid] = blk // so mark the Block as duplicate
} else {
// cleanup are by the original requester and
// only after we are sure we got the block
defer unmarshalFns.Delete(cid)
walldiss marked this conversation as resolved.
Show resolved Hide resolved
}
}

blkCh, err := fetcher.GetBlocks(ctx, cids)
if err != nil {
return fmt.Errorf("requesting Bitswap blocks: %w", err)
}

for bitswapBlk := range blkCh { // GetBlocks closes blkCh on ctx cancellation
if err := exchg.NotifyNewBlocks(ctx, bitswapBlk); err != nil {
log.Error("failed to notify new Bitswap blocks: %w", err)
}

blk, ok := duplicates[bitswapBlk.Cid()]
if !ok {
// common case: the block was populated by the hasher, so skip
continue
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved
walldiss marked this conversation as resolved.
Show resolved Hide resolved
// uncommon duplicate case: concurrent fetching of the same block,
// so we have to unmarshal it ourselves instead of the hasher,
walldiss marked this conversation as resolved.
Show resolved Hide resolved
unmarshalFn := blk.UnmarshalFn(root)
_, err := unmarshal(unmarshalFn, bitswapBlk.RawData())
if err != nil {
// this means verification succeeded in the hasher but failed here
// this case should never happen in practice
// and if so something is really wrong
panic(fmt.Sprintf("unmarshaling duplicate block: %s", err))
}
// NOTE: This approach has a downside that we redo deserialization and computationally
// expensive computation for as many duplicates. We tried solutions that doesn't have this
// problem, but they are *much* more complex. Considering this a rare edge-case the tradeoff
// towards simplicity has been made.
walldiss marked this conversation as resolved.
Show resolved Hide resolved
}

return ctx.Err()
}

// unmarshal unmarshalls the Shwap Container data into a Block via UnmarshalFn
// If unmarshalFn is nil -- gets it from the global unmarshalFns.
func unmarshal(unmarshalFn UnmarshalFn, data []byte) ([]byte, error) {
var blk bitswappb.Block
err := blk.Unmarshal(data)
if err != nil {
return nil, fmt.Errorf("unmarshalling block: %w", err)
}
cid, err := cid.Cast(blk.Cid)
if err != nil {
return nil, fmt.Errorf("casting cid: %w", err)
}
// getBlock ID out of CID validating it
id, err := extractCID(cid)
if err != nil {
return nil, fmt.Errorf("validating cid: %w", err)
}

if unmarshalFn == nil {
// getBlock registered UnmarshalFn and use it to check data validity and
// pass it to Fetch caller
val, ok := unmarshalFns.Load(cid)
if !ok {
return nil, fmt.Errorf("no unmarshallers registered for %s", cid.String())
}
entry := val.(*unmarshalEntry)

entry.Lock()
defer entry.Unlock()
unmarshalFn = entry.UnmarshalFn
}

err = unmarshalFn(blk.Container)
if err != nil {
return nil, fmt.Errorf("verifying data: %w", err)
}

return id, nil
}

// unmarshalFns exist to communicate between Fetch and hasher, and it's global as a necessity
//
// Fetch registers UnmarshalFNs that hasher then uses to validate and unmarshal Block responses coming
// through Bitswap
//
// Bitswap does not provide *stateful* verification out of the box and by default
// messages are verified by their respective MultiHashes that are registered globally.
// For every Block type there is a global hasher registered that accesses stored UnmarshalFn once a
// message is received. It then uses UnmarshalFn to validate and fill in the respective Block
//
// sync.Map is used to minimize contention for disjoint keys
var unmarshalFns sync.Map

type unmarshalEntry struct {
sync.Mutex
UnmarshalFn
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved

// hasher implements hash.Hash to be registered as custom multihash
// hasher is the *hack* to inject custom verification logic into Bitswap
type hasher struct {
// IDSize of the respective Shwap container
IDSize int // to be set during hasher registration

sum []byte
}

func (h *hasher) Write(data []byte) (int, error) {
id, err := unmarshal(nil, data)
if err != nil {
err = fmt.Errorf("hasher: %w", err)
log.Error(err)
return 0, fmt.Errorf("shwap/bitswap: %w", err)
}
// set the id as resulting sum
// it's required for the sum to match the requested ID
// to satisfy hash contract and signal to Bitswap that data is correct
h.sum = id
return len(data), err
}

func (h *hasher) Sum([]byte) []byte {
return h.sum
}

func (h *hasher) Reset() {
h.sum = nil
}

func (h *hasher) Size() int {
return h.IDSize
}

func (h *hasher) BlockSize() int {
return sha256.BlockSize
}

type fetcherSessionKey struct{}

var fetcherKey fetcherSessionKey

// getFetcher takes context and a fetcher, if there is another fetcher in the context,
// it gets returned.
func getFetcher(ctx context.Context, fetcher exchange.Fetcher) exchange.Fetcher {
fetcherVal := ctx.Value(fetcherKey)
if fetcherVal == nil {
return fetcher
}

return fetcherVal.(exchange.Fetcher)
}
129 changes: 129 additions & 0 deletions share/shwap/p2p/bitswap/block_fetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package bitswap

import (
"context"
"math/rand/v2"
"sync"
"testing"
"time"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/routing/offline"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
eds "github.com/celestiaorg/celestia-node/share/new_eds"
)

func TestFetchDuplicates(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

eds := edstest.RandEDS(t, 4)
root, err := share.NewRoot(eds)
require.NoError(t, err)
fetcher := fetcher(ctx, t, eds)

var wg sync.WaitGroup
for i := range 100 {
blks := make([]Block, eds.Width())
for i := range blks {
blk, err := NewEmptyRowBlock(1, i, root) // create the same Block ID
require.NoError(t, err)
blks[i] = blk
}

wg.Add(1)
go func(i int) {
rint := rand.IntN(10)
// this sleep ensures fetches aren't started simultaneously, allowing to check for edge-cases
time.Sleep(time.Millisecond * time.Duration(rint))

err := Fetch(ctx, fetcher, root, blks...)
assert.NoError(t, err)
for _, blk := range blks {
assert.False(t, blk.IsEmpty())
}
wg.Done()
}(i)
}
wg.Wait()

var entries int
unmarshalFns.Range(func(key, _ any) bool {
unmarshalFns.Delete(key)
entries++
return true
})
require.Zero(t, entries)
}

func fetcher(ctx context.Context, t *testing.T, rsmt2dEds *rsmt2d.ExtendedDataSquare) exchange.SessionExchange {
bstore := &Blockstore{
Accessors: testAccessors{
Accessor: eds.Rsmt2D{ExtendedDataSquare: rsmt2dEds},
},
}

net, err := mocknet.FullMeshLinked(3)
require.NoError(t, err)

newServer(ctx, net.Hosts()[0], bstore)
newServer(ctx, net.Hosts()[1], bstore)

client := newClient(ctx, net.Hosts()[2], bstore)

err = net.ConnectAllButSelf()
require.NoError(t, err)
return client
}

func newServer(ctx context.Context, host host.Host, store blockstore.Blockstore) {
dstore := dssync.MutexWrap(ds.NewMapDatastore())
routing := offline.NewOfflineRouter(dstore, record.NamespacedValidator{})
net := network.NewFromIpfsHost(host, routing)
server := server.New(
ctx,
net,
store,
server.TaskWorkerCount(2),
server.EngineTaskWorkerCount(2),
server.ProvideEnabled(false),
server.SetSendDontHaves(false),
)
net.Start(server)
}

func newClient(ctx context.Context, host host.Host, store blockstore.Blockstore) *client.Client {
dstore := dssync.MutexWrap(ds.NewMapDatastore())
routing := offline.NewOfflineRouter(dstore, record.NamespacedValidator{})
net := network.NewFromIpfsHost(host, routing)
client := client.New(
ctx,
net,
store,
)
net.Start(client)
return client
}

type testAccessors struct {
eds.Accessor
}

func (t testAccessors) Get(context.Context, uint64) (eds.Accessor, error) {
return t.Accessor, nil
}
Loading
Loading