From be0d0328d24f0ee0d6e75a976f8d5876455cdbd9 Mon Sep 17 00:00:00 2001 From: Yutaka Takeda Date: Sat, 1 Oct 2022 18:21:17 -0700 Subject: [PATCH] Reset outgoing stream on inbound is reset Added state to Stream to reset outbound only when the state is open Relates to #187 --- association.go | 40 ++++------------------- stream.go | 89 ++++++++++++++++++++++++++++++-------------------- 2 files changed, 61 insertions(+), 68 deletions(-) diff --git a/association.go b/association.go index feed5a2c..c5c355b9 100644 --- a/association.go +++ b/association.go @@ -2014,6 +2014,9 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet { if !ok { continue } + a.lock.Unlock() + s.onInboundStreamReset() + a.lock.Lock() a.unregisterStream(s, io.EOF) } delete(a.reconfigRequests, p.reconfigRequestSequenceNumber) @@ -2023,41 +2026,12 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet { result = reconfigResultInProgress } - // From draft-ietf-rtcweb-data-channel-13 section-6.7: - // if one side decides to close the data channel, it resets the corresponding - // outgoing stream. When the peer sees that an incoming stream was - // reset, it also resets its corresponding outgoing stream. Once this - // is completed, the data channel is closed. - - rsn := a.generateNextRSN() - tsn := a.myNextTSN - 1 - - c := &chunkReconfig{ - paramA: ¶mOutgoingResetRequest{ - reconfigRequestSequenceNumber: rsn, + return a.createPacket([]chunk{&chunkReconfig{ + paramA: ¶mReconfigResponse{ reconfigResponseSequenceNumber: p.reconfigRequestSequenceNumber, - senderLastTSN: tsn, - streamIdentifiers: p.streamIdentifiers, - }, - } - a.reconfigs[rsn] = c // store in the map for retransmission - a.log.Debugf("[%s] sending RECONFIG : rsn=%d tsn=%d", - a.name, rsn, a.myNextTSN-1) - - if len(a.reconfigs) > 0 { - a.tReconfig.start(a.rtoMgr.getRTO()) - } - - return a.createPacket([]chunk{ - &chunkReconfig{ - paramA: ¶mReconfigResponse{ - reconfigResponseSequenceNumber: p.reconfigRequestSequenceNumber, - result: result, - }, + result: result, }, - c, - }) - + }}) } // Move the chunk peeked with a.pendingQueue.peek() to the inflightQueue. diff --git a/stream.go b/stream.go index 58041b1f..05f1545f 100644 --- a/stream.go +++ b/stream.go @@ -20,6 +20,24 @@ const ( ReliabilityTypeTimed byte = 2 ) +type StreamState int + +const ( + StreamStateOpen StreamState = iota // Stream object starts with "open" + StreamStateClosing // Outgoing stream is being reset + StreamStateClosed // Stream has been closed +) + +func (ss StreamState) String() string { + switch ss { + case StreamStateOpen: + return "open" + case StreamStateClosing: + return "closing" + } + return "closed" +} + var ( errOutboundPacketTooLarge = errors.New("outbound packet larger than maximum message size") errStreamClosed = errors.New("Stream closed") @@ -35,13 +53,13 @@ type Stream struct { sequenceNumber uint16 readNotifier *sync.Cond readErr error - writeErr error unordered bool reliabilityType byte reliabilityValue uint32 bufferedAmount uint64 bufferedAmountLow uint64 onBufferedAmountLow func() + state StreamState log logging.LeveledLogger name string } @@ -178,32 +196,19 @@ func (s *Stream) Write(p []byte) (n int, err error) { } // WriteSCTP writes len(p) bytes from p to the DTLS connection -func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (n int, err error) { +func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (int, error) { maxMessageSize := s.association.MaxMessageSize() if len(p) > int(maxMessageSize) { return 0, fmt.Errorf("%w: %v", errOutboundPacketTooLarge, math.MaxUint16) } - switch s.association.getState() { - case shutdownSent, shutdownAckSent, shutdownPending, shutdownReceived: - s.lock.Lock() - if s.writeErr == nil { - s.writeErr = errStreamClosed - } - s.lock.Unlock() - default: - } - - s.lock.RLock() - err = s.writeErr - s.lock.RUnlock() + chunks := s.packetize(p, ppi) + n := len(p) + err := s.association.sendPayloadData(chunks) if err != nil { - return 0, err + return n, errStreamClosed } - - chunks := s.packetize(p, ppi) - - return len(p), s.association.sendPayloadData(chunks) + return n, nil } func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPayloadData { @@ -267,26 +272,18 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa // Close closes the write-direction of the stream. // Future calls to Write are not permitted after calling Close. func (s *Stream) Close() error { - if sid, isOpen := func() (uint16, bool) { + if sid, resetOutbound := func() (uint16, bool) { s.lock.Lock() defer s.lock.Unlock() - isOpen := true - if s.writeErr == nil { - s.writeErr = errStreamClosed - } else { - isOpen = false - } + s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String()) - if s.readErr == nil { - s.readErr = io.EOF - } else { - isOpen = false + if s.state == StreamStateOpen { + s.state = StreamStateClosing + return s.streamIdentifier, true } - s.readNotifier.Broadcast() // broadcast regardless - - return s.streamIdentifier, isOpen - }(); isOpen { + return s.streamIdentifier, false + }(); resetOutbound { // Reset the outgoing stream // https://tools.ietf.org/html/rfc6525 return s.association.sendResetRequest(sid) @@ -365,3 +362,25 @@ func (s *Stream) getNumBytesInReassemblyQueue() int { // No lock is required as it reads the size with atomic load function. return s.reassemblyQueue.getNumBytes() } + +func (s *Stream) onInboundStreamReset() { + var sid uint16 + var resetOutbound bool + s.lock.Lock() + s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String()) + if s.state == StreamStateOpen { + s.state = StreamStateClosing + sid = s.streamIdentifier + resetOutbound = true + } + s.lock.Unlock() + + // From draft-ietf-rtcweb-data-channel-13 section-6.7: + // if one side decides to close the data channel, it resets the corresponding + // outgoing stream. When the peer sees that an incoming stream was + // reset, it also resets its corresponding outgoing stream. Once this + // is completed, the data channel is closed. + if resetOutbound { + s.association.sendResetRequest(sid) + } +}