Skip to content

Commit

Permalink
Reset outgoing stream on inbound is reset
Browse files Browse the repository at this point in the history
Added state to Stream to reset outbound only when the state is open
Relates to #187
  • Loading branch information
enobufs committed Oct 2, 2022
1 parent ed0e83d commit be0d032
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 68 deletions.
40 changes: 7 additions & 33 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,9 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
if !ok {
continue
}
a.lock.Unlock()
s.onInboundStreamReset()
a.lock.Lock()
a.unregisterStream(s, io.EOF)
}
delete(a.reconfigRequests, p.reconfigRequestSequenceNumber)
Expand All @@ -2023,41 +2026,12 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
result = reconfigResultInProgress
}

// From draft-ietf-rtcweb-data-channel-13 section-6.7:
// if one side decides to close the data channel, it resets the corresponding
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.

rsn := a.generateNextRSN()
tsn := a.myNextTSN - 1

c := &chunkReconfig{
paramA: &paramOutgoingResetRequest{
reconfigRequestSequenceNumber: rsn,
return a.createPacket([]chunk{&chunkReconfig{
paramA: &paramReconfigResponse{
reconfigResponseSequenceNumber: p.reconfigRequestSequenceNumber,
senderLastTSN: tsn,
streamIdentifiers: p.streamIdentifiers,
},
}
a.reconfigs[rsn] = c // store in the map for retransmission
a.log.Debugf("[%s] sending RECONFIG : rsn=%d tsn=%d",
a.name, rsn, a.myNextTSN-1)

if len(a.reconfigs) > 0 {
a.tReconfig.start(a.rtoMgr.getRTO())
}

return a.createPacket([]chunk{
&chunkReconfig{
paramA: &paramReconfigResponse{
reconfigResponseSequenceNumber: p.reconfigRequestSequenceNumber,
result: result,
},
result: result,
},
c,
})

}})
}

// Move the chunk peeked with a.pendingQueue.peek() to the inflightQueue.
Expand Down
89 changes: 54 additions & 35 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ const (
ReliabilityTypeTimed byte = 2
)

type StreamState int

const (
StreamStateOpen StreamState = iota // Stream object starts with "open"
StreamStateClosing // Outgoing stream is being reset
StreamStateClosed // Stream has been closed
)

func (ss StreamState) String() string {
switch ss {
case StreamStateOpen:
return "open"
case StreamStateClosing:
return "closing"
}
return "closed"
}

var (
errOutboundPacketTooLarge = errors.New("outbound packet larger than maximum message size")
errStreamClosed = errors.New("Stream closed")
Expand All @@ -35,13 +53,13 @@ type Stream struct {
sequenceNumber uint16
readNotifier *sync.Cond
readErr error
writeErr error
unordered bool
reliabilityType byte
reliabilityValue uint32
bufferedAmount uint64
bufferedAmountLow uint64
onBufferedAmountLow func()
state StreamState
log logging.LeveledLogger
name string
}
Expand Down Expand Up @@ -178,32 +196,19 @@ func (s *Stream) Write(p []byte) (n int, err error) {
}

// WriteSCTP writes len(p) bytes from p to the DTLS connection
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (n int, err error) {
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (int, error) {
maxMessageSize := s.association.MaxMessageSize()
if len(p) > int(maxMessageSize) {
return 0, fmt.Errorf("%w: %v", errOutboundPacketTooLarge, math.MaxUint16)
}

switch s.association.getState() {
case shutdownSent, shutdownAckSent, shutdownPending, shutdownReceived:
s.lock.Lock()
if s.writeErr == nil {
s.writeErr = errStreamClosed
}
s.lock.Unlock()
default:
}

s.lock.RLock()
err = s.writeErr
s.lock.RUnlock()
chunks := s.packetize(p, ppi)
n := len(p)
err := s.association.sendPayloadData(chunks)
if err != nil {
return 0, err
return n, errStreamClosed
}

chunks := s.packetize(p, ppi)

return len(p), s.association.sendPayloadData(chunks)
return n, nil
}

func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPayloadData {
Expand Down Expand Up @@ -267,26 +272,18 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa
// Close closes the write-direction of the stream.
// Future calls to Write are not permitted after calling Close.
func (s *Stream) Close() error {
if sid, isOpen := func() (uint16, bool) {
if sid, resetOutbound := func() (uint16, bool) {
s.lock.Lock()
defer s.lock.Unlock()

isOpen := true
if s.writeErr == nil {
s.writeErr = errStreamClosed
} else {
isOpen = false
}
s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String())

if s.readErr == nil {
s.readErr = io.EOF
} else {
isOpen = false
if s.state == StreamStateOpen {
s.state = StreamStateClosing
return s.streamIdentifier, true
}
s.readNotifier.Broadcast() // broadcast regardless

return s.streamIdentifier, isOpen
}(); isOpen {
return s.streamIdentifier, false
}(); resetOutbound {
// Reset the outgoing stream
// https://tools.ietf.org/html/rfc6525
return s.association.sendResetRequest(sid)
Expand Down Expand Up @@ -365,3 +362,25 @@ func (s *Stream) getNumBytesInReassemblyQueue() int {
// No lock is required as it reads the size with atomic load function.
return s.reassemblyQueue.getNumBytes()
}

func (s *Stream) onInboundStreamReset() {
var sid uint16
var resetOutbound bool
s.lock.Lock()
s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String())
if s.state == StreamStateOpen {
s.state = StreamStateClosing
sid = s.streamIdentifier
resetOutbound = true
}
s.lock.Unlock()

// From draft-ietf-rtcweb-data-channel-13 section-6.7:
// if one side decides to close the data channel, it resets the corresponding
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.
if resetOutbound {
s.association.sendResetRequest(sid)
}
}

0 comments on commit be0d032

Please sign in to comment.