Skip to content

Commit

Permalink
Merge pull request #57 from BoostryJP/fix/#52
Browse files Browse the repository at this point in the history
eth: better active protocol handler tracking
  • Loading branch information
YoshihitoAso authored Sep 12, 2023
2 parents 93eee0c + b83a9e5 commit 56381cf
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 36 deletions.
86 changes: 65 additions & 21 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ type handler struct {

chainSync *chainSyncer
wg sync.WaitGroup
peerWG sync.WaitGroup

handlerStartCh chan struct{}
handlerDoneCh chan struct{}

// Quorum
raftMode bool
Expand All @@ -173,15 +175,17 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
// Quorum
authorizationList: config.AuthorizationList,
raftMode: config.RaftMode,
Expand Down Expand Up @@ -285,22 +289,57 @@ func newHandler(config *handlerConfig) (*handler, error) {
return h, nil
}

// protoTracker tracks the number of active protocol handlers.
func (h *handler) protoTracker() {
defer h.wg.Done()
var active int
for {
select {
case <-h.handlerStartCh:
active++
case <-h.handlerDoneCh:
active--
case <-h.quitSync:
// Wait for all active handlers to finish.
for ; active > 0; active-- {
<-h.handlerDoneCh
}
return
}
}
}

// incHandlers signals to increment the number of active handlers if not
// quitting.
func (h *handler) incHandlers() bool {
select {
case h.handlerStartCh <- struct{}{}:
return true
case <-h.quitSync:
return false
}
}

// decHandlers signals to decrement the number of active handlers.
func (h *handler) decHandlers() {
h.handlerDoneCh <- struct{}{}
}

// runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
// various subsistems and starts handling messages.
func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
if !h.incHandlers() {
return p2p.DiscQuitting
}
defer h.decHandlers()

// If the peer has a `snap` extension, wait for it to connect so we can have
// a uniform initialization/teardown mechanism
snap, err := h.peers.waitSnapExtension(peer)
if err != nil {
peer.Log().Error("Snapshot extension barrier failed", "err", err)
return err
}
// TODO(karalabe): Not sure why this is needed
if !h.chainSync.handlePeerEvent(peer) {
return p2p.DiscQuitting
}
h.peerWG.Add(1)
defer h.peerWG.Done()

// Execute the Ethereum handshake
var (
Expand Down Expand Up @@ -366,7 +405,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
return err
}
}
h.chainSync.handlePeerEvent(peer)
h.chainSync.handlePeerEvent()

// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
Expand Down Expand Up @@ -411,8 +450,10 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// `eth`, all subsystem registrations and lifecycle management will be done by
// the main `eth` handler to prevent strange races.
func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error {
h.peerWG.Add(1)
defer h.peerWG.Done()
if !h.incHandlers() {
return p2p.DiscQuitting
}
defer h.decHandlers()

if err := h.peers.registerSnapExtension(peer); err != nil {
peer.Log().Error("Snapshot extension registration failed", "err", err)
Expand Down Expand Up @@ -492,6 +533,10 @@ func (h *handler) Start(maxPeers int) {
h.wg.Add(2)
go h.chainSync.loop()
go h.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.

// start peer handler tracker
h.wg.Add(1)
go h.protoTracker()
}

func (h *handler) Stop() {
Expand All @@ -504,14 +549,13 @@ func (h *handler) Stop() {
// Quit chainSync and txsync64.
// After this is done, no new peers will be accepted.
close(h.quitSync)
h.wg.Wait()

// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to h.peers yet
// will exit when they try to register.
h.peers.close()
h.peerWG.Wait()
h.wg.Wait()

log.Info("Ethereum protocol stopped")
}
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td
// Update the peer's total difficulty if better than the previous
if _, td := peer.Head(); trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueTD)
h.chainSync.handlePeerEvent(peer)
h.chainSync.handlePeerEvent()
}
return nil
}
21 changes: 12 additions & 9 deletions eth/handler_qlight_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,18 @@ func newQLightClientHandler(config *handlerConfig) (*handler, error) {
// runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
// various subsistems and starts handling messages.
func (h *handler) runQLightClientPeer(peer *qlightproto.Peer, handler qlightproto.Handler) error {
if !h.incHandlers() {
return p2p.DiscQuitting
}
defer h.decHandlers()

// If the peer has a `snap` extension, wait for it to connect so we can have
// a uniform initialization/teardown mechanism
snap, err := h.peers.waitSnapExtension(peer.EthPeer)
if err != nil {
peer.Log().Error("Snapshot extension barrier failed", "err", err)
return err
}
// TODO(karalabe): Not sure why this is needed
if !h.chainSync.handlePeerEvent(peer.EthPeer) {
return p2p.DiscQuitting
}
h.peerWG.Add(1)
defer h.peerWG.Done()

// Execute the Ethereum handshake
var (
Expand Down Expand Up @@ -271,7 +270,7 @@ func (h *handler) runQLightClientPeer(peer *qlightproto.Peer, handler qlightprot
return err
}
}
h.chainSync.handlePeerEvent(peer.EthPeer)
h.chainSync.handlePeerEvent()

// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
Expand Down Expand Up @@ -325,6 +324,10 @@ func (h *handler) StartQLightClient() {
// start sync handlers
h.wg.Add(1)
go h.chainSync.loop()

// start peer handler tracker
h.wg.Add(1)
go h.protoTracker()
}

func (h *handler) StopQLightClient() {
Expand All @@ -334,14 +337,14 @@ func (h *handler) StopQLightClient() {
// Quit chainSync and txsync64.
// After this is done, no new peers will be accepted.
close(h.quitSync)
h.wg.Wait()

// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to h.peers yet
// will exit when they try to register.
h.peers.close()
h.peerWG.Wait()
h.wg.Wait()

log.Info("QLight client protocol stopped")
}

Expand Down
14 changes: 10 additions & 4 deletions eth/handler_qlight_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ func newQLightServerHandler(config *handlerConfig) (*handler, error) {
// runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
// various subsistems and starts handling messages.
func (h *handler) runQLightServerPeer(peer *qlightproto.Peer, handler qlightproto.Handler) error {
h.peerWG.Add(1)
defer h.peerWG.Done()
if !h.incHandlers() {
return p2p.DiscQuitting
}
defer h.decHandlers()

// Execute the Ethereum handshake
var (
Expand Down Expand Up @@ -184,20 +186,24 @@ func (h *handler) StartQLightServer(maxPeers int) {
h.wg.Add(1)
go h.newBlockBroadcastLoop()

// start peer handler tracker
h.wg.Add(1)
go h.protoTracker()

h.authProvider.Initialize()
}

func (h *handler) StopQLightServer() {
h.txsSub.Unsubscribe()
close(h.quitSync)
h.wg.Wait()

// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to h.peers yet
// will exit when they try to register.
h.peers.close()
h.peerWG.Wait()
h.wg.Wait()

log.Info("QLight server protocol stopped")
}

Expand Down
2 changes: 1 addition & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func newChainSyncer(handler *handler) *chainSyncer {
// handlePeerEvent notifies the syncer about a change in the peer set.
// This is called for new peers and every time a peer announces a new
// chain head.
func (cs *chainSyncer) handlePeerEvent(peer *eth.Peer) bool {
func (cs *chainSyncer) handlePeerEvent() bool {
select {
case cs.peerEventCh <- struct{}{}:
return true
Expand Down

0 comments on commit 56381cf

Please sign in to comment.