From 904654b6ee5da585452e41db0d7fd0dad9ed5b22 Mon Sep 17 00:00:00 2001 From: Atsushi Watanabe Date: Tue, 8 Nov 2022 10:45:12 +0900 Subject: [PATCH] Fix timout on BlockWriter.Shutdown (#322) - Close connection on PERSISTED event to reduce duration of `Shutdown()`. Connection to KVS endpoint is closed by the server a few seconds after receiving PERSISTED event. - Avoid creating duplicated connection when there is a prepared one and timecode has over int16 jump which caused timeout on `Shutdown()`. --- closer.go | 27 --------------------------- provider.go | 42 +++++++++++++++++++++++++++++------------- putresp.go | 20 ++++++++++++++++++-- 3 files changed, 47 insertions(+), 42 deletions(-) delete mode 100644 closer.go diff --git a/closer.go b/closer.go deleted file mode 100644 index 10a9322..0000000 --- a/closer.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2021 SEQSENSE, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kinesisvideomanager - -import ( - "io" -) - -type nopCloser struct { - io.Reader -} - -func (nopCloser) Close() error { - return nil -} diff --git a/provider.go b/provider.go index ec7bb26..6dd2a56 100644 --- a/provider.go +++ b/provider.go @@ -295,7 +295,9 @@ func (p *Provider) PutMedia(opts ...PutMediaOption) (BlockWriter, error) { options.logger.Debugf(`Forcing next connection: { StreamID: "%s", AbsTime: %d, LastAbsTime: %d, Diff: %d }`, p.streamID, bt.AbsTimecode(), lastAbsTime, diff, ) - prepareNextConn() + if nextConn == nil { + prepareNextConn() + } switchToNextConn(absTime) } } @@ -353,7 +355,9 @@ func (p *Provider) putSegments(ctx context.Context, ch chan *connection, chResp wg.Add(1) go func() { defer wg.Done() + opts.logger.Debugf("New conn: %p", conn) err := p.putMedia(ctx, conn, chResp, opts) + opts.logger.Debugf("Finished conn: %p", conn) if err != nil { opts.onError(err) return @@ -431,6 +435,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan * chMarshalDone := make(chan struct{}) go func() { defer func() { + opts.logger.Debug("Finished EBML marshalling") close(chMarshalDone) wOutRaw.CloseWithError(io.EOF) }() @@ -447,6 +452,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan * var wgResp sync.WaitGroup defer func() { + opts.logger.Debug("Flushing responses") close(chRespRaw) wgResp.Wait() }() @@ -463,9 +469,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan * }() errPutMedia := p.putMediaRaw(ctx, r, chRespRaw, opts) - if errPutMedia != nil { - _ = r.Close() - } + _ = r.Close() <-chMarshalDone if errMarshal != nil { @@ -479,6 +483,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan * err := newMultiError(errPutMedia, errFlush, writeErr()) if err != nil && opts.retryCount > 0 { + opts.logger.Debug("Retrying PutMedia") interval := opts.retryIntervalBase L_RETRY: for i := 0; i < opts.retryCount; i++ { @@ -493,7 +498,7 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan * p.streamID, i, string(regexAmzCredHeader.ReplaceAll([]byte(strconv.Quote(err.Error())), []byte("X-Amz-$1=***"))), ) - if err = p.putMediaRaw(ctx, &nopCloser{bytes.NewReader(backup.Bytes())}, chRespRaw, opts); err == nil { + if err = p.putMediaRaw(ctx, bytes.NewReader(backup.Bytes()), chRespRaw, opts); err == nil { break } if fe, ok := err.(*FragmentEventError); ok && opts.fragmentHeadDumpLen > 0 { @@ -510,10 +515,12 @@ func (p *Provider) putMedia(ctx context.Context, conn *connection, chResp chan * return err } -func (p *Provider) putMediaRaw(ctx context.Context, rc io.ReadCloser, chResp chan *FragmentEvent, opts *PutMediaOptions) error { - req, err := http.NewRequestWithContext(ctx, "POST", p.endpoint, rc) +func (p *Provider) putMediaRaw(ctx context.Context, r io.Reader, chResp chan *FragmentEvent, opts *PutMediaOptions) error { + ctx2, cancel := context.WithCancel(ctx) + defer cancel() + + req, err := http.NewRequestWithContext(ctx2, "POST", p.endpoint, r) if err != nil { - _ = rc.Close() return fmt.Errorf("creating http request: %w", err) } if p.streamID.StreamName() != nil { @@ -531,15 +538,16 @@ func (p *Provider) putMediaRaw(ctx context.Context, rc io.ReadCloser, chResp cha 10*time.Minute, time.Now(), ) if err != nil { - _ = rc.Close() return fmt.Errorf("presigning request: %w", err) } res, err := opts.httpClient.Do(req) if err != nil { return fmt.Errorf("sending http request: %w", err) } + defer func() { _ = res.Body.Close() + opts.logger.Debug("API connection closed") }() if res.StatusCode != 200 { @@ -553,18 +561,26 @@ func (p *Provider) putMediaRaw(ctx context.Context, rc io.ReadCloser, chResp cha chErr := make(chan error, 1) go func() { for fe := range chFE { - if fe.IsError() && err == nil { + switch fe.EventType { + case FRAGMENT_EVENT_ERROR: chErr <- fe.AsError() + cancel() + case FRAGMENT_EVENT_PERSISTED: + cancel() } chResp <- fe } close(chErr) }() - if err := parseFragmentEvent(res.Body, chFE); err != nil { + if err := parseFragmentEvent( + res.Body, chFE, + ); err != nil && err != context.Canceled { return err } - err = <-chErr - return err + if err := ctx.Err(); err != nil { + return err + } + return <-chErr } func generateRandomUUID() ([]byte, error) { diff --git a/putresp.go b/putresp.go index a6b9288..146ff22 100644 --- a/putresp.go +++ b/putresp.go @@ -21,6 +21,8 @@ import ( "io" ) +// ErrorID represents ErrorId enum of PutMedia API. +// See https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_dataplane_PutMedia.html for details. type ErrorID int const ( @@ -47,8 +49,22 @@ const ( ARCHIVAL_ERROR ErrorID = 5001 ) +// FragmentEventType represents AckEventType enum of PutMedia API. +// See https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_dataplane_PutMedia.html for details. +type FragmentEventType string + +const ( + FRAGMENT_EVENT_BUFFERING FragmentEventType = "BUFFERING" + FRAGMENT_EVENT_RECEIVED FragmentEventType = "RECEIVED" + FRAGMENT_EVENT_PERSISTED FragmentEventType = "PERSISTED" + FRAGMENT_EVENT_ERROR FragmentEventType = "ERROR" + FRAGMENT_EVENT_IDLE FragmentEventType = "IDLE" +) + +// FragmentEvent represents Acknowledgement object of PutMedia API. +// See https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_dataplane_PutMedia.html for details. type FragmentEvent struct { - EventType string + EventType FragmentEventType FragmentTimecode uint64 FragmentNumber string // 158-bit number, handle as string ErrorId ErrorID @@ -58,7 +74,7 @@ type FragmentEvent struct { } func (e *FragmentEvent) IsError() bool { - return e.EventType == "ERROR" + return e.EventType == FRAGMENT_EVENT_ERROR } func (e *FragmentEvent) AsError() error {