-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathsubscriber.go
162 lines (147 loc) · 4.64 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package record
import (
"bufio"
"io"
"path/filepath"
"strings"
"time"
"go.uber.org/zap"
. "m7s.live/engine/v4"
)
// 录像类型
type RecordMode int
// 使用常量块和 iota 来定义枚举值
const (
OrdinaryMode RecordMode = iota // iota 初始值为 0,表示普通录像(连续录像),包括自动录像和手动录像
EventMode // 1,表示事件录像
)
// 判断是否有写入帧,用于解决pullonstart时拉取的流为空的情况下,生成空文件的问题
var isWrifeFrame = false
type IRecorder interface {
ISubscriber
GetRecorder() *Recorder
Start(streamPath string) error
StartWithFileName(streamPath string, fileName string) error
io.Closer
CreateFile() (FileWr, error)
StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error
UpdateTimeout(timeout time.Duration)
GetRecordModeString(mode RecordMode) string
SetId(streamPath string)
}
type Recorder struct {
Subscriber
SkipTS uint32
Record `json:"-" yaml:"-"`
File FileWr `json:"-" yaml:"-"`
FileName string // 自定义文件名,分段录像无效
filePath string // 文件路径
append bool // 是否追加模式
}
func (r *Recorder) GetRecorder() *Recorder {
return r
}
func (r *Recorder) CreateFile() (f FileWr, err error) {
r.filePath = r.getFileName(r.Stream.Path) + r.Ext
f, err = r.CreateFileFn(r.filePath, r.append)
logFields := []zap.Field{zap.String("path", r.filePath)}
if fw, ok := f.(*FileWriter); ok && r.Config != nil {
if r.Config.WriteBufferSize > 0 {
logFields = append(logFields, zap.Int("bufferSize", r.Config.WriteBufferSize))
fw.bufw = bufio.NewWriterSize(fw.Writer, r.Config.WriteBufferSize)
fw.Writer = fw.bufw
}
}
if err == nil {
r.Info("create file", logFields...)
} else {
logFields = append(logFields, zap.Error(err))
r.Error("create file", logFields...)
}
return
}
// transform 函数处理字符串并返回格式化的结果
func transform(input string) string {
parts := strings.Split(input, "/")
if len(parts) > 1 {
return strings.Join(parts[1:], "-") // 返回除第一个索引外的所有部分
}
return input // 默认返回原始输入
}
func (r *Recorder) getFileName(streamPath string) (filename string) {
if RecordPluginConfig.RecordPathNotShowStreamPath {
filename = streamPath
}
if r.Fragment == 0 {
if r.FileName != "" {
filename = filepath.Join(filename, r.FileName)
}
} else {
filename = filepath.Join(filename, transform(streamPath)+"_"+time.Now().Format("2006-01-02-15-04-05"))
}
return
}
func (r *Recorder) start(re IRecorder, streamPath string, subType byte) (err error) {
err = plugin.Subscribe(streamPath, re)
if err == nil {
if _, loaded := RecordPluginConfig.recordings.LoadOrStore(r.ID, re); loaded {
return ErrRecordExist
}
r.Closer = re
go func() {
r.PlayBlock(subType)
RecordPluginConfig.recordings.Delete(r.ID)
}()
}
return
}
func (r *Recorder) cut(absTime uint32) {
if ts := absTime - r.SkipTS; (time.Duration(ts)*time.Millisecond <= r.Fragment && r.Fragment-time.Duration(ts)*time.Millisecond <= time.Second) || time.Duration(ts)*time.Millisecond >= r.Fragment {
r.SkipTS = absTime
r.Close()
if file, err := r.Spesific.(IRecorder).CreateFile(); err == nil {
r.File = file
r.Spesific.OnEvent(file)
} else {
r.Stop(zap.Error(err))
}
}
}
// func (r *Recorder) Stop(reason ...zap.Field) {
// r.Close()
// r.Subscriber.Stop(reason...)
// }
func (r *Recorder) OnEvent(event any) {
switch v := event.(type) {
case IRecorder:
if file, err := r.Spesific.(IRecorder).CreateFile(); err == nil {
r.File = file
r.Spesific.OnEvent(file)
} else {
r.Stop(zap.Error(err))
}
case AudioFrame:
// 纯音频流的情况下需要切割文件
if r.Fragment > 0 && r.VideoReader == nil {
r.cut(v.AbsTime)
}
case VideoFrame:
isWrifeFrame = true
if v.IFrame {
//plugin.Error("this is keyframe and absTime is " + strconv.FormatUint(uint64(v.AbsTime), 10))
//go func() { //将视频关键帧的数据存入sqlite数据库中
// var flvKeyfram = &FLVKeyframe{FLVFileName: r.Path + "/" + strings.ReplaceAll(r.filePath, "\\", "/"), FrameOffset: r.VideoReader, FrameAbstime: v.AbsTime}
// sqlitedb.Create(flvKeyfram)
//}()
//r.Info("这是关键帧,且取到了r.filePath是" + r.Path + r.filePath)
//r.Info("这是关键帧,且取到了r.VideoReader.AbsTime是" + strconv.FormatUint(uint64(v.FrameAbstime), 10))
//r.Info("这是关键帧,且取到了r.Offset是" + strconv.Itoa(int(v.FrameOffset)))
//r.Info("这是关键帧,且取到了r.Offset是" + r.Stream.Path)
}
if r.Fragment > 0 && v.IFrame {
r.cut(v.AbsTime)
}
default:
r.Subscriber.OnEvent(event)
}
}