diff --git a/common/test_params.go b/common/test_params.go new file mode 100644 index 0000000..a89ac0b --- /dev/null +++ b/common/test_params.go @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ + +package common + +import "time" + +// handshake test cases +const ( + HandshakeNormal = iota + Handshake_StopAfterSendVersion + Handshake_StopAfterReceiveVersion + Handshake_StopAfterUpdateKad + Handshake_StopAfterReadKad + Handshake_StopAfterSendAck + Handshake_StopAfterReadAck +) + +var ( + HandshakeLevel uint8 = HandshakeNormal // default normal + HandshakeDuration time.Duration = time.Duration(10) * time.Second // default value: 10 sec +) + +func SetHandshakeLevel(lvl uint8) { + HandshakeLevel = lvl +} +func StopHandshake(lvl uint8) bool { + return HandshakeLevel == lvl +} +func SetHandshakeDuraion(sec int) { + HandshakeDuration = time.Duration(sec) * time.Second +} + +// heartbeat test cases +var HeartbeatBlockHeight uint64 = 358 // default 100000 +func SetHeartbeatBlockHeight(height uint64) { + HeartbeatBlockHeight = height +} diff --git a/config.json b/config.json index 28b0e83..4748ec2 100644 --- a/config.json +++ b/config.json @@ -1,4 +1,16 @@ { + "Seed":[ + "172.168.3.158:20338", + "172.168.3.159:20338", + "172.168.3.160:20338", + "172.168.3.161:20338" + ], + "Sync":[ + "172.168.3.162:20338", + "172.168.3.163:20338", + "172.168.3.164:20338", + "172.168.3.165:20338" + ], "Net":{ "ReservedPeersOnly":false, "ReservedCfg":{ @@ -7,37 +19,29 @@ "1.2.3.5" ], "mask":[ - "2.3.4.5" + "172.168.3.151", + "172.168.3.152", + "172.168.3.153", + "172.168.3.154", + "172.168.3.155", + "172.168.3.156", + "172.168.3.157" ] }, - "NetworkMagic":0, - "NetworkId":0, + "NetworkMagic":299, + "NetworkId":299, "NetworkName":"", - "NodePort":0, - "IsTLS":false + "NodePort":20338, + "HttpInfoPort":20446, + "IsTLS":false, + "MaxHdrSyncReqs":1024, + "MaxConnInBound":1024, + "MaxConnOutBound":1024, + "MaxConnInBoundForSingleIP":1024 }, "Sdk":{ - "JsonRpcAddress":"http://localhost:20336", + "JsonRpcAddress":"http://172.168.3.162:20336", "GasPrice":0, "GasLimit":20000 - }, - "Sync":[ - "172.168.3.151:20338", - "172.168.3.152:20338", - "172.168.3.153:20338", - "172.168.3.154:20338", - "172.168.3.155:20338" - ], - "Seed":[ - "172.168.3.156:20338", - "172.168.3.157:20338", - "172.168.3.158:20338", - "172.168.3.159:20338", - "172.168.3.160:20338", - "172.168.3.161:20338", - "172.168.3.162:20338", - "172.168.3.163:20338", - "172.168.3.164:20338", - "172.168.3.165:20338" - ] + } } \ No newline at end of file diff --git a/config/config.go b/config/config.go index a89da91..d5a30a8 100644 --- a/config/config.go +++ b/config/config.go @@ -22,7 +22,7 @@ import ( "encoding/json" "fmt" log4 "github.com/alecthomas/log4go" - "github.com/ontio/ontology/common/config" + cmf "github.com/ontio/ontology/common/config" "io/ioutil" "os" ) @@ -32,7 +32,7 @@ var DefConfig = NewDHTConfig() type DHTConfig struct { Seed []string Sync []string - Net *config.P2PNodeConfig + Net *cmf.P2PNodeConfig Sdk *SDKConfig } @@ -70,6 +70,7 @@ func (this *DHTConfig) loadConfig(fileName string) error { if err != nil { return fmt.Errorf("json.Unmarshal TestConfig:%s error:%s", data, err) } + cmf.DefConfig.P2PNode = this.Net return nil } diff --git a/dht-tool b/dht-tool index 1433b60..de36451 100755 Binary files a/dht-tool and b/dht-tool differ diff --git a/main.go b/main.go index aa42fe2..e88a332 100644 --- a/main.go +++ b/main.go @@ -39,7 +39,7 @@ var ( func init() { flag.StringVar(&Config, "cfg", "./config.json", "Config of ontology-tool") flag.StringVar(&LogConfig, "lfg", "./log4go.xml", "Log config of ontology-tool") - flag.StringVar(&Methods, "t", "", "methods to run. use ',' to split methods") + flag.StringVar(&Methods, "t", "handshake", "methods to run. use ',' to split methods") flag.Parse() } diff --git a/methods/endpoint.go b/methods/endpoint.go index 5a2e0c1..cddcc6b 100644 --- a/methods/endpoint.go +++ b/methods/endpoint.go @@ -24,4 +24,5 @@ import ( func init() { core.OntTool.RegMethod("demo", Demo) + core.OntTool.RegMethod("handshake", Handshake) } diff --git a/methods/methods.go b/methods/methods.go index fc26714..625d928 100644 --- a/methods/methods.go +++ b/methods/methods.go @@ -20,10 +20,67 @@ package methods import ( log4 "github.com/alecthomas/log4go" + "github.com/ontio/ontology-tool/common" + "github.com/ontio/ontology-tool/config" + "github.com/ontio/ontology-tool/p2pserver/net/netserver" + "github.com/ontio/ontology-tool/p2pserver/net/protocol" + "github.com/ontio/ontology-tool/p2pserver/protocols" + "github.com/ontio/ontology-tool/utils/timer" +) + +var ( + ns *netserver.NetServer + tr *timer.Timer ) func Demo() bool { log4.Info("hello, dht demo") + return true +} + +func setup(protocol p2p.Protocol) { + var err error + + if ns, err = netserver.NewNetServer(protocol, config.DefConfig.Net); err != nil { + log4.Crashf("[NewNetServer] crashed, err %s", err) + } + if err = ns.Start(); err != nil { + log4.Crashf("start netserver failed, err %s", err) + } + + tr = timer.NewTimer(2) +} + +func Handshake() bool { + + // 1. get params from json file + var params struct { + Remote string + HeartbeatTime int + } + if err := getParamsFromJsonFile("./params/Handshake.json", ¶ms); err != nil { + log4.Error("%s", err) + return false + } + + // 2. set common params + common.SetHandshakeDuraion(10) + common.SetHandshakeLevel(common.HandshakeNormal) + common.SetHeartbeatBlockHeight(358) + + // 3. setup p2p.protocols + protocol := protocols.NewOnlyHeartbeatMsgHandler() + setup(protocol) + + // 4. connect and handshake + if err := ns.Connect(params.Remote); err != nil { + log4.Debug("connecting to %s failed, err: %s", params.Remote, err) + return false + } + + // 5. dispatch + dispatch(params.HeartbeatTime) + log4.Info("handshake end!") return true } diff --git a/methods/net_server.go b/methods/net_server.go deleted file mode 100644 index 5225ac5..0000000 --- a/methods/net_server.go +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright (C) 2018 The ontology Authors - * This file is part of The ontology library. - * - * The ontology is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * The ontology is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with The ontology. If not, see . - */ - -package methods - -import ( - "fmt" - log4 "github.com/alecthomas/log4go" - "github.com/ontio/ontology-tool/config" - "github.com/ontio/ontology-tool/p2pserver/common" - "github.com/ontio/ontology-tool/p2pserver/connect_controller" - "github.com/ontio/ontology-tool/p2pserver/message/types" - netsvr "github.com/ontio/ontology-tool/p2pserver/net/netserver" - commCfg "github.com/ontio/ontology/common/config" - // "github.com/ontio/ontology-tool/p2pserver/net/protocol" - "github.com/ontio/ontology-tool/p2pserver/peer" - "net" - "time" -) - -const Version = "" - -type NetServer struct { - base *peer.PeerInfo - listener net.Listener - NetChan chan *types.MsgPayload - Np *netsvr.NbrPeers - - connCtrl *connect_controller.ConnectController - - stopRecvCh chan bool // To stop sync channel -} - -func NewNetServer() (*NetServer, error) { - n := &NetServer{ - NetChan: make(chan *types.MsgPayload, common.CHAN_CAPABILITY), - base: &peer.PeerInfo{}, - Np: netsvr.NewNbrPeers(), - stopRecvCh: make(chan bool), - } - - err := n.init(config.DefConfig.Net, Version) - if err != nil { - return nil, err - } - return n, nil -} - -func (s *NetServer) init(conf *commCfg.P2PNodeConfig, version string) error { - keyId := common.RandPeerKeyId() - - httpInfo := conf.HttpInfoPort - nodePort := conf.NodePort - if nodePort == 0 { - return fmt.Errorf("[p2p]invalid link port") - } - - s.base = peer.NewPeerInfo(keyId.Id, common.PROTOCOL_VERSION, common.SERVICE_NODE, true, httpInfo, - nodePort, 0, version, "") - - option, err := connect_controller.ConnCtrlOptionFromConfig(conf) - if err != nil { - return err - } - s.connCtrl = connect_controller.NewConnectController(s.base, keyId, option) - - syncPort := s.base.Port - if syncPort == 0 { - return fmt.Errorf("[p2p]sync port invalid") - } - s.listener, err = connect_controller.NewListener(syncPort, conf) - if err != nil { - return fmt.Errorf("[p2p]failed to create sync listener") - } - - log4.Info("[p2p]init peer ID to %s", s.base.Id.ToHexString()) - - return nil -} - -func (this *NetServer) handleClientConnection(conn net.Conn) error { - peerInfo, conn, err := this.connCtrl.AcceptConnect(conn) - if err != nil { - return err - } - remotePeer := createPeer(peerInfo, conn) - remotePeer.AttachChan(this.NetChan) - this.ReplacePeer(remotePeer) - - go remotePeer.Link.Rx() - - // todo - // this.protocol.HandleSystemMessage(this, p2p.PeerConnected{Info: remotePeer.Info}) - return nil -} - -func createPeer(info *peer.PeerInfo, conn net.Conn) *peer.Peer { - remotePeer := peer.NewPeer() - remotePeer.SetInfo(info) - remotePeer.Link.UpdateRXTime(time.Now()) - remotePeer.Link.SetAddr(conn.RemoteAddr().String()) - remotePeer.Link.SetConn(conn) - remotePeer.Link.SetID(info.Id) - - return remotePeer -} - -func (this *NetServer) SendTo(p common.PeerId, msg types.Message) { - // todo - //peer := this.GetPeer(p) - //if peer != nil { - // this.Send(peer, msg) - //} -} - -func (this *NetServer) GetPeer(id common.PeerId) *peer.Peer { - return this.Np.GetPeer(id) -} - -func (this *NetServer) ReplacePeer(remotePeer *peer.Peer) { - // todo - //old := this.Np.ReplacePeer(remotePeer, this) - //if old != nil { - // old.Close() - //} -} diff --git a/methods/utils.go b/methods/utils.go index 9f6c74c..12530ef 100644 --- a/methods/utils.go +++ b/methods/utils.go @@ -17,3 +17,26 @@ */ package methods + +import ( + "encoding/json" + "io/ioutil" + "time" +) + +func getParamsFromJsonFile(fileName string, data interface{}) error { + bz, err := ioutil.ReadFile(fileName) + if err != nil { + return err + } + return json.Unmarshal(bz, data) +} + +func dispatch(sec int) { + expire := time.Duration(sec) * time.Second + stop := make(chan struct{}) + tr.Add(expire, func() { + stop <- struct{}{} + }) + <-stop +} diff --git a/p2pserver/connect_controller/connect_controller_test.go b/p2pserver/connect_controller/connect_controller_test.go index 4152d48..38fa0e0 100644 --- a/p2pserver/connect_controller/connect_controller_test.go +++ b/p2pserver/connect_controller/connect_controller_test.go @@ -19,21 +19,18 @@ package connect_controller import ( "fmt" - "net" - "sort" - "sync" - "testing" - "time" - "github.com/ontio/ontology-tool/p2pserver/common" "github.com/ontio/ontology-tool/p2pserver/handshake" "github.com/ontio/ontology-tool/p2pserver/peer" "github.com/stretchr/testify/assert" + "net" + "sort" + "sync" + "testing" ) func init() { common.Difficulty = 1 - handshake.HANDSHAKE_DURATION = 10 * time.Second } type Transport struct { diff --git a/p2pserver/handshake/handshake.go b/p2pserver/handshake/handshake.go index fe90e2c..35e2364 100644 --- a/p2pserver/handshake/handshake.go +++ b/p2pserver/handshake/handshake.go @@ -19,21 +19,19 @@ package handshake import ( "fmt" - "net" - "time" - "github.com/blang/semver" + tcm "github.com/ontio/ontology-tool/common" "github.com/ontio/ontology-tool/p2pserver/common" "github.com/ontio/ontology-tool/p2pserver/message/types" "github.com/ontio/ontology-tool/p2pserver/peer" common2 "github.com/ontio/ontology/common" + "net" + "time" ) -var HANDSHAKE_DURATION = 10 * time.Second // handshake time can not exceed this duration, or will treat as attack. - func HandshakeClient(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Conn) (*peer.PeerInfo, error) { version := newVersion(info) - if err := conn.SetDeadline(time.Now().Add(HANDSHAKE_DURATION)); err != nil { + if err := conn.SetDeadline(time.Now().Add(tcm.HandshakeDuration)); err != nil { return nil, err } defer func() { @@ -45,6 +43,10 @@ func HandshakeClient(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Con if err != nil { return nil, err } + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterSendVersion) { + return nil, fmt.Errorf("client handshake stopped after send version") + } // 2. read version msg, _, err := types.ReadMessage(conn) @@ -55,6 +57,10 @@ func HandshakeClient(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Con if !ok { return nil, fmt.Errorf("expected version message, but got message type: %s", msg.CmdType()) } + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterReceiveVersion) { + return nil, fmt.Errorf("client handshake stopped after receive version") + } // 3. update kadId kid := common.PseudoPeerIdFromUint64(receivedVersion.P.Nonce) @@ -63,6 +69,11 @@ func HandshakeClient(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Con if err != nil { return nil, err } + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterUpdateKad) { + return nil, fmt.Errorf("client handshake stopped after update kad") + } + // 4. read kadkeyid msg, _, err = types.ReadMessage(conn) if err != nil { @@ -72,6 +83,10 @@ func HandshakeClient(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Con if !ok { return nil, fmt.Errorf("handshake failed, expect kad id message, got %s", msg.CmdType()) } + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterReadKad) { + return nil, fmt.Errorf("client handshake stopped after read kad") + } kid = kadKeyId.KadKeyId.Id } @@ -81,13 +96,16 @@ func HandshakeClient(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Con if err != nil { return nil, err } + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterSendAck) { + return nil, fmt.Errorf("client handshake stopped after send ack") + } + // 6. receive verack msg, _, err = types.ReadMessage(conn) if err != nil { return nil, err } - - // 6. receive verack if _, ok := msg.(*types.VerACK); !ok { return nil, fmt.Errorf("handshake failed, expect verack message, got %s", msg.CmdType()) } @@ -97,7 +115,7 @@ func HandshakeClient(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Con func HandshakeServer(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Conn) (*peer.PeerInfo, error) { ver := newVersion(info) - if err := conn.SetDeadline(time.Now().Add(HANDSHAKE_DURATION)); err != nil { + if err := conn.SetDeadline(time.Now().Add(tcm.HandshakeDuration)); err != nil { return nil, err } defer func() { @@ -113,12 +131,20 @@ func HandshakeServer(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Con return nil, fmt.Errorf("[HandshakeServer] expected version message") } version := msg.(*types.Version) + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterReceiveVersion) { + return nil, fmt.Errorf("server handshake stopped after receive version") + } // 2. sendMsg version err = sendMsg(conn, ver) if err != nil { return nil, err } + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterSendVersion) { + return nil, fmt.Errorf("server handshake stopped after send version") + } // 3. read update kadkey id kid := common.PseudoPeerIdFromUint64(version.P.Nonce) @@ -132,11 +158,20 @@ func HandshakeServer(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Con return nil, fmt.Errorf("[HandshakeServer] expected update kadkeyid message") } kid = kadkeyId.KadKeyId.Id + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterReadKad) { + return nil, fmt.Errorf("server handshake stopped after read kad") + } + // 4. sendMsg update kadkey id err = sendMsg(conn, &types.UpdatePeerKeyId{KadKeyId: selfId}) if err != nil { return nil, err } + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterUpdateKad) { + return nil, fmt.Errorf("server handshake stopped after update kad") + } } // 5. read version ack @@ -147,6 +182,10 @@ func HandshakeServer(info *peer.PeerInfo, selfId *common.PeerKeyId, conn net.Con if msg.CmdType() != common.VERACK_TYPE { return nil, fmt.Errorf("[HandshakeServer] expected version ack message") } + // mark: + if tcm.StopHandshake(tcm.Handshake_StopAfterReadAck) { + return nil, fmt.Errorf("server handshake stopped after read ack") + } // 6. sendMsg ack err = sendMsg(conn, &types.VerACK{}) diff --git a/p2pserver/net/netserver/netserver.go b/p2pserver/net/netserver/netserver.go index f72b277..3eddd26 100644 --- a/p2pserver/net/netserver/netserver.go +++ b/p2pserver/net/netserver/netserver.go @@ -213,11 +213,8 @@ func (this *NetServer) Send(p *peer.Peer, msg types.Message) error { } //Connect used to connect net address under sync or cons mode -func (this *NetServer) Connect(addr string) { - err := this.connect(addr) - if err != nil { - log4.Debug("%s connecting to %s failed, err: %s", this.base.Addr, addr, err) - } +func (this *NetServer) Connect(addr string) error { + return this.connect(addr) } //Connect used to connect net address under sync or cons mode diff --git a/p2pserver/net/protocol/server.go b/p2pserver/net/protocol/server.go index df0e3ff..f116bfc 100644 --- a/p2pserver/net/protocol/server.go +++ b/p2pserver/net/protocol/server.go @@ -27,7 +27,7 @@ import ( //P2P represent the net interface of p2p package type P2P interface { - Connect(addr string) + Connect(addr string) error GetHostInfo() *peer.PeerInfo GetID() common.PeerId GetNeighbors() []*peer.Peer diff --git a/p2pserver/p2pserver.go b/p2pserver/p2pserver.go index 61050c0..2135a0c 100644 --- a/p2pserver/p2pserver.go +++ b/p2pserver/p2pserver.go @@ -28,7 +28,6 @@ import ( p2pnet "github.com/ontio/ontology-tool/p2pserver/net/protocol" "github.com/ontio/ontology-tool/p2pserver/protocols" "github.com/ontio/ontology/common/config" - "github.com/ontio/ontology/core/ledger" ) //P2PServer control all network activities @@ -37,10 +36,7 @@ type P2PServer struct { } //NewServer return a new p2pserver according to the pubkey -func NewServer() (*P2PServer, error) { - ld := ledger.DefLedger - - protocol := protocols.NewMsgHandler(ld) +func NewServer(protocol *protocols.MsgHandler) (*P2PServer, error) { n, err := netserver.NewNetServer(protocol, config.DefConfig.P2PNode) if err != nil { return nil, err diff --git a/p2pserver/protocols/heatbeat/heartbeat.go b/p2pserver/protocols/heatbeat/heartbeat.go index b90dd1c..ec1aef4 100644 --- a/p2pserver/protocols/heatbeat/heartbeat.go +++ b/p2pserver/protocols/heatbeat/heartbeat.go @@ -21,27 +21,27 @@ import ( "time" log4 "github.com/alecthomas/log4go" + tcm "github.com/ontio/ontology-tool/common" "github.com/ontio/ontology-tool/p2pserver/common" - msgpack "github.com/ontio/ontology-tool/p2pserver/message/msg_pack" + "github.com/ontio/ontology-tool/p2pserver/message/msg_pack" "github.com/ontio/ontology-tool/p2pserver/message/types" - p2p "github.com/ontio/ontology-tool/p2pserver/net/protocol" + "github.com/ontio/ontology-tool/p2pserver/net/protocol" "github.com/ontio/ontology/common/config" - "github.com/ontio/ontology/core/ledger" ) type HeartBeat struct { net p2p.P2P id common.PeerId quit chan bool - ledger *ledger.Ledger //ledger + height uint64 } -func NewHeartBeat(net p2p.P2P, ld *ledger.Ledger) *HeartBeat { +func NewHeartBeat(net p2p.P2P) *HeartBeat { return &HeartBeat{ id: net.GetID(), net: net, quit: make(chan bool), - ledger: ld, + height: tcm.HeartbeatBlockHeight, } } @@ -68,8 +68,11 @@ func (this *HeartBeat) heartBeatService() { } } +// mark: func (this *HeartBeat) ping() { - height := this.ledger.GetCurrentBlockHeight() + height := this.height + // todo how to increase block height + //atomic.AddUint64(&this.height, 1) ping := msgpack.NewPingMsg(uint64(height)) go this.net.Broadcast(ping) } @@ -93,7 +96,7 @@ func (this *HeartBeat) PingHandle(ctx *p2p.Context, ping *types.Ping) { remotePeer.SetHeight(ping.Height) p2p := ctx.Network() - height := ledger.DefLedger.GetCurrentBlockHeight() + height := this.height p2p.SetHeight(uint64(height)) msg := msgpack.NewPongMsg(uint64(height)) diff --git a/p2pserver/protocols/msg_handler.go b/p2pserver/protocols/msg_handler.go index 2413a98..aed4140 100644 --- a/p2pserver/protocols/msg_handler.go +++ b/p2pserver/protocols/msg_handler.go @@ -69,7 +69,8 @@ func (self *MsgHandler) start(net p2p.P2P) { self.discovery = discovery.NewDiscovery(net, config.DefConfig.P2PNode.ReservedCfg.MaskPeers, 0) seeds := config.DefConfig.Genesis.SeedList self.bootstrap = bootstrap.NewBootstrapService(net, seeds) - self.heatBeat = heatbeat.NewHeartBeat(net, self.ledger) + // mark: + self.heatBeat = heatbeat.NewHeartBeat(net) self.persistRecentPeerService = recent_peers.NewPersistRecentPeerService(net) go self.persistRecentPeerService.Start() go self.blockSync.Start() diff --git a/p2pserver/protocols/only_heartbeat.go b/p2pserver/protocols/only_heartbeat.go new file mode 100644 index 0000000..9b617e6 --- /dev/null +++ b/p2pserver/protocols/only_heartbeat.go @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ + +package protocols + +import ( + log4 "github.com/alecthomas/log4go" + msgCommon "github.com/ontio/ontology-tool/p2pserver/common" + msgTypes "github.com/ontio/ontology-tool/p2pserver/message/types" + "github.com/ontio/ontology-tool/p2pserver/net/protocol" + "github.com/ontio/ontology-tool/p2pserver/protocols/heatbeat" +) + +type OnlyHeartbeatMsgHandler struct { + heatBeat *heatbeat.HeartBeat +} + +func NewOnlyHeartbeatMsgHandler() *OnlyHeartbeatMsgHandler { + return &OnlyHeartbeatMsgHandler{} +} + +func (self *OnlyHeartbeatMsgHandler) start(net p2p.P2P) { + self.heatBeat = heatbeat.NewHeartBeat(net) + go self.heatBeat.Start() +} + +func (self *OnlyHeartbeatMsgHandler) stop() { + self.heatBeat.Stop() +} + +func (self *OnlyHeartbeatMsgHandler) HandleSystemMessage(net p2p.P2P, msg p2p.SystemMessage) { + switch m := msg.(type) { + case p2p.NetworkStart: + self.start(net) + case p2p.PeerConnected: + log4.Debug("peer connected, address: %s, id %d", m.Info.Addr, m.Info.Id.ToUint64()) + case p2p.PeerDisConnected: + log4.Debug("peer disconnected, address: %s, id %d", m.Info.Addr, m.Info.Id.ToUint64()) + case p2p.NetworkStop: + self.stop() + } +} + +func (self *OnlyHeartbeatMsgHandler) HandlePeerMessage(ctx *p2p.Context, msg msgTypes.Message) { + log4.Trace("[p2p]receive message, remote address %s, id %d", ctx.Sender().GetAddr(), ctx.Sender().GetID().ToUint64()) + switch m := msg.(type) { + case *msgTypes.Ping: + self.heatBeat.PingHandle(ctx, m) + case *msgTypes.Pong: + self.heatBeat.PongHandle(ctx, m) + case *msgTypes.NotFound: + log4.Debug("[p2p]receive notFound message, hash is %s", m.Hash.ToHexString()) + default: + msgType := msg.CmdType() + if msgType == msgCommon.VERACK_TYPE || msgType == msgCommon.VERSION_TYPE { + log4.Info("receive message: %s from peer %s", msgType, ctx.Sender().GetAddr()) + } else { + log4.Warn("unknown message handler for the msg: %s", msgType) + } + } +} diff --git a/params/Handshake.json b/params/Handshake.json new file mode 100644 index 0000000..10c1444 --- /dev/null +++ b/params/Handshake.json @@ -0,0 +1,4 @@ +{ + "Remote": "172.168.3.158:20338", + "HeartbeatTime": 20 +} \ No newline at end of file diff --git a/utils/timer/timer.go b/utils/timer/timer.go new file mode 100644 index 0000000..93c4d71 --- /dev/null +++ b/utils/timer/timer.go @@ -0,0 +1,257 @@ +package timer + +import ( + log4 "github.com/alecthomas/log4go" + "sync" + "time" +) + +const ( + timerFormat = "2006-01-02 15:04:05" + infiniteDuration = time.Duration(1<<63 - 1) +) + +// TimerData timer data. +type TimerData struct { + Key string + expire time.Time + fn func() + index int + next *TimerData +} + +// Delay delay duration. +func (td *TimerData) Delay() time.Duration { + return time.Until(td.expire) +} + +// ExpireString expire string. +func (td *TimerData) ExpireString() string { + return td.expire.Format(timerFormat) +} + +// Timer timer. +type Timer struct { + lock sync.Mutex + free *TimerData + timers []*TimerData + signal *time.Timer + num int +} + +// NewTimer new a timer. +// A heap must be initialized before any of the heap operations +// can be used. Init is idempotent with respect to the heap invariants +// and may be called whenever the heap invariants may have been invalidated. +// Its complexity is O(n) where n = h.Len(). +// +func NewTimer(num int) (t *Timer) { + t = new(Timer) + t.init(num) + return t +} + +// Init init the timer. +func (t *Timer) Init(num int) { + t.init(num) +} + +func (t *Timer) init(num int) { + t.signal = time.NewTimer(infiniteDuration) + t.timers = make([]*TimerData, 0, num) + t.num = num + t.grow() + go t.start() +} + +func (t *Timer) grow() { + var ( + i int + td *TimerData + tds = make([]TimerData, t.num) + ) + t.free = &(tds[0]) + td = t.free + for i = 1; i < t.num; i++ { + td.next = &(tds[i]) + td = td.next + } + td.next = nil +} + +// get get a free timer data. +func (t *Timer) get() (td *TimerData) { + if td = t.free; td == nil { + t.grow() + td = t.free + } + t.free = td.next + return +} + +// put put back a timer data. +func (t *Timer) put(td *TimerData) { + td.fn = nil + td.next = t.free + t.free = td +} + +// Add add the element x onto the heap. The complexity is +// O(log(n)) where n = h.Len(). +func (t *Timer) Add(expire time.Duration, fn func()) (td *TimerData) { + t.lock.Lock() + td = t.get() + td.expire = time.Now().Add(expire) + td.fn = fn + t.add(td) + t.lock.Unlock() + return +} + +// Del removes the element at index i from the heap. +// The complexity is O(log(n)) where n = h.Len(). +func (t *Timer) Del(td *TimerData) { + t.lock.Lock() + t.del(td) + t.put(td) + t.lock.Unlock() +} + +// Push pushes the element x onto the heap. The complexity is +// O(log(n)) where n = h.Len(). +func (t *Timer) add(td *TimerData) { + var d time.Duration + td.index = len(t.timers) + // add to the minheap last node + t.timers = append(t.timers, td) + t.up(td.index) + if td.index == 0 { + // if first node, signal start goroutine + d = td.Delay() + t.signal.Reset(d) + debug("timer: add reset delay %d ms", int64(d)/int64(time.Millisecond)) + } + debug("timer: push item key: %s, expire: %s, index: %d", td.Key, td.ExpireString(), td.index) +} + +func (t *Timer) del(td *TimerData) { + var ( + i = td.index + last = len(t.timers) - 1 + ) + if i < 0 || i > last || t.timers[i] != td { + // already remove, usually by expire + debug("timer del i: %d, last: %d, %p", i, last, td) + return + } + if i != last { + t.swap(i, last) + t.down(i, last) + t.up(i) + } + // remove item is the last node + t.timers[last].index = -1 // for safety + t.timers = t.timers[:last] + debug("timer: remove item key: %s, expire: %s, index: %d", td.Key, td.ExpireString(), td.index) +} + +// Set update timer data. +func (t *Timer) Set(td *TimerData, expire time.Duration) { + t.lock.Lock() + t.del(td) + td.expire = time.Now().Add(expire) + t.add(td) + t.lock.Unlock() +} + +// start start the timer. +func (t *Timer) start() { + for { + t.expire() + <-t.signal.C + } +} + +// expire removes the minimum element (according to Less) from the heap. +// The complexity is O(log(n)) where n = max. +// It is equivalent to Del(0). +func (t *Timer) expire() { + var ( + fn func() + td *TimerData + d time.Duration + ) + t.lock.Lock() + for { + if len(t.timers) == 0 { + d = infiniteDuration + debug("timer: no other instance") + break + } + td = t.timers[0] + if d = td.Delay(); d > 0 { + break + } + fn = td.fn + // let caller put back + t.del(td) + t.lock.Unlock() + if fn == nil { + debug("expire timer no fn") + } else { + debug("timer key: %s, expire: %s, index: %d expired, call fn", td.Key, td.ExpireString(), td.index) + fn() + } + t.lock.Lock() + } + t.signal.Reset(d) + debug("timer: expier reset delay %d ms", int64(d)/int64(time.Millisecond)) + t.lock.Unlock() +} + +func (t *Timer) up(j int) { + for { + i := (j - 1) / 2 // parent + if i <= j || !t.less(j, i) { + break + } + t.swap(i, j) + j = i + } +} + +func (t *Timer) down(i, n int) { + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && !t.less(j1, j2) { + j = j2 // = 2*i + 2 // right child + } + if !t.less(j, i) { + break + } + t.swap(i, j) + i = j + } +} + +func (t *Timer) less(i, j int) bool { + return t.timers[i].expire.Before(t.timers[j].expire) +} + +func (t *Timer) swap(i, j int) { + t.timers[i], t.timers[j] = t.timers[j], t.timers[i] + t.timers[i].index = i + t.timers[j].index = j +} + +const debugOpen = false + +func debug(format string, args ...interface{}) { + if debugOpen { + log4.Debug(format, args...) + } +}