Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Remove pending buffer when stream closed" #255

Merged
merged 2 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,8 @@ type Association struct {
delayedAckTriggered bool
immediateAckTriggered bool

name string
log logging.LeveledLogger
streamVersion uint32
name string
log logging.LeveledLogger
}

// Config collects the arguments to createAssociation construction into
Expand Down Expand Up @@ -1369,7 +1368,6 @@ func (a *Association) createStream(streamIdentifier uint16, accept bool) *Stream
streamIdentifier: streamIdentifier,
reassemblyQueue: newReassemblyQueue(streamIdentifier),
log: a.log,
version: atomic.AddUint32(&a.streamVersion, 1),
name: fmt.Sprintf("%d:%s", streamIdentifier, a.name),
}

Expand Down Expand Up @@ -2090,14 +2088,10 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
dataLen := uint32(len(c.userData))
if dataLen == 0 {
sisToReset = append(sisToReset, c.streamIdentifier)
a.popPendingDataChunksToDrop(c)
continue
}

s, ok := a.streams[c.streamIdentifier]

if !ok || s.State() > StreamStateOpen || s.version != c.streamVersion {
a.popPendingDataChunksToDrop(c)
err := a.pendingQueue.pop(c)
if err != nil {
a.log.Errorf("failed to pop from pending queue: %s", err.Error())
}
continue
}

Expand Down Expand Up @@ -2129,13 +2123,6 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
return chunks, sisToReset
}

func (a *Association) popPendingDataChunksToDrop(c *chunkPayloadData) {
err := a.pendingQueue.pop(c)
if err != nil {
a.log.Errorf("failed to pop from pending queue: %s", err.Error())
}
}

// bundleDataChunksIntoPackets packs DATA chunks into packets. It tries to bundle
// DATA chunks into a packet so long as the resulting packet size does not exceed
// the path MTU.
Expand Down
3 changes: 1 addition & 2 deletions chunk_payload_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ type chunkPayloadData struct {
// chunk is still in the inflight queue
retransmit bool

head *chunkPayloadData // link to the head of the fragment
streamVersion uint32
head *chunkPayloadData // link to the head of the fragment
}

const (
Expand Down
51 changes: 20 additions & 31 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ const (

// StreamState is an enum for SCTP Stream state field
// This field identifies the state of stream.
type StreamState int32
type StreamState int

// StreamState enums
const (
StreamStateOpen StreamState = iota // Stream object starts with StreamStateOpen
StreamStateClosing // Stream is closed by remote
StreamStateClosing // Outgoing stream is being reset
StreamStateClosed // Stream has been closed
)

Expand Down Expand Up @@ -71,7 +71,6 @@ type Stream struct {
state StreamState
log logging.LeveledLogger
name string
version uint32
}

// StreamIdentifier returns the Stream identifier associated to the stream.
Expand Down Expand Up @@ -297,7 +296,6 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa
copy(userData, raw[i:i+fragmentSize])

chunk := &chunkPayloadData{
streamVersion: s.version,
streamIdentifier: s.streamIdentifier,
userData: userData,
unordered: unordered,
Expand Down Expand Up @@ -340,22 +338,16 @@ func (s *Stream) Close() error {
s.lock.Lock()
defer s.lock.Unlock()

state := s.State()
s.log.Debugf("[%s] Close: state=%s", s.name, state.String())
s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String())

switch state {
case StreamStateOpen:
s.SetState(StreamStateClosed)
s.log.Debugf("[%s] state change: open => closed", s.name)
s.readErr = io.EOF
s.readNotifier.Broadcast()
return s.streamIdentifier, true
case StreamStateClosing:
s.SetState(StreamStateClosed)
s.log.Debugf("[%s] state change: closing => closed", s.name)
if s.state == StreamStateOpen {
if s.readErr == nil {
s.state = StreamStateClosing
} else {
s.state = StreamStateClosed
}
s.log.Debugf("[%s] state change: open => %s", s.name, s.state.String())
return s.streamIdentifier, true
case StreamStateClosed:
return s.streamIdentifier, false
}
return s.streamIdentifier, false
}(); resetOutbound {
Expand Down Expand Up @@ -442,8 +434,7 @@ func (s *Stream) onInboundStreamReset() {
s.lock.Lock()
defer s.lock.Unlock()

state := s.State()
s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, state.String())
s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String())

// No more inbound data to read. Unblock the read with io.EOF.
// This should cause DCEP layer (datachannel package) to call Close() which
Expand All @@ -454,21 +445,19 @@ func (s *Stream) onInboundStreamReset() {
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.
if state == StreamStateOpen {
s.log.Debugf("[%s] state change: open => closing", s.name)
s.SetState(StreamStateClosing)
}

s.readErr = io.EOF
s.readNotifier.Broadcast()
}

// State atomically returns the stream state.
func (s *Stream) State() StreamState {
return StreamState(atomic.LoadInt32((*int32)(&s.state)))
if s.state == StreamStateClosing {
s.log.Debugf("[%s] state change: closing => closed", s.name)
s.state = StreamStateClosed
}
}

// SetState atomically sets the stream state.
func (s *Stream) SetState(newState StreamState) {
atomic.StoreInt32((*int32)(&s.state), int32(newState))
// State return the stream state.
func (s *Stream) State() StreamState {
s.lock.RLock()
defer s.lock.RUnlock()
return s.state
}
Loading