forked from cometbft/cometbft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase_reactor.go
66 lines (54 loc) · 2.33 KB
/
base_reactor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package p2p
import (
"github.com/cometbft/cometbft/libs/service"
)
// Reactor is responsible for handling incoming messages on one or more
// Channel. Switch calls StreamDescriptors when reactor is added to it. When a new
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
// when the peer is stopped. Receive is called when a message is received on a
// channel associated with this reactor.
//
// Peer#Send or Peer#TrySend should be used to send the message to a peer.
type Reactor interface {
service.Service // Start, Stop
// SetSwitch allows setting a switch.
SetSwitch(sw *Switch)
// StreamDescriptors returns the list of stream descriptors. Make sure
// that each ID is unique across all the reactors added to the switch.
StreamDescriptors() []StreamDescriptor
// InitPeer is called by the switch before the peer is started. Use it to
// initialize data for the peer (e.g. peer state).
//
// NOTE: The switch won't call AddPeer nor RemovePeer if it fails to start
// the peer. Do not store any data associated with the peer in the reactor
// itself unless you don't want to have a state, which is never cleaned up.
InitPeer(peer Peer) Peer
// AddPeer is called by the switch after the peer is added and successfully
// started. Use it to start goroutines communicating with the peer.
AddPeer(peer Peer)
// RemovePeer is called by the switch when the peer is stopped (due to error
// or other reason).
RemovePeer(peer Peer, reason any)
// Receive is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor
Receive(e Envelope)
}
// --------------------------------------
type BaseReactor struct {
service.BaseService // Provides Start, Stop, .Quit
Switch *Switch
}
func NewBaseReactor(name string, impl Reactor) *BaseReactor {
return &BaseReactor{
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
}
}
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (*BaseReactor) StreamDescriptors() []StreamDescriptor { return nil }
func (*BaseReactor) AddPeer(Peer) {}
func (*BaseReactor) RemovePeer(Peer, any) {}
func (*BaseReactor) Receive(Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }