Skip to content

Commit

Permalink
Merge branch 'EN-5556-antiflood-feedback-loop-conn-mngr' into frankie…
Browse files Browse the repository at this point in the history
…1-week1
  • Loading branch information
iulianpascalau committed Jan 3, 2020
2 parents aaad02b + aadfc74 commit 7e59fa8
Show file tree
Hide file tree
Showing 137 changed files with 5,311 additions and 748 deletions.
14 changes: 14 additions & 0 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@
Size = 1000
Type = "LRU"

[Antiflood]
PeerMaxMessagesPerSecond = 75
PeerMaxTotalSizePerSecond = 2097152
MaxMessagesPerSecond = 400
MaxTotalSizePerSecond = 4194304
[Antiflood.Cache]
Size = 5000
Type = "LRU"
[Antiflood.BlackList]
ThresholdNumMessagesPerSecond = 150
ThresholdSizePerSecond = 4194304
NumFloodingRounds = 10
PeerBanDurationInSeconds = 300

[Logger]
Path = "logs"
StackTraceDepth = 2
Expand Down
124 changes: 120 additions & 4 deletions cmd/node/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/ntp"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/ElrondNetwork/elrond-go/p2p/antiflood"
"github.com/ElrondNetwork/elrond-go/p2p/libp2p"
factoryP2P "github.com/ElrondNetwork/elrond-go/p2p/libp2p/factory"
"github.com/ElrondNetwork/elrond-go/p2p/loadBalancer"
Expand All @@ -73,15 +74,17 @@ import (
"github.com/ElrondNetwork/elrond-go/process/smartContract"
"github.com/ElrondNetwork/elrond-go/process/smartContract/hooks"
processSync "github.com/ElrondNetwork/elrond-go/process/sync"
processAntiflood "github.com/ElrondNetwork/elrond-go/process/throttle/antiflood"
"github.com/ElrondNetwork/elrond-go/process/transaction"
"github.com/ElrondNetwork/elrond-go/sharding"
"github.com/ElrondNetwork/elrond-go/statusHandler"
"github.com/ElrondNetwork/elrond-go/statusHandler/p2pQuota"
"github.com/ElrondNetwork/elrond-go/storage"
storageFactory "github.com/ElrondNetwork/elrond-go/storage/factory"
"github.com/ElrondNetwork/elrond-go/storage/memorydb"
"github.com/ElrondNetwork/elrond-go/storage/storageUnit"
"github.com/ElrondNetwork/elrond-go/storage/timecache"
"github.com/ElrondNetwork/elrond-vm-common"
vmcommon "github.com/ElrondNetwork/elrond-vm-common"
"github.com/btcsuite/btcd/btcec"
libp2pCrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/urfave/cli"
Expand Down Expand Up @@ -117,7 +120,8 @@ var timeSpanForBadHeaders = time.Minute * 2

// Network struct holds the network components of the Elrond protocol
type Network struct {
NetMessenger p2p.Messenger
NetMessenger p2p.Messenger
AntifloodHandler consensus.P2PAntifloodHandler
}

// Core struct holds the core components of the Elrond protocol
Expand Down Expand Up @@ -473,7 +477,7 @@ func CryptoComponentsFactory(args *cryptoComponentsFactoryArgs) (*Crypto, error)
}

// NetworkComponentsFactory creates the network components
func NetworkComponentsFactory(p2pConfig *config.P2PConfig, log logger.Logger, core *Core) (*Network, error) {
func NetworkComponentsFactory(p2pConfig *config.P2PConfig, mainConfig *config.Config, log logger.Logger, core *Core) (*Network, error) {
var randReader io.Reader
if p2pConfig.Node.Seed != "" {
randReader = NewSeedRandReader(core.Hasher.Compute(p2pConfig.Node.Seed))
Expand All @@ -486,11 +490,119 @@ func NetworkComponentsFactory(p2pConfig *config.P2PConfig, log logger.Logger, co
return nil, err
}

antifloodHandler, p2pPeerBlackList, err := createAntifloodAndBlackListComponents(mainConfig, core.StatusHandler)
if err != nil {
return nil, err
}

err = netMessenger.ApplyOptions(
libp2p.WithPeerBlackList(p2pPeerBlackList),
)
if err != nil {
return nil, err
}

return &Network{
NetMessenger: netMessenger,
NetMessenger: netMessenger,
AntifloodHandler: antifloodHandler,
}, nil
}

func createAntifloodAndBlackListComponents(
mainConfig *config.Config,
status core.AppStatusHandler,
) (consensus.P2PAntifloodHandler, p2p.BlacklistHandler, error) {

cacheConfig := storageFactory.GetCacherFromConfig(mainConfig.Antiflood.Cache)
antifloodCache, err := storageUnit.NewCache(cacheConfig.Type, cacheConfig.Size, cacheConfig.Shards)
if err != nil {
return nil, nil, err
}

blackListCache, err := storageUnit.NewCache(cacheConfig.Type, cacheConfig.Size, cacheConfig.Shards)
if err != nil {
return nil, nil, err
}

peerMaxMessagesPerSecond := mainConfig.Antiflood.PeerMaxMessagesPerSecond
peerMaxTotalSizePerSecond := mainConfig.Antiflood.PeerMaxTotalSizePerSecond
maxMessagesPerSecond := mainConfig.Antiflood.MaxMessagesPerSecond
maxTotalSizePerSecond := mainConfig.Antiflood.MaxTotalSizePerSecond

quotaProcessor, err := p2pQuota.NewP2pQuotaProcessor(status)
if err != nil {
return nil, nil, err
}

peerBanInSeconds := mainConfig.Antiflood.BlackList.PeerBanDurationInSeconds
if peerBanInSeconds == 0 {
return nil, nil, fmt.Errorf("Antiflood.BlackList.PeerBanDurationInSeconds should be greater than 0")
}

p2pPeerBlackList := timecache.NewTimeCache(time.Second * time.Duration(peerBanInSeconds))
blackListProcessor, err := processAntiflood.NewP2pBlackListProcessor(
blackListCache,
p2pPeerBlackList,
mainConfig.Antiflood.BlackList.ThresholdNumMessagesPerSecond,
mainConfig.Antiflood.BlackList.ThresholdSizePerSecond,
mainConfig.Antiflood.BlackList.NumFloodingRounds,
)
if err != nil {
return nil, nil, err
}

floodPreventer, err := processAntiflood.NewQuotaFloodPreventer(
antifloodCache,
[]processAntiflood.QuotaStatusHandler{quotaProcessor, blackListProcessor},
peerMaxMessagesPerSecond,
peerMaxTotalSizePerSecond,
maxMessagesPerSecond,
maxTotalSizePerSecond,
)
if err != nil {
return nil, nil, err
}

log.Debug("started antiflood & blacklist components",
"peerMaxMessagesPerSecond", peerMaxMessagesPerSecond,
"peerMaxTotalSizePerSecond", core.ConvertBytes(peerMaxTotalSizePerSecond),
"maxMessagesPerSecond", maxMessagesPerSecond,
"maxTotalSizePerSecond", core.ConvertBytes(maxTotalSizePerSecond),
"peerBanDurationInSeconds", peerBanInSeconds,
"thresholdNumMessagesPerSecond", mainConfig.Antiflood.BlackList.ThresholdNumMessagesPerSecond,
"thresholdSizePerSecond", mainConfig.Antiflood.BlackList.ThresholdSizePerSecond,
"numFloodingRounds", mainConfig.Antiflood.BlackList.NumFloodingRounds,
)

startResetingFloodPreventer(floodPreventer)
startSweepingP2pPeerBlackList(p2pPeerBlackList)

p2pAntiflood, err := antiflood.NewP2pAntiflood(floodPreventer)
if err != nil {
return nil, nil, err
}

return p2pAntiflood, p2pPeerBlackList, nil
}

func startResetingFloodPreventer(floodPreventer p2p.FloodPreventer) {
go func() {
for {
time.Sleep(time.Second)
floodPreventer.Reset()
}
}()
}

func startSweepingP2pPeerBlackList(p2pPeerBlackList process.BlackListHandler) {
go func() {
for {
time.Sleep(time.Second * 5)
p2pPeerBlackList.Sweep()
}
}()
}

type processComponentsFactoryArgs struct {
coreComponents *coreComponentsFactoryArgs
genesisConfig *sharding.Genesis
Expand Down Expand Up @@ -1301,6 +1413,7 @@ func newShardInterceptorAndResolverContainerFactory(
headerSigVerifier,
core.ChainID,
sizeCheckDelta,
network.AntifloodHandler,
)
if err != nil {
return nil, nil, nil, err
Expand All @@ -1320,6 +1433,7 @@ func newShardInterceptorAndResolverContainerFactory(
core.Uint64ByteSliceConverter,
dataPacker,
sizeCheckDelta,
network.AntifloodHandler,
)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -1362,6 +1476,7 @@ func newMetaInterceptorAndResolverContainerFactory(
headerSigVerifier,
core.ChainID,
sizeCheckDelta,
network.AntifloodHandler,
)
if err != nil {
return nil, nil, nil, err
Expand All @@ -1381,6 +1496,7 @@ func newMetaInterceptorAndResolverContainerFactory(
core.Uint64ByteSliceConverter,
dataPacker,
sizeCheckDelta,
network.AntifloodHandler,
)
if err != nil {
return nil, nil, nil, err
Expand Down
3 changes: 2 additions & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {
log.LogIfError(err)

log.Trace("creating network components")
networkComponents, err := factory.NetworkComponentsFactory(p2pConfig, log, coreComponents)
networkComponents, err := factory.NetworkComponentsFactory(p2pConfig, generalConfig, log, coreComponents)
if err != nil {
return err
}
Expand Down Expand Up @@ -1177,6 +1177,7 @@ func createNode(
node.WithHeaderSigVerifier(process.HeaderSigVerifier),
node.WithValidatorStatistics(process.ValidatorsStatistics),
node.WithChainID(core.ChainID),
node.WithAntifloodHandler(network.AntifloodHandler),
)
if err != nil {
return nil, errors.New("error creating node: " + err.Error())
Expand Down
19 changes: 19 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type Config struct {
ShardHeadersDataPool CacheConfig
MetaHeaderNoncesDataPool CacheConfig

Antiflood AntifloodConfig
EpochStartConfig EpochStartConfig
Logger LoggerConfig
Address AddressConfig
Expand Down Expand Up @@ -192,3 +193,21 @@ type FacadeConfig struct {
RestApiInterface string
PprofEnabled bool
}

// BlackListConfig will hold the p2p peer black list threshold values
type BlackListConfig struct {
ThresholdNumMessagesPerSecond uint32
ThresholdSizePerSecond uint64
NumFloodingRounds uint32
PeerBanDurationInSeconds uint32
}

// AntifloodConfig will hold all p2p antiflood parameters
type AntifloodConfig struct {
Cache CacheConfig
BlackList BlackListConfig
PeerMaxMessagesPerSecond uint32
PeerMaxTotalSizePerSecond uint64
MaxMessagesPerSecond uint32
MaxTotalSizePerSecond uint64
}
8 changes: 8 additions & 0 deletions consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/p2p"
)

// Rounder defines the actions which should be handled by a round implementation
Expand Down Expand Up @@ -67,3 +68,10 @@ type P2PMessenger interface {
Broadcast(topic string, buff []byte)
IsInterfaceNil() bool
}

// P2PAntifloodHandler defines the behavior of a component able to signal that the system is too busy (or flooded) processing
// p2p messages
type P2PAntifloodHandler interface {
CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer p2p.PeerID) error
IsInterfaceNil() bool
}
15 changes: 15 additions & 0 deletions consensus/mock/p2pAntifloodHandlerStub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package mock

import "github.com/ElrondNetwork/elrond-go/p2p"

type P2PAntifloodHandlerStub struct {
CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer p2p.PeerID) error
}

func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer p2p.PeerID) error {
return p2pahs.CanProcessMessageCalled(message, fromConnectedPeer)
}

func (p2pahs *P2PAntifloodHandlerStub) IsInterfaceNil() bool {
return p2pahs == nil
}
2 changes: 1 addition & 1 deletion consensus/mock/sposWorkerMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (sposWorkerMock *SposWorkerMock) RemoveAllReceivedMessagesCalls() {
sposWorkerMock.RemoveAllReceivedMessagesCallsCalled()
}

func (sposWorkerMock *SposWorkerMock) ProcessReceivedMessage(message p2p.MessageP2P, _ func(buffToSend []byte)) error {
func (sposWorkerMock *SposWorkerMock) ProcessReceivedMessage(message p2p.MessageP2P, _ p2p.PeerID) error {
return sposWorkerMock.ProcessReceivedMessageCalled(message)
}

Expand Down
3 changes: 3 additions & 0 deletions consensus/spos/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ var ErrNilExecuteStoredMessages = errors.New("executeStoredMessages is nil")
// ErrNilAppStatusHandler defines the error for setting a nil AppStatusHandler
var ErrNilAppStatusHandler = errors.New("nil AppStatusHandler")

// ErrNilAntifloodHandler signals that a nil antiflood handler has been provided
var ErrNilAntifloodHandler = errors.New("nil antiflood handler")

// ErrNilHeaderSigVerifier signals that a nil header sig verifier has been provided
var ErrNilHeaderSigVerifier = errors.New("nil header sig verifier")

Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type WorkerHandler interface {
//RemoveAllReceivedMessagesCalls removes all the functions handlers
RemoveAllReceivedMessagesCalls()
//ProcessReceivedMessage method redirects the received message to the channel which should handle it
ProcessReceivedMessage(message p2p.MessageP2P, broadcastHandler func(buffToSend []byte)) error
ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer p2p.PeerID) error
//Extend does an extension for the subround with subroundId
Extend(subroundId int)
//GetConsensusStateChangedChannel gets the channel for the consensusStateChanged
Expand Down
Loading

0 comments on commit 7e59fa8

Please sign in to comment.