From 9b33cbd5dbe9e34efbe4a0515414c34277f8c167 Mon Sep 17 00:00:00 2001 From: pg Date: Wed, 2 Oct 2024 10:45:21 +0800 Subject: [PATCH] =?UTF-8?q?1.=E8=87=AA=E5=8A=A8=E5=88=A0=E9=99=A4=E6=9C=80?= =?UTF-8?q?=E6=97=A7=E7=9A=84=E5=BD=95=E5=83=8F=E6=96=87=E4=BB=B6=E7=9A=84?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=202.=E5=A2=9E=E5=8A=A0=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E7=94=A8=E4=BA=8E=E5=8C=BA=E5=88=86record=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E6=98=AF=E8=87=AA=E5=8A=A8=E7=9A=84=E6=99=AE=E9=80=9A=E5=BD=95?= =?UTF-8?q?=E5=83=8F=E8=BF=98=E6=98=AF=E4=BA=8B=E4=BB=B6=E5=BD=95=E5=83=8F?= =?UTF-8?q?=203.=E4=BF=AE=E5=A4=8D=E8=87=AA=E5=8A=A8=E5=BD=95=E5=83=8F?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E8=AE=A2=E9=98=85=E5=8D=B4=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E6=97=B6=E4=BC=9A=E7=94=9F=E6=88=90=E5=B0=8F?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- entity.go | 34 +++++++++-------- exception.go | 2 +- flv.go | 59 ++++++++++++++++++++++------- fmp4.go | 10 +++++ hls.go | 10 +++++ main.go | 97 ++++++++++++++++++++++++++++++++++-------------- mp4.go | 22 +++++++++++ mysqldb.go | 9 +++-- raw.go | 10 +++++ restful.go | 2 +- restful_event.go | 67 ++++++++++++++++++--------------- sqlitedb.go | 7 ++-- subscriber.go | 35 +++++++++++++++-- 13 files changed, 264 insertions(+), 100 deletions(-) diff --git a/entity.go b/entity.go index 3521e7d..78567e6 100644 --- a/entity.go +++ b/entity.go @@ -5,22 +5,24 @@ import "time" // mysql数据库eventrecord表 type EventRecord struct { Id uint `json:"id" desc:"自增长id" gorm:"primaryKey;autoIncrement"` - StreamPath string `json:"streamPath" desc:"流路径" gorm:"type:varchar(255)"` - EventId string `json:"eventId" desc:"事件编号" gorm:"type:varchar(255)"` - RecordMode string `json:"recordMode" desc:"事件类型,0=连续录像模式,1=事件录像模式" gorm:"type:varchar(255)"` - EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255)"` - BeforeDuration string `json:"beforeDuration" desc:"事件前缓存时长" gorm:"type:varchar(255);"` - AfterDuration string `json:"afterDuration" desc:"事件后缓存时长" gorm:"type:varchar(255)"` - CreateTime string `json:"createTime" desc:"录像时间" gorm:"type:varchar(255)"` - StartTime string `json:"startTime" desc:"录像开始时间" gorm:"type:varchar(255)"` - EndTime string `json:"endTime" desc:"录像结束时间" gorm:"type:varchar(255)"` - Filepath string `json:"filePath" desc:"录像文件物理路径" gorm:"type:varchar(255)"` - Urlpath string `json:"urlPath" desc:"录像文件下载URL路径" gorm:"type:varchar(255)"` - IsDelete string `json:"isDelete" desc:"是否删除,0表示正常,1表示删除,默认0" gorm:"type:varchar(255);default:'0'"` - UserId string `json:"useId" desc:"用户id" gorm:"type:varchar(255)"` - Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255)"` - Fragment string `json:"fragment" desc:"切片大小" gorm:"type:varchar(255)"` - EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255)"` + StreamPath string `json:"streamPath" desc:"流路径" gorm:"type:varchar(255);comment:流路径"` + EventId string `json:"eventId" desc:"事件编号" gorm:"type:varchar(255);comment:事件编号"` + RecordMode string `json:"recordMode" desc:"事件类型,0=连续录像模式,1=事件录像模式" gorm:"type:varchar(255);comment:事件类型,0=连续录像模式,1=事件录像模式"` + EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"` + BeforeDuration string `json:"beforeDuration" desc:"事件前缓存时长" gorm:"type:varchar(255);comment:事件前缓存时长"` + AfterDuration string `json:"afterDuration" desc:"事件后缓存时长" gorm:"type:varchar(255);comment:事件后缓存时长"` + CreateTime string `json:"createTime" desc:"录像时间" gorm:"type:varchar(255);comment:录像时间"` + StartTime string `json:"startTime" desc:"录像开始时间" gorm:"type:varchar(255);comment:录像开始时间"` + EndTime string `json:"endTime" desc:"录像结束时间" gorm:"type:varchar(255);comment:录像结束时间"` + Filepath string `json:"filePath" desc:"录像文件物理路径" gorm:"type:varchar(255);comment:录像文件物理路径"` + Urlpath string `json:"urlPath" desc:"录像文件下载URL路径" gorm:"type:varchar(255);comment:录像文件下载URL路径"` + IsDelete string `json:"isDelete" desc:"是否删除,0表示正常,1表示删除,默认0" gorm:"type:varchar(255);default:'0';comment:是否删除,0表示正常,1表示删除,默认0"` + UserId string `json:"useId" desc:"用户id" gorm:"type:varchar(255);comment:用户id"` + Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"` + Fragment string `json:"fragment" desc:"切片大小" gorm:"type:varchar(255);comment:切片大小;default:'0'"` + EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255);comment:事件描述"` + Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"` + EventLevel string `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,0表示重要事件,无法删除且表示无需自动删除,1表示非重要事件,达到自动删除时间后,自动删除;default:'1'"` } //// TableName 返回自定义的表名 diff --git a/exception.go b/exception.go index bc49fbb..b475ec4 100644 --- a/exception.go +++ b/exception.go @@ -18,7 +18,7 @@ func SendToThirdPartyAPI(exception *Exception) { fmt.Println("Error marshalling exception:", err) return } - err = mysqldb.Create(&exception).Error + err = db.Create(&exception).Error if err != nil { fmt.Println("异常数据插入数据库失败:", err) return diff --git a/flv.go b/flv.go index 8ff569d..8534c83 100644 --- a/flv.go +++ b/flv.go @@ -6,7 +6,6 @@ import ( "io" "net" "os" - "strconv" "strings" "sync" "time" @@ -26,6 +25,22 @@ type FLVRecorder struct { timer *time.Timer stopCh chan struct{} mu sync.Mutex + RecordMode +} + +func (r *FLVRecorder) SetId(streamPath string) { + r.ID = fmt.Sprintf("%s/flv/%s", streamPath, r.GetRecordModeString(r.RecordMode)) +} + +func (r *FLVRecorder) GetRecordModeString(mode RecordMode) string { + switch mode { + case EventMode: + return "eventmode" + case OrdinaryMode: + return "ordinarymode" + default: + return "" + } } // Goroutine 等待定时器停止录像 @@ -84,21 +99,22 @@ func (r *FLVRecorder) UpdateTimeout(timeout time.Duration) { r.resetTimer(timeout) } -func NewFLVRecorder() (r *FLVRecorder) { +func NewFLVRecorder(mode RecordMode) (r *FLVRecorder) { r = &FLVRecorder{ - stopCh: make(chan struct{}), + stopCh: make(chan struct{}), + RecordMode: mode, } r.Record = RecordPluginConfig.Flv return r } func (r *FLVRecorder) Start(streamPath string) (err error) { - r.ID = streamPath + "/flv" + r.ID = fmt.Sprintf("%s/flv/%s", streamPath, r.GetRecordModeString(r.RecordMode)) return r.start(r, streamPath, SUBTYPE_FLV) } func (r *FLVRecorder) StartWithFileName(streamPath string, fileName string) error { - r.ID = fmt.Sprintf("%s/flv/%s", streamPath, fileName) + r.ID = fmt.Sprintf("%s/flv/%s", streamPath, r.GetRecordModeString(r.RecordMode)) return r.start(r, streamPath, SUBTYPE_FLV) } @@ -223,14 +239,14 @@ func (r *FLVRecorder) OnEvent(event any) { } case VideoFrame: if r.VideoReader.Value.IFrame { - go func() { //将视频关键帧的数据存入sqlite数据库中 - var flvKeyfram = &FLVKeyframe{FLVFileName: r.Path + "/" + strings.ReplaceAll(r.filePath, "\\", "/"), FrameOffset: r.Offset, FrameAbstime: r.VideoReader.AbsTime} - sqlitedb.Create(flvKeyfram) - }() - r.Info("这是关键帧,且取到了r.filePath是" + r.Path + r.filePath) - r.Info("这是关键帧,且取到了r.VideoReader.AbsTime是" + strconv.FormatUint(uint64(r.VideoReader.AbsTime), 10)) - r.Info("这是关键帧,且取到了r.Offset是" + strconv.Itoa(int(r.Offset))) - r.Info("这是关键帧,且取到了r.Offset是" + r.Stream.Path) + //go func() { //将视频关键帧的数据存入sqlite数据库中 + // var flvKeyfram = &FLVKeyframe{FLVFileName: r.Path + "/" + strings.ReplaceAll(r.filePath, "\\", "/"), FrameOffset: r.Offset, FrameAbstime: r.VideoReader.AbsTime} + // db.Create(flvKeyfram) + //}() + //r.Info("这是关键帧,且取到了r.filePath是" + r.Path + r.filePath) + //r.Info("这是关键帧,且取到了r.VideoReader.AbsTime是" + strconv.FormatUint(uint64(r.VideoReader.AbsTime), 10)) + //r.Info("这是关键帧,且取到了r.Offset是" + strconv.Itoa(int(r.Offset))) + //r.Info("这是关键帧,且取到了r.Offset是" + r.Stream.Path) } case FLVFrame: check := false @@ -283,8 +299,25 @@ func (r *FLVRecorder) OnEvent(event any) { func (r *FLVRecorder) Close() error { if r.File != nil { if !r.append { + go func() { + if r.RecordMode == OrdinaryMode { + startTime := time.Now().Add(-time.Duration(r.duration) * time.Millisecond).Format("2006-01-02 15:04:05") + endTime := time.Now().Format("2006-01-02 15:04:05") + fileName := r.FileName + if r.FileName == "" { + fileName = strings.ReplaceAll(r.Stream.Path, "/", "-") + "-" + time.Now().Format("2006-01-02-15-04-05") + } + filepath := RecordPluginConfig.Flv.Path + "/" + r.Stream.Path + "/" + fileName + r.Ext //录像文件存入的完整路径(相对路径) + eventRecord := EventRecord{StreamPath: r.Stream.Path, RecordMode: "0", BeforeDuration: "0", + AfterDuration: fmt.Sprintf("%.0f", r.Fragment.Seconds()), CreateTime: startTime, StartTime: startTime, + EndTime: endTime, Filepath: filepath, Filename: fileName + r.Ext, Urlpath: "record/" + strings.ReplaceAll(r.filePath, "\\", "/"), Fragment: fmt.Sprintf("%.0f", r.Fragment.Seconds()), Type: "flv"} + err = db.Omit("id", "isDelete").Create(&eventRecord).Error + } + }() + plugin.Info("====into close append false===recordid is===" + r.ID + "====record type is " + r.GetRecordModeString(r.RecordMode) + "====starttime is " + time.Now().Add(-time.Duration(r.duration)*time.Millisecond).Format("2006-01-02 15:04:05")) go r.writeMetaData(r.File, r.duration) } else { + plugin.Info("====into close append true===recordid is===" + r.ID + "====record type is " + r.GetRecordModeString(r.RecordMode)) return r.File.Close() } } diff --git a/fmp4.go b/fmp4.go index a9c222f..80d5db1 100644 --- a/fmp4.go +++ b/fmp4.go @@ -44,6 +44,16 @@ type FMP4Recorder struct { ftyp *mp4.FtypBox } +func (r *FMP4Recorder) SetId(string) { + //TODO implement me + panic("implement me") +} + +func (r *FMP4Recorder) GetRecordModeString(mode RecordMode) string { + //TODO implement me + panic("implement me") +} + func (r *FMP4Recorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error { //TODO implement me panic("implement me") diff --git a/hls.go b/hls.go index aed4e9d..f4bd4b9 100644 --- a/hls.go +++ b/hls.go @@ -24,6 +24,16 @@ type HLSRecorder struct { MemoryTs } +func (h *HLSRecorder) SetId(string) { + //TODO implement me + panic("implement me") +} + +func (h *HLSRecorder) GetRecordModeString(mode RecordMode) string { + //TODO implement me + panic("implement me") +} + func (h *HLSRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error { //TODO implement me panic("implement me") diff --git a/main.go b/main.go index 7ad7dc1..d08ed7f 100644 --- a/main.go +++ b/main.go @@ -3,33 +3,38 @@ package record import ( _ "embed" "errors" + "fmt" + "gorm.io/gorm" "io" - "net" - "sync" - . "m7s.live/engine/v4" "m7s.live/engine/v4/codec" "m7s.live/engine/v4/config" "m7s.live/engine/v4/util" + "net" + "os" + "sync" + "time" ) type RecordConfig struct { config.Subscribe config.HTTP - Flv Record `desc:"flv录制配置"` - Mp4 Record `desc:"mp4录制配置"` - Fmp4 Record `desc:"fmp4录制配置"` - Hls Record `desc:"hls录制配置"` - Raw Record `desc:"视频裸流录制配置"` - RawAudio Record `desc:"音频裸流录制配置"` - recordings sync.Map - beforeDuration int `desc:"事件前缓存时长"` - afterDuration int `desc:"事件后缓存时长"` - MysqlDSN string `desc:"mysql数据库连接字符串"` - ExceptionPostUrl string `desc:"第三方异常上报地址"` - SqliteDbPath string `desc:"sqlite数据库路径"` - DiskMaxPercent float64 `desc:"硬盘使用百分之上限值,超过后报警"` - LocalIp string `desc:"本机IP"` + Flv Record `desc:"flv录制配置"` + Mp4 Record `desc:"mp4录制配置"` + Fmp4 Record `desc:"fmp4录制配置"` + Hls Record `desc:"hls录制配置"` + Raw Record `desc:"视频裸流录制配置"` + RawAudio Record `desc:"音频裸流录制配置"` + recordings sync.Map + beforeDuration int `desc:"事件前缓存时长"` + afterDuration int `desc:"事件后缓存时长"` + MysqlDSN string `desc:"mysql数据库连接字符串"` + ExceptionPostUrl string `desc:"第三方异常上报地址"` + SqliteDbPath string `desc:"sqlite数据库路径"` + DiskMaxPercent float64 `desc:"硬盘使用百分之上限值,超过后报警"` + LocalIp string `desc:"本机IP"` + RecordFileExpireDays int `desc:"录像自动删除的天数,0或未设置表示不自动删除"` + RecordPathNotShowStreamPath bool `desc:"录像路径中是否包含streamPath,默认true"` } //go:embed default.yaml @@ -61,17 +66,20 @@ var RecordPluginConfig = &RecordConfig{ Path: "record/raw", Ext: ".", // 默认aac扩展名为.aac,pcma扩展名为.pcma,pcmu扩展名为.pcmu }, - beforeDuration: 30, - afterDuration: 30, - MysqlDSN: "", - ExceptionPostUrl: "http://www.163.com", - SqliteDbPath: "./sqlite.db", - DiskMaxPercent: 80.00, - LocalIp: getLocalIP(), + beforeDuration: 30, + afterDuration: 30, + MysqlDSN: "", + ExceptionPostUrl: "http://www.163.com", + SqliteDbPath: "./sqlite.db", + DiskMaxPercent: 80.00, + LocalIp: getLocalIP(), + RecordFileExpireDays: 0, + RecordPathNotShowStreamPath: true, } var plugin = InstallPlugin(RecordPluginConfig, defaultYaml) var exceptionChannel = make(chan *Exception) +var db *gorm.DB func (conf *RecordConfig) OnEvent(event any) { switch v := event.(type) { @@ -87,10 +95,45 @@ func (conf *RecordConfig) OnEvent(event any) { }() if conf.MysqlDSN == "" { plugin.Info("sqliteDb filepath is" + conf.SqliteDbPath) - initSqliteDB(conf.SqliteDbPath) + db = initSqliteDB(conf.SqliteDbPath) } else { plugin.Info("mysqlDSN is" + conf.MysqlDSN) - initMysqlDB(conf.MysqlDSN) + db = initMysqlDB(conf.MysqlDSN) + } + + if conf.RecordFileExpireDays > 0 { //当有设置录像文件自动删除时间时,则开始运行录像自动删除的进程 + //主要逻辑为 + //搜索event_records表中event_level值为1的(非重要)数据,并将其create_time与当前时间比对,大于RecordFileExpireDays则进行删除,数据库标记is_delete为1,磁盘上删除录像文件 + go func() { + for { + var eventRecords []EventRecord + expireTime := time.Now().AddDate(0, 0, -conf.RecordFileExpireDays) + // 创建包含查询条件的 EventRecord 对象 + queryRecord := EventRecord{ + EventLevel: "1", // 查询条件:event_level = 1 + } + fmt.Printf(" Create Time: %s\n", expireTime.Format("2006-01-02 15:04:05")) + err = db.Where(&queryRecord).Where("create_time < ?", expireTime).Find(&eventRecords).Error + if err == nil { + if len(eventRecords) > 0 { + for _, record := range eventRecords { + fmt.Printf("ID: %d, Create Time: %s,filepath is %s\n", record.Id, record.CreateTime, record.Filepath) + err = os.Remove(record.Filepath) + if err != nil { + fmt.Println("error is " + err.Error()) + } + err = db.Delete(record).Error + if err != nil { + fmt.Println("error is " + err.Error()) + } + } + } + } + + // 等待 1 分钟后继续执行 + <-time.After(1 * time.Minute) + } + }() } conf.Flv.Init() conf.Mp4.Init() @@ -101,7 +144,7 @@ func (conf *RecordConfig) OnEvent(event any) { case SEpublish: streamPath := v.Target.Path if conf.Flv.NeedRecord(streamPath) { - go NewFLVRecorder().Start(streamPath) + go NewFLVRecorder(OrdinaryMode).Start(streamPath) } if conf.Mp4.NeedRecord(streamPath) { go NewMP4Recorder().Start(streamPath) diff --git a/mp4.go b/mp4.go index 3d75186..71cb232 100644 --- a/mp4.go +++ b/mp4.go @@ -2,6 +2,8 @@ package record import ( "net" + "os" + "path/filepath" "time" "github.com/yapingcat/gomedia/go-mp4" @@ -18,6 +20,16 @@ type MP4Recorder struct { audioId uint32 } +func (r *MP4Recorder) SetId(string) { + //TODO implement me + panic("implement me") +} + +func (r *MP4Recorder) GetRecordModeString(mode RecordMode) string { + //TODO implement me + panic("implement me") +} + func (r *MP4Recorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error { //TODO implement me panic("implement me") @@ -54,7 +66,17 @@ func (r *MP4Recorder) Close() (err error) { r.Info("mp4 write trailer", zap.Error(err)) } err = r.File.Close() + if !isWrifeFrame { + fullPath := filepath.Join(r.Path, "/", r.filePath) + go func() { + err = os.Remove(fullPath) + if err != nil { + r.Info("未写入帧,文件为空,直接删除,删除结果为=======" + err.Error()) + } + }() + } } + isWrifeFrame = false return } func (r *MP4Recorder) setTracks() { diff --git a/mysqldb.go b/mysqldb.go index 965eeba..52e2034 100644 --- a/mysqldb.go +++ b/mysqldb.go @@ -8,15 +8,15 @@ import ( "reflect" ) -var mysqldb *gorm.DB +// var mysqldb *gorm.DB var err error var createDataBaseSql = `CREATE DATABASE IF NOT EXISTS m7srecord;` var useDataBaseSql = `USE m7srecord;` -func initMysqlDB(MysqlDSN string) { - mysqldb, err = gorm.Open(mysql.Open(MysqlDSN), &gorm.Config{}) +func initMysqlDB(MysqlDSN string) *gorm.DB { + mysqldb, err := gorm.Open(mysql.Open(MysqlDSN), &gorm.Config{}) if err != nil { log.Fatal(err) } @@ -24,9 +24,10 @@ func initMysqlDB(MysqlDSN string) { mysqldb.Exec(useDataBaseSql) mysqldb.AutoMigrate(&EventRecord{}) mysqldb.AutoMigrate(&Exception{}) + return mysqldb } -func paginate[T any](model T, pageNum, pageSize int, filters map[string]interface{}) ([]T, int64, error) { +func paginate[T any](mysqldb *gorm.DB, model T, pageNum, pageSize int, filters map[string]interface{}) ([]T, int64, error) { var results []T var totalCount int64 diff --git a/raw.go b/raw.go index 2ee9179..885c6d7 100644 --- a/raw.go +++ b/raw.go @@ -13,6 +13,16 @@ type RawRecorder struct { IsAudio bool } +func (r *RawRecorder) SetId(string) { + //TODO implement me + panic("implement me") +} + +func (r *RawRecorder) GetRecordModeString(mode RecordMode) string { + //TODO implement me + panic("implement me") +} + func (r *RawRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error { //TODO implement me panic("implement me") diff --git a/restful.go b/restful.go index dc54ae6..7949f35 100644 --- a/restful.go +++ b/restful.go @@ -61,7 +61,7 @@ func (conf *RecordConfig) API_start(w http.ResponseWriter, r *http.Request) { t = "flv" fallthrough case "flv": - irecorder = NewFLVRecorder() + irecorder = NewFLVRecorder(OrdinaryMode) case "mp4": irecorder = NewMP4Recorder() case "fmp4": diff --git a/restful_event.go b/restful_event.go index d56af01..0c58bea 100644 --- a/restful_event.go +++ b/restful_event.go @@ -47,7 +47,7 @@ func (conf *RecordConfig) API_event_pull(w http.ResponseWriter, r *http.Request) id := int(postData["id"].(float64)) if id > 0 { var eventRecord EventRecord - result := mysqldb.First(&eventRecord, id) // 根据主键查询 + result := db.First(&eventRecord, id) // 根据主键查询 if result.Error != nil { log.Println("Error finding eventrecord:", result.Error) } @@ -117,7 +117,7 @@ func (conf *RecordConfig) API_alarm_list(w http.ResponseWriter, r *http.Request) util.ReturnError(-1, errorJsonString(resultJsonData), w, r) return } - exceptions, totalCount, err := paginate(Exception{}, int(pageNum), int(pageSize), postData) + exceptions, totalCount, err := paginate(db, Exception{}, int(pageNum), int(pageSize), postData) if err != nil { resultJsonData["msg"] = err.Error() util.ReturnError(-1, errorJsonString(resultJsonData), w, r) @@ -163,7 +163,7 @@ func (conf *RecordConfig) API_event_list(w http.ResponseWriter, r *http.Request) util.ReturnError(-1, errorJsonString(resultJsonData), w, r) return } - eventRecords, totalCount, err := paginate(EventRecord{}, int(pageNum), int(pageSize), postData) + eventRecords, totalCount, err := paginate(db, EventRecord{}, int(pageNum), int(pageSize), postData) if err != nil { resultJsonData["msg"] = err.Error() util.ReturnError(-1, errorJsonString(resultJsonData), w, r) @@ -224,6 +224,10 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request util.ReturnError(-1, errorJsonString(resultJsonData), w, r) return } + t := eventRecordModel.Type + if t == "" { + t = "flv" + } //recordMode := eventRecordModel.RecordMode //if recordMode == "" { // resultJsonData["msg"] = "no recordMode" @@ -245,62 +249,65 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request afterDuration = strconv.Itoa(conf.afterDuration) } recordTime := time.Now().Format("2006-01-02 15:04:05") - fileName := time.Now().Format("20060102150405") + fileName := strings.ReplaceAll(streamPath, "/", "-") + "-" + time.Now().Format("2006-01-02-15-04-05") startTime := time.Now().Add(-30 * time.Second).Format("2006-01-02 15:04:05") endTime := time.Now().Add(30 * time.Second).Format("2006-01-02 15:04:05") //切片大小 fragment := eventRecordModel.Fragment //var id string - irecorder := NewFLVRecorder() - found := false - conf.recordings.Range(func(key, value any) bool { - tmpIRecorder := value.(*FLVRecorder) - existStreamPath := tmpIRecorder.GetSubscriber().Stream.Path - if existStreamPath == streamPath { - irecorder = tmpIRecorder - found = true - } - return found - }) + irecorder := NewFLVRecorder(EventMode) recorder := irecorder.GetRecorder() recorder.FileName = fileName recorder.append = false - filepath := conf.Flv.Path + "/" + streamPath + "/" + fileName + recorder.Ext - urlpath := "record/" + streamPath + "/" + fileName + recorder.Ext + irecorder.SetId(streamPath) + filepath := conf.Flv.Path + "/" + streamPath + "/" + fileName + recorder.Ext //录像文件存入的完整路径(相对路径) + urlpath := "record/" + streamPath + "/" + fileName + recorder.Ext //网络拉流的地址 if fragment != "" { if f, err := time.ParseDuration(fragment); err == nil { recorder.Fragment = f } + } else { + recorder.Fragment = 0 + } + found := false + if recordtmp, ok := conf.recordings.Load(recorder.ID); ok { + found = true + irecorder = recordtmp.(*FLVRecorder) } - if found { irecorder.UpdateTimeout(30 * time.Second) } else { err = irecorder.StartWithDynamicTimeout(streamPath, fileName, 30*time.Second) } - //go func() { - // timer := time.NewTimer(30 * time.Second) - // - // // 等待计时器到期 - // <-timer.C - // irecorder.Stop(zap.String("reason", "api")) - //}() - //id = recorder.ID if err != nil { exceptionChannel <- &Exception{AlarmType: "record", AlarmDesc: "录像失败", StreamPath: streamPath} resultJsonData["msg"] = err.Error() util.ReturnError(-1, errorJsonString(resultJsonData), w, r) return } - eventRecord := EventRecord{StreamPath: streamPath, EventId: eventId, RecordMode: "1", EventName: eventName, BeforeDuration: beforeDuration, - AfterDuration: afterDuration, CreateTime: recordTime, StartTime: startTime, EndTime: endTime, Filepath: filepath, Filename: fileName + recorder.Ext, EventDesc: eventRecordModel.EventDesc, Urlpath: urlpath} - err = mysqldb.Omit("id", "fragment", "isDelete").Create(&eventRecord).Error + var outid uint + var eventRecord EventRecord + if found { + var oldeventRecord EventRecord + // 定义 User 结构体作为查询条件 + queryRecord := EventRecord{StreamPath: streamPath} + db.Where(&queryRecord).Order("id DESC").First(&oldeventRecord) + eventRecord = EventRecord{StreamPath: streamPath, EventId: eventId, RecordMode: "1", EventName: eventName, BeforeDuration: beforeDuration, + AfterDuration: afterDuration, CreateTime: recordTime, StartTime: startTime, EndTime: endTime, Filepath: oldeventRecord.Filepath, Filename: oldeventRecord.Filename, + EventDesc: eventRecordModel.EventDesc, Urlpath: oldeventRecord.Urlpath, Type: t} + } else { + eventRecord = EventRecord{StreamPath: streamPath, EventId: eventId, RecordMode: "1", EventName: eventName, BeforeDuration: beforeDuration, + AfterDuration: afterDuration, CreateTime: recordTime, StartTime: startTime, EndTime: endTime, Filepath: filepath, Filename: fileName + recorder.Ext, EventDesc: eventRecordModel.EventDesc, Urlpath: urlpath, Type: t} + } + err = db.Omit("id", "fragment", "isDelete").Create(&eventRecord).Error + outid = eventRecord.Id if err != nil { + exceptionChannel <- &Exception{AlarmType: "record", AlarmDesc: "录像失败", StreamPath: streamPath} + resultJsonData["msg"] = err.Error() util.ReturnError(-1, errorJsonString(resultJsonData), w, r) return } - outid := eventRecord.Id resultJsonData["id"] = outid resultJsonData["code"] = 0 resultJsonData["msg"] = "" diff --git a/sqlitedb.go b/sqlitedb.go index 6f087c6..84bbaa5 100644 --- a/sqlitedb.go +++ b/sqlitedb.go @@ -6,12 +6,10 @@ import ( "log" ) -var sqlitedb *gorm.DB - // sqlite数据库初始化,用来存放视频的关键帧等信息 -func initSqliteDB(sqliteDbPath string) { +func initSqliteDB(sqliteDbPath string) *gorm.DB { // 打开数据库连接 - sqlitedb, err = gorm.Open(sqlite.Open(sqliteDbPath), &gorm.Config{}) + sqlitedb, err := gorm.Open(sqlite.Open(sqliteDbPath), &gorm.Config{}) if err != nil { log.Fatal(err) } @@ -21,4 +19,5 @@ func initSqliteDB(sqliteDbPath string) { if err != nil { log.Fatal(err) } + return sqlitedb } diff --git a/subscriber.go b/subscriber.go index b68a076..2e86bcd 100644 --- a/subscriber.go +++ b/subscriber.go @@ -11,6 +11,18 @@ import ( . "m7s.live/engine/v4" ) +// 录像类型 +type RecordMode int + +// 使用常量块和 iota 来定义枚举值 +const ( + OrdinaryMode RecordMode = iota // iota 初始值为 0,表示普通录像(连续录像),包括自动录像和手动录像 + EventMode // 1,表示事件录像 +) + +// 判断是否有写入帧,用于解决pullonstart时拉取的流为空的情况下,生成空文件的问题 +var isWrifeFrame = false + type IRecorder interface { ISubscriber GetRecorder() *Recorder @@ -20,6 +32,8 @@ type IRecorder interface { CreateFile() (FileWr, error) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error UpdateTimeout(timeout time.Duration) + GetRecordModeString(mode RecordMode) string + SetId(streamPath string) } type Recorder struct { @@ -56,14 +70,25 @@ func (r *Recorder) CreateFile() (f FileWr, err error) { return } +// transform 函数处理字符串并返回格式化的结果 +func transform(input string) string { + parts := strings.Split(input, "/") + if len(parts) > 1 { + return strings.Join(parts[1:], "-") // 返回除第一个索引外的所有部分 + } + return input // 默认返回原始输入 +} + func (r *Recorder) getFileName(streamPath string) (filename string) { - filename = streamPath + if RecordPluginConfig.RecordPathNotShowStreamPath { + filename = streamPath + } if r.Fragment == 0 { if r.FileName != "" { filename = filepath.Join(filename, r.FileName) } } else { - filename = filepath.Join(filename, strings.ReplaceAll(streamPath, "/", "-")+"-"+time.Now().Format("2006-01-02-15-04-05")) + filename = filepath.Join(filename, transform(streamPath)+"_"+time.Now().Format("2006-01-02-15-04-05")) } return } @@ -84,7 +109,7 @@ func (r *Recorder) start(re IRecorder, streamPath string, subType byte) (err err } func (r *Recorder) cut(absTime uint32) { - if ts := absTime - r.SkipTS; time.Duration(ts)*time.Millisecond >= r.Fragment { + if ts := absTime - r.SkipTS; (time.Duration(ts)*time.Millisecond <= r.Fragment && r.Fragment-time.Duration(ts)*time.Millisecond <= time.Second) || time.Duration(ts)*time.Millisecond >= r.Fragment { r.SkipTS = absTime r.Close() if file, err := r.Spesific.(IRecorder).CreateFile(); err == nil { @@ -116,12 +141,14 @@ func (r *Recorder) OnEvent(event any) { r.cut(v.AbsTime) } case VideoFrame: + isWrifeFrame = true if v.IFrame { + //plugin.Error("this is keyframe and absTime is " + strconv.FormatUint(uint64(v.AbsTime), 10)) //go func() { //将视频关键帧的数据存入sqlite数据库中 // var flvKeyfram = &FLVKeyframe{FLVFileName: r.Path + "/" + strings.ReplaceAll(r.filePath, "\\", "/"), FrameOffset: r.VideoReader, FrameAbstime: v.AbsTime} // sqlitedb.Create(flvKeyfram) //}() - r.Info("这是关键帧,且取到了r.filePath是" + r.Path + r.filePath) + //r.Info("这是关键帧,且取到了r.filePath是" + r.Path + r.filePath) //r.Info("这是关键帧,且取到了r.VideoReader.AbsTime是" + strconv.FormatUint(uint64(v.FrameAbstime), 10)) //r.Info("这是关键帧,且取到了r.Offset是" + strconv.Itoa(int(v.FrameOffset))) //r.Info("这是关键帧,且取到了r.Offset是" + r.Stream.Path)