Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset outgoing stream on inbound is reset #1

Merged
merged 1 commit into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 7 additions & 33 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,9 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
if !ok {
continue
}
a.lock.Unlock()
s.onInboundStreamReset()
a.lock.Lock()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reporting the event to the corresponding stream and let it decide what to do.

a.unregisterStream(s, io.EOF)
}
delete(a.reconfigRequests, p.reconfigRequestSequenceNumber)
Expand All @@ -2023,41 +2026,12 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
result = reconfigResultInProgress
}

// From draft-ietf-rtcweb-data-channel-13 section-6.7:
// if one side decides to close the data channel, it resets the corresponding
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.

rsn := a.generateNextRSN()
tsn := a.myNextTSN - 1

c := &chunkReconfig{
paramA: &paramOutgoingResetRequest{
reconfigRequestSequenceNumber: rsn,
return a.createPacket([]chunk{&chunkReconfig{
paramA: &paramReconfigResponse{
reconfigResponseSequenceNumber: p.reconfigRequestSequenceNumber,
senderLastTSN: tsn,
streamIdentifiers: p.streamIdentifiers,
},
}
a.reconfigs[rsn] = c // store in the map for retransmission
a.log.Debugf("[%s] sending RECONFIG : rsn=%d tsn=%d",
a.name, rsn, a.myNextTSN-1)

if len(a.reconfigs) > 0 {
a.tReconfig.start(a.rtoMgr.getRTO())
}

return a.createPacket([]chunk{
&chunkReconfig{
paramA: &paramReconfigResponse{
reconfigResponseSequenceNumber: p.reconfigRequestSequenceNumber,
result: result,
},
result: result,
},
c,
})

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I am simply reverting the change.

}})
}

// Move the chunk peeked with a.pendingQueue.peek() to the inflightQueue.
Expand Down
89 changes: 54 additions & 35 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ const (
ReliabilityTypeTimed byte = 2
)

type StreamState int

const (
StreamStateOpen StreamState = iota // Stream object starts with "open"
StreamStateClosing // Outgoing stream is being reset
StreamStateClosed // Stream has been closed
)

func (ss StreamState) String() string {
switch ss {
case StreamStateOpen:
return "open"
case StreamStateClosing:
return "closing"
}
return "closed"
}

var (
errOutboundPacketTooLarge = errors.New("outbound packet larger than maximum message size")
errStreamClosed = errors.New("Stream closed")
Expand All @@ -35,13 +53,13 @@ type Stream struct {
sequenceNumber uint16
readNotifier *sync.Cond
readErr error
writeErr error
unordered bool
reliabilityType byte
reliabilityValue uint32
bufferedAmount uint64
bufferedAmountLow uint64
onBufferedAmountLow func()
state StreamState
log logging.LeveledLogger
name string
}
Expand Down Expand Up @@ -178,32 +196,19 @@ func (s *Stream) Write(p []byte) (n int, err error) {
}

// WriteSCTP writes len(p) bytes from p to the DTLS connection
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (n int, err error) {
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (int, error) {
maxMessageSize := s.association.MaxMessageSize()
if len(p) > int(maxMessageSize) {
return 0, fmt.Errorf("%w: %v", errOutboundPacketTooLarge, math.MaxUint16)
}

switch s.association.getState() {
case shutdownSent, shutdownAckSent, shutdownPending, shutdownReceived:
s.lock.Lock()
if s.writeErr == nil {
s.writeErr = errStreamClosed
}
s.lock.Unlock()
default:
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state checks are done inside sendPayloadData()


s.lock.RLock()
err = s.writeErr
s.lock.RUnlock()
chunks := s.packetize(p, ppi)
n := len(p)
err := s.association.sendPayloadData(chunks)
if err != nil {
return 0, err
return n, errStreamClosed
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to use the same error object to be compatible.

}

chunks := s.packetize(p, ppi)

return len(p), s.association.sendPayloadData(chunks)
return n, nil
}

func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPayloadData {
Expand Down Expand Up @@ -267,26 +272,18 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa
// Close closes the write-direction of the stream.
// Future calls to Write are not permitted after calling Close.
func (s *Stream) Close() error {
if sid, isOpen := func() (uint16, bool) {
if sid, resetOutbound := func() (uint16, bool) {
s.lock.Lock()
defer s.lock.Unlock()

isOpen := true
if s.writeErr == nil {
s.writeErr = errStreamClosed
} else {
isOpen = false
}
s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String())

if s.readErr == nil {
s.readErr = io.EOF
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not what RFC 8831 implies. s.Close() should only close the outgoing stream. (fixing the existing bug)

} else {
isOpen = false
if s.state == StreamStateOpen {
s.state = StreamStateClosing
return s.streamIdentifier, true
}
s.readNotifier.Broadcast() // broadcast regardless

return s.streamIdentifier, isOpen
}(); isOpen {
return s.streamIdentifier, false
}(); resetOutbound {
// Reset the outgoing stream
// https://tools.ietf.org/html/rfc6525
return s.association.sendResetRequest(sid)
Expand Down Expand Up @@ -365,3 +362,25 @@ func (s *Stream) getNumBytesInReassemblyQueue() int {
// No lock is required as it reads the size with atomic load function.
return s.reassemblyQueue.getNumBytes()
}

func (s *Stream) onInboundStreamReset() {
var sid uint16
var resetOutbound bool
s.lock.Lock()
s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String())
if s.state == StreamStateOpen {
s.state = StreamStateClosing
sid = s.streamIdentifier
resetOutbound = true
}
s.lock.Unlock()

// From draft-ietf-rtcweb-data-channel-13 section-6.7:
// if one side decides to close the data channel, it resets the corresponding
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.
if resetOutbound {
s.association.sendResetRequest(sid)
}
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally, this is what we wanted in the PR.