From 8bb436447595a6001f7505c217975365dd013175 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Fri, 15 Nov 2024 12:25:02 +0700 Subject: [PATCH] fix: utp initial in test case --- cmd/shisui/main.go | 21 ++++++------------- internal/debug/flags.go | 8 +++---- p2p/discover/portal_protocol.go | 15 ++++++++----- p2p/discover/portal_protocol_test.go | 10 ++++----- p2p/discover/portal_utp.go | 20 ++++++------------ portalnetwork/beacon/test_utils.go | 4 +++- portalnetwork/history/history_network_test.go | 5 +++-- 7 files changed, 36 insertions(+), 47 deletions(-) diff --git a/cmd/shisui/main.go b/cmd/shisui/main.go index 7f1aff73e413..9f369cc4797e 100644 --- a/cmd/shisui/main.go +++ b/cmd/shisui/main.go @@ -123,9 +123,6 @@ func shisui(ctx *cli.Context) error { if err != nil { return err } - go func() { - debug.Setup(ctx) - }() // Start metrics export if enabled utils.SetupMetrics(ctx) @@ -397,11 +394,9 @@ func initHistory(config Config, server *rpc.Server, conn discover.UDPConn, local conn, localNode, discV5, + utp, contentStorage, - contentQueue, - func(p *discover.PortalProtocol) { - p.Utp = utp - }) + contentQueue) if err != nil { return nil, err @@ -450,11 +445,9 @@ func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localN conn, localNode, discV5, + utp, contentStorage, - contentQueue, - func(p *discover.PortalProtocol) { - p.Utp = utp - }) + contentQueue) if err != nil { return nil, err @@ -496,11 +489,9 @@ func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNo conn, localNode, discV5, + utp, stateStore, - contentQueue, - func(p *discover.PortalProtocol) { - p.Utp = utp - }) + contentQueue) if err != nil { return nil, err diff --git a/internal/debug/flags.go b/internal/debug/flags.go index 8c98e6fbdcd3..5e4f97e074db 100644 --- a/internal/debug/flags.go +++ b/internal/debug/flags.go @@ -252,8 +252,8 @@ func Setup(ctx *cli.Context) error { glogger = log.NewGlogHandler(handler) // logging - //verbosity := log.FromLegacyLevel(ctx.Int(verbosityFlag.Name)) - //glogger.Verbosity(verbosity) + verbosity := log.FromLegacyLevel(ctx.Int(verbosityFlag.Name)) + glogger.Verbosity(verbosity) vmodule := ctx.String(logVmoduleFlag.Name) if vmodule == "" { // Retain backwards compatibility with `--vmodule` flag if `--log.vmodule` not set @@ -264,7 +264,7 @@ func Setup(ctx *cli.Context) error { } glogger.Vmodule(vmodule) - //log.SetDefault(log.NewLogger(glogger)) + log.SetDefault(log.NewLogger(glogger)) // profiling, tracing runtime.MemProfileRate = memprofilerateFlag.Value @@ -312,7 +312,7 @@ func StartPProf(address string, withMetrics bool) { } log.Info("Starting pprof server", "addr", fmt.Sprintf("http://%s/debug/pprof", address)) go func() { - if err := http.ListenAndServe(address, nil); err != nil { + if err := http.ListenAndServe("0.0.0.0:8080", nil); err != nil { log.Error("Failure in running pprof server", "err", err) } }() diff --git a/p2p/discover/portal_protocol.go b/p2p/discover/portal_protocol.go index ec96b38f4c26..b1be63233c4e 100644 --- a/p2p/discover/portal_protocol.go +++ b/p2p/discover/portal_protocol.go @@ -201,7 +201,7 @@ func defaultContentIdFunc(contentKey []byte) []byte { return digest[:] } -func NewPortalProtocol(config *PortalProtocolConfig, protocolId portalwire.ProtocolId, privateKey *ecdsa.PrivateKey, conn UDPConn, localNode *enode.LocalNode, discV5 *UDPv5, storage storage.ContentStorage, contentQueue chan *ContentElement, opts ...PortalProtocolOption) (*PortalProtocol, error) { +func NewPortalProtocol(config *PortalProtocolConfig, protocolId portalwire.ProtocolId, privateKey *ecdsa.PrivateKey, conn UDPConn, localNode *enode.LocalNode, discV5 *UDPv5, utp *PortalUtp, storage storage.ContentStorage, contentQueue chan *ContentElement, opts ...PortalProtocolOption) (*PortalProtocol, error) { closeCtx, cancelCloseCtx := context.WithCancel(context.Background()) protocol := &PortalProtocol{ @@ -222,6 +222,7 @@ func NewPortalProtocol(config *PortalProtocolConfig, protocolId portalwire.Proto offerQueue: make(chan *OfferRequestWithNode, concurrentOffers), conn: conn, DiscV5: discV5, + Utp: utp, NAT: config.NAT, clock: config.clock, connIdGen: libutp.NewConnIdGenerator(), @@ -247,7 +248,9 @@ func (p *PortalProtocol) Start() error { } p.DiscV5.RegisterTalkHandler(p.protocolId, p.handleTalkRequest) - err = p.Utp.Start() + if p.Utp != nil { + err = p.Utp.Start() + } if err != nil { return err } @@ -268,7 +271,9 @@ func (p *PortalProtocol) Stop() { p.cancelCloseCtx() p.table.close() p.DiscV5.Close() - p.Utp.Stop() + if p.Utp != nil { + p.Utp.Stop() + } } func (p *PortalProtocol) RoutingTableInfo() [][]string { p.table.mutex.Lock() @@ -1146,7 +1151,7 @@ func (p *PortalProtocol) handleFindContent(id enode.ID, addr *net.UDPAddr, reque }(p.closeCtx, connectionId) idBuffer := make([]byte, 2) - binary.BigEndian.PutUint16(idBuffer, uint16(connectionId.SendId())) + binary.BigEndian.PutUint16(idBuffer, connectionId.SendId()) connIdMsg := &portalwire.ConnectionId{ Id: idBuffer, } @@ -1286,7 +1291,7 @@ func (p *PortalProtocol) handleOffer(id enode.ID, addr *net.UDPAddr, request *po } }(p.closeCtx, connectionId) - binary.BigEndian.PutUint16(idBuffer, uint16(connectionId.SendId())) + binary.BigEndian.PutUint16(idBuffer, connectionId.SendId()) } else { binary.BigEndian.PutUint16(idBuffer, uint16(0)) } diff --git a/p2p/discover/portal_protocol_test.go b/p2p/discover/portal_protocol_test.go index b7db661257d7..c212677dab75 100644 --- a/p2p/discover/portal_protocol_test.go +++ b/p2p/discover/portal_protocol_test.go @@ -94,11 +94,9 @@ func setupLocalPortalNode(addr string, bootNodes []*enode.Node) (*PortalProtocol conn, localNode, discV5, + utpSocket, &storage.MockStorage{Db: make(map[string][]byte)}, - contentQueue, - func(p *PortalProtocol) { - p.Utp = utpSocket - }) + contentQueue) if err != nil { return nil, err } @@ -195,7 +193,7 @@ func TestPortalWireProtocolUdp(t *testing.T) { _ = connWithConnId.Close() } }() - connWithConnId, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), uint16(cid1.SendId())) + connWithConnId, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), cid1.SendId()) if err != nil { panic(err) } @@ -218,7 +216,7 @@ func TestPortalWireProtocolUdp(t *testing.T) { _ = ConnId2Conn.Close() } }() - ConnId2Conn, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), uint16(cid2.SendId())) + ConnId2Conn, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), cid2.SendId()) if err != nil && err != io.EOF { panic(err) } diff --git a/p2p/discover/portal_utp.go b/p2p/discover/portal_utp.go index 8278972f8265..e8c8e8f74ccd 100644 --- a/p2p/discover/portal_utp.go +++ b/p2p/discover/portal_utp.go @@ -42,26 +42,20 @@ func NewPortalUtp(ctx context.Context, config *PortalProtocolConfig, discV5 *UDP } func (p *PortalUtp) Start() error { - errCh := make(chan error, 1) - p.startOnce.Do(func() { - defer func() { - close(errCh) - }() - laddr := p.getLocalAddr() - - p.packetRouter = utp.NewPacketRouter(p.packetRouterFunc) - + var err error + go p.startOnce.Do(func() { var logger *zap.Logger - var err error if p.log.Enabled(p.ctx, log.LevelDebug) || p.log.Enabled(p.ctx, log.LevelTrace) { logger, err = zap.NewDevelopmentConfig().Build() } else { logger, err = zap.NewProductionConfig().Build() } if err != nil { - errCh <- err return } + + laddr := p.getLocalAddr() + p.packetRouter = utp.NewPacketRouter(p.packetRouterFunc) p.utpSm, err = utp.NewSocketManagerWithOptions( "utp", laddr, @@ -70,12 +64,10 @@ func (p *PortalUtp) Start() error { utp.WithPacketRouter(p.packetRouter), utp.WithMaxPacketSize(1145)) if err != nil { - errCh <- err return } p.listener, err = utp.ListenUTPOptions("utp", (*utp.Addr)(laddr), utp.WithSocketManager(p.utpSm)) if err != nil { - errCh <- err return } p.lAddr = p.listener.Addr().(*utp.Addr) @@ -84,7 +76,7 @@ func (p *PortalUtp) Start() error { p.discV5.RegisterTalkHandler(string(portalwire.Utp), p.handleUtpTalkRequest) }) - return <-errCh + return err } func (p *PortalUtp) Stop() { diff --git a/portalnetwork/beacon/test_utils.go b/portalnetwork/beacon/test_utils.go index 9f0550e54871..5235b79ffdf4 100644 --- a/portalnetwork/beacon/test_utils.go +++ b/portalnetwork/beacon/test_utils.go @@ -2,6 +2,7 @@ package beacon import ( "bytes" + "context" "fmt" "net" "os" @@ -65,7 +66,8 @@ func SetupBeaconNetwork(addr string, bootNodes []*enode.Node) (*BeaconNetwork, e contentQueue := make(chan *discover.ContentElement, 50) - portalProtocol, err := discover.NewPortalProtocol(conf, portalwire.Beacon, privKey, conn, localNode, discV5, &storage.MockStorage{Db: make(map[string][]byte)}, contentQueue) + utpSocket := discover.NewPortalUtp(context.Background(), conf, discV5, conn) + portalProtocol, err := discover.NewPortalProtocol(conf, portalwire.Beacon, privKey, conn, localNode, discV5, utpSocket, &storage.MockStorage{Db: make(map[string][]byte)}, contentQueue) if err != nil { return nil, err } diff --git a/portalnetwork/history/history_network_test.go b/portalnetwork/history/history_network_test.go index e1bba686313a..38b816e9cb10 100644 --- a/portalnetwork/history/history_network_test.go +++ b/portalnetwork/history/history_network_test.go @@ -2,6 +2,7 @@ package history import ( "bytes" + "context" "crypto/sha256" _ "embed" "encoding/json" @@ -334,8 +335,8 @@ func genHistoryNetwork(addr string, bootNodes []*enode.Node) (*HistoryNetwork, e } contentQueue := make(chan *discover.ContentElement, 50) - - portalProtocol, err := discover.NewPortalProtocol(conf, portalwire.History, privKey, conn, localNode, discV5, &storage.MockStorage{Db: make(map[string][]byte)}, contentQueue) + utpSocket := discover.NewPortalUtp(context.Background(), conf, discV5, conn) + portalProtocol, err := discover.NewPortalProtocol(conf, portalwire.History, privKey, conn, localNode, discV5, utpSocket, &storage.MockStorage{Db: make(map[string][]byte)}, contentQueue) if err != nil { return nil, err }