-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathpubsub.nim
700 lines (599 loc) · 23.2 KB
/
pubsub.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
## Base interface for pubsub protocols
##
## You can `subscribe<#subscribe%2CPubSub%2Cstring%2CTopicHandler>`_ to a topic,
## `publish<#publish.e%2CPubSub%2Cstring%2Cseq%5Bbyte%5D>`_ something on it,
## and eventually `unsubscribe<#unsubscribe%2CPubSub%2Cstring%2CTopicHandler>`_ from it.
{.push raises: [].}
import std/[tables, sequtils, sets, strutils]
import chronos, chronicles, metrics
import chronos/ratelimit
import
./errors as pubsub_errors,
./pubsubpeer,
./rpc/[message, messages, protobuf],
../../switch,
../protocol,
../../crypto/crypto,
../../stream/connection,
../../peerid,
../../peerinfo,
../../errors,
../../utility
import stew/results
export results
export tables, sets
export PubSubPeer
export PubSubObserver
export protocol
export pubsub_errors
logScope:
topics = "libp2p pubsub"
const
KnownLibP2PTopics* {.strdefine.} = ""
KnownLibP2PTopicsSeq* = KnownLibP2PTopics.toLowerAscii().split(",")
declareGauge(libp2p_pubsub_peers, "pubsub peer instances")
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
declareCounter(libp2p_pubsub_subscriptions, "pubsub subscription operations")
declareCounter(libp2p_pubsub_unsubscriptions, "pubsub unsubscription operations")
declareGauge(
libp2p_pubsub_topic_handlers,
"pubsub subscribed topics handlers count",
labels = ["topic"],
)
declareCounter(
libp2p_pubsub_validation_success, "pubsub successfully validated messages"
)
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
declareCounter(libp2p_pubsub_validation_ignore, "pubsub ignore validated messages")
declarePublicCounter(
libp2p_pubsub_messages_published, "published messages", labels = ["topic"]
)
declarePublicCounter(
libp2p_pubsub_messages_rebroadcasted, "re-broadcasted messages", labels = ["topic"]
)
declarePublicCounter(
libp2p_pubsub_broadcast_subscriptions,
"pubsub broadcast subscriptions",
labels = ["topic"],
)
declarePublicCounter(
libp2p_pubsub_broadcast_unsubscriptions,
"pubsub broadcast unsubscriptions",
labels = ["topic"],
)
declarePublicCounter(
libp2p_pubsub_broadcast_messages, "pubsub broadcast messages", labels = ["topic"]
)
declarePublicCounter(
libp2p_pubsub_received_subscriptions,
"pubsub received subscriptions",
labels = ["topic"],
)
declarePublicCounter(
libp2p_pubsub_received_unsubscriptions,
"pubsub received subscriptions",
labels = ["topic"],
)
declarePublicCounter(
libp2p_pubsub_received_messages, "pubsub received messages", labels = ["topic"]
)
declarePublicCounter(libp2p_pubsub_broadcast_iwant, "pubsub broadcast iwant")
declarePublicCounter(
libp2p_pubsub_broadcast_ihave, "pubsub broadcast ihave", labels = ["topic"]
)
declarePublicCounter(
libp2p_pubsub_broadcast_graft, "pubsub broadcast graft", labels = ["topic"]
)
declarePublicCounter(
libp2p_pubsub_broadcast_prune, "pubsub broadcast prune", labels = ["topic"]
)
declarePublicCounter(libp2p_pubsub_received_iwant, "pubsub broadcast iwant")
declarePublicCounter(
libp2p_pubsub_received_ihave, "pubsub broadcast ihave", labels = ["topic"]
)
declarePublicCounter(
libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"]
)
declarePublicCounter(
libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"]
)
type
InitializationError* = object of LPError
PeerMessageDecodeError* = object of CatchableError
TopicHandler* {.public.} =
proc(topic: string, data: seq[byte]): Future[void] {.gcsafe, async: (raises: []).}
ValidatorHandler* {.public.} = proc(
topic: string, message: Message
): Future[ValidationResult] {.gcsafe, async: (raises: []).}
TopicPair* = tuple[topic: string, handler: TopicHandler]
MsgIdProvider* {.public.} = proc(m: Message): Result[MessageId, ValidationResult] {.
noSideEffect, raises: [], gcsafe
.}
SubscriptionValidator* {.public.} = proc(topic: string): bool {.raises: [], gcsafe.}
## Every time a peer send us a subscription (even to an unknown topic),
## we have to store it, which may be an attack vector.
## This callback can be used to reject topic we're not interested in
PubSub* {.public.} = ref object of LPProtocol
switch*: Switch # the switch used to dial/connect to peers
peerInfo*: PeerInfo # this peer's info
topics*: Table[string, seq[TopicHandler]] # the topics that _we_ are interested in
peers*: Table[PeerId, PubSubPeer]
#\
# Peers that we are interested to gossip with (but not necessarily
# yet connected to)
triggerSelf*: bool ## trigger own local handler on publish
verifySignature*: bool ## enable signature verification
sign*: bool ## enable message signing
validators*: Table[string, HashSet[ValidatorHandler]]
observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider ## Turn message into message id (not nil)
msgSeqno*: uint64
anonymize*: bool ## if we omit fromPeer and seqno from RPC messages we send
subscriptionValidator*: SubscriptionValidator
# callback used to validate subscriptions
topicsHigh*: int ## the maximum number of topics a peer is allowed to subscribe to
maxMessageSize*: int
##\
## the maximum raw message size we'll globally allow
## for finer tuning, check message size on topic validator
##
## sending a big message to a peer with a lower size limit can
## lead to issues, from descoring to connection drops
##
## defaults to 1mB
rng*: ref HmacDrbgContext
knownTopics*: HashSet[string]
method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
## handle peer disconnects
##
debug "unsubscribing pubsub peer", peerId
p.peers.del(peerId)
libp2p_pubsub_peers.set(p.peers.len.int64)
proc send*(
p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool
) {.raises: [].} =
## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network.
##
## Parameters:
## - `p`: The `PubSub` instance.
## - `peer`: An instance of `PubSubPeer` representing the peer to whom the message should be sent.
## - `msg`: The `RPCMsg` instance that contains the message to be sent.
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
trace "sending pubsub message to peer", peer, payload = shortLog(msg)
peer.send(msg, p.anonymize, isHighPriority)
proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg,
isHighPriority: bool,
) {.raises: [].} =
## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network.
##
## Parameters:
## - `p`: The `PubSub` instance.
## - `sendPeers`: An iterable of `PubSubPeer` instances representing the peers to whom the message should be sent.
## - `msg`: The `RPCMsg` instance that contains the message to be broadcast.
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
let npeers = sendPeers.len.int64
for sub in msg.subscriptions:
if sub.subscribe:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = [sub.topic])
else:
libp2p_pubsub_broadcast_subscriptions.inc(npeers, labelValues = ["generic"])
else:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = [sub.topic])
else:
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = ["generic"])
for smsg in msg.messages:
let topic = smsg.topic
if p.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = ["generic"])
msg.control.withValue(control):
libp2p_pubsub_broadcast_iwant.inc(npeers * control.iwant.len.int64)
for ihave in control.ihave:
if p.knownTopics.contains(ihave.topicID):
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = [ihave.topicID])
else:
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = ["generic"])
for graft in control.graft:
if p.knownTopics.contains(graft.topicID):
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = [graft.topicID])
else:
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = ["generic"])
for prune in control.prune:
if p.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
trace "broadcasting messages to peers", peers = sendPeers.len, payload = shortLog(msg)
if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg, isHighPriority)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded, isHighPriority)
proc sendSubs*(
p: PubSub, peer: PubSubPeer, topics: openArray[string], subscribe: bool
) =
## send subscriptions to remote peer
p.send(peer, RPCMsg.withSubs(topics, subscribe), isHighPriority = true)
for topic in topics:
if subscribe:
if p.knownTopics.contains(topic):
libp2p_pubsub_broadcast_subscriptions.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_subscriptions.inc(labelValues = ["generic"])
else:
if p.knownTopics.contains(topic):
libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_unsubscriptions.inc(labelValues = ["generic"])
proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) =
for i in 0 ..< min(rpcMsg.subscriptions.len, p.topicsHigh):
template sub(): untyped =
rpcMsg.subscriptions[i]
if sub.subscribe:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_received_subscriptions.inc(labelValues = [sub.topic])
else:
libp2p_pubsub_received_subscriptions.inc(labelValues = ["generic"])
else:
if p.knownTopics.contains(sub.topic):
libp2p_pubsub_received_unsubscriptions.inc(labelValues = [sub.topic])
else:
libp2p_pubsub_received_unsubscriptions.inc(labelValues = ["generic"])
for i in 0 ..< rpcMsg.messages.len():
let topic = rpcMsg.messages[i].topic
if p.knownTopics.contains(topic):
libp2p_pubsub_received_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_received_messages.inc(labelValues = ["generic"])
rpcMsg.control.withValue(control):
libp2p_pubsub_received_iwant.inc(control.iwant.len.int64)
for ihave in control.ihave:
if p.knownTopics.contains(ihave.topicID):
libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicID])
else:
libp2p_pubsub_received_ihave.inc(labelValues = ["generic"])
for graft in control.graft:
if p.knownTopics.contains(graft.topicID):
libp2p_pubsub_received_graft.inc(labelValues = [graft.topicID])
else:
libp2p_pubsub_received_graft.inc(labelValues = ["generic"])
for prune in control.prune:
if p.knownTopics.contains(prune.topicID):
libp2p_pubsub_received_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_received_prune.inc(labelValues = ["generic"])
method rpcHandler*(
p: PubSub, peer: PubSubPeer, data: seq[byte]
): Future[void] {.
base, async: (raises: [CancelledError, PeerMessageDecodeError, PeerRateLimitError])
.} =
## Handler that must be overridden by concrete implementation
raiseAssert "Unimplemented"
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base, gcsafe.} =
discard
method onPubSubPeerEvent*(
p: PubSub, peer: PubSubPeer, event: PubSubPeerEvent
) {.base, gcsafe.} =
# Peer event is raised for the send connection in particular
case event.kind
of PubSubPeerEventKind.StreamOpened:
if p.topics.len > 0:
p.sendSubs(peer, toSeq(p.topics.keys), true)
of PubSubPeerEventKind.StreamClosed:
discard
of PubSubPeerEventKind.DisconnectionRequested:
discard
method getOrCreatePeer*(
p: PubSub, peerId: PeerId, protosToDial: seq[string], protoNegotiated: string = ""
): PubSubPeer {.base, gcsafe.} =
p.peers.withValue(peerId, peer):
if peer[].codec == "":
peer[].codec = protoNegotiated
return peer[]
proc getConn(): Future[Connection] {.async: (raises: [GetConnDialError]).} =
try:
return await p.switch.dial(peerId, protosToDial)
except CatchableError as e:
raise (ref GetConnDialError)(parent: e)
proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
p.onPubSubPeerEvent(peer, event)
# create new pubsub peer
let pubSubPeer =
PubSubPeer.new(peerId, getConn, onEvent, protoNegotiated, p.maxMessageSize)
debug "created new pubsub peer", peerId
p.peers[peerId] = pubSubPeer
pubSubPeer.observers = p.observers
onNewPeer(p, pubSubPeer)
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)
return pubSubPeer
proc handleData*(
p: PubSub, topic: string, data: seq[byte]
): Future[void] {.async: (raises: [], raw: true).} =
# Start work on all data handlers without copying data into closure like
# happens on {.async.} transformation
p.topics.withValue(topic, handlers):
var futs = newSeq[Future[void]]()
for handler in handlers[]:
if handler != nil: # allow nil handlers
let fut = handler(topic, data)
if not fut.completed(): # Fast path for successful sync handlers
futs.add(fut)
if futs.len() > 0:
proc waiter(): Future[void] {.async: (raises: []).} =
# slow path - we have to wait for the handlers to complete
try:
futs = await allFinished(futs)
except CancelledError:
# propagate cancellation
for fut in futs:
if not (fut.finished):
fut.cancelSoon()
# check for errors in futures
for fut in futs:
if fut.failed:
let err = fut.error()
warn "Error in topic handler", description = err.msg
return waiter()
# Fast path - futures finished synchronously or nobody cared about data
var res = newFuture[void]()
res.complete()
return res
method handleConn*(
p: PubSub, conn: Connection, proto: string
) {.base, async: (raises: [CancelledError]).} =
## handle incoming connections
##
## this proc will:
## 1) register a new PubSubPeer for the connection
## 2) register a handler with the peer;
## this handler gets called on every rpc message
## that the peer receives
## 3) ask the peer to subscribe us to every topic
## that we're interested in
##
proc handler(
peer: PubSubPeer, data: seq[byte]
): Future[void] {.async: (raises: []).} =
# call pubsub rpc handler
p.rpcHandler(peer, data)
let peer = p.getOrCreatePeer(conn.peerId, @[], proto)
try:
peer.handler = handler
await peer.handle(conn) # spawn peer read loop
trace "pubsub peer handler ended", conn
except CancelledError as exc:
raise exc
except PeerMessageDecodeError as exc:
trace "exception ocurred in pubsub handle", description = exc.msg, conn
finally:
await conn.closeWithEOF()
method subscribePeer*(p: PubSub, peer: PeerId) {.base, gcsafe.} =
## subscribe to remote peer to receive/send pubsub
## messages
##
let pubSubPeer = p.getOrCreatePeer(peer, p.codecs)
pubSubPeer.connect()
proc updateTopicMetrics(p: PubSub, topic: string) =
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)
if p.knownTopics.contains(topic):
p.topics.withValue(topic, handlers):
libp2p_pubsub_topic_handlers.set(handlers[].len.int64, labelValues = [topic])
do:
libp2p_pubsub_topic_handlers.set(0, labelValues = [topic])
else:
var others: int64 = 0
for key, val in p.topics:
if key notin p.knownTopics:
others += 1
libp2p_pubsub_topic_handlers.set(others, labelValues = ["other"])
method onTopicSubscription*(
p: PubSub, topic: string, subscribed: bool
) {.base, gcsafe.} =
# Called when subscribe is called the first time for a topic or unsubscribe
# removes the last handler
# Notify others that we are no longer interested in the topic
for _, peer in p.peers:
# If we don't have a sendConn yet, we will
# send the full sub list when we get the sendConn,
# so no need to send it here
if peer.hasSendConn:
p.sendSubs(peer, [topic], subscribed)
if subscribed:
libp2p_pubsub_subscriptions.inc()
else:
libp2p_pubsub_unsubscriptions.inc()
proc unsubscribe*(p: PubSub, topic: string, handler: TopicHandler) {.public.} =
## unsubscribe from a ``topic`` string
##
p.topics.withValue(topic, handlers):
handlers[].keepItIf(it != handler)
if handlers[].len() == 0:
p.topics.del(topic)
p.onTopicSubscription(topic, false)
p.updateTopicMetrics(topic)
proc unsubscribe*(p: PubSub, topics: openArray[TopicPair]) {.public.} =
## unsubscribe from a list of ``topic`` handlers
for t in topics:
p.unsubscribe(t.topic, t.handler)
proc unsubscribeAll*(p: PubSub, topic: string) {.public, gcsafe.} =
## unsubscribe every `handler` from `topic`
if topic notin p.topics:
debug "unsubscribeAll called for an unknown topic", topic
else:
p.topics.del(topic)
p.onTopicSubscription(topic, false)
p.updateTopicMetrics(topic)
proc subscribe*(p: PubSub, topic: string, handler: TopicHandler) {.public.} =
## subscribe to a topic
##
## ``topic`` - a string topic to subscribe to
##
## ``handler`` - user provided proc that
## will be triggered on every
## received message
# Check that this is an allowed topic
if p.subscriptionValidator != nil and p.subscriptionValidator(topic) == false:
warn "Trying to subscribe to a topic not passing validation!", topic
return
p.topics.withValue(topic, handlers):
# Already subscribed, just adding another handler
handlers[].add(handler)
do:
trace "subscribing to topic", name = topic
p.topics[topic] = @[handler]
# Notify on first handler
p.onTopicSubscription(topic, true)
p.updateTopicMetrics(topic)
method publish*(
p: PubSub, topic: string, data: seq[byte]
): Future[int] {.base, async: (raises: [LPError]), public.} =
## publish to a ``topic``
##
## The return value is the number of neighbours that we attempted to send the
## message to, excluding self. Note that this is an optimistic number of
## attempts - the number of peers that actually receive the message might
## be lower.
if p.triggerSelf:
await handleData(p, topic, data)
return 0
method initPubSub*(p: PubSub) {.base, raises: [InitializationError].} =
## perform pubsub initialization
p.observers = new(seq[PubSubObserver])
if p.msgIdProvider == nil:
p.msgIdProvider = defaultMsgIdProvider
method addValidator*(
p: PubSub, topic: varargs[string], hook: ValidatorHandler
) {.base, public, gcsafe.} =
## Add a validator to a `topic`. Each new message received in this
## will be sent to `hook`. `hook` can return either `Accept`,
## `Ignore` or `Reject` (which can descore the peer)
for t in topic:
trace "adding validator for topic", topic = t
p.validators.mgetOrPut(t, HashSet[ValidatorHandler]()).incl(hook)
method removeValidator*(
p: PubSub, topic: varargs[string], hook: ValidatorHandler
) {.base, public.} =
for t in topic:
p.validators.withValue(t, validators):
validators[].excl(hook)
if validators[].len() == 0:
p.validators.del(t)
method validate*(
p: PubSub, message: Message
): Future[ValidationResult] {.async: (raises: [CancelledError]), base.} =
var pending: seq[Future[ValidationResult]]
trace "about to validate message"
let topic = message.topic
trace "looking for validators on topic",
topic = topic, registered = toSeq(p.validators.keys)
if topic in p.validators:
trace "running validators for topic", topic = topic
p.validators.withValue(topic, validators):
for validator in validators[]:
pending.add(validator(topic, message))
var valResult = ValidationResult.Accept
let futs = await allFinished(pending)
for fut in futs:
if fut.failed:
valResult = ValidationResult.Reject
break
try:
let res = fut.read()
if res != ValidationResult.Accept:
valResult = res
if res == ValidationResult.Reject:
break
except CatchableError as e:
trace "validator for message could not be executed, ignoring",
topic = topic, err = e.msg
valResult = ValidationResult.Ignore
case valResult
of ValidationResult.Accept:
libp2p_pubsub_validation_success.inc()
of ValidationResult.Reject:
libp2p_pubsub_validation_failure.inc()
of ValidationResult.Ignore:
libp2p_pubsub_validation_ignore.inc()
valResult
proc init*[PubParams: object | bool](
P: typedesc[PubSub],
switch: Switch,
triggerSelf: bool = false,
anonymize: bool = false,
verifySignature: bool = true,
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
maxMessageSize: int = 1024 * 1024,
rng: ref HmacDrbgContext = newRng(),
parameters: PubParams = false,
): P {.raises: [InitializationError], public.} =
let pubsub =
when PubParams is bool:
P(
switch: switch,
peerInfo: switch.peerInfo,
triggerSelf: triggerSelf,
anonymize: anonymize,
verifySignature: verifySignature,
sign: sign,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
maxMessageSize: maxMessageSize,
rng: rng,
topicsHigh: int.high,
)
else:
P(
switch: switch,
peerInfo: switch.peerInfo,
triggerSelf: triggerSelf,
anonymize: anonymize,
verifySignature: verifySignature,
sign: sign,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
parameters: parameters,
maxMessageSize: maxMessageSize,
rng: rng,
topicsHigh: int.high,
)
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
pubsub.subscribePeer(peerId)
else:
pubsub.unsubscribePeer(peerId)
switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
pubsub.knownTopics = KnownLibP2PTopicsSeq.toHashSet()
pubsub.initPubSub()
return pubsub
proc addObserver*(p: PubSub, observer: PubSubObserver) {.public.} =
p.observers[] &= observer
proc removeObserver*(p: PubSub, observer: PubSubObserver) {.public.} =
let idx = p.observers[].find(observer)
if idx != -1:
p.observers[].del(idx)