From 332b36510a924db38178629bee0b2890ad196684 Mon Sep 17 00:00:00 2001 From: Alexander Pevzner Date: Mon, 2 Dec 2024 18:03:08 +0300 Subject: [PATCH] Device initialization timeout handling revisited When a new device is added, ipp-usb performs several HTTP requests (IPP/eSCL) to obtain device parameters for DNS-SD advertising. Previously, the entire initialization process was governed by a single timeout, which placed different requests in the sequence at an unequal disadvantage. Additionally, fetching unnecessary response bodies was not covered by the timeout at all, which could cause ipp-usb to keep a failed USB interface busy indefinitely and prevent a clean shutdown (see #32 for details). The updated implementation uses individual timeouts for each HTTP request made during initialization and also includes a timeout for body reception. --- device.go | 15 +++---- usbtransport.go | 105 +++++++++++++++++++++++++----------------------- 2 files changed, 61 insertions(+), 59 deletions(-) diff --git a/device.go b/device.go index f9bf388..114011f 100644 --- a/device.go +++ b/device.go @@ -13,7 +13,6 @@ import ( "fmt" "net" "net/http" - "time" ) // Device object brings all parts together, namely: @@ -74,12 +73,10 @@ func NewDevice(desc UsbDeviceDesc) (*Device, error) { goto ERROR } - // Create HTTP server - dev.UsbTransport.SetDeadline( - time.Now(). - Add(DevInitTimeout). - Add(dev.UsbTransport.Quirks().GetInitDelay())) + // Configure transport for init + dev.UsbTransport.SetTimeout(DevInitTimeout) + // Create HTTP server dev.HTTPProxy = NewHTTPProxy(dev.Log, listener, dev.UsbTransport) // Obtain DNS-SD info for IPP @@ -96,7 +93,7 @@ func NewDevice(desc UsbDeviceDesc) (*Device, error) { log.Flush() - if dev.UsbTransport.DeadlineExpired() { + if dev.UsbTransport.TimeoutExpired() { err = ErrInitTimedOut goto ERROR } @@ -125,7 +122,7 @@ func NewDevice(desc UsbDeviceDesc) (*Device, error) { log.Flush() - if dev.UsbTransport.DeadlineExpired() { + if dev.UsbTransport.TimeoutExpired() { err = ErrInitTimedOut goto ERROR } @@ -183,7 +180,7 @@ func NewDevice(desc UsbDeviceDesc) (*Device, error) { }) // Enable handling incoming requests - dev.UsbTransport.SetDeadline(time.Time{}) + dev.UsbTransport.SetTimeout(DevInitTimeout) dev.HTTPProxy.Enable() // Start DNS-SD publisher diff --git a/usbtransport.go b/usbtransport.go index e87af6e..abb3c70 100644 --- a/usbtransport.go +++ b/usbtransport.go @@ -28,17 +28,18 @@ import ( // UsbTransport implements HTTP transport functionality over USB type UsbTransport struct { - addr UsbAddr // Device address - info UsbDeviceInfo // USB device info - log *Logger // Device's own logger - dev *UsbDevHandle // Underlying USB device - connPool chan *usbConn // Pool of idle connections - connList []*usbConn // List of all connections - connReleased chan struct{} // Signalled when connection released - shutdown chan struct{} // Closed by Shutdown() - connstate *usbConnState // Connections state tracker - quirks Quirks // Device quirks - deadline time.Time // Deadline for requests + addr UsbAddr // Device address + info UsbDeviceInfo // USB device info + log *Logger // Device's own logger + dev *UsbDevHandle // Underlying USB device + connPool chan *usbConn // Pool of idle connections + connList []*usbConn // List of all connections + connReleased chan struct{} // Signalled when connection released + shutdown chan struct{} // Closed by Shutdown() + connstate *usbConnState // Connections state tracker + quirks Quirks // Device quirks + timeout time.Duration // Timeout for requests (0 is none) + timeoutExpired uint32 // Atomic non-zero, if timeout expired } // NewUsbTransport creates new http.RoundTripper backed by IPP-over-USB @@ -237,26 +238,21 @@ func (transport *UsbTransport) connInUse() int { return cap(transport.connPool) - len(transport.connPool) } -// SetDeadline sets the deadline for all requests, submitted -// via RoundTrip and RoundTripWithSession methods -// -// A deadline is an absolute time after which request processing -// will fail instead of blocking +// SetTimeout sets the timeout for all subsequent requests. // // This is useful only at initialization time and if some requests // were failed due to timeout, device reset is required, because -// at this case synchronization with device will probably be lost +// at this case synchronization with device will probably be lost. // // A zero value for t means no timeout -func (transport *UsbTransport) SetDeadline(t time.Time) { - transport.deadline = t +func (transport *UsbTransport) SetTimeout(t time.Duration) { + transport.timeout = t } -// DeadlineExpired reports if deadline previously set by SetDeadline() -// is already expired -func (transport *UsbTransport) DeadlineExpired() bool { - deadline := transport.deadline - return !deadline.IsZero() && time.Until(deadline) <= 0 +// TimeoutExpired returns true if one or more of the preceding HTTP request +// has failed due to timeout. +func (transport *UsbTransport) TimeoutExpired() bool { + return atomic.LoadUint32(&transport.timeoutExpired) != 0 } // closeShutdownChan closes the transport.shutdown, which effectively @@ -455,6 +451,20 @@ func (transport *UsbTransport) RoundTripWithSession(session int, time.Sleep(delay) } + // Set read/write Context. This effectively sets request timeout. + // + // This is important that context is is set after inter-request + // or initial delay is already done, so we don't need to bother + // with adjusting the timeout. + rwctx := context.Background() + if transport.timeout != 0 { + var cancel context.CancelFunc + rwctx, cancel = context.WithTimeout(rwctx, transport.timeout) + defer cancel() + } + + conn.setRWCtx(rwctx) + // Send request and receive a response err = outreq.Write(conn) if err != nil { @@ -640,14 +650,15 @@ func (wrap *usbResponseBodyWrapper) Close() error { // usbConn implements an USB connection type usbConn struct { - transport *UsbTransport // Transport that owns the connection - index int // Connection index (for logging) - iface *UsbInterface // Underlying interface - reader *bufio.Reader // For http.ReadResponse - delayUntil time.Time // Delay till this time before next request - delayInterval time.Duration // Pause between requests - cntRecv int // Total bytes received - cntSent int // Total bytes sent + transport *UsbTransport // Transport that owns the connection + index int // Connection index (for logging) + iface *UsbInterface // Underlying interface + reader *bufio.Reader // For http.ReadResponse + rwctx context.Context // For usbConn.Read and usbConn.Write + delayUntil time.Time // Delay till this time before next request + delayInterval time.Duration // Pause between requests + cntRecv int // Total bytes received + cntSent int // Total bytes sent } // Open usbConn @@ -697,6 +708,11 @@ ERROR: return nil, err } +// setRWCtx sets context.Context for subsequent Read and Write operations +func (conn *usbConn) setRWCtx(ctx context.Context) { + conn.rwctx = ctx +} + // Read from USB func (conn *usbConn) Read(b []byte) (int, error) { conn.transport.connstate.beginRead(conn) @@ -719,16 +735,9 @@ func (conn *usbConn) Read(b []byte) (int, error) { } // Setup deadline - ctx := context.Background() - if !conn.transport.deadline.IsZero() { - var cancel context.CancelFunc - ctx, cancel = context.WithDeadline(ctx, conn.transport.deadline) - defer cancel() - } - backoff := time.Millisecond * 10 for { - n, err := conn.iface.Recv(ctx, b) + n, err := conn.iface.Recv(conn.rwctx, b) conn.cntRecv += n conn.transport.log.Add(LogTraceHTTP, '<', @@ -742,13 +751,15 @@ func (conn *usbConn) Read(b []byte) (int, error) { "USB[%d]: recv: %s", conn.index, err) if err == context.DeadlineExceeded { - err = ErrInitTimedOut + atomic.StoreUint32( + &conn.transport.timeoutExpired, 1) } } if n != 0 || err != nil { return n, err } + conn.transport.log.Debug(' ', "USB[%d]: zero-size read", conn.index) @@ -766,14 +777,7 @@ func (conn *usbConn) Write(b []byte) (int, error) { defer conn.transport.connstate.doneWrite(conn) // Setup deadline - ctx := context.Background() - if !conn.transport.deadline.IsZero() { - var cancel context.CancelFunc - ctx, cancel = context.WithDeadline(ctx, conn.transport.deadline) - defer cancel() - } - - n, err := conn.iface.Send(context.Background(), b) + n, err := conn.iface.Send(conn.rwctx, b) conn.cntSent += n conn.transport.log.Add(LogTraceHTTP, '>', @@ -787,7 +791,8 @@ func (conn *usbConn) Write(b []byte) (int, error) { "USB[%d]: send: %s", conn.index, err) if err == context.DeadlineExceeded { - err = ErrInitTimedOut + atomic.StoreUint32( + &conn.transport.timeoutExpired, 1) } }