Skip to content

Commit

Permalink
Device initialization timeout handling revisited
Browse files Browse the repository at this point in the history
When a new device is added, ipp-usb performs several HTTP requests
(IPP/eSCL) to obtain device parameters for DNS-SD advertising.

Previously, the entire initialization process was governed by a single
timeout, which placed different requests in the sequence at an unequal
disadvantage.

Additionally, fetching unnecessary response bodies was not covered by
the timeout at all, which could cause ipp-usb to keep a failed USB
interface busy indefinitely and prevent a clean shutdown (see #32 for
details).

The updated implementation uses individual timeouts for each HTTP
request made during initialization and also includes a timeout for body
reception.
  • Loading branch information
alexpevzner committed Dec 2, 2024
1 parent e76c669 commit 332b365
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 59 deletions.
15 changes: 6 additions & 9 deletions device.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"fmt"
"net"
"net/http"
"time"
)

// Device object brings all parts together, namely:
Expand Down Expand Up @@ -74,12 +73,10 @@ func NewDevice(desc UsbDeviceDesc) (*Device, error) {
goto ERROR
}

// Create HTTP server
dev.UsbTransport.SetDeadline(
time.Now().
Add(DevInitTimeout).
Add(dev.UsbTransport.Quirks().GetInitDelay()))
// Configure transport for init
dev.UsbTransport.SetTimeout(DevInitTimeout)

// Create HTTP server
dev.HTTPProxy = NewHTTPProxy(dev.Log, listener, dev.UsbTransport)

// Obtain DNS-SD info for IPP
Expand All @@ -96,7 +93,7 @@ func NewDevice(desc UsbDeviceDesc) (*Device, error) {

log.Flush()

if dev.UsbTransport.DeadlineExpired() {
if dev.UsbTransport.TimeoutExpired() {
err = ErrInitTimedOut
goto ERROR
}
Expand Down Expand Up @@ -125,7 +122,7 @@ func NewDevice(desc UsbDeviceDesc) (*Device, error) {

log.Flush()

if dev.UsbTransport.DeadlineExpired() {
if dev.UsbTransport.TimeoutExpired() {
err = ErrInitTimedOut
goto ERROR
}
Expand Down Expand Up @@ -183,7 +180,7 @@ func NewDevice(desc UsbDeviceDesc) (*Device, error) {
})

// Enable handling incoming requests
dev.UsbTransport.SetDeadline(time.Time{})
dev.UsbTransport.SetTimeout(DevInitTimeout)
dev.HTTPProxy.Enable()

// Start DNS-SD publisher
Expand Down
105 changes: 55 additions & 50 deletions usbtransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,18 @@ import (

// UsbTransport implements HTTP transport functionality over USB
type UsbTransport struct {
addr UsbAddr // Device address
info UsbDeviceInfo // USB device info
log *Logger // Device's own logger
dev *UsbDevHandle // Underlying USB device
connPool chan *usbConn // Pool of idle connections
connList []*usbConn // List of all connections
connReleased chan struct{} // Signalled when connection released
shutdown chan struct{} // Closed by Shutdown()
connstate *usbConnState // Connections state tracker
quirks Quirks // Device quirks
deadline time.Time // Deadline for requests
addr UsbAddr // Device address
info UsbDeviceInfo // USB device info
log *Logger // Device's own logger
dev *UsbDevHandle // Underlying USB device
connPool chan *usbConn // Pool of idle connections
connList []*usbConn // List of all connections
connReleased chan struct{} // Signalled when connection released
shutdown chan struct{} // Closed by Shutdown()
connstate *usbConnState // Connections state tracker
quirks Quirks // Device quirks
timeout time.Duration // Timeout for requests (0 is none)
timeoutExpired uint32 // Atomic non-zero, if timeout expired
}

// NewUsbTransport creates new http.RoundTripper backed by IPP-over-USB
Expand Down Expand Up @@ -237,26 +238,21 @@ func (transport *UsbTransport) connInUse() int {
return cap(transport.connPool) - len(transport.connPool)
}

// SetDeadline sets the deadline for all requests, submitted
// via RoundTrip and RoundTripWithSession methods
//
// A deadline is an absolute time after which request processing
// will fail instead of blocking
// SetTimeout sets the timeout for all subsequent requests.
//
// This is useful only at initialization time and if some requests
// were failed due to timeout, device reset is required, because
// at this case synchronization with device will probably be lost
// at this case synchronization with device will probably be lost.
//
// A zero value for t means no timeout
func (transport *UsbTransport) SetDeadline(t time.Time) {
transport.deadline = t
func (transport *UsbTransport) SetTimeout(t time.Duration) {
transport.timeout = t
}

// DeadlineExpired reports if deadline previously set by SetDeadline()
// is already expired
func (transport *UsbTransport) DeadlineExpired() bool {
deadline := transport.deadline
return !deadline.IsZero() && time.Until(deadline) <= 0
// TimeoutExpired returns true if one or more of the preceding HTTP request
// has failed due to timeout.
func (transport *UsbTransport) TimeoutExpired() bool {
return atomic.LoadUint32(&transport.timeoutExpired) != 0
}

// closeShutdownChan closes the transport.shutdown, which effectively
Expand Down Expand Up @@ -455,6 +451,20 @@ func (transport *UsbTransport) RoundTripWithSession(session int,
time.Sleep(delay)
}

// Set read/write Context. This effectively sets request timeout.
//
// This is important that context is is set after inter-request
// or initial delay is already done, so we don't need to bother
// with adjusting the timeout.
rwctx := context.Background()
if transport.timeout != 0 {
var cancel context.CancelFunc
rwctx, cancel = context.WithTimeout(rwctx, transport.timeout)
defer cancel()
}

conn.setRWCtx(rwctx)

// Send request and receive a response
err = outreq.Write(conn)
if err != nil {
Expand Down Expand Up @@ -640,14 +650,15 @@ func (wrap *usbResponseBodyWrapper) Close() error {

// usbConn implements an USB connection
type usbConn struct {
transport *UsbTransport // Transport that owns the connection
index int // Connection index (for logging)
iface *UsbInterface // Underlying interface
reader *bufio.Reader // For http.ReadResponse
delayUntil time.Time // Delay till this time before next request
delayInterval time.Duration // Pause between requests
cntRecv int // Total bytes received
cntSent int // Total bytes sent
transport *UsbTransport // Transport that owns the connection
index int // Connection index (for logging)
iface *UsbInterface // Underlying interface
reader *bufio.Reader // For http.ReadResponse
rwctx context.Context // For usbConn.Read and usbConn.Write
delayUntil time.Time // Delay till this time before next request
delayInterval time.Duration // Pause between requests
cntRecv int // Total bytes received
cntSent int // Total bytes sent
}

// Open usbConn
Expand Down Expand Up @@ -697,6 +708,11 @@ ERROR:
return nil, err
}

// setRWCtx sets context.Context for subsequent Read and Write operations
func (conn *usbConn) setRWCtx(ctx context.Context) {
conn.rwctx = ctx
}

// Read from USB
func (conn *usbConn) Read(b []byte) (int, error) {
conn.transport.connstate.beginRead(conn)
Expand All @@ -719,16 +735,9 @@ func (conn *usbConn) Read(b []byte) (int, error) {
}

// Setup deadline
ctx := context.Background()
if !conn.transport.deadline.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, conn.transport.deadline)
defer cancel()
}

backoff := time.Millisecond * 10
for {
n, err := conn.iface.Recv(ctx, b)
n, err := conn.iface.Recv(conn.rwctx, b)
conn.cntRecv += n

conn.transport.log.Add(LogTraceHTTP, '<',
Expand All @@ -742,13 +751,15 @@ func (conn *usbConn) Read(b []byte) (int, error) {
"USB[%d]: recv: %s", conn.index, err)

if err == context.DeadlineExceeded {
err = ErrInitTimedOut
atomic.StoreUint32(
&conn.transport.timeoutExpired, 1)
}
}

if n != 0 || err != nil {
return n, err
}

conn.transport.log.Debug(' ',
"USB[%d]: zero-size read", conn.index)

Expand All @@ -766,14 +777,7 @@ func (conn *usbConn) Write(b []byte) (int, error) {
defer conn.transport.connstate.doneWrite(conn)

// Setup deadline
ctx := context.Background()
if !conn.transport.deadline.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, conn.transport.deadline)
defer cancel()
}

n, err := conn.iface.Send(context.Background(), b)
n, err := conn.iface.Send(conn.rwctx, b)
conn.cntSent += n

conn.transport.log.Add(LogTraceHTTP, '>',
Expand All @@ -787,7 +791,8 @@ func (conn *usbConn) Write(b []byte) (int, error) {
"USB[%d]: send: %s", conn.index, err)

if err == context.DeadlineExceeded {
err = ErrInitTimedOut
atomic.StoreUint32(
&conn.transport.timeoutExpired, 1)
}
}

Expand Down

0 comments on commit 332b365

Please sign in to comment.