diff --git a/.github/workflows/gh-translator.yml b/.github/workflows/gh-translator.yml new file mode 100644 index 000000000..c3a0f6d92 --- /dev/null +++ b/.github/workflows/gh-translator.yml @@ -0,0 +1,23 @@ +name: 'gh-translator' + +on: + issues: + types: [opened] + pull_request: + types: [opened] + issue_comment: + types: [created, edited] + discussion: + types: [created, edited, answered] + discussion_comment: + types: [created, edited] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: usthe/issues-translate-action@v2.7 + with: + BOT_GITHUB_TOKEN: ${{ secrets.GH_TRANSLATOR_TOKEN }} + IS_MODIFY_TITLE: true + CUSTOM_BOT_NOTE: 🤖 Non-English text detected, translating... diff --git a/acceptor_unix.go b/acceptor_unix.go index cb752c60b..13177a8b8 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -23,6 +23,7 @@ import ( "golang.org/x/sys/unix" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" "github.com/panjf2000/gnet/v2/internal/socket" "github.com/panjf2000/gnet/v2/pkg/errors" "github.com/panjf2000/gnet/v2/pkg/logging" @@ -51,9 +52,9 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { el := eng.eventLoops.next(remoteAddr) c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr) - err = el.poller.UrgentTrigger(el.register, c) + err = el.poller.Trigger(queue.HighPriority, el.register, c) if err != nil { - eng.opts.Logger.Errorf("UrgentTrigger() failed due to error: %v", err) + eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err) _ = unix.Close(nfd) c.release() } diff --git a/acceptor_windows.go b/acceptor_windows.go index 13d551f82..25717e59f 100644 --- a/acceptor_windows.go +++ b/acceptor_windows.go @@ -69,7 +69,7 @@ func (eng *engine) listen() (err error) { } el := eng.eventLoops.next(tc.RemoteAddr()) c := newTCPConn(tc, el) - el.ch <- c + el.ch <- &openConn{c: c} go func(c *conn, tc net.Conn, el *eventloop) { var buffer [0x10000]byte for { diff --git a/client_test.go b/client_test.go index 91f226e03..27c2b9ff5 100644 --- a/client_test.go +++ b/client_test.go @@ -4,6 +4,7 @@ package gnet import ( + "bytes" "io" "math/rand" "net" @@ -21,12 +22,17 @@ import ( goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" ) +type connHandler struct { + network string + rspCh chan []byte + data []byte +} + type clientEvents struct { *BuiltinEventEngine tester *testing.T svr *testClientServer packetLen int - rspChMap sync.Map } func (ev *clientEvents) OnBoot(e Engine) Action { @@ -37,11 +43,11 @@ func (ev *clientEvents) OnBoot(e Engine) Action { return None } -func (ev *clientEvents) OnOpen(c Conn) ([]byte, Action) { - c.SetContext([]byte{}) - rspCh := make(chan []byte, 1) - ev.rspChMap.Store(c.LocalAddr().String(), rspCh) - return nil, None +var pingMsg = []byte("PING\r\n") + +func (ev *clientEvents) OnOpen(Conn) (out []byte, action Action) { + out = pingMsg + return } func (ev *clientEvents) OnClose(Conn, error) Action { @@ -54,24 +60,18 @@ func (ev *clientEvents) OnClose(Conn, error) Action { } func (ev *clientEvents) OnTraffic(c Conn) (action Action) { - ctx := c.Context() - var p []byte - if ctx != nil { - p = ctx.([]byte) - } else { // UDP - ev.packetLen = 1024 + handler := c.Context().(*connHandler) + if handler.network == "udp" { + ev.packetLen = datagramLen } buf, err := c.Next(-1) assert.NoError(ev.tester, err) - p = append(p, buf...) - if len(p) < ev.packetLen { - c.SetContext(p) + handler.data = append(handler.data, buf...) + if len(handler.data) < ev.packetLen { return } - v, _ := ev.rspChMap.Load(c.LocalAddr().String()) - rspCh := v.(chan []byte) - rspCh <- p - c.SetContext([]byte{}) + handler.rspCh <- handler.data + handler.data = nil return } @@ -199,20 +199,20 @@ func TestServeWithGnetClient(t *testing.T) { type testClientServer struct { *BuiltinEventEngine - client *Client - clientEV *clientEvents - tester *testing.T - eng Engine - network string - addr string - multicore bool - async bool - nclients int - started int32 - connected int32 - clientActive int32 - disconnected int32 - workerPool *goPool.Pool + client *Client + tester *testing.T + eng Engine + network string + addr string + multicore bool + async bool + nclients int + started int32 + connected int32 + clientActive int32 + disconnected int32 + workerPool *goPool.Pool + udpReadHeader int32 } func (s *testClientServer) OnBoot(eng Engine) (action Action) { @@ -221,7 +221,7 @@ func (s *testClientServer) OnBoot(eng Engine) (action Action) { } func (s *testClientServer) OnOpen(c Conn) (out []byte, action Action) { - c.SetContext(c) + c.SetContext(&sync.Once{}) atomic.AddInt32(&s.connected, 1) require.NotNil(s.tester, c.LocalAddr(), "nil local addr") require.NotNil(s.tester, c.RemoteAddr(), "nil remote addr") @@ -233,7 +233,7 @@ func (s *testClientServer) OnClose(c Conn, err error) (action Action) { logging.Debugf("error occurred on closed, %v\n", err) } if s.network != "udp" { - require.Equal(s.tester, c.Context(), c, "invalid context") + require.IsType(s.tester, c.Context(), new(sync.Once), "invalid context") } atomic.AddInt32(&s.disconnected, 1) @@ -246,7 +246,25 @@ func (s *testClientServer) OnClose(c Conn, err error) (action Action) { return } +func (s *testClientServer) OnShutdown(Engine) { + if s.network == "udp" { + require.EqualValues(s.tester, int32(s.nclients), atomic.LoadInt32(&s.udpReadHeader)) + } +} + func (s *testClientServer) OnTraffic(c Conn) (action Action) { + readHeader := func() { + ping := make([]byte, len(pingMsg)) + n, err := io.ReadFull(c, ping) + require.NoError(s.tester, err) + require.EqualValues(s.tester, len(pingMsg), n) + require.Equal(s.tester, string(pingMsg), string(ping), "bad header") + } + v := c.Context() + if v != nil { + v.(*sync.Once).Do(readHeader) + } + if s.async { buf := bbPool.Get() _, _ = c.WriteTo(buf) @@ -257,14 +275,30 @@ func (s *testClientServer) OnTraffic(c Conn) (action Action) { _ = c.OutboundBuffered() _, _ = c.Discard(1) } + if v == nil && bytes.Equal(buf.Bytes(), pingMsg) { + atomic.AddInt32(&s.udpReadHeader, 1) + buf.Reset() + } _ = s.workerPool.Submit( func() { - _ = c.AsyncWrite(buf.Bytes(), nil) + if buf.Len() > 0 { + err := c.AsyncWrite(buf.Bytes(), nil) + require.NoError(s.tester, err) + } }) return } + buf, _ := c.Next(-1) - _, _ = c.Write(buf) + if v == nil && bytes.Equal(buf, pingMsg) { + atomic.AddInt32(&s.udpReadHeader, 1) + buf = nil + } + if len(buf) > 0 { + n, err := c.Write(buf) + require.NoError(s.tester, err) + require.EqualValues(s.tester, len(buf), n) + } return } @@ -277,7 +311,7 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) { if i%2 == 0 { netConn = true } - go startGnetClient(s.tester, s.client, s.clientEV, s.network, s.addr, s.multicore, s.async, netConn) + go startGnetClient(s.tester, s.client, s.network, s.addr, s.multicore, s.async, netConn) } } if s.network == "udp" && atomic.LoadInt32(&s.clientActive) == 0 { @@ -298,9 +332,9 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus workerPool: goPool.Default(), } var err error - ts.clientEV = &clientEvents{tester: t, packetLen: streamLen, svr: ts} + clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts} ts.client, err = NewClient( - ts.clientEV, + clientEV, WithLogLevel(logging.DebugLevel), WithLockOSThread(true), WithTicker(true), @@ -324,51 +358,36 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus assert.NoError(t, err) } -func startGnetClient(t *testing.T, cli *Client, ev *clientEvents, network, addr string, multicore, async, netDial bool) { +func startGnetClient(t *testing.T, cli *Client, network, addr string, multicore, async, netDial bool) { rand.Seed(time.Now().UnixNano()) var ( c Conn err error ) + handler := &connHandler{ + network: network, + rspCh: make(chan []byte, 1), + } if netDial { var netConn net.Conn netConn, err = NetDial(network, addr) require.NoError(t, err) - c, err = cli.Enroll(netConn) + c, err = cli.EnrollContext(netConn, handler) } else { - c, err = cli.Dial(network, addr) + c, err = cli.DialContext(network, addr, handler) } require.NoError(t, err) defer c.Close() err = c.Wake(nil) require.NoError(t, err) - var rspCh chan []byte - if network == "udp" { - rspCh = make(chan []byte, 1) - ev.rspChMap.Store(c.LocalAddr().String(), rspCh) - } else { - var ( - v interface{} - ok bool - ) - start := time.Now() - for time.Since(start) < time.Second { - v, ok = ev.rspChMap.Load(c.LocalAddr().String()) - if ok { - break - } - time.Sleep(10 * time.Millisecond) - } - require.True(t, ok) - rspCh = v.(chan []byte) - } + rspCh := handler.rspCh duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2 t.Logf("test duration: %dms", duration/time.Millisecond) start := time.Now() for time.Since(start) < duration { reqData := make([]byte, streamLen) if network == "udp" { - reqData = reqData[:1024] + reqData = reqData[:datagramLen] } _, err = rand.Read(reqData) require.NoError(t, err) diff --git a/client_unix.go b/client_unix.go index 70bf37c7c..7c50d0608 100644 --- a/client_unix.go +++ b/client_unix.go @@ -30,6 +30,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/math" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" "github.com/panjf2000/gnet/v2/internal/socket" "github.com/panjf2000/gnet/v2/pkg/buffer/ring" errorx "github.com/panjf2000/gnet/v2/pkg/errors" @@ -126,7 +127,7 @@ func (cli *Client) Start() error { // Stop stops the client event-loop. func (cli *Client) Stop() (err error) { - logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)) + logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)) // Stop the ticker. if cli.opts.Ticker { cli.el.engine.ticker.cancel() @@ -140,15 +141,25 @@ func (cli *Client) Stop() (err error) { // Dial is like net.Dial(). func (cli *Client) Dial(network, address string) (Conn, error) { + return cli.DialContext(network, address, nil) +} + +// DialContext is like Dial but also accepts an empty interface ctx that can be obtained later via Conn.Context. +func (cli *Client) DialContext(network, address string, ctx interface{}) (Conn, error) { c, err := net.Dial(network, address) if err != nil { return nil, err } - return cli.Enroll(c) + return cli.EnrollContext(c, ctx) } // Enroll converts a net.Conn to gnet.Conn and then adds it into Client. func (cli *Client) Enroll(c net.Conn) (Conn, error) { + return cli.EnrollContext(c, nil) +} + +// EnrollContext is like Enroll but also accepts an empty interface ctx that can be obtained later via Conn.Context. +func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) { defer c.Close() sc, ok := c.(syscall.Conn) @@ -184,7 +195,7 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) { var ( sockAddr unix.Sockaddr - gc Conn + gc *conn ) switch c.(type) { case *net.UnixConn: @@ -217,10 +228,18 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) { default: return nil, errorx.ErrUnsupportedProtocol } - err = cli.el.poller.UrgentTrigger(cli.el.register, gc) + gc.ctx = ctx + + connOpened := make(chan struct{}) + ccb := &connWithCallback{c: gc, cb: func() { + close(connOpened) + }} + err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb) if err != nil { gc.Close() return nil, err } + + <-connOpened return gc, nil } diff --git a/client_windows.go b/client_windows.go index d90951b53..07d2294b2 100644 --- a/client_windows.go +++ b/client_windows.go @@ -118,6 +118,10 @@ func unixAddr(addr string) string { } func (cli *Client) Dial(network, addr string) (Conn, error) { + return cli.DialContext(network, addr, nil) +} + +func (cli *Client) DialContext(network, addr string, ctx interface{}) (Conn, error) { var ( c net.Conn err error @@ -135,10 +139,15 @@ func (cli *Client) Dial(network, addr string) (Conn, error) { return nil, err } } - return cli.Enroll(c) + return cli.EnrollContext(c, ctx) } func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { + return cli.EnrollContext(nc, nil) +} + +func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err error) { + connOpened := make(chan struct{}) switch v := nc.(type) { case *net.TCPConn: if cli.opts.TCPNoDelay == TCPNoDelay { @@ -156,7 +165,8 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { } c := newTCPConn(nc, cli.el) - cli.el.ch <- c + c.SetContext(ctx) + cli.el.ch <- &openConn{c: c, cb: func() { close(connOpened) }} go func(c *conn, tc net.Conn, el *eventloop) { var buffer [0x10000]byte for { @@ -171,7 +181,8 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { gc = c case *net.UnixConn: c := newTCPConn(nc, cli.el) - cli.el.ch <- c + c.SetContext(ctx) + cli.el.ch <- &openConn{c: c, cb: func() { close(connOpened) }} go func(c *conn, uc net.Conn, el *eventloop) { var buffer [0x10000]byte for { @@ -192,7 +203,9 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { gc = c case *net.UDPConn: c := newUDPConn(cli.el, nc.LocalAddr(), nc.RemoteAddr()) + c.SetContext(ctx) c.rawConn = nc + cli.el.ch <- &openConn{c: c, isDatagram: true, cb: func() { close(connOpened) }} go func(uc net.Conn, el *eventloop) { var buffer [0x10000]byte for { @@ -201,6 +214,7 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { return } c := newUDPConn(cli.el, uc.LocalAddr(), uc.RemoteAddr()) + c.SetContext(ctx) c.rawConn = uc el.ch <- packUDPConn(c, buffer[:n]) } @@ -210,5 +224,6 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { return nil, errorx.ErrUnsupportedProtocol } + <-connOpened return } diff --git a/connection_unix.go b/connection_unix.go index 44cd351d4..18a33d2fe 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -29,6 +29,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/gfd" gio "github.com/panjf2000/gnet/v2/internal/io" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" "github.com/panjf2000/gnet/v2/internal/socket" "github.com/panjf2000/gnet/v2/pkg/buffer/elastic" errorx "github.com/panjf2000/gnet/v2/pkg/errors" @@ -84,6 +85,7 @@ func newUDPConn(fd int, el *eventloop, localAddr net.Addr, sa unix.Sockaddr, con } func (c *conn) release() { + c.opened = false c.ctx = nil c.buffer = nil if addr, ok := c.localAddr.(*net.TCPAddr); ok && c.localAddr != c.loop.ln.addr && len(addr.Zone) > 0 { @@ -102,7 +104,6 @@ func (c *conn) release() { c.remoteAddr = nil c.pollAttachment.FD, c.pollAttachment.Callback = 0, nil if !c.isDatagram { - c.opened = false c.peer = nil c.inboundBuffer.Done() c.outboundBuffer.Release() @@ -110,6 +111,10 @@ func (c *conn) release() { } func (c *conn) open(buf []byte) error { + if c.isDatagram && c.peer == nil { + return unix.Send(c.fd, buf, 0) + } + n, err := unix.Write(c.fd, buf) if err != nil && err == unix.EAGAIN { _, _ = c.outboundBuffer.Write(buf) @@ -438,18 +443,18 @@ func (c *conn) AsyncWrite(buf []byte, callback AsyncCallback) error { } return err } - return c.loop.poller.Trigger(c.asyncWrite, &asyncWriteHook{callback, buf}) + return c.loop.poller.Trigger(queue.HighPriority, c.asyncWrite, &asyncWriteHook{callback, buf}) } func (c *conn) AsyncWritev(bs [][]byte, callback AsyncCallback) error { if c.isDatagram { return errorx.ErrUnsupportedOp } - return c.loop.poller.Trigger(c.asyncWritev, &asyncWritevHook{callback, bs}) + return c.loop.poller.Trigger(queue.HighPriority, c.asyncWritev, &asyncWritevHook{callback, bs}) } func (c *conn) Wake(callback AsyncCallback) error { - return c.loop.poller.UrgentTrigger(func(_ interface{}) (err error) { + return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) { err = c.loop.wake(c) if callback != nil { _ = callback(c, err) @@ -459,7 +464,7 @@ func (c *conn) Wake(callback AsyncCallback) error { } func (c *conn) CloseWithCallback(callback AsyncCallback) error { - return c.loop.poller.Trigger(func(_ interface{}) (err error) { + return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) { err = c.loop.close(c, nil) if callback != nil { _ = callback(c, err) @@ -469,7 +474,7 @@ func (c *conn) CloseWithCallback(callback AsyncCallback) error { } func (c *conn) Close() error { - return c.loop.poller.Trigger(func(_ interface{}) (err error) { + return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) { err = c.loop.close(c, nil) return }, nil) diff --git a/connection_windows.go b/connection_windows.go index c41a3f94f..61619b5bb 100644 --- a/connection_windows.go +++ b/connection_windows.go @@ -42,6 +42,12 @@ type udpConn struct { c *conn } +type openConn struct { + c *conn + cb func() + isDatagram bool +} + type conn struct { ctx interface{} // user-defined context loop *eventloop // owner event-loop diff --git a/engine_unix.go b/engine_unix.go index 3d2fc9cf2..5040a3fe1 100644 --- a/engine_unix.go +++ b/engine_unix.go @@ -27,6 +27,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/gfd" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" "github.com/panjf2000/gnet/v2/pkg/errors" ) @@ -202,17 +203,17 @@ func (eng *engine) stop(s Engine) { eng.eventHandler.OnShutdown(s) // Notify all event-loops to exit. - eng.eventLoops.iterate(func(_ int, el *eventloop) bool { - err := el.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) + eng.eventLoops.iterate(func(i int, el *eventloop) bool { + err := el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) if err != nil { - eng.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping engine: %v", err) + eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", i, err) } return true }) if eng.acceptor != nil { - err := eng.acceptor.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) + err := eng.acceptor.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) if err != nil { - eng.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping engine: %v", err) + eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for main event-loop: %v", err) } } @@ -299,7 +300,7 @@ func (eng *engine) sendCmd(cmd *asyncCmd, urgent bool) error { return errors.ErrInvalidConn } if urgent { - return el.poller.UrgentTrigger(el.execCmd, cmd) + return el.poller.Trigger(queue.LowPriority, el.execCmd, cmd) } return el.poller.Trigger(el.execCmd, cmd) } diff --git a/eventloop_unix.go b/eventloop_unix.go index d434267cd..f3082f14f 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -30,6 +30,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/io" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" errorx "github.com/panjf2000/gnet/v2/pkg/errors" "github.com/panjf2000/gnet/v2/pkg/logging" ) @@ -61,8 +62,19 @@ func (el *eventloop) closeConns() { }) } +type connWithCallback struct { + c *conn + cb func() +} + func (el *eventloop) register(itf interface{}) error { - c := itf.(*conn) + c, ok := itf.(*conn) + if !ok { + ccb := itf.(*connWithCallback) + c = ccb.c + defer ccb.cb() + } + if err := el.poller.AddRead(&c.pollAttachment); err != nil { _ = unix.Close(c.fd) c.release() @@ -71,7 +83,7 @@ func (el *eventloop) register(itf interface{}) error { el.connections.addConn(c, el.idx) - if c.isDatagram { + if c.isDatagram && c.peer != nil { return nil } return el.open(c) @@ -242,8 +254,10 @@ func (el *eventloop) ticker(ctx context.Context) { switch action { case None: case Shutdown: - err := el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil) - el.getLogger().Debugf("stopping ticker in event-loop(%d) from OnTick(), UrgentTrigger:%v", el.idx, err) + // It seems reasonable to mark this as low-priority, waiting for some tasks like asynchronous writes + // to finish up before shutting down the service. + err := el.poller.Trigger(queue.LowPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil) + el.getLogger().Debugf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", el.idx, err) } if timer == nil { timer = time.NewTimer(delay) diff --git a/eventloop_windows.go b/eventloop_windows.go index 5ae95ca3d..074318c58 100644 --- a/eventloop_windows.go +++ b/eventloop_windows.go @@ -67,7 +67,7 @@ func (el *eventloop) run() (err error) { err = v case *netErr: err = el.close(v.c, v.err) - case *conn: + case *openConn: err = el.open(v) case *tcpConn: unpackTCPConn(v) @@ -90,9 +90,16 @@ func (el *eventloop) run() (err error) { return nil } -func (el *eventloop) open(c *conn) error { - el.connections[c] = struct{}{} - el.incConn(1) +func (el *eventloop) open(oc *openConn) error { + if oc.cb != nil { + defer oc.cb() + } + + c := oc.c + if !oc.isDatagram { + el.connections[c] = struct{}{} + el.incConn(1) + } out, action := el.eventHandler.OnOpen(c) if out != nil { diff --git a/gnet_test.go b/gnet_test.go index 36bba8c73..e6150023d 100644 --- a/gnet_test.go +++ b/gnet_test.go @@ -25,7 +25,10 @@ import ( goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" ) -var streamLen = 1024 * 1024 +var ( + datagramLen = 1024 + streamLen = 1024 * 1024 +) func TestServe(t *testing.T) { // start an engine @@ -415,7 +418,7 @@ func startClient(t *testing.T, network, addr string, multicore, async bool) { for time.Since(start) < duration { reqData := make([]byte, streamLen) if network == "udp" { - reqData = reqData[:1024] + reqData = reqData[:datagramLen] } _, err = rand.Read(reqData) require.NoError(t, err) diff --git a/internal/netpoll/epoll_default_poller.go b/internal/netpoll/epoll_default_poller.go index a83613aaf..b8e686192 100644 --- a/internal/netpoll/epoll_default_poller.go +++ b/internal/netpoll/epoll_default_poller.go @@ -33,12 +33,13 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { - fd int // epoll fd - efd int // eventfd - efdBuf []byte // efd buffer to read an 8-byte integer - wakeupCall int32 - asyncTaskQueue queue.AsyncTaskQueue // queue with low priority - urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + fd int // epoll fd + efd int // eventfd + efdBuf []byte // efd buffer to read an 8-byte integer + wakeupCall int32 + asyncTaskQueue queue.AsyncTaskQueue // queue with low priority + urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + highPriorityEventsThreshold int32 // threshold of high-priority events } // OpenPoller instantiates a poller. @@ -63,6 +64,7 @@ func OpenPoller() (poller *Poller, err error) { } poller.asyncTaskQueue = queue.NewLockFreeQueue() poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue() + poller.highPriorityEventsThreshold = MaxPollEventsCap return } @@ -81,31 +83,22 @@ var ( b = (*(*[8]byte)(unsafe.Pointer(&u)))[:] ) -// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events, -// then the poller will get tasks from urgentAsyncTaskQueue and run them. +// Trigger enqueues task and wakes up the poller to process pending tasks. +// By default, any incoming task will enqueued into urgentAsyncTaskQueue +// before the threshold of high-priority events is reached. When it happens, +// any asks other than high-priority tasks will be shunted to asyncTaskQueue. // -// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small, -// so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { +// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog. +func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() task.Run, task.Arg = fn, arg - p.urgentAsyncTaskQueue.Enqueue(task) - if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Write(p.efd, b); err == unix.EAGAIN { - err = nil - } + if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { + p.asyncTaskQueue.Enqueue(task) + } else { + // There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash, + // but that's tolerable because it ought to be a rare case. + p.urgentAsyncTaskQueue.Enqueue(task) } - return os.NewSyscallError("write", err) -} - -// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue, -// call this method when the task is not so urgent, for instance writing data back to the peer. -// -// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { - task := queue.GetTask() - task.Run, task.Arg = fn, arg - p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { if _, err = unix.Write(p.efd, b); err == unix.EAGAIN { err = nil diff --git a/internal/netpoll/epoll_optimized_poller.go b/internal/netpoll/epoll_optimized_poller.go index c4fb008ea..2c5db7353 100644 --- a/internal/netpoll/epoll_optimized_poller.go +++ b/internal/netpoll/epoll_optimized_poller.go @@ -32,12 +32,13 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { - fd int // epoll fd - epa *PollAttachment // PollAttachment for waking events - efdBuf []byte // efd buffer to read an 8-byte integer - wakeupCall int32 - asyncTaskQueue queue.AsyncTaskQueue // queue with low priority - urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + fd int // epoll fd + epa *PollAttachment // PollAttachment for waking events + efdBuf []byte // efd buffer to read an 8-byte integer + wakeupCall int32 + asyncTaskQueue queue.AsyncTaskQueue // queue with low priority + urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + highPriorityEventsThreshold int32 // threshold of high-priority events } // OpenPoller instantiates a poller. @@ -64,6 +65,7 @@ func OpenPoller() (poller *Poller, err error) { } poller.asyncTaskQueue = queue.NewLockFreeQueue() poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue() + poller.highPriorityEventsThreshold = MaxPollEventsCap return } @@ -82,31 +84,22 @@ var ( b = (*(*[8]byte)(unsafe.Pointer(&u)))[:] ) -// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events, -// then the poller will get tasks from urgentAsyncTaskQueue and run them. +// Trigger enqueues task and wakes up the poller to process pending tasks. +// By default, any incoming task will enqueued into urgentAsyncTaskQueue +// before the threshold of high-priority events is reached. When it happens, +// any asks other than high-priority tasks will be shunted to asyncTaskQueue. // -// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small, -// so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { +// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog. +func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() task.Run, task.Arg = fn, arg - p.urgentAsyncTaskQueue.Enqueue(task) - if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN { - err = nil - } + if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { + p.asyncTaskQueue.Enqueue(task) + } else { + // There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash, + // but that's tolerable because it ought to be a rare case. + p.urgentAsyncTaskQueue.Enqueue(task) } - return os.NewSyscallError("write", err) -} - -// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue, -// call this method when the task is not so urgent, for instance writing data back to the peer. -// -// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { - task := queue.GetTask() - task.Run, task.Arg = fn, arg - p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN { err = nil diff --git a/internal/netpoll/kqueue_default_poller.go b/internal/netpoll/kqueue_default_poller.go index 3431c4286..12f29b443 100644 --- a/internal/netpoll/kqueue_default_poller.go +++ b/internal/netpoll/kqueue_default_poller.go @@ -32,10 +32,11 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { - fd int - wakeupCall int32 - asyncTaskQueue queue.AsyncTaskQueue // queue with low priority - urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + fd int + wakeupCall int32 + asyncTaskQueue queue.AsyncTaskQueue // queue with low priority + urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + highPriorityEventsThreshold int32 // threshold of high-priority events } // OpenPoller instantiates a poller. @@ -58,6 +59,7 @@ func OpenPoller() (poller *Poller, err error) { } poller.asyncTaskQueue = queue.NewLockFreeQueue() poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue() + poller.highPriorityEventsThreshold = MaxPollEventsCap return } @@ -72,31 +74,22 @@ var note = []unix.Kevent_t{{ Fflags: unix.NOTE_TRIGGER, }} -// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events, -// then the poller will get tasks from urgentAsyncTaskQueue and run them. +// Trigger enqueues task and wakes up the poller to process pending tasks. +// By default, any incoming task will enqueued into urgentAsyncTaskQueue +// before the threshold of high-priority events is reached. When it happens, +// any asks other than high-priority tasks will be shunted to asyncTaskQueue. // -// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small, -// so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { +// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog. +func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() task.Run, task.Arg = fn, arg - p.urgentAsyncTaskQueue.Enqueue(task) - if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { - err = nil - } + if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { + p.asyncTaskQueue.Enqueue(task) + } else { + // There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash, + // but that's tolerable because it ought to be a rare case. + p.urgentAsyncTaskQueue.Enqueue(task) } - return os.NewSyscallError("kevent trigger", err) -} - -// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue, -// call this method when the task is not so urgent, for instance writing data back to the peer. -// -// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { - task := queue.GetTask() - task.Run, task.Arg = fn, arg - p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { err = nil diff --git a/internal/netpoll/kqueue_optimized_poller.go b/internal/netpoll/kqueue_optimized_poller.go index 5b56bcd2e..1b5b69e7b 100644 --- a/internal/netpoll/kqueue_optimized_poller.go +++ b/internal/netpoll/kqueue_optimized_poller.go @@ -33,10 +33,11 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { - fd int - wakeupCall int32 - asyncTaskQueue queue.AsyncTaskQueue // queue with low priority - urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + fd int + wakeupCall int32 + asyncTaskQueue queue.AsyncTaskQueue // queue with low priority + urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + highPriorityEventsThreshold int32 // threshold of high-priority events } // OpenPoller instantiates a poller. @@ -59,6 +60,7 @@ func OpenPoller() (poller *Poller, err error) { } poller.asyncTaskQueue = queue.NewLockFreeQueue() poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue() + poller.highPriorityEventsThreshold = MaxPollEventsCap return } @@ -73,31 +75,22 @@ var note = []unix.Kevent_t{{ Fflags: unix.NOTE_TRIGGER, }} -// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events, -// then the poller will get tasks from urgentAsyncTaskQueue and run them. +// Trigger enqueues task and wakes up the poller to process pending tasks. +// By default, any incoming task will enqueued into urgentAsyncTaskQueue +// before the threshold of high-priority events is reached. When it happens, +// any asks other than high-priority tasks will be shunted to asyncTaskQueue. // -// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small, -// so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { +// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog. +func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() task.Run, task.Arg = fn, arg - p.urgentAsyncTaskQueue.Enqueue(task) - if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { - err = nil - } + if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { + p.asyncTaskQueue.Enqueue(task) + } else { + // There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash, + // but that's tolerable because it ought to be a rare case. + p.urgentAsyncTaskQueue.Enqueue(task) } - return os.NewSyscallError("kevent trigger", err) -} - -// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue, -// call this method when the task is not so urgent, for instance writing data back to the peer. -// -// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { - task := queue.GetTask() - task.Run, task.Arg = fn, arg - p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { err = nil diff --git a/internal/queue/lock_free_queue.go b/internal/queue/lock_free_queue.go index a089ddbde..16099fecd 100644 --- a/internal/queue/lock_free_queue.go +++ b/internal/queue/lock_free_queue.go @@ -157,6 +157,10 @@ func (q *lockFreeQueue) IsEmpty() bool { return atomic.LoadInt32(&q.length) == 0 } +func (q *lockFreeQueue) Length() int32 { + return atomic.LoadInt32(&q.length) +} + func load(p *unsafe.Pointer) (n *node) { return (*node)(atomic.LoadPointer(p)) } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 194799a53..826f1843f 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -43,4 +43,17 @@ type AsyncTaskQueue interface { Enqueue(*Task) Dequeue() *Task IsEmpty() bool + Length() int32 } + +// EventPriority is the priority of an event. +type EventPriority int + +const ( + // HighPriority is for the tasks expected to be executed + // as soon as possible. + HighPriority EventPriority = iota + // LowPriority is for the tasks that won't matter much + // even if they are deferred a little bit. + LowPriority +)