Skip to content

Commit

Permalink
1.mysql数据库配置未配置时,默认使用sqlite
Browse files Browse the repository at this point in the history
2.当前该流有事件录像时,重复请求该流的事件录像,录像结束时间顺延,仅实现flv
  • Loading branch information
pggiroro committed Sep 19, 2024
1 parent d31d4a3 commit b15c776
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 33 deletions.
65 changes: 64 additions & 1 deletion flv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package record

import (
"fmt"
"go.uber.org/zap/zapcore"
"io"
"net"
"os"
"strconv"
"strings"
"sync"
"time"

"go.uber.org/zap"
Expand All @@ -21,10 +23,71 @@ type FLVRecorder struct {
times []float64
Offset int64
duration int64
timer *time.Timer
stopCh chan struct{}
mu sync.Mutex
}

// Goroutine 等待定时器停止录像
func (r *FLVRecorder) waitForStop(streamPath string) {
select {
case <-r.timer.C: // 定时器到期
r.StopTimerRecord(zap.String("reason", "timer expired"))
case <-r.stopCh: // 手动停止
return
}
}

// 停止定时录像
func (r *FLVRecorder) StopTimerRecord(reason ...zapcore.Field) {
r.mu.Lock()
defer r.mu.Unlock()

// 停止录像
r.Stop(reason...)

// 关闭 stop 通道,停止 Goroutine
close(r.stopCh)
}

// 重置定时器
func (r *FLVRecorder) resetTimer(timeout time.Duration) {
if r.timer != nil {
r.Info("事件录像", zap.String("timeout seconeds is reset to", fmt.Sprintf("%.0f", timeout.Seconds())))
r.timer.Reset(timeout)
} else {
r.Info("事件录像", zap.String("timeout seconeds is first set to", fmt.Sprintf("%.0f", timeout.Seconds())))
r.timer = time.NewTimer(timeout)
}
}

func (r *FLVRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
// 启动录像
if err := r.StartWithFileName(streamPath, fileName); err != nil {
return err
}

// 创建定时器
r.resetTimer(timeout)

// 启动 Goroutine 监听定时器
go r.waitForStop(streamPath)

return nil
}

func (r *FLVRecorder) UpdateTimeout(timeout time.Duration) {
r.mu.Lock()
defer r.mu.Unlock()

// 停止旧的定时器并重置
r.resetTimer(timeout)
}

func NewFLVRecorder() (r *FLVRecorder) {
r = &FLVRecorder{}
r = &FLVRecorder{
stopCh: make(chan struct{}),
}
r.Record = RecordPluginConfig.Flv
return r
}
Expand Down
11 changes: 11 additions & 0 deletions fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/Eyevinn/mp4ff/mp4"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"time"
)

type mediaContext struct {
Expand Down Expand Up @@ -43,6 +44,16 @@ type FMP4Recorder struct {
ftyp *mp4.FtypBox
}

func (r *FMP4Recorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
//TODO implement me
panic("implement me")
}

func (r *FMP4Recorder) UpdateTimeout(timeout time.Duration) {
//TODO implement me
panic("implement me")
}

func NewFMP4Recorder() *FMP4Recorder {
r := &FMP4Recorder{}
r.Record = RecordPluginConfig.Fmp4
Expand Down
12 changes: 11 additions & 1 deletion hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ type HLSRecorder struct {
MemoryTs
}

func (h *HLSRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
//TODO implement me
panic("implement me")
}

func (h *HLSRecorder) UpdateTimeout(timeout time.Duration) {
//TODO implement me
panic("implement me")
}

func NewHLSRecorder() (r *HLSRecorder) {
r = &HLSRecorder{}
r.Record = RecordPluginConfig.Hls
Expand Down Expand Up @@ -81,7 +91,7 @@ func (h *HLSRecorder) OnEvent(event any) {
case AudioFrame:
if h.tsStartTime == 0 {
h.tsStartTime = v.AbsTime
}
}
h.tsLastTime = v.AbsTime
h.Recorder.OnEvent(event)
pes := &mpegts.MpegtsPESFrame{
Expand Down
16 changes: 10 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,22 @@ var exceptionChannel = make(chan *Exception)
func (conf *RecordConfig) OnEvent(event any) {
switch v := event.(type) {
case FirstConfig, config.Config:
if conf.MysqlDSN == "" {
plugin.Error("mysqlDSN 数据库连接配置为空,无法运行,请在config.yaml里配置")
}
plugin.Info("mysqlDSN is" + conf.MysqlDSN)
//if conf.MysqlDSN == "" {
// plugin.Error("mysqlDSN 数据库连接配置为空,无法运行,请在config.yaml里配置")
//}

go func() { //处理所有异常,录像中断异常、录像读取异常、录像导出文件中断、磁盘容量低于阈值异常、磁盘异常
for exception := range exceptionChannel {
SendToThirdPartyAPI(exception)
}
}()
initMysqlDB(conf.MysqlDSN)
initSqliteDB(conf.SqliteDbPath)
if conf.MysqlDSN == "" {
plugin.Info("sqliteDb filepath is" + conf.SqliteDbPath)
initSqliteDB(conf.SqliteDbPath)
} else {
plugin.Info("mysqlDSN is" + conf.MysqlDSN)
initMysqlDB(conf.MysqlDSN)
}
conf.Flv.Init()
conf.Mp4.Init()
conf.Fmp4.Init()
Expand Down
11 changes: 11 additions & 0 deletions mp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package record

import (
"net"
"time"

"github.com/yapingcat/gomedia/go-mp4"
"go.uber.org/zap"
Expand All @@ -17,6 +18,16 @@ type MP4Recorder struct {
audioId uint32
}

func (r *MP4Recorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
//TODO implement me
panic("implement me")
}

func (r *MP4Recorder) UpdateTimeout(timeout time.Duration) {
//TODO implement me
panic("implement me")
}

func NewMP4Recorder() *MP4Recorder {
r := &MP4Recorder{}
r.Record = RecordPluginConfig.Mp4
Expand Down
11 changes: 11 additions & 0 deletions raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@ import (
. "m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/track"
"time"
)

type RawRecorder struct {
Recorder
IsAudio bool
}

func (r *RawRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
//TODO implement me
panic("implement me")
}

func (r *RawRecorder) UpdateTimeout(timeout time.Duration) {
//TODO implement me
panic("implement me")
}

func NewRawRecorder() (r *RawRecorder) {
r = &RawRecorder{}
r.Record = RecordPluginConfig.Raw
Expand Down
50 changes: 27 additions & 23 deletions restful_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package record
import (
"encoding/json"
"fmt"
"go.uber.org/zap"
"io"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"

"m7s.live/engine/v4/util"
)

var mu sync.Mutex

func errorJsonString(args map[string]interface{}) string {
resultJsonData := make(map[string]interface{})
for field, value := range args {
Expand Down Expand Up @@ -178,7 +180,8 @@ func (conf *RecordConfig) API_event_list(w http.ResponseWriter, r *http.Request)

// 事件录像
func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request) {

mu.Lock()
defer mu.Unlock()
token := r.Header.Get("token")
resultJsonData := make(map[string]interface{})
resultJsonData["code"] = -1
Expand Down Expand Up @@ -215,20 +218,6 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
//var streamExist = false
//conf.recordings.Range(func(key, value any) bool {
// existStreamPath := value.(IRecorder).GetSubscriber().Stream.Path
// if existStreamPath == streamPath {
// resultJsonData["msg"] = "streamPath is exist"
// util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
// streamExist = true
// return !streamExist
// }
// return !streamExist
//})
//if streamExist {
// return
//}
eventId := eventRecordModel.EventId
if eventId == "" {
resultJsonData["msg"] = "no eventId"
Expand Down Expand Up @@ -263,6 +252,16 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
fragment := eventRecordModel.Fragment
//var id string
irecorder := NewFLVRecorder()
found := false
conf.recordings.Range(func(key, value any) bool {
tmpIRecorder := value.(*FLVRecorder)
existStreamPath := tmpIRecorder.GetSubscriber().Stream.Path
if existStreamPath == streamPath {
irecorder = tmpIRecorder
found = true
}
return found
})
recorder := irecorder.GetRecorder()
recorder.FileName = fileName
recorder.append = false
Expand All @@ -273,14 +272,19 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
recorder.Fragment = f
}
}
err = irecorder.StartWithFileName(streamPath, fileName)
go func() {
timer := time.NewTimer(30 * time.Second)

// 等待计时器到期
<-timer.C
irecorder.Stop(zap.String("reason", "api"))
}()
if found {
irecorder.UpdateTimeout(30 * time.Second)
} else {
err = irecorder.StartWithDynamicTimeout(streamPath, fileName, 30*time.Second)
}
//go func() {
// timer := time.NewTimer(30 * time.Second)
//
// // 等待计时器到期
// <-timer.C
// irecorder.Stop(zap.String("reason", "api"))
//}()
//id = recorder.ID
if err != nil {
exceptionChannel <- &Exception{AlarmType: "record", AlarmDesc: "录像失败", StreamPath: streamPath}
Expand Down
2 changes: 2 additions & 0 deletions sqlitedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func initSqliteDB(sqliteDbPath string) {
log.Fatal(err)
}
err = sqlitedb.AutoMigrate(&FLVKeyframe{})
err = sqlitedb.AutoMigrate(&EventRecord{})
err = sqlitedb.AutoMigrate(&Exception{})
if err != nil {
log.Fatal(err)
}
Expand Down
16 changes: 14 additions & 2 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bufio"
"io"
"path/filepath"
"strconv"
"strings"
"time"

"go.uber.org/zap"
Expand All @@ -18,6 +18,8 @@ type IRecorder interface {
StartWithFileName(streamPath string, fileName string) error
io.Closer
CreateFile() (FileWr, error)
StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error
UpdateTimeout(timeout time.Duration)
}

type Recorder struct {
Expand Down Expand Up @@ -61,7 +63,7 @@ func (r *Recorder) getFileName(streamPath string) (filename string) {
filename = filepath.Join(filename, r.FileName)
}
} else {
filename = filepath.Join(filename, strconv.FormatInt(time.Now().Unix(), 10))
filename = filepath.Join(filename, strings.ReplaceAll(streamPath, "/", "-")+"-"+time.Now().Format("2006-01-02-15-04-05"))
}
return
}
Expand Down Expand Up @@ -114,6 +116,16 @@ func (r *Recorder) OnEvent(event any) {
r.cut(v.AbsTime)
}
case VideoFrame:
if v.IFrame {
//go func() { //将视频关键帧的数据存入sqlite数据库中
// var flvKeyfram = &FLVKeyframe{FLVFileName: r.Path + "/" + strings.ReplaceAll(r.filePath, "\\", "/"), FrameOffset: r.VideoReader, FrameAbstime: v.AbsTime}
// sqlitedb.Create(flvKeyfram)
//}()
r.Info("这是关键帧,且取到了r.filePath是" + r.Path + r.filePath)
//r.Info("这是关键帧,且取到了r.VideoReader.AbsTime是" + strconv.FormatUint(uint64(v.FrameAbstime), 10))
//r.Info("这是关键帧,且取到了r.Offset是" + strconv.Itoa(int(v.FrameOffset)))
//r.Info("这是关键帧,且取到了r.Offset是" + r.Stream.Path)
}
if r.Fragment > 0 && v.IFrame {
r.cut(v.AbsTime)
}
Expand Down

0 comments on commit b15c776

Please sign in to comment.