forked from probe-lab/zikade
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdht.go
422 lines (352 loc) · 12.3 KB
/
dht.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
package zikade
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/ipfs/go-datastore/trace"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"golang.org/x/exp/slog"
"github.com/plprobelab/zikade/internal/coord"
"github.com/plprobelab/zikade/internal/coord/routing"
"github.com/plprobelab/zikade/kadt"
"github.com/plprobelab/zikade/tele"
)
// DHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type DHT struct {
// host holds a reference to the underlying libp2p host
host host.Host
// cfg holds a reference to the DHT configuration struct
cfg *Config
// mode indicates the current mode the DHT operates in. This can differ from
// the desired mode if set to auto-client or auto-server. The desired mode
// can be configured via the Config struct.
modeMu sync.RWMutex
mode mode
// kad is a reference to the coordinator
kad *coord.Coordinator
// rt holds a reference to the routing table implementation. This can be
// configured via the Config struct.
rt routing.RoutingTableCplNormalized[kadt.Key, kadt.PeerID]
// backends
backends map[string]Backend
// log is a convenience accessor to the logging instance. It gets the value
// of the logger field from the configuration.
log *slog.Logger
// sub holds a subscription to the libp2p event bus. The DHT subscribes to
// these events in networkEventsSubscription and consumes them
// asynchronously in consumeNetworkEvents.
sub event.Subscription
// tele holds a reference to a telemetry struct
tele *Telemetry
// indicates whether this DHT instance was stopped ([DHT.Close] was called).
stopped atomic.Bool
}
// New constructs a new [DHT] for the given underlying host and with the given
// configuration. Use [DefaultConfig] to construct a configuration.
func New(h host.Host, cfg *Config) (*DHT, error) {
var err error
if cfg == nil {
cfg = DefaultConfig()
} else if err = cfg.Validate(); err != nil {
return nil, fmt.Errorf("validate DHT config: %w", err)
}
d := &DHT{
host: h,
cfg: cfg,
log: cfg.Logger,
}
nid := kadt.PeerID(d.host.ID())
// println("My peer ID: ", d.host.ID().String())
// Use the configured routing table if it was provided
if cfg.RoutingTable != nil {
d.rt = cfg.RoutingTable
} else if d.rt, err = DefaultRoutingTable(nid, cfg.BucketSize); err != nil {
return nil, fmt.Errorf("new normalized routing table: %w", err)
}
// initialize a new telemetry struct
d.tele, err = NewTelemetry(cfg.MeterProvider, cfg.TracerProvider)
if err != nil {
return nil, fmt.Errorf("init telemetry: %w", err)
}
// initialize backends
if len(cfg.Backends) != 0 {
d.backends = cfg.Backends
} else if cfg.ProtocolID == ProtocolIPFS {
d.backends, err = d.initAminoBackends()
if err != nil {
return nil, fmt.Errorf("init amino backends: %w", err)
}
}
// wrap all backends with tracing
for ns, be := range d.backends {
d.backends[ns] = traceWrapBackend(ns, be, d.tele.Tracer)
}
// instantiate a new Kademlia DHT coordinator.
coordCfg := coord.DefaultCoordinatorConfig()
coordCfg.Clock = cfg.Clock
coordCfg.Logger = cfg.Logger
coordCfg.MeterProvider = cfg.MeterProvider
coordCfg.TracerProvider = cfg.TracerProvider
coordCfg.Query.Clock = cfg.Clock
coordCfg.Query.Logger = cfg.Logger.With("behaviour", "pooledquery")
coordCfg.Query.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Query.Concurrency = cfg.Query.Concurrency
coordCfg.Query.Timeout = cfg.Query.Timeout
coordCfg.Query.RequestConcurrency = cfg.Query.RequestConcurrency
coordCfg.Query.RequestTimeout = cfg.Query.RequestTimeout
coordCfg.Routing.Clock = cfg.Clock
coordCfg.Routing.Logger = cfg.Logger.With("behaviour", "routing")
coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName)
rtr := &router{
host: h,
protocolID: cfg.ProtocolID,
tele: d.tele,
clk: cfg.Clock,
tracer: d.tele.Tracer,
}
d.kad, err = coord.NewCoordinator(kadt.PeerID(d.host.ID()), rtr, d.rt, coordCfg)
if err != nil {
return nil, fmt.Errorf("new coordinator: %w", err)
}
// determine mode to start in
switch cfg.Mode {
case ModeOptClient, ModeOptAutoClient:
d.setClientMode()
case ModeOptServer, ModeOptAutoServer:
d.setServerMode()
default:
// should never happen because of the configuration validation above
return nil, fmt.Errorf("invalid dht mode %s", cfg.Mode)
}
// create subscription to various network events
d.sub, err = d.networkEventsSubscription()
if err != nil {
return nil, fmt.Errorf("failed subscribing to event bus: %w", err)
}
// consume these events asynchronously
go d.consumeNetworkEvents(d.sub)
return d, nil
}
// initAminoBackends initializes the default backends for the Amino DHT. This
// includes the ipns, public key, and providers backends. A [DHT] with these
// backends will support these three record types.
func (d *DHT) initAminoBackends() (map[string]Backend, error) {
var (
err error
dstore Datastore
)
if d.cfg.Datastore != nil {
dstore = d.cfg.Datastore
} else if dstore, err = InMemoryDatastore(); err != nil {
return nil, fmt.Errorf("new default datastore: %w", err)
}
// wrap datastore in open telemetry tracing
dstore = trace.New(dstore, d.tele.Tracer)
pbeCfg, err := DefaultProviderBackendConfig()
if err != nil {
return nil, fmt.Errorf("default provider config: %w", err)
}
pbeCfg.Logger = d.cfg.Logger
pbeCfg.AddressFilter = d.cfg.AddressFilter
pbeCfg.Tele = d.tele
pbeCfg.clk = d.cfg.Clock
pbe, err := NewBackendProvider(d.host.Peerstore(), dstore, pbeCfg)
if err != nil {
return nil, fmt.Errorf("new provider backend: %w", err)
}
rbeCfg, err := DefaultRecordBackendConfig()
if err != nil {
return nil, fmt.Errorf("default provider config: %w", err)
}
rbeCfg.Logger = d.cfg.Logger
rbeCfg.Tele = d.tele
rbeCfg.clk = d.cfg.Clock
ipnsBe, err := NewBackendIPNS(dstore, d.host.Peerstore(), rbeCfg)
if err != nil {
return nil, fmt.Errorf("new ipns backend: %w", err)
}
pkBe, err := NewBackendPublicKey(dstore, rbeCfg)
if err != nil {
return nil, fmt.Errorf("new public key backend: %w", err)
}
return map[string]Backend{
namespaceIPNS: ipnsBe,
namespacePublicKey: pkBe,
namespaceProviders: pbe,
}, nil
}
// Close cleans up all resources associated with this DHT.
func (d *DHT) Close() error {
if d.stopped.Swap(true) {
return nil
}
if err := d.sub.Close(); err != nil {
d.debugErr(err, "failed closing event bus subscription")
}
if err := d.kad.Close(); err != nil {
d.debugErr(err, "failed closing coordinator")
}
for ns, b := range d.backends {
closer, ok := b.(io.Closer)
if !ok {
continue
}
if err := closer.Close(); err != nil {
d.warnErr(err, "failed closing backend", "namespace", ns)
}
}
// TODO: improve the following.
// If the protocol is the IPFS kademlia protocol
// and the user didn't provide a datastore implementation, we have initialized
// an in-memory datastore and assigned it to all backends. In the following
// we check if the conditions are met that we have initialized the datastore
// and the get hold of a reference to that datastore by looking in our
// backends map and casting one to one of our known providers.
if d.cfg.ProtocolID == ProtocolIPFS && d.cfg.Datastore == nil {
if pbe, err := typedBackend[*ProvidersBackend](d, namespaceProviders); err == nil {
if err := pbe.datastore.Close(); err != nil {
d.warnErr(err, "failed closing in memory datastore")
}
}
}
// kill all active streams using the DHT protocol.
for _, c := range d.host.Network().Conns() {
for _, s := range c.GetStreams() {
if s.Protocol() != d.cfg.ProtocolID {
continue
}
if err := s.Reset(); err != nil {
d.debugErr(err, "failed closing stream")
}
}
}
return nil
}
// setServerMode advertises (via libp2p identify updates) that we are able to
// respond to DHT queries for the configured protocol and sets the appropriate
// stream handler. This method is safe to call even if the DHT is already in
// server mode.
func (d *DHT) setServerMode() {
d.modeMu.Lock()
defer d.modeMu.Unlock()
d.log.Info("Activating DHT server mode")
d.mode = modeServer
d.host.SetStreamHandler(d.cfg.ProtocolID, d.streamHandler)
}
// setClientMode stops advertising (and rescinds advertisements via libp2p
// identify updates) that we are able to respond to DHT queries for the
// configured protocol and removes the registered stream handlers. We also kill
// all inbound streams that were utilizing the handled protocols. If we are
// already in client mode, this method is a no-op. This method is safe to call
// even if the DHT is already in client mode.
func (d *DHT) setClientMode() {
d.modeMu.Lock()
defer d.modeMu.Unlock()
d.log.Info("Activating DHT client mode")
d.mode = modeClient
d.host.RemoveStreamHandler(d.cfg.ProtocolID)
// kill all active inbound streams using the DHT protocol. Note that if we
// request something from a remote peer behind a NAT that succeeds with a
// connection reversal, the connection would be inbound but the stream would
// still be outbound and therefore not reset here.
for _, c := range d.host.Network().Conns() {
for _, s := range c.GetStreams() {
if s.Protocol() != d.cfg.ProtocolID {
continue
}
switch s.Stat().Direction {
case network.DirUnknown:
case network.DirInbound:
case network.DirOutbound:
// don't reset outbound connections because these are queries
// that we have initiated.
continue
}
if err := s.Reset(); err != nil {
d.debugErr(err, "failed closing stream")
}
}
}
}
// warnErr is a helper method that uses the slogger of the DHT and writes a
// warning log line with the given message alongside the error. args is a list of
// key/value pairs or slog.Attrs that will be included with the log message. If the error
// is nil, this method is a no-op.
func (d *DHT) warnErr(err error, msg string, args ...any) {
if err == nil {
return
}
if len(args) == 0 {
d.log.Warn(msg, tele.LogAttrError(err))
return
}
d.log.With(args...).Warn(msg, tele.LogAttrError(err))
}
// debugErr is a helper method that uses the slogger of the DHT and writes a
// debug log line with the given message alongside the error. args is a list of
// key/value pairs or slog.Attrs that will be included with the log message. If the error
// is nil, this method is a no-op.
func (d *DHT) debugErr(err error, msg string, args ...any) {
if err == nil {
return
}
if len(args) == 0 {
d.log.Debug(msg, tele.LogAttrError(err))
return
}
d.log.With(args...).Debug(msg, tele.LogAttrError(err))
}
// AddAddresses suggests peers and their associated addresses to be added to the routing table.
// Addresses will be added to the peerstore with the supplied time to live.
func (d *DHT) AddAddresses(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.AddAddresses")
defer span.End()
ids := make([]kadt.PeerID, 0, len(ais))
ps := d.host.Peerstore()
for _, ai := range ais {
// TODO: apply address filter
ps.AddAddrs(ai.ID, ai.Addrs, ttl)
ids = append(ids, kadt.PeerID(ai.ID))
}
return d.kad.AddNodes(ctx, ids)
}
// typedBackend returns the backend at the given namespace. It is casted to the
// provided type. If the namespace doesn't exist or the type cast failed, this
// function returns an error. Can't be a method on [DHT] because of the generic
// type constraint [0].
//
// This method is only used in tests and the [DHT.Close] method. It would be
// great if we wouldn't need this method.
//
// [0]: https://github.com/golang/go/issues/49085
func typedBackend[T Backend](d *DHT, namespace string) (T, error) {
// check if backend was registered
be, found := d.backends[namespace]
if !found {
return *new(T), fmt.Errorf("backend for namespace %s not found", namespace)
}
// try to cast to the desired type
cbe, ok := be.(T) // casted backend
if !ok {
// that didn't work... check if the desired backend was wrapped
// into a traced backend
tbe, ok := be.(*tracedBackend)
if !ok {
return *new(T), fmt.Errorf("backend at namespace is no traced backend nor %T", *new(T))
}
cbe, ok := tbe.backend.(T)
if !ok {
return *new(T), fmt.Errorf("traced backend doesn't contain %T", *new(T))
}
return cbe, nil
}
return cbe, nil
}