Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 18, 2023
1 parent 8173c35 commit e6118a3
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 54 deletions.
51 changes: 29 additions & 22 deletions fullrt.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"time"

"github.com/ipfs/boxo/provider"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
record "github.com/libp2p/go-libp2p-record"
Expand All @@ -31,20 +32,23 @@ import (
type FullRT struct {
*DHT

cfg *FullRTConfig
cfg *FullRTConfig
queryConfig *coord.QueryConfig
}

type FullRTConfig struct {
*Config
CrawlInterval time.Duration
QuorumFrac float64
CrawlInterval time.Duration
QuorumFrac float64
FindPeerConnectTimeout time.Duration
}

func DefaultFullRTConfig() *FullRTConfig {
return &FullRTConfig{
Config: DefaultConfig(),
CrawlInterval: time.Hour, // MAGIC
QuorumFrac: 0.25, // MAGIC
Config: DefaultConfig(),
CrawlInterval: time.Hour, // MAGIC
QuorumFrac: 0.25, // MAGIC
FindPeerConnectTimeout: 5 * time.Second,
}
}

Expand All @@ -58,15 +62,20 @@ func NewFullRT(h host.Host, cfg *FullRTConfig) (*FullRT, error) {
cfg.Query.DefaultQuorum = int(float64(cfg.BucketSize) * cfg.QuorumFrac)
}

qcfg := coord.DefaultQueryConfig()
qcfg.NumResults = cfg.BucketSize
qcfg.Strategy = &query.StrategyStatic{}

frt := &FullRT{
DHT: d,
cfg: cfg,
DHT: d,
cfg: cfg,
queryConfig: qcfg,
}

return frt, nil
}

var _ routing.Routing = (*FullRT)(nil)
var _ provider.ProvideMany = (*FullRT)(nil)

func (f *FullRT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
ctx, span := f.tele.Tracer.Start(ctx, "FullRT.FindPeer")
Expand Down Expand Up @@ -102,7 +111,7 @@ func (f *FullRT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, erro
}

// start the query with a static set of peers (see queryConfig)
_, _, err = f.kad.QueryClosest(ctx, kadt.PeerID(pid).Key(), fn, f.queryConfig())
_, _, err = f.kad.QueryClosest(ctx, kadt.PeerID(pid).Key(), fn, f.queryConfig)
if err != nil {
return peer.AddrInfo{}, fmt.Errorf("failed to run query: %w", err)
}
Expand All @@ -119,7 +128,7 @@ func (f *FullRT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, erro
}

// connect to peer (this also happens in the non-fullrt case)
connCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // TODO: put timeout in config
connCtx, cancel := context.WithTimeout(ctx, f.cfg.FindPeerConnectTimeout)
defer cancel()
_ = f.host.Connect(connCtx, peer.AddrInfo{
ID: pid,
Expand Down Expand Up @@ -274,7 +283,7 @@ func (f *FullRT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count
return nil
}

_, _, err = f.kad.QueryMessage(ctx, msg, fn, f.queryConfig())
_, _, err = f.kad.QueryMessage(ctx, msg, fn, f.queryConfig)
if err != nil {
span.RecordError(err)
f.log.Warn("Failed querying", slog.String("cid", c.String()), slog.String("err", err.Error()))
Expand Down Expand Up @@ -340,13 +349,6 @@ func (f *FullRT) Bootstrap(ctx context.Context) error {
return f.kad.Crawl(ctx, seed)
}

func (f *FullRT) queryConfig() *coord.QueryConfig {
cfg := coord.DefaultQueryConfig()
cfg.NumResults = f.cfg.BucketSize
cfg.Strategy = &query.StrategyStatic{}
return cfg
}

func (f *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
ctx, span := f.tele.Tracer.Start(ctx, "FullRT.GetValue")
defer span.End()
Expand Down Expand Up @@ -510,7 +512,7 @@ func (f *FullRT) searchValueRoutine(ctx context.Context, backend Backend, ns str
return nil
}

_, _, err := f.kad.QueryMessage(ctx, req, fn, f.queryConfig())
_, _, err := f.kad.QueryMessage(ctx, req, fn, f.queryConfig)
if err != nil {
f.warnErr(err, "Search value query failed")
return
Expand Down Expand Up @@ -568,7 +570,12 @@ func (f *FullRT) ProvideMany(ctx context.Context, mhashes []mh.Multihash) error
keys = append(keys, kadt.NewKey(mhash))
}

return f.kad.BroadcastMany(ctx, keys, msgFn)
// track successes
fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message) {
// TODO
}

return f.kad.BroadcastMany(ctx, keys, fn, msgFn)
}

func (f *FullRT) PutMany(ctx context.Context, keySlice []string, valueSlice [][]byte) error {
Expand Down Expand Up @@ -618,5 +625,5 @@ func (f *FullRT) PutMany(ctx context.Context, keySlice []string, valueSlice [][]
}
}

return f.kad.BroadcastMany(ctx, kadKeys, msgFn)
return f.kad.BroadcastMany(ctx, kadKeys, nil, msgFn)
}
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-block-format v0.1.2 // indirect
github.com/ipfs/go-cidutil v0.1.0 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-format v0.5.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipld/go-ipld-prime v0.21.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
Expand Down
20 changes: 20 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -122,24 +123,39 @@ github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyf
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huin/goupnp v1.2.0 h1:uOKW26NG1hsSSbXIZ1IR7XP9Gjd1U8pnLaCMgntmkmY=
github.com/huin/goupnp v1.2.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.12.0 h1:AXHg/1ONZdRQHQLgG5JHsSC3XoE4DjCAMgK+asZvUcQ=
github.com/ipfs/boxo v0.12.0/go.mod h1:xAnfiU6PtxWCnRqu7dcXQ10bB5/kvI1kXRotuGqGBhg=
github.com/ipfs/go-block-format v0.1.2 h1:GAjkfhVx1f4YTODS6Esrj1wt2HhrtwTnhEr+DyPUaJo=
github.com/ipfs/go-block-format v0.1.2/go.mod h1:mACVcrxarQKstUU3Yf/RdwbC4DzPV6++rO2a3d+a/KE=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk=
github.com/ipfs/go-cidutil v0.1.0 h1:RW5hO7Vcf16dplUU60Hs0AKDkQAVPVplr7lk97CFL+Q=
github.com/ipfs/go-cidutil v0.1.0/go.mod h1:e7OEVBMIv9JaOxt9zaGEmAoSlXW9jdFZ5lP/0PwcfpA=
github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk=
github.com/ipfs/go-datastore v0.6.1-0.20230901172804-1caa2449ed7c h1:iSyhKydtSJiEkmf5O3KizuySDB0zgyWPth76NACTMVI=
github.com/ipfs/go-datastore v0.6.1-0.20230901172804-1caa2449ed7c/go.mod h1:3Et7HSjOA8tPu9OjYuDZxLAgBLfvlNMD4r8BIuri9eo=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo=
github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ=
github.com/ipfs/go-ipfs-pq v0.0.3 h1:YpoHVJB+jzK15mr/xsWC574tyDLkezVrDNeaalQBsTE=
github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8=
github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ=
github.com/ipfs/go-ipld-format v0.5.0 h1:WyEle9K96MSrvr47zZHKKcDxJ/vlpET6PSiQsAFO+Ds=
github.com/ipfs/go-ipld-format v0.5.0/go.mod h1:ImdZqJQaEouMjCvqCe0ORUS+uoBmf7Hf+EO/jh+nk3M=
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY=
github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg=
github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E=
github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down Expand Up @@ -223,6 +239,7 @@ github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE=
Expand All @@ -242,11 +259,13 @@ github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6o
github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg=
github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U=
github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM=
github.com/multiformats/go-multistream v0.4.1 h1:rFy0Iiyn3YT0asivDUIR05leAdwZq3de4741sbiSdfo=
github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqdtyNUEhKSM0Lwar2p77Q=
github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
Expand Down Expand Up @@ -351,6 +370,7 @@ github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
6 changes: 3 additions & 3 deletions internal/coord/behaviour.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (w *Waiter[E]) Chan() <-chan WaiterEvent[E] {
}

// A QueryMonitor receives event notifications on the progress of a query
type QueryMonitor[E TerminalQueryEvent] interface {
type QueryMonitor[E TerminalBehaviourEvent] interface {
// NotifyProgressed returns a channel that can be used to send notification that a
// query has made progress. If the notification cannot be sent then it will be
// queued and retried at a later time. If the query completes before the progress
Expand All @@ -168,15 +168,15 @@ type QueryMonitor[E TerminalQueryEvent] interface {

// QueryMonitorHook wraps a [QueryMonitor] interface and provides hooks
// that are invoked before calls to the QueryMonitor methods are forwarded.
type QueryMonitorHook[E TerminalQueryEvent] struct {
type QueryMonitorHook[E TerminalBehaviourEvent] struct {
qm QueryMonitor[E]
BeforeProgressed func()
BeforeFinished func()
}

var _ QueryMonitor[*EventQueryFinished] = (*QueryMonitorHook[*EventQueryFinished])(nil)

func NewQueryMonitorHook[E TerminalQueryEvent](qm QueryMonitor[E]) *QueryMonitorHook[E] {
func NewQueryMonitorHook[E TerminalBehaviourEvent](qm QueryMonitor[E]) *QueryMonitorHook[E] {
return &QueryMonitorHook[E]{
qm: qm,
BeforeProgressed: func() {},
Expand Down
7 changes: 5 additions & 2 deletions internal/coord/brdcst/followup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ type FollowUp[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {
// the logic much easier to implement.
pool *query.Pool[K, N, M]

// TODO: ...
// started indicates that this state machine has sent out the first
// message to a node. Even after this state machine has returned a finished
// state, this flag will stay true.
started bool

// the message generator that takes a target key and will return the message
// that we will send to the closest nodes in the follow-up phase
msgFunc func(K) M

// TODO:
// seed holds the nodes from where we should start our query to find closer
// nodes to the target key (held by [ConfigFollowUp]).
seed []N

// the closest nodes to the target key. This will be filled after the query
Expand Down
16 changes: 8 additions & 8 deletions internal/coord/brdcst/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ type StatePoolWaiting struct{}
type StatePoolStoreRecord[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {
QueryID coordt.QueryID // the id of the broadcast operation that wants to send the message
NodeID N // the node to send the message to
Target K
Message M // the message that should be sent to the remote node
Target K // the key we want to store a record for
Message M // the message that should be sent to the remote node
}

// StatePoolBroadcastFinished indicates that the broadcast operation with the
Expand Down Expand Up @@ -347,9 +347,9 @@ type EventPoolGetCloserNodesFailure[K kad.Key[K], N kad.NodeID[K]] struct {
type EventPoolStoreRecordSuccess[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {
QueryID coordt.QueryID // the id of the query that sent the message
NodeID N // the node the message was sent to
Target K
Request M // the message that was sent to the remote node
Response M // the reply we got from the remote node (nil in many cases of the Amino DHT)
Target K // the key we successfully stored a record for
Request M // the message that was sent to the remote node
Response M // the reply we got from the remote node (nil in many cases of the Amino DHT)
}

// EventPoolStoreRecordFailure noties the broadcast [Pool] that storing a record
Expand All @@ -358,9 +358,9 @@ type EventPoolStoreRecordSuccess[K kad.Key[K], N kad.NodeID[K], M coordt.Message
type EventPoolStoreRecordFailure[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {
QueryID coordt.QueryID // the id of the query that sent the message
NodeID N // the node the message was sent to
Target K
Request M // the message that was sent to the remote node
Error error // the error that caused the failure
Target K // the key we failed to store a record for
Request M // the message that was sent to the remote node
Error error // the error that caused the failure
}

// poolEvent() ensures that only events accepted by a broadcast [Pool] can be
Expand Down
24 changes: 16 additions & 8 deletions internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coor

if cfg == nil {
cfg = DefaultQueryConfig()
} else if err := cfg.Validate(); err != nil {
return nil, coordt.QueryStats{}, fmt.Errorf("validate query config: %w", err)
}
// TODO: validate config

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -392,15 +393,15 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coor

func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message, seed []kadt.PeerID) error {
msgFunc := func(k kadt.Key) *pb.Message { return msg }
return c.broadcast(ctx, msgFunc, seed, brdcst.DefaultConfigFollowUp(msg.Target()))
return c.broadcast(ctx, msgFunc, seed, coordt.BrdcstFuncNoop, brdcst.DefaultConfigFollowUp(msg.Target()))
}

func (c *Coordinator) BroadcastStatic(ctx context.Context, msg *pb.Message, seed []kadt.PeerID) error {
msgFunc := func(k kadt.Key) *pb.Message { return msg }
return c.broadcast(ctx, msgFunc, seed, brdcst.DefaultConfigOneToMany(msg.Target()))
return c.broadcast(ctx, msgFunc, seed, coordt.BrdcstFuncNoop, brdcst.DefaultConfigOneToMany(msg.Target()))
}

func (c *Coordinator) BroadcastMany(ctx context.Context, keys []kadt.Key, msgFn func(k kadt.Key) *pb.Message) error {
func (c *Coordinator) BroadcastMany(ctx context.Context, keys []kadt.Key, fn coordt.BrdcstFunc, msgFn func(k kadt.Key) *pb.Message) error {
// verify that we have keys to push into the network
if len(keys) == 0 {
return fmt.Errorf("no keys to broadcast")
Expand All @@ -410,10 +411,10 @@ func (c *Coordinator) BroadcastMany(ctx context.Context, keys []kadt.Key, msgFn
seed := c.rt.NearestNodes(keys[0], math.MaxInt)

// start broadcasting
return c.broadcast(ctx, msgFn, seed, brdcst.DefaultConfigManyToMany(keys))
return c.broadcast(ctx, msgFn, seed, fn, brdcst.DefaultConfigManyToMany(keys))
}

func (c *Coordinator) broadcast(ctx context.Context, msgFunc func(k kadt.Key) *pb.Message, seed []kadt.PeerID, cfg brdcst.Config) error {
func (c *Coordinator) broadcast(ctx context.Context, msgFunc func(k kadt.Key) *pb.Message, seed []kadt.PeerID, fn coordt.BrdcstFunc, cfg brdcst.Config) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.broadcast")
defer span.End()

Expand All @@ -434,7 +435,7 @@ func (c *Coordinator) broadcast(ctx context.Context, msgFunc func(k kadt.Key) *p
// queue the start of the query
c.brdcstBehaviour.Notify(ctx, cmd)

contacted, _, err := c.waitForBroadcast(ctx, waiter)
contacted, _, err := c.waitForBroadcast(ctx, waiter, fn)
if err != nil {
return err
}
Expand Down Expand Up @@ -507,7 +508,7 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID,
}
}

func (c *Coordinator) waitForBroadcast(ctx context.Context, waiter *BroadcastWaiter) ([]kadt.PeerID, map[string]struct {
func (c *Coordinator) waitForBroadcast(ctx context.Context, waiter *BroadcastWaiter, fn coordt.BrdcstFunc) ([]kadt.PeerID, map[string]struct {
Node kadt.PeerID
Err error
}, error,
Expand All @@ -516,6 +517,13 @@ func (c *Coordinator) waitForBroadcast(ctx context.Context, waiter *BroadcastWai
select {
case <-ctx.Done():
return nil, nil, ctx.Err()

case wev, more := <-waiter.Progressed():
if !more {
return nil, nil, ctx.Err()
}
fn(wev.Ctx, wev.Event.NodeID, wev.Event.Response)

case wev, more := <-waiter.Finished():
if !more {
return nil, nil, ctx.Err()
Expand Down
Loading

0 comments on commit e6118a3

Please sign in to comment.