-
Notifications
You must be signed in to change notification settings - Fork 205
/
Copy pathmemory-ts.go
192 lines (171 loc) · 5.78 KB
/
memory-ts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package engine
import (
"errors"
"fmt"
"io"
"net"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegts"
"m7s.live/engine/v4/util"
)
type MemoryTs struct {
util.BytesPool
PMT util.Buffer
util.BLL
}
func (ts *MemoryTs) WritePMTPacket(audio codec.AudioCodecID, video codec.VideoCodecID) {
ts.PMT.Reset()
mpegts.WritePMTPacket(&ts.PMT, video, audio)
}
func (ts *MemoryTs) WriteTo(w io.Writer) (int64, error) {
w.Write(mpegts.DefaultPATPacket)
w.Write(ts.PMT)
return ts.BLL.WriteTo(w)
}
func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.MpegTsPESPacket) (err error) {
if packet.Header.PacketStartCodePrefix != 0x000001 {
err = errors.New("packetStartCodePrefix != 0x000001")
return
}
pesHeadItem := ts.Get(32)
pesHeadItem.Value.Reset()
_, err = mpegts.WritePESHeader(&pesHeadItem.Value, packet.Header)
if err != nil {
return
}
pesBuffers := append(net.Buffers{pesHeadItem.Value}, packet.Buffers...)
defer pesHeadItem.Recycle()
pesPktLength := util.SizeOfBuffers(pesBuffers)
buffer := ts.Get((pesPktLength/mpegts.TS_PACKET_SIZE+1)*6 + pesPktLength)
bwTsHeader := &buffer.Value
bigLen := bwTsHeader.Len()
bwTsHeader.Reset()
ts.BLL.Push(buffer)
var tsHeaderLength int
for i := 0; len(pesBuffers) > 0; i++ {
if bigLen < mpegts.TS_PACKET_SIZE {
if i == 0 {
ts.Recycle()
}
headerItem := ts.Get(mpegts.TS_PACKET_SIZE)
ts.BLL.Push(headerItem)
bwTsHeader = &headerItem.Value
bwTsHeader.Reset()
}
bigLen -= mpegts.TS_PACKET_SIZE
pesPktLength = util.SizeOfBuffers(pesBuffers)
tsHeader := mpegts.MpegTsHeader{
SyncByte: 0x47,
TransportErrorIndicator: 0,
PayloadUnitStartIndicator: 0,
TransportPriority: 0,
Pid: frame.Pid,
TransportScramblingControl: 0,
AdaptionFieldControl: 1,
ContinuityCounter: frame.ContinuityCounter,
}
frame.ContinuityCounter++
frame.ContinuityCounter = frame.ContinuityCounter % 16
// 每一帧的开头,当含有pcr的时候,包含调整字段
if i == 0 {
tsHeader.PayloadUnitStartIndicator = 1
// 当PCRFlag为1的时候,包含调整字段
if frame.IsKeyFrame {
tsHeader.AdaptionFieldControl = 0x03
tsHeader.AdaptationFieldLength = 7
tsHeader.PCRFlag = 1
tsHeader.RandomAccessIndicator = 1
tsHeader.ProgramClockReferenceBase = frame.ProgramClockReferenceBase
}
}
// 每一帧的结尾,当不满足188个字节的时候,包含调整字段
if pesPktLength < mpegts.TS_PACKET_SIZE-4 {
var tsStuffingLength uint8
tsHeader.AdaptionFieldControl = 0x03
tsHeader.AdaptationFieldLength = uint8(mpegts.TS_PACKET_SIZE - 4 - 1 - pesPktLength)
// TODO:如果第一个TS包也是最后一个TS包,是不是需要考虑这个情况?
// MpegTsHeader最少占6个字节.(前4个走字节 + AdaptationFieldLength(1 byte) + 3个指示符5个标志位(1 byte))
if tsHeader.AdaptationFieldLength >= 1 {
tsStuffingLength = tsHeader.AdaptationFieldLength - 1
} else {
tsStuffingLength = 0
}
// error
tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
if err != nil {
return
}
if tsStuffingLength > 0 {
if _, err = bwTsHeader.Write(mpegts.Stuffing[:tsStuffingLength]); err != nil {
return
}
}
tsHeaderLength += int(tsStuffingLength)
} else {
tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
if err != nil {
return
}
}
tsPayloadLength := mpegts.TS_PACKET_SIZE - tsHeaderLength
//fmt.Println("tsPayloadLength :", tsPayloadLength)
// 这里不断的减少PES包
io.CopyN(bwTsHeader, &pesBuffers, int64(tsPayloadLength))
// tmp := tsHeaderByte[3] << 2
// tmp = tmp >> 6
// if tmp == 2 {
// fmt.Println("fuck you mother.")
// }
tsPktByteLen := bwTsHeader.Len()
if tsPktByteLen != (i+1)*mpegts.TS_PACKET_SIZE && tsPktByteLen != mpegts.TS_PACKET_SIZE {
err = errors.New(fmt.Sprintf("%s, packet size=%d", "TS_PACKET_SIZE != 188,", tsPktByteLen))
return
}
}
return nil
}
func (ts *MemoryTs) WriteAudioFrame(frame AudioFrame, pes *mpegts.MpegtsPESFrame) (err error) {
// packetLength = 原始音频流长度 + adts(7) + MpegTsOptionalPESHeader长度(8 bytes, 因为只含有pts)
var packet mpegts.MpegTsPESPacket
if frame.CodecID == codec.CodecID_AAC {
packet.Header.PesPacketLength = uint16(7 + frame.AUList.ByteLength + 8)
packet.Buffers = frame.GetADTS()
} else {
packet.Header.PesPacketLength = uint16(frame.AUList.ByteLength + 8)
packet.Buffers = frame.AUList.ToBuffers()
}
packet.Header.PacketStartCodePrefix = 0x000001
packet.Header.ConstTen = 0x80
packet.Header.StreamID = mpegts.STREAM_ID_AUDIO
packet.Header.Pts = uint64(frame.PTS)
pes.ProgramClockReferenceBase = packet.Header.Pts
packet.Header.PtsDtsFlags = 0x80
packet.Header.PesHeaderDataLength = 5
return ts.WritePESPacket(pes, packet)
}
func (ts *MemoryTs) WriteVideoFrame(frame VideoFrame, pes *mpegts.MpegtsPESFrame) (err error) {
var buffer net.Buffers
//需要对原始数据(ES),进行一些预处理,视频需要分割nalu(H264编码),并且打上sps,pps,nalu_aud信息.
if len(frame.ParamaterSets) == 2 {
buffer = append(buffer, codec.NALU_AUD_BYTE)
} else {
buffer = append(buffer, codec.AudNalu)
}
buffer = append(buffer, frame.GetAnnexB()...)
pktLength := util.SizeOfBuffers(buffer) + 10 + 3
if pktLength > 0xffff {
pktLength = 0
}
var packet mpegts.MpegTsPESPacket
packet.Header.PacketStartCodePrefix = 0x000001
packet.Header.ConstTen = 0x80
packet.Header.StreamID = mpegts.STREAM_ID_VIDEO
packet.Header.PesPacketLength = uint16(pktLength)
packet.Header.Pts = uint64(frame.PTS)
pes.ProgramClockReferenceBase = packet.Header.Pts
packet.Header.Dts = uint64(frame.DTS)
packet.Header.PtsDtsFlags = 0xC0
packet.Header.PesHeaderDataLength = 10
packet.Buffers = buffer
return ts.WritePESPacket(pes, packet)
}