From e6118a3f1564048e976cbdf8f4feb5dab6026fd2 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Wed, 18 Oct 2023 11:02:24 +0200 Subject: [PATCH] WIP --- fullrt.go | 51 +++++++++++++++++------------- go.mod | 6 ++++ go.sum | 20 ++++++++++++ internal/coord/behaviour.go | 6 ++-- internal/coord/brdcst/followup.go | 7 ++-- internal/coord/brdcst/pool.go | 16 +++++----- internal/coord/coordinator.go | 24 +++++++++----- internal/coord/coordt/coretypes.go | 7 +++- internal/coord/event.go | 34 +++++++++++++++----- internal/coord/query.go | 2 +- internal/coord/routing.go | 1 - 11 files changed, 120 insertions(+), 54 deletions(-) diff --git a/fullrt.go b/fullrt.go index 60805bc..493118d 100644 --- a/fullrt.go +++ b/fullrt.go @@ -7,6 +7,7 @@ import ( "fmt" "time" + "github.com/ipfs/boxo/provider" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" record "github.com/libp2p/go-libp2p-record" @@ -31,20 +32,23 @@ import ( type FullRT struct { *DHT - cfg *FullRTConfig + cfg *FullRTConfig + queryConfig *coord.QueryConfig } type FullRTConfig struct { *Config - CrawlInterval time.Duration - QuorumFrac float64 + CrawlInterval time.Duration + QuorumFrac float64 + FindPeerConnectTimeout time.Duration } func DefaultFullRTConfig() *FullRTConfig { return &FullRTConfig{ - Config: DefaultConfig(), - CrawlInterval: time.Hour, // MAGIC - QuorumFrac: 0.25, // MAGIC + Config: DefaultConfig(), + CrawlInterval: time.Hour, // MAGIC + QuorumFrac: 0.25, // MAGIC + FindPeerConnectTimeout: 5 * time.Second, } } @@ -58,15 +62,20 @@ func NewFullRT(h host.Host, cfg *FullRTConfig) (*FullRT, error) { cfg.Query.DefaultQuorum = int(float64(cfg.BucketSize) * cfg.QuorumFrac) } + qcfg := coord.DefaultQueryConfig() + qcfg.NumResults = cfg.BucketSize + qcfg.Strategy = &query.StrategyStatic{} + frt := &FullRT{ - DHT: d, - cfg: cfg, + DHT: d, + cfg: cfg, + queryConfig: qcfg, } return frt, nil } -var _ routing.Routing = (*FullRT)(nil) +var _ provider.ProvideMany = (*FullRT)(nil) func (f *FullRT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) { ctx, span := f.tele.Tracer.Start(ctx, "FullRT.FindPeer") @@ -102,7 +111,7 @@ func (f *FullRT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, erro } // start the query with a static set of peers (see queryConfig) - _, _, err = f.kad.QueryClosest(ctx, kadt.PeerID(pid).Key(), fn, f.queryConfig()) + _, _, err = f.kad.QueryClosest(ctx, kadt.PeerID(pid).Key(), fn, f.queryConfig) if err != nil { return peer.AddrInfo{}, fmt.Errorf("failed to run query: %w", err) } @@ -119,7 +128,7 @@ func (f *FullRT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, erro } // connect to peer (this also happens in the non-fullrt case) - connCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // TODO: put timeout in config + connCtx, cancel := context.WithTimeout(ctx, f.cfg.FindPeerConnectTimeout) defer cancel() _ = f.host.Connect(connCtx, peer.AddrInfo{ ID: pid, @@ -274,7 +283,7 @@ func (f *FullRT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count return nil } - _, _, err = f.kad.QueryMessage(ctx, msg, fn, f.queryConfig()) + _, _, err = f.kad.QueryMessage(ctx, msg, fn, f.queryConfig) if err != nil { span.RecordError(err) f.log.Warn("Failed querying", slog.String("cid", c.String()), slog.String("err", err.Error())) @@ -340,13 +349,6 @@ func (f *FullRT) Bootstrap(ctx context.Context) error { return f.kad.Crawl(ctx, seed) } -func (f *FullRT) queryConfig() *coord.QueryConfig { - cfg := coord.DefaultQueryConfig() - cfg.NumResults = f.cfg.BucketSize - cfg.Strategy = &query.StrategyStatic{} - return cfg -} - func (f *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { ctx, span := f.tele.Tracer.Start(ctx, "FullRT.GetValue") defer span.End() @@ -510,7 +512,7 @@ func (f *FullRT) searchValueRoutine(ctx context.Context, backend Backend, ns str return nil } - _, _, err := f.kad.QueryMessage(ctx, req, fn, f.queryConfig()) + _, _, err := f.kad.QueryMessage(ctx, req, fn, f.queryConfig) if err != nil { f.warnErr(err, "Search value query failed") return @@ -568,7 +570,12 @@ func (f *FullRT) ProvideMany(ctx context.Context, mhashes []mh.Multihash) error keys = append(keys, kadt.NewKey(mhash)) } - return f.kad.BroadcastMany(ctx, keys, msgFn) + // track successes + fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message) { + // TODO + } + + return f.kad.BroadcastMany(ctx, keys, fn, msgFn) } func (f *FullRT) PutMany(ctx context.Context, keySlice []string, valueSlice [][]byte) error { @@ -618,5 +625,5 @@ func (f *FullRT) PutMany(ctx context.Context, keySlice []string, valueSlice [][] } } - return f.kad.BroadcastMany(ctx, kadKeys, msgFn) + return f.kad.BroadcastMany(ctx, kadKeys, nil, msgFn) } diff --git a/go.mod b/go.mod index b909bbf..2550395 100644 --- a/go.mod +++ b/go.mod @@ -59,7 +59,13 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/huin/goupnp v1.2.0 // indirect + github.com/ipfs/bbloom v0.0.4 // indirect + github.com/ipfs/go-block-format v0.1.2 // indirect + github.com/ipfs/go-cidutil v0.1.0 // indirect + github.com/ipfs/go-ipfs-util v0.0.2 // indirect + github.com/ipfs/go-ipld-format v0.5.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect + github.com/ipfs/go-metrics-interface v0.0.1 // indirect github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect diff --git a/go.sum b/go.sum index 0a21e7b..25b6a2e 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,7 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8 github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -122,10 +123,16 @@ github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyf github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.2.0 h1:uOKW26NG1hsSSbXIZ1IR7XP9Gjd1U8pnLaCMgntmkmY= github.com/huin/goupnp v1.2.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= +github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= +github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= github.com/ipfs/boxo v0.12.0 h1:AXHg/1ONZdRQHQLgG5JHsSC3XoE4DjCAMgK+asZvUcQ= github.com/ipfs/boxo v0.12.0/go.mod h1:xAnfiU6PtxWCnRqu7dcXQ10bB5/kvI1kXRotuGqGBhg= +github.com/ipfs/go-block-format v0.1.2 h1:GAjkfhVx1f4YTODS6Esrj1wt2HhrtwTnhEr+DyPUaJo= +github.com/ipfs/go-block-format v0.1.2/go.mod h1:mACVcrxarQKstUU3Yf/RdwbC4DzPV6++rO2a3d+a/KE= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= +github.com/ipfs/go-cidutil v0.1.0 h1:RW5hO7Vcf16dplUU60Hs0AKDkQAVPVplr7lk97CFL+Q= +github.com/ipfs/go-cidutil v0.1.0/go.mod h1:e7OEVBMIv9JaOxt9zaGEmAoSlXW9jdFZ5lP/0PwcfpA= github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.6.1-0.20230901172804-1caa2449ed7c h1:iSyhKydtSJiEkmf5O3KizuySDB0zgyWPth76NACTMVI= github.com/ipfs/go-datastore v0.6.1-0.20230901172804-1caa2449ed7c/go.mod h1:3Et7HSjOA8tPu9OjYuDZxLAgBLfvlNMD4r8BIuri9eo= @@ -133,13 +140,22 @@ github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo= github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q= +github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= +github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= +github.com/ipfs/go-ipfs-pq v0.0.3 h1:YpoHVJB+jzK15mr/xsWC574tyDLkezVrDNeaalQBsTE= github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= +github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= +github.com/ipfs/go-ipld-format v0.5.0 h1:WyEle9K96MSrvr47zZHKKcDxJ/vlpET6PSiQsAFO+Ds= +github.com/ipfs/go-ipld-format v0.5.0/go.mod h1:ImdZqJQaEouMjCvqCe0ORUS+uoBmf7Hf+EO/jh+nk3M= github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= +github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= +github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= +github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg= github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E= github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -223,6 +239,7 @@ github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE= @@ -242,11 +259,13 @@ github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6o github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg= github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k= github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= +github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= github.com/multiformats/go-multistream v0.4.1 h1:rFy0Iiyn3YT0asivDUIR05leAdwZq3de4741sbiSdfo= github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqdtyNUEhKSM0Lwar2p77Q= github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= +github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= @@ -351,6 +370,7 @@ github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= +github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/internal/coord/behaviour.go b/internal/coord/behaviour.go index 28819aa..0fa8228 100644 --- a/internal/coord/behaviour.go +++ b/internal/coord/behaviour.go @@ -150,7 +150,7 @@ func (w *Waiter[E]) Chan() <-chan WaiterEvent[E] { } // A QueryMonitor receives event notifications on the progress of a query -type QueryMonitor[E TerminalQueryEvent] interface { +type QueryMonitor[E TerminalBehaviourEvent] interface { // NotifyProgressed returns a channel that can be used to send notification that a // query has made progress. If the notification cannot be sent then it will be // queued and retried at a later time. If the query completes before the progress @@ -168,7 +168,7 @@ type QueryMonitor[E TerminalQueryEvent] interface { // QueryMonitorHook wraps a [QueryMonitor] interface and provides hooks // that are invoked before calls to the QueryMonitor methods are forwarded. -type QueryMonitorHook[E TerminalQueryEvent] struct { +type QueryMonitorHook[E TerminalBehaviourEvent] struct { qm QueryMonitor[E] BeforeProgressed func() BeforeFinished func() @@ -176,7 +176,7 @@ type QueryMonitorHook[E TerminalQueryEvent] struct { var _ QueryMonitor[*EventQueryFinished] = (*QueryMonitorHook[*EventQueryFinished])(nil) -func NewQueryMonitorHook[E TerminalQueryEvent](qm QueryMonitor[E]) *QueryMonitorHook[E] { +func NewQueryMonitorHook[E TerminalBehaviourEvent](qm QueryMonitor[E]) *QueryMonitorHook[E] { return &QueryMonitorHook[E]{ qm: qm, BeforeProgressed: func() {}, diff --git a/internal/coord/brdcst/followup.go b/internal/coord/brdcst/followup.go index e8a3aa6..6790de0 100644 --- a/internal/coord/brdcst/followup.go +++ b/internal/coord/brdcst/followup.go @@ -49,14 +49,17 @@ type FollowUp[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { // the logic much easier to implement. pool *query.Pool[K, N, M] - // TODO: ... + // started indicates that this state machine has sent out the first + // message to a node. Even after this state machine has returned a finished + // state, this flag will stay true. started bool // the message generator that takes a target key and will return the message // that we will send to the closest nodes in the follow-up phase msgFunc func(K) M - // TODO: + // seed holds the nodes from where we should start our query to find closer + // nodes to the target key (held by [ConfigFollowUp]). seed []N // the closest nodes to the target key. This will be filled after the query diff --git a/internal/coord/brdcst/pool.go b/internal/coord/brdcst/pool.go index ae3497c..38785fe 100644 --- a/internal/coord/brdcst/pool.go +++ b/internal/coord/brdcst/pool.go @@ -258,8 +258,8 @@ type StatePoolWaiting struct{} type StatePoolStoreRecord[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { QueryID coordt.QueryID // the id of the broadcast operation that wants to send the message NodeID N // the node to send the message to - Target K - Message M // the message that should be sent to the remote node + Target K // the key we want to store a record for + Message M // the message that should be sent to the remote node } // StatePoolBroadcastFinished indicates that the broadcast operation with the @@ -347,9 +347,9 @@ type EventPoolGetCloserNodesFailure[K kad.Key[K], N kad.NodeID[K]] struct { type EventPoolStoreRecordSuccess[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { QueryID coordt.QueryID // the id of the query that sent the message NodeID N // the node the message was sent to - Target K - Request M // the message that was sent to the remote node - Response M // the reply we got from the remote node (nil in many cases of the Amino DHT) + Target K // the key we successfully stored a record for + Request M // the message that was sent to the remote node + Response M // the reply we got from the remote node (nil in many cases of the Amino DHT) } // EventPoolStoreRecordFailure noties the broadcast [Pool] that storing a record @@ -358,9 +358,9 @@ type EventPoolStoreRecordSuccess[K kad.Key[K], N kad.NodeID[K], M coordt.Message type EventPoolStoreRecordFailure[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct { QueryID coordt.QueryID // the id of the query that sent the message NodeID N // the node the message was sent to - Target K - Request M // the message that was sent to the remote node - Error error // the error that caused the failure + Target K // the key we failed to store a record for + Request M // the message that was sent to the remote node + Error error // the error that caused the failure } // poolEvent() ensures that only events accepted by a broadcast [Pool] can be diff --git a/internal/coord/coordinator.go b/internal/coord/coordinator.go index d7d310a..091803b 100644 --- a/internal/coord/coordinator.go +++ b/internal/coord/coordinator.go @@ -309,8 +309,9 @@ func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coor if cfg == nil { cfg = DefaultQueryConfig() + } else if err := cfg.Validate(); err != nil { + return nil, coordt.QueryStats{}, fmt.Errorf("validate query config: %w", err) } - // TODO: validate config ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -392,15 +393,15 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coor func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message, seed []kadt.PeerID) error { msgFunc := func(k kadt.Key) *pb.Message { return msg } - return c.broadcast(ctx, msgFunc, seed, brdcst.DefaultConfigFollowUp(msg.Target())) + return c.broadcast(ctx, msgFunc, seed, coordt.BrdcstFuncNoop, brdcst.DefaultConfigFollowUp(msg.Target())) } func (c *Coordinator) BroadcastStatic(ctx context.Context, msg *pb.Message, seed []kadt.PeerID) error { msgFunc := func(k kadt.Key) *pb.Message { return msg } - return c.broadcast(ctx, msgFunc, seed, brdcst.DefaultConfigOneToMany(msg.Target())) + return c.broadcast(ctx, msgFunc, seed, coordt.BrdcstFuncNoop, brdcst.DefaultConfigOneToMany(msg.Target())) } -func (c *Coordinator) BroadcastMany(ctx context.Context, keys []kadt.Key, msgFn func(k kadt.Key) *pb.Message) error { +func (c *Coordinator) BroadcastMany(ctx context.Context, keys []kadt.Key, fn coordt.BrdcstFunc, msgFn func(k kadt.Key) *pb.Message) error { // verify that we have keys to push into the network if len(keys) == 0 { return fmt.Errorf("no keys to broadcast") @@ -410,10 +411,10 @@ func (c *Coordinator) BroadcastMany(ctx context.Context, keys []kadt.Key, msgFn seed := c.rt.NearestNodes(keys[0], math.MaxInt) // start broadcasting - return c.broadcast(ctx, msgFn, seed, brdcst.DefaultConfigManyToMany(keys)) + return c.broadcast(ctx, msgFn, seed, fn, brdcst.DefaultConfigManyToMany(keys)) } -func (c *Coordinator) broadcast(ctx context.Context, msgFunc func(k kadt.Key) *pb.Message, seed []kadt.PeerID, cfg brdcst.Config) error { +func (c *Coordinator) broadcast(ctx context.Context, msgFunc func(k kadt.Key) *pb.Message, seed []kadt.PeerID, fn coordt.BrdcstFunc, cfg brdcst.Config) error { ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.broadcast") defer span.End() @@ -434,7 +435,7 @@ func (c *Coordinator) broadcast(ctx context.Context, msgFunc func(k kadt.Key) *p // queue the start of the query c.brdcstBehaviour.Notify(ctx, cmd) - contacted, _, err := c.waitForBroadcast(ctx, waiter) + contacted, _, err := c.waitForBroadcast(ctx, waiter, fn) if err != nil { return err } @@ -507,7 +508,7 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID, } } -func (c *Coordinator) waitForBroadcast(ctx context.Context, waiter *BroadcastWaiter) ([]kadt.PeerID, map[string]struct { +func (c *Coordinator) waitForBroadcast(ctx context.Context, waiter *BroadcastWaiter, fn coordt.BrdcstFunc) ([]kadt.PeerID, map[string]struct { Node kadt.PeerID Err error }, error, @@ -516,6 +517,13 @@ func (c *Coordinator) waitForBroadcast(ctx context.Context, waiter *BroadcastWai select { case <-ctx.Done(): return nil, nil, ctx.Err() + + case wev, more := <-waiter.Progressed(): + if !more { + return nil, nil, ctx.Err() + } + fn(wev.Ctx, wev.Event.NodeID, wev.Event.Response) + case wev, more := <-waiter.Finished(): if !more { return nil, nil, ctx.Err() diff --git a/internal/coord/coordt/coretypes.go b/internal/coord/coordt/coretypes.go index 640c658..87072b4 100644 --- a/internal/coord/coordt/coretypes.go +++ b/internal/coord/coordt/coretypes.go @@ -40,7 +40,7 @@ var ( // Query stops entirely and returns that error. // // The stats argument contains statistics on the progress of the query so far. -type QueryFunc func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats QueryStats) error +type QueryFunc func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats QueryStats) error // TODO: move to query package? type QueryStats struct { Start time.Time // Start is the time the query began executing. @@ -51,6 +51,11 @@ type QueryStats struct { Exhausted bool // Exhausted is true if the query ended after visiting every node it could. } +// BrdcstFunc is the type of the function called when broadcasting to the network after we have received a response from a node. +type BrdcstFunc func(ctx context.Context, id kadt.PeerID, resp *pb.Message) + +func BrdcstFuncNoop(ctx context.Context, id kadt.PeerID, resp *pb.Message) {} + var ( // ErrSkipNode is used as a return value from a QueryFunc to indicate that the node is to be skipped. ErrSkipNode = errors.New("skip node") diff --git a/internal/coord/event.go b/internal/coord/event.go index cecfa35..d16ce4e 100644 --- a/internal/coord/event.go +++ b/internal/coord/event.go @@ -45,10 +45,10 @@ type RoutingNotification interface { routingNotification() } -// TerminalQueryEvent is a type of [BehaviourEvent] that indicates a query has completed. -type TerminalQueryEvent interface { +// TerminalBehaviourEvent is a type of [BehaviourEvent] that indicates a query has completed. +type TerminalBehaviourEvent interface { BehaviourEvent - terminalQueryEvent() + terminalBehaviourEvent() } type EventStartBootstrap struct { @@ -192,8 +192,8 @@ type EventQueryFinished struct { ClosestNodes []kadt.PeerID } -func (*EventQueryFinished) behaviourEvent() {} -func (*EventQueryFinished) terminalQueryEvent() {} +func (*EventQueryFinished) behaviourEvent() {} +func (*EventQueryFinished) terminalBehaviourEvent() {} // EventRoutingUpdated is emitted by the coordinator when a new node has been verified and added to the routing table. type EventRoutingUpdated struct { @@ -258,7 +258,7 @@ type BrdcstCommand interface { brdcstCommand() } -// EventStartBroadcast starts a new +// EventStartBroadcast starts a new broadcast operation type EventStartBroadcast struct { QueryID coordt.QueryID MsgFunc func(k kadt.Key) *pb.Message @@ -269,6 +269,24 @@ type EventStartBroadcast struct { func (*EventStartBroadcast) behaviourEvent() {} +type EventStopBroadcast struct { + QueryID coordt.QueryID +} + +func (*EventStopBroadcast) behaviourEvent() {} +func (*EventStopBroadcast) queryCommand() {} + +// EventBroadcastProgressed is emitted by the coordinator when a broadcast +// operation has progressed. +type EventBroadcastProgressed struct { + QueryID coordt.QueryID + NodeID kadt.PeerID + Response *pb.Message + Stats query.QueryStats +} + +func (*EventBroadcastProgressed) behaviourEvent() {} + // EventBroadcastFinished is emitted by the coordinator when a broadcasting // a record to the network has finished, either through running to completion or // by being canceled. @@ -281,5 +299,5 @@ type EventBroadcastFinished struct { } } -func (*EventBroadcastFinished) behaviourEvent() {} -func (*EventBroadcastFinished) terminalQueryEvent() {} +func (*EventBroadcastFinished) behaviourEvent() {} +func (*EventBroadcastFinished) terminalBehaviourEvent() {} diff --git a/internal/coord/query.go b/internal/coord/query.go index 4b60429..960ba73 100644 --- a/internal/coord/query.go +++ b/internal/coord/query.go @@ -445,7 +445,7 @@ func (p *QueryBehaviour) queueNonConnectivityEvent(nid kadt.PeerID) { }) } -type queryNotifier[E TerminalQueryEvent] struct { +type queryNotifier[E TerminalBehaviourEvent] struct { monitor QueryMonitor[E] pending []CtxEvent[*EventQueryProgressed] stopping bool diff --git a/internal/coord/routing.go b/internal/coord/routing.go index a23489c..47e7721 100644 --- a/internal/coord/routing.go +++ b/internal/coord/routing.go @@ -558,7 +558,6 @@ func (r *RoutingBehaviour) perfomNextInbound() (BehaviourEvent, bool) { NodeID: ev.NodeID, } } else { - // TODO: apply ttl cmd = &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{ NodeID: ev.NodeID, }