Skip to content

Commit

Permalink
Fix timout on BlockWriter.Shutdown (#322)
Browse files Browse the repository at this point in the history
- Close connection on PERSISTED event to reduce duration of `Shutdown()`.
  Connection to KVS endpoint is closed by the server a few seconds after receiving
  PERSISTED event.
- Avoid creating duplicated connection when there is a prepared one and timecode
  has over int16 jump which caused timeout on `Shutdown()`.
  • Loading branch information
at-wat authored Nov 8, 2022
1 parent 6edfeb2 commit 904654b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 42 deletions.
27 changes: 0 additions & 27 deletions closer.go

This file was deleted.

42 changes: 29 additions & 13 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,9 @@ func (p *Provider) PutMedia(opts ...PutMediaOption) (BlockWriter, error) {
options.logger.Debugf(`Forcing next connection: { StreamID: "%s", AbsTime: %d, LastAbsTime: %d, Diff: %d }`,
p.streamID, bt.AbsTimecode(), lastAbsTime, diff,
)
prepareNextConn()
if nextConn == nil {
prepareNextConn()
}
switchToNextConn(absTime)
}
}
Expand Down Expand Up @@ -353,7 +355,9 @@ func (p *Provider) putSegments(ctx context.Context, ch chan *connection, chResp
wg.Add(1)
go func() {
defer wg.Done()
opts.logger.Debugf("New conn: %p", conn)
err := p.putMedia(ctx, conn, chResp, opts)
opts.logger.Debugf("Finished conn: %p", conn)
if err != nil {
opts.onError(err)
return
Expand Down Expand Up @@ -431,6 +435,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan *
chMarshalDone := make(chan struct{})
go func() {
defer func() {
opts.logger.Debug("Finished EBML marshalling")
close(chMarshalDone)
wOutRaw.CloseWithError(io.EOF)
}()
Expand All @@ -447,6 +452,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan *

var wgResp sync.WaitGroup
defer func() {
opts.logger.Debug("Flushing responses")
close(chRespRaw)
wgResp.Wait()
}()
Expand All @@ -463,9 +469,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan *
}()

errPutMedia := p.putMediaRaw(ctx, r, chRespRaw, opts)
if errPutMedia != nil {
_ = r.Close()
}
_ = r.Close()

<-chMarshalDone
if errMarshal != nil {
Expand All @@ -479,6 +483,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan *

err := newMultiError(errPutMedia, errFlush, writeErr())
if err != nil && opts.retryCount > 0 {
opts.logger.Debug("Retrying PutMedia")
interval := opts.retryIntervalBase
L_RETRY:
for i := 0; i < opts.retryCount; i++ {
Expand All @@ -493,7 +498,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan *
p.streamID, i,
string(regexAmzCredHeader.ReplaceAll([]byte(strconv.Quote(err.Error())), []byte("X-Amz-$1=***"))),
)
if err = p.putMediaRaw(ctx, &nopCloser{bytes.NewReader(backup.Bytes())}, chRespRaw, opts); err == nil {
if err = p.putMediaRaw(ctx, bytes.NewReader(backup.Bytes()), chRespRaw, opts); err == nil {
break
}
if fe, ok := err.(*FragmentEventError); ok && opts.fragmentHeadDumpLen > 0 {
Expand All @@ -510,10 +515,12 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan *
return err
}

func (p *Provider) putMediaRaw(ctx context.Context, rc io.ReadCloser, chResp chan *FragmentEvent, opts *PutMediaOptions) error {
req, err := http.NewRequestWithContext(ctx, "POST", p.endpoint, rc)
func (p *Provider) putMediaRaw(ctx context.Context, r io.Reader, chResp chan *FragmentEvent, opts *PutMediaOptions) error {
ctx2, cancel := context.WithCancel(ctx)
defer cancel()

req, err := http.NewRequestWithContext(ctx2, "POST", p.endpoint, r)
if err != nil {
_ = rc.Close()
return fmt.Errorf("creating http request: %w", err)
}
if p.streamID.StreamName() != nil {
Expand All @@ -531,15 +538,16 @@ func (p *Provider) putMediaRaw(ctx context.Context, rc io.ReadCloser, chResp cha
10*time.Minute, time.Now(),
)
if err != nil {
_ = rc.Close()
return fmt.Errorf("presigning request: %w", err)
}
res, err := opts.httpClient.Do(req)
if err != nil {
return fmt.Errorf("sending http request: %w", err)
}

defer func() {
_ = res.Body.Close()
opts.logger.Debug("API connection closed")
}()

if res.StatusCode != 200 {
Expand All @@ -553,18 +561,26 @@ func (p *Provider) putMediaRaw(ctx context.Context, rc io.ReadCloser, chResp cha
chErr := make(chan error, 1)
go func() {
for fe := range chFE {
if fe.IsError() && err == nil {
switch fe.EventType {
case FRAGMENT_EVENT_ERROR:
chErr <- fe.AsError()
cancel()
case FRAGMENT_EVENT_PERSISTED:
cancel()
}
chResp <- fe
}
close(chErr)
}()
if err := parseFragmentEvent(res.Body, chFE); err != nil {
if err := parseFragmentEvent(
res.Body, chFE,
); err != nil && err != context.Canceled {
return err
}
err = <-chErr
return err
if err := ctx.Err(); err != nil {
return err
}
return <-chErr
}

func generateRandomUUID() ([]byte, error) {
Expand Down
20 changes: 18 additions & 2 deletions putresp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"
)

// ErrorID represents ErrorId enum of PutMedia API.
// See https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_dataplane_PutMedia.html for details.
type ErrorID int

const (
Expand All @@ -47,8 +49,22 @@ const (
ARCHIVAL_ERROR ErrorID = 5001
)

// FragmentEventType represents AckEventType enum of PutMedia API.
// See https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_dataplane_PutMedia.html for details.
type FragmentEventType string

const (
FRAGMENT_EVENT_BUFFERING FragmentEventType = "BUFFERING"
FRAGMENT_EVENT_RECEIVED FragmentEventType = "RECEIVED"
FRAGMENT_EVENT_PERSISTED FragmentEventType = "PERSISTED"
FRAGMENT_EVENT_ERROR FragmentEventType = "ERROR"
FRAGMENT_EVENT_IDLE FragmentEventType = "IDLE"
)

// FragmentEvent represents Acknowledgement object of PutMedia API.
// See https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_dataplane_PutMedia.html for details.
type FragmentEvent struct {
EventType string
EventType FragmentEventType
FragmentTimecode uint64
FragmentNumber string // 158-bit number, handle as string
ErrorId ErrorID
Expand All @@ -58,7 +74,7 @@ type FragmentEvent struct {
}

func (e *FragmentEvent) IsError() bool {
return e.EventType == "ERROR"
return e.EventType == FRAGMENT_EVENT_ERROR
}

func (e *FragmentEvent) AsError() error {
Expand Down

0 comments on commit 904654b

Please sign in to comment.