Skip to content

Commit

Permalink
Fix race and lint errors
Browse files Browse the repository at this point in the history
Relates to pion#239
  • Loading branch information
enobufs authored and jerry-tao committed Nov 14, 2022
1 parent 477ca3e commit 6410218
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
19 changes: 10 additions & 9 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -2090,20 +2090,14 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
dataLen := uint32(len(c.userData))
if dataLen == 0 {
sisToReset = append(sisToReset, c.streamIdentifier)
err := a.pendingQueue.pop(c)
if err != nil {
a.log.Errorf("failed to pop from pending queue: %s", err.Error())
}
a.popPendingDataChunksToDrop(c)
continue
}

s, ok := a.streams[c.streamIdentifier]

if !ok || s.state > StreamStateOpen || s.version != c.streamVersion {
err := a.pendingQueue.pop(c)
if err != nil {
a.log.Errorf("failed to pop from pending queue: %s", err.Error())
}
if !ok || s.State() > StreamStateOpen || s.version != c.streamVersion {
a.popPendingDataChunksToDrop(c)
continue
}

Expand Down Expand Up @@ -2135,6 +2129,13 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
return chunks, sisToReset
}

func (a *Association) popPendingDataChunksToDrop(c *chunkPayloadData) {
err := a.pendingQueue.pop(c)
if err != nil {
a.log.Errorf("failed to pop from pending queue: %s", err.Error())
}
}

// bundleDataChunksIntoPackets packs DATA chunks into packets. It tries to bundle
// DATA chunks into a packet so long as the resulting packet size does not exceed
// the path MTU.
Expand Down
30 changes: 17 additions & 13 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (

// StreamState is an enum for SCTP Stream state field
// This field identifies the state of stream.
type StreamState int
type StreamState int32

// StreamState enums
const (
Expand Down Expand Up @@ -340,17 +340,18 @@ func (s *Stream) Close() error {
s.lock.Lock()
defer s.lock.Unlock()

s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String())
state := s.State()
s.log.Debugf("[%s] Close: state=%s", s.name, state.String())

switch s.state {
switch state {
case StreamStateOpen:
s.state = StreamStateClosed
s.SetState(StreamStateClosed)
s.log.Debugf("[%s] state change: open => closed", s.name)
s.readErr = io.EOF
s.readNotifier.Broadcast()
return s.streamIdentifier, true
case StreamStateClosing:
s.state = StreamStateClosed
s.SetState(StreamStateClosed)
s.log.Debugf("[%s] state change: closing => closed", s.name)
return s.streamIdentifier, true
case StreamStateClosed:
Expand Down Expand Up @@ -441,7 +442,8 @@ func (s *Stream) onInboundStreamReset() {
s.lock.Lock()
defer s.lock.Unlock()

s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String())
state := s.State()
s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, state.String())

// No more inbound data to read. Unblock the read with io.EOF.
// This should cause DCEP layer (datachannel package) to call Close() which
Expand All @@ -452,19 +454,21 @@ func (s *Stream) onInboundStreamReset() {
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.
if s.state == StreamStateOpen {
if state == StreamStateOpen {
s.log.Debugf("[%s] state change: open => closing", s.name)
s.state = StreamStateClosing
s.SetState(StreamStateClosing)
}

s.readErr = io.EOF
s.readNotifier.Broadcast()

}

// State return the stream state.
// State atomically returns the stream state.
func (s *Stream) State() StreamState {
s.lock.RLock()
defer s.lock.RUnlock()
return s.state
return StreamState(atomic.LoadInt32((*int32)(&s.state)))
}

// SetState atomically sets the stream state.
func (s *Stream) SetState(newState StreamState) {
atomic.StoreInt32((*int32)(&s.state), int32(newState))
}

0 comments on commit 6410218

Please sign in to comment.