From ce1c318527d52407517e8be7ad267a6a234801c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?halwu=28=E5=90=B4=E6=B5=A9=E9=BA=9F=29?= Date: Mon, 29 May 2017 10:37:57 +0800 Subject: [PATCH] fix: rename self --- av/av.go | 8 +- av/rwbase.go | 34 ++-- container/flv/demuxer.go | 4 +- container/flv/muxer.go | 39 ++-- container/flv/tag.go | 58 +++--- container/mp4/muxer.go | 1 - container/ts/crc32.go | 2 +- container/ts/muxer.go | 134 +++++++------ main.go | 8 +- parser/aac/parser.go | 56 +++--- parser/h264/parser.go | 32 +-- parser/mp3/parser.go | 12 +- parser/parser.go | 30 +-- protocol/dash/dash.go | 1 - protocol/hls/align.go | 10 +- protocol/hls/audio_cache.go | 26 +-- protocol/hls/hls.go | 302 +++-------------------------- protocol/hls/source.go | 264 +++++++++++++++++++++++++ protocol/hls/ts_cache.go | 40 ++-- protocol/httpflv/http_flv.go | 2 - protocol/httpopera/http_opera.go | 3 +- protocol/kcpts/kcp_ts.go | 1 - protocol/private/protocol.go | 1 - protocol/rtmp/cache/cache.go | 20 +- protocol/rtmp/cache/gop.go | 57 +++--- protocol/rtmp/cache/special.go | 12 +- protocol/rtmp/core/chunk_stream.go | 178 ++++++++--------- protocol/rtmp/core/conn.go | 89 +++++---- protocol/rtmp/core/conn_client.go | 116 +++++------ protocol/rtmp/core/conn_server.go | 146 +++++++------- protocol/rtmp/core/conn_test.go | 1 - protocol/rtmp/core/handshake.go | 42 ++-- protocol/rtmp/rtmp.go | 111 +++++------ protocol/rtmp/stream.go | 3 +- protocol/rtp/rtp.go | 1 - protocol/rtsp/protocol.go | 1 - protocol/rtsp/rtsp.go | 1 - protocol/webrtc/webrtc.go | 1 - utils/cmap/cmap.go | 22 --- utils/pool/pool.go | 12 +- utils/queue/queue.go | 1 - 41 files changed, 921 insertions(+), 961 deletions(-) delete mode 100755 container/mp4/muxer.go delete mode 100755 protocol/dash/dash.go create mode 100644 protocol/hls/source.go delete mode 100755 protocol/kcpts/kcp_ts.go delete mode 100755 protocol/private/protocol.go delete mode 100755 protocol/rtp/rtp.go delete mode 100755 protocol/rtsp/protocol.go delete mode 100755 protocol/rtsp/rtsp.go delete mode 100755 protocol/webrtc/webrtc.go diff --git a/av/av.go b/av/av.go index faeb9266..18895c94 100755 --- a/av/av.go +++ b/av/av.go @@ -129,13 +129,13 @@ type Info struct { Inter bool } -func (self Info) IsInterval() bool { - return self.Inter +func (info Info) IsInterval() bool { + return info.Inter } -func (i Info) String() string { +func (info Info) String() string { return fmt.Sprintf("", - i.Key, i.URL, i.UID, i.Inter) + info.Key, info.URL, info.UID, info.Inter) } type ReadCloser interface { diff --git a/av/rwbase.go b/av/rwbase.go index 6385dd88..3bd6c457 100755 --- a/av/rwbase.go +++ b/av/rwbase.go @@ -19,35 +19,35 @@ func NewRWBaser(duration time.Duration) RWBaser { } } -func (self *RWBaser) BaseTimeStamp() uint32 { - return self.BaseTimestamp +func (rw *RWBaser) BaseTimeStamp() uint32 { + return rw.BaseTimestamp } -func (self *RWBaser) CalcBaseTimestamp() { - if self.LastAudioTimestamp > self.LastVideoTimestamp { - self.BaseTimestamp = self.LastAudioTimestamp +func (rw *RWBaser) CalcBaseTimestamp() { + if rw.LastAudioTimestamp > rw.LastVideoTimestamp { + rw.BaseTimestamp = rw.LastAudioTimestamp } else { - self.BaseTimestamp = self.LastVideoTimestamp + rw.BaseTimestamp = rw.LastVideoTimestamp } } -func (self *RWBaser) RecTimeStamp(timestamp, typeID uint32) { +func (rw *RWBaser) RecTimeStamp(timestamp, typeID uint32) { if typeID == TAG_VIDEO { - self.LastVideoTimestamp = timestamp + rw.LastVideoTimestamp = timestamp } else if typeID == TAG_AUDIO { - self.LastAudioTimestamp = timestamp + rw.LastAudioTimestamp = timestamp } } -func (self *RWBaser) SetPreTime() { - self.lock.Lock() - self.PreTime = time.Now() - self.lock.Unlock() +func (rw *RWBaser) SetPreTime() { + rw.lock.Lock() + rw.PreTime = time.Now() + rw.lock.Unlock() } -func (self *RWBaser) Alive() bool { - self.lock.Lock() - b := !(time.Now().Sub(self.PreTime) >= self.timeout) - self.lock.Unlock() +func (rw *RWBaser) Alive() bool { + rw.lock.Lock() + b := !(time.Now().Sub(rw.PreTime) >= rw.timeout) + rw.lock.Unlock() return b } diff --git a/container/flv/demuxer.go b/container/flv/demuxer.go index 730ed35f..9843978e 100755 --- a/container/flv/demuxer.go +++ b/container/flv/demuxer.go @@ -16,7 +16,7 @@ func NewDemuxer() *Demuxer { return &Demuxer{} } -func (self *Demuxer) DemuxH(p *av.Packet) error { +func (d *Demuxer) DemuxH(p *av.Packet) error { var tag Tag _, err := tag.ParseMeidaTagHeader(p.Data, p.IsVideo) if err != nil { @@ -27,7 +27,7 @@ func (self *Demuxer) DemuxH(p *av.Packet) error { return nil } -func (self *Demuxer) Demux(p *av.Packet) error { +func (d *Demuxer) Demux(p *av.Packet) error { var tag Tag n, err := tag.ParseMeidaTagHeader(p.Data, p.IsVideo) if err != nil { diff --git a/container/flv/muxer.go b/container/flv/muxer.go index dadb70d8..71064f83 100755 --- a/container/flv/muxer.go +++ b/container/flv/muxer.go @@ -14,10 +14,7 @@ import ( var ( flvHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09} -) - -var ( - flvFile = flag.String("filFile", "./out.flv", "output flv file name") + flvFile = flag.String("filFile", "./out.flv", "output flv file name") ) func NewFlv(handler av.Handler, info av.Info) { @@ -75,9 +72,9 @@ func NewFLVWriter(app, title, url string, ctx *os.File) *FLVWriter { return ret } -func (self *FLVWriter) Write(p av.Packet) error { - self.RWBaser.SetPreTime() - h := self.buf[:headerLen] +func (writer *FLVWriter) Write(p av.Packet) error { + writer.RWBaser.SetPreTime() + h := writer.buf[:headerLen] typeID := av.TAG_VIDEO if !p.IsVideo { if p.IsMetadata { @@ -93,8 +90,8 @@ func (self *FLVWriter) Write(p av.Packet) error { } dataLen := len(p.Data) timestamp := p.TimeStamp - timestamp += self.BaseTimeStamp() - self.RWBaser.RecTimeStamp(timestamp, uint32(typeID)) + timestamp += writer.BaseTimeStamp() + writer.RWBaser.RecTimeStamp(timestamp, uint32(typeID)) preDataLen := dataLen + headerLen timestampbase := timestamp & 0xffffff @@ -105,37 +102,37 @@ func (self *FLVWriter) Write(p av.Packet) error { pio.PutI24BE(h[4:7], int32(timestampbase)) pio.PutU8(h[7:8], uint8(timestampExt)) - if _, err := self.ctx.Write(h); err != nil { + if _, err := writer.ctx.Write(h); err != nil { return err } - if _, err := self.ctx.Write(p.Data); err != nil { + if _, err := writer.ctx.Write(p.Data); err != nil { return err } pio.PutI32BE(h[:4], int32(preDataLen)) - if _, err := self.ctx.Write(h[:4]); err != nil { + if _, err := writer.ctx.Write(h[:4]); err != nil { return err } return nil } -func (self *FLVWriter) Wait() { +func (writer *FLVWriter) Wait() { select { - case <-self.closed: + case <-writer.closed: return } } -func (self *FLVWriter) Close(error) { - self.ctx.Close() - close(self.closed) +func (writer *FLVWriter) Close(error) { + writer.ctx.Close() + close(writer.closed) } -func (self *FLVWriter) Info() (ret av.Info) { - ret.UID = self.Uid - ret.URL = self.url - ret.Key = self.app + "/" + self.title +func (writer *FLVWriter) Info() (ret av.Info) { + ret.UID = writer.Uid + ret.URL = writer.url + ret.Key = writer.app + "/" + writer.title return } diff --git a/container/flv/tag.go b/container/flv/tag.go index 8f5fa76b..550efaaa 100755 --- a/container/flv/tag.go +++ b/container/flv/tag.go @@ -105,74 +105,74 @@ type Tag struct { mediat mediaTag } -func (self *Tag) SoundFormat() uint8 { - return self.mediat.soundFormat +func (tag *Tag) SoundFormat() uint8 { + return tag.mediat.soundFormat } -func (self *Tag) AACPacketType() uint8 { - return self.mediat.aacPacketType +func (tag *Tag) AACPacketType() uint8 { + return tag.mediat.aacPacketType } -func (self *Tag) IsKeyFrame() bool { - return self.mediat.frameType == av.FRAME_KEY +func (tag *Tag) IsKeyFrame() bool { + return tag.mediat.frameType == av.FRAME_KEY } -func (self *Tag) IsSeq() bool { - return self.mediat.frameType == av.FRAME_KEY && - self.mediat.avcPacketType == av.AVC_SEQHDR +func (tag *Tag) IsSeq() bool { + return tag.mediat.frameType == av.FRAME_KEY && + tag.mediat.avcPacketType == av.AVC_SEQHDR } -func (self *Tag) CodecID() uint8 { - return self.mediat.codecID +func (tag *Tag) CodecID() uint8 { + return tag.mediat.codecID } -func (self *Tag) CompositionTime() int32 { - return self.mediat.compositionTime +func (tag *Tag) CompositionTime() int32 { + return tag.mediat.compositionTime } // ParseMeidaTagHeader, parse video, audio, tag header -func (self *Tag) ParseMeidaTagHeader(b []byte, isVideo bool) (n int, err error) { +func (tag *Tag) ParseMeidaTagHeader(b []byte, isVideo bool) (n int, err error) { switch isVideo { case false: - n, err = self.parseAudioHeader(b) + n, err = tag.parseAudioHeader(b) case true: - n, err = self.parseVideoHeader(b) + n, err = tag.parseVideoHeader(b) } return } -func (self *Tag) parseAudioHeader(b []byte) (n int, err error) { +func (tag *Tag) parseAudioHeader(b []byte) (n int, err error) { if len(b) < n+1 { err = fmt.Errorf("invalid audiodata len=%d", len(b)) return } flags := b[0] - self.mediat.soundFormat = flags >> 4 - self.mediat.soundRate = (flags >> 2) & 0x3 - self.mediat.soundSize = (flags >> 1) & 0x1 - self.mediat.soundType = flags & 0x1 + tag.mediat.soundFormat = flags >> 4 + tag.mediat.soundRate = (flags >> 2) & 0x3 + tag.mediat.soundSize = (flags >> 1) & 0x1 + tag.mediat.soundType = flags & 0x1 n++ - switch self.mediat.soundFormat { + switch tag.mediat.soundFormat { case av.SOUND_AAC: - self.mediat.aacPacketType = b[1] + tag.mediat.aacPacketType = b[1] n++ } return } -func (self *Tag) parseVideoHeader(b []byte) (n int, err error) { +func (tag *Tag) parseVideoHeader(b []byte) (n int, err error) { if len(b) < n+5 { err = fmt.Errorf("invalid videodata len=%d", len(b)) return } flags := b[0] - self.mediat.frameType = flags >> 4 - self.mediat.codecID = flags & 0xf + tag.mediat.frameType = flags >> 4 + tag.mediat.codecID = flags & 0xf n++ - if self.mediat.frameType == av.FRAME_INTER || self.mediat.frameType == av.FRAME_KEY { - self.mediat.avcPacketType = b[1] + if tag.mediat.frameType == av.FRAME_INTER || tag.mediat.frameType == av.FRAME_KEY { + tag.mediat.avcPacketType = b[1] for i := 2; i < 5; i++ { - self.mediat.compositionTime = self.mediat.compositionTime<<8 + int32(b[i]) + tag.mediat.compositionTime = tag.mediat.compositionTime<<8 + int32(b[i]) } n += 4 } diff --git a/container/mp4/muxer.go b/container/mp4/muxer.go deleted file mode 100755 index c39d11cf..00000000 --- a/container/mp4/muxer.go +++ /dev/null @@ -1 +0,0 @@ -package mp4 diff --git a/container/ts/crc32.go b/container/ts/crc32.go index ec8c4809..7d214024 100755 --- a/container/ts/crc32.go +++ b/container/ts/crc32.go @@ -70,7 +70,7 @@ func GenCrc32(src []byte) uint32 { j := byte(0) crc32 := uint32(0xFFFFFFFF) for i := 0; i < len(src); i++ { - j = ((byte(crc32>>24) ^ src[i]) & 0xff) + j = (byte(crc32>>24) ^ src[i]) & 0xff crc32 = uint32(uint32(crc32<<8) ^ uint32(crcTable[j])) } diff --git a/container/ts/muxer.go b/container/ts/muxer.go index 30a0e38a..1731a58b 100755 --- a/container/ts/muxer.go +++ b/container/ts/muxer.go @@ -9,9 +9,7 @@ const ( tsDefaultDataLen = 184 tsPacketLen = 188 h264DefaultHZ = 90 -) -const ( videoPID = 0x100 audioPID = 0x101 videoSID = 0xe0 @@ -32,7 +30,7 @@ func NewMuxer() *Muxer { return &Muxer{} } -func (self *Muxer) Mux(p *av.Packet, w io.Writer) error { +func (muxer *Muxer) Mux(p *av.Packet, w io.Writer) error { first := true wBytes := 0 pesIndex := 0 @@ -61,50 +59,50 @@ func (self *Muxer) Mux(p *av.Packet, w io.Writer) error { break } if p.IsVideo { - self.videoCc++ - if self.videoCc > 0xf { - self.videoCc = 0 + muxer.videoCc++ + if muxer.videoCc > 0xf { + muxer.videoCc = 0 } } else { - self.audioCc++ - if self.audioCc > 0xf { - self.audioCc = 0 + muxer.audioCc++ + if muxer.audioCc > 0xf { + muxer.audioCc = 0 } } i := byte(0) //sync byte - self.tsPacket[i] = 0x47 + muxer.tsPacket[i] = 0x47 i++ //error indicator, unit start indicator,ts priority,pid - self.tsPacket[i] = byte(pid >> 8) //pid high 5 bits + muxer.tsPacket[i] = byte(pid >> 8) //pid high 5 bits if first { - self.tsPacket[i] = self.tsPacket[i] | 0x40 //unit start indicator + muxer.tsPacket[i] = muxer.tsPacket[i] | 0x40 //unit start indicator } i++ //pid low 8 bits - self.tsPacket[i] = byte(pid) + muxer.tsPacket[i] = byte(pid) i++ //scram control, adaptation control, counter if p.IsVideo { - self.tsPacket[i] = 0x10 | byte(self.videoCc&0x0f) + muxer.tsPacket[i] = 0x10 | byte(muxer.videoCc&0x0f) } else { - self.tsPacket[i] = 0x10 | byte(self.audioCc&0x0f) + muxer.tsPacket[i] = 0x10 | byte(muxer.audioCc&0x0f) } i++ //关键帧需要加pcr if first && p.IsVideo && videoH.IsKeyFrame() { - self.tsPacket[3] |= 0x20 - self.tsPacket[i] = 7 + muxer.tsPacket[3] |= 0x20 + muxer.tsPacket[i] = 7 i++ - self.tsPacket[i] = 0x50 + muxer.tsPacket[i] = 0x50 i++ - self.writePcr(self.tsPacket[0:], i, dts) + muxer.writePcr(muxer.tsPacket[0:], i, dts) i += 6 } @@ -115,7 +113,7 @@ func (self *Muxer) Mux(p *av.Packet, w io.Writer) error { dataLen -= (i - 4) } } else { - self.tsPacket[3] |= 0x20 //have adaptation + muxer.tsPacket[3] |= 0x20 //have adaptation remainBytes := byte(0) dataLen = byte(packetBytesLen) if first { @@ -123,7 +121,7 @@ func (self *Muxer) Mux(p *av.Packet, w io.Writer) error { } else { remainBytes = tsDefaultDataLen - dataLen } - self.adaptationBufInit(self.tsPacket[i:], byte(remainBytes)) + muxer.adaptationBufInit(muxer.tsPacket[i:], byte(remainBytes)) i += remainBytes } if first && i < tsPacketLen && pesHeaderLen > 0 { @@ -131,7 +129,7 @@ func (self *Muxer) Mux(p *av.Packet, w io.Writer) error { if pesHeaderLen <= tmpLen { tmpLen = pesHeaderLen } - copy(self.tsPacket[i:], pes.data[pesIndex:pesIndex+int(tmpLen)]) + copy(muxer.tsPacket[i:], pes.data[pesIndex:pesIndex+int(tmpLen)]) i += tmpLen packetBytesLen -= int(tmpLen) dataLen -= tmpLen @@ -144,12 +142,12 @@ func (self *Muxer) Mux(p *av.Packet, w io.Writer) error { if tmpLen <= dataLen { dataLen = tmpLen } - copy(self.tsPacket[i:], p.Data[wBytes:wBytes+int(dataLen)]) + copy(muxer.tsPacket[i:], p.Data[wBytes:wBytes+int(dataLen)]) wBytes += int(dataLen) packetBytesLen -= int(dataLen) } if w != nil { - if _, err := w.Write(self.tsPacket[0:]); err != nil { + if _, err := w.Write(muxer.tsPacket[0:]); err != nil { return err } } @@ -160,44 +158,44 @@ func (self *Muxer) Mux(p *av.Packet, w io.Writer) error { } //PAT return pat data -func (self *Muxer) PAT() []byte { +func (muxer *Muxer) PAT() []byte { i := 0 remainByte := 0 tsHeader := []byte{0x47, 0x40, 0x00, 0x10, 0x00} patHeader := []byte{0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00, 0x00, 0x01, 0xf0, 0x01} - if self.patCc > 0xf { - self.patCc = 0 + if muxer.patCc > 0xf { + muxer.patCc = 0 } - tsHeader[3] |= self.patCc & 0x0f - self.patCc++ + tsHeader[3] |= muxer.patCc & 0x0f + muxer.patCc++ - copy(self.pat[i:], tsHeader) + copy(muxer.pat[i:], tsHeader) i += len(tsHeader) - copy(self.pat[i:], patHeader) + copy(muxer.pat[i:], patHeader) i += len(patHeader) crc32Value := GenCrc32(patHeader) - self.pat[i] = byte(crc32Value >> 24) + muxer.pat[i] = byte(crc32Value >> 24) i++ - self.pat[i] = byte(crc32Value >> 16) + muxer.pat[i] = byte(crc32Value >> 16) i++ - self.pat[i] = byte(crc32Value >> 8) + muxer.pat[i] = byte(crc32Value >> 8) i++ - self.pat[i] = byte(crc32Value) + muxer.pat[i] = byte(crc32Value) i++ remainByte = int(tsPacketLen - i) for j := 0; j < remainByte; j++ { - self.pat[i+j] = 0xff + muxer.pat[i+j] = 0xff } - return self.pat[0:] + return muxer.pat[0:] } // PMT return pmt data -func (self *Muxer) PMT(soundFormat byte, hasVideo bool) []byte { +func (muxer *Muxer) PMT(soundFormat byte, hasVideo bool) []byte { i := int(0) j := int(0) var progInfo []byte @@ -214,11 +212,11 @@ func (self *Muxer) PMT(soundFormat byte, hasVideo bool) []byte { } pmtHeader[2] = byte(len(progInfo) + 9 + 4) - if self.pmtCc > 0xf { - self.pmtCc = 0 + if muxer.pmtCc > 0xf { + muxer.pmtCc = 0 } - tsHeader[3] |= self.pmtCc & 0x0f - self.pmtCc++ + tsHeader[3] |= muxer.pmtCc & 0x0f + muxer.pmtCc++ if soundFormat == 2 || soundFormat == 14 { @@ -229,34 +227,34 @@ func (self *Muxer) PMT(soundFormat byte, hasVideo bool) []byte { } } - copy(self.pmt[i:], tsHeader) + copy(muxer.pmt[i:], tsHeader) i += len(tsHeader) - copy(self.pmt[i:], pmtHeader) + copy(muxer.pmt[i:], pmtHeader) i += len(pmtHeader) - copy(self.pmt[i:], progInfo[0:]) + copy(muxer.pmt[i:], progInfo[0:]) i += len(progInfo) - crc32Value := GenCrc32(self.pmt[5: 5+len(pmtHeader)+len(progInfo)]) - self.pmt[i] = byte(crc32Value >> 24) + crc32Value := GenCrc32(muxer.pmt[5: 5+len(pmtHeader)+len(progInfo)]) + muxer.pmt[i] = byte(crc32Value >> 24) i++ - self.pmt[i] = byte(crc32Value >> 16) + muxer.pmt[i] = byte(crc32Value >> 16) i++ - self.pmt[i] = byte(crc32Value >> 8) + muxer.pmt[i] = byte(crc32Value >> 8) i++ - self.pmt[i] = byte(crc32Value) + muxer.pmt[i] = byte(crc32Value) i++ remainBytes = int(tsPacketLen - i) for j = 0; j < remainBytes; j++ { - self.pmt[i+j] = 0xff + muxer.pmt[i+j] = 0xff } - return self.pmt[0:] + return muxer.pmt[0:] } -func (self *Muxer) adaptationBufInit(src []byte, remainBytes byte) { +func (muxer *Muxer) adaptationBufInit(src []byte, remainBytes byte) { src[0] = byte(remainBytes - 1) if remainBytes == 1 { } else { @@ -268,7 +266,7 @@ func (self *Muxer) adaptationBufInit(src []byte, remainBytes byte) { return } -func (self *Muxer) writePcr(b []byte, i byte, pcr int64) error { +func (muxer *Muxer) writePcr(b []byte, i byte, pcr int64) error { b[i] = byte(pcr >> 25) i++ b[i] = byte((pcr >> 17) & 0xff) @@ -290,21 +288,21 @@ type pesHeader struct { } //pesPacket return pes packet -func (self *pesHeader) packet(p *av.Packet, pts, dts int64) error { +func (header *pesHeader) packet(p *av.Packet, pts, dts int64) error { //PES header i := 0 - self.data[i] = 0x00 + header.data[i] = 0x00 i++ - self.data[i] = 0x00 + header.data[i] = 0x00 i++ - self.data[i] = 0x01 + header.data[i] = 0x01 i++ sid := audioSID if p.IsVideo { sid = videoSID } - self.data[i] = byte(sid) + header.data[i] = byte(sid) i++ flag := 0x80 @@ -319,31 +317,31 @@ func (self *pesHeader) packet(p *av.Packet, pts, dts int64) error { if size > 0xffff { size = 0 } - self.data[i] = byte(size >> 8) + header.data[i] = byte(size >> 8) i++ - self.data[i] = byte(size) + header.data[i] = byte(size) i++ - self.data[i] = 0x80 + header.data[i] = 0x80 i++ - self.data[i] = byte(flag) + header.data[i] = byte(flag) i++ - self.data[i] = byte(headerSize) + header.data[i] = byte(headerSize) i++ - self.writeTs(self.data[0:], i, flag>>6, pts) + header.writeTs(header.data[0:], i, flag>>6, pts) i += ptslen if p.IsVideo && pts != dts { - self.writeTs(self.data[0:], i, 1, dts) + header.writeTs(header.data[0:], i, 1, dts) i += dtslen } - self.len = byte(i) + header.len = byte(i) return nil } -func (self *pesHeader) writeTs(src []byte, i int, fb int, ts int64) { +func (header *pesHeader) writeTs(src []byte, i int, fb int, ts int64) { val := uint32(0) if ts > 0x1ffffffff { ts -= 0x1ffffffff diff --git a/main.go b/main.go index 23a259a1..5f3e4e47 100755 --- a/main.go +++ b/main.go @@ -12,10 +12,10 @@ import ( ) var ( - rtmpAddr = flag.String("rtmpAddr", ":1935", "The rtmp server address to bind.") - operaAddr = flag.String("operaAddr", ":8080", "the http operation or config address to bind.") - flvAddr = flag.String("flvAddr", ":8081", "the http-flv server address to bind.") - hlsAddr = flag.String("hlsAddr", ":8082", "the hls server address to bind.") + rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address") + operaAddr = flag.String("manage-addr", ":8080", "HTTP manage interface server listen address") + flvAddr = flag.String("flv-addr", ":8081", "HTTP-FLV server listen address") + hlsAddr = flag.String("hls-addr", ":8082", "HLS server listen address") ) func init() { diff --git a/parser/aac/parser.go b/parser/aac/parser.go index 22fdf8a4..35ffe789 100755 --- a/parser/aac/parser.go +++ b/parser/aac/parser.go @@ -47,45 +47,45 @@ func NewParser() *Parser { } } -func (self *Parser) specificInfo(src []byte) error { +func (parser *Parser) specificInfo(src []byte) error { if len(src) < 2 { return specificBufInvalid } - self.gettedSpecific = true - self.cfgInfo.objectType = (src[0] >> 3) & 0xff - self.cfgInfo.sampleRate = ((src[0] & 0x07) << 1) | src[1]>>7 - self.cfgInfo.channel = (src[1] >> 3) & 0x0f + parser.gettedSpecific = true + parser.cfgInfo.objectType = (src[0] >> 3) & 0xff + parser.cfgInfo.sampleRate = ((src[0] & 0x07) << 1) | src[1]>>7 + parser.cfgInfo.channel = (src[1] >> 3) & 0x0f return nil } -func (self *Parser) adts(src []byte, w io.Writer) error { - if len(src) <= 0 || !self.gettedSpecific { +func (parser *Parser) adts(src []byte, w io.Writer) error { + if len(src) <= 0 || !parser.gettedSpecific { return audioBufInvalid } frameLen := uint16(len(src)) + 7 //first write adts header - self.adtsHeader[0] = 0xff - self.adtsHeader[1] = 0xf1 + parser.adtsHeader[0] = 0xff + parser.adtsHeader[1] = 0xf1 - self.adtsHeader[2] &= 0x00 - self.adtsHeader[2] = self.adtsHeader[2] | (self.cfgInfo.objectType-1)<<6 - self.adtsHeader[2] = self.adtsHeader[2] | (self.cfgInfo.sampleRate)<<2 + parser.adtsHeader[2] &= 0x00 + parser.adtsHeader[2] = parser.adtsHeader[2] | (parser.cfgInfo.objectType-1)<<6 + parser.adtsHeader[2] = parser.adtsHeader[2] | (parser.cfgInfo.sampleRate)<<2 - self.adtsHeader[3] &= 0x00 - self.adtsHeader[3] = self.adtsHeader[3] | (self.cfgInfo.channel<<2)<<4 - self.adtsHeader[3] = self.adtsHeader[3] | byte((frameLen<<3)>>14) + parser.adtsHeader[3] &= 0x00 + parser.adtsHeader[3] = parser.adtsHeader[3] | (parser.cfgInfo.channel<<2)<<4 + parser.adtsHeader[3] = parser.adtsHeader[3] | byte((frameLen<<3)>>14) - self.adtsHeader[4] &= 0x00 - self.adtsHeader[4] = self.adtsHeader[4] | byte((frameLen<<5)>>8) + parser.adtsHeader[4] &= 0x00 + parser.adtsHeader[4] = parser.adtsHeader[4] | byte((frameLen<<5)>>8) - self.adtsHeader[5] &= 0x00 - self.adtsHeader[5] = self.adtsHeader[5] | byte(((frameLen<<13)>>13)<<5) - self.adtsHeader[5] = self.adtsHeader[5] | (0x7C<<1)>>3 - self.adtsHeader[6] = 0xfc + parser.adtsHeader[5] &= 0x00 + parser.adtsHeader[5] = parser.adtsHeader[5] | byte(((frameLen<<13)>>13)<<5) + parser.adtsHeader[5] = parser.adtsHeader[5] | (0x7C<<1)>>3 + parser.adtsHeader[6] = 0xfc - if _, err := w.Write(self.adtsHeader[0:]); err != nil { + if _, err := w.Write(parser.adtsHeader[0:]); err != nil { return err } if _, err := w.Write(src); err != nil { @@ -94,20 +94,20 @@ func (self *Parser) adts(src []byte, w io.Writer) error { return nil } -func (self *Parser) SampleRate() int { +func (parser *Parser) SampleRate() int { rate := 44100 - if self.cfgInfo.sampleRate <= byte(len(aacRates)-1) { - rate = aacRates[self.cfgInfo.sampleRate] + if parser.cfgInfo.sampleRate <= byte(len(aacRates)-1) { + rate = aacRates[parser.cfgInfo.sampleRate] } return rate } -func (self *Parser) Parse(b []byte, packetType uint8, w io.Writer) (err error) { +func (parser *Parser) Parse(b []byte, packetType uint8, w io.Writer) (err error) { switch packetType { case av.AAC_SEQHDR: - err = self.specificInfo(b) + err = parser.specificInfo(b) case av.AAC_RAW: - err = self.adts(b, w) + err = parser.adts(b, w) } return } diff --git a/parser/h264/parser.go b/parser/h264/parser.go index d659f8af..e065622d 100755 --- a/parser/h264/parser.go +++ b/parser/h264/parser.go @@ -74,7 +74,7 @@ func NewParser() *Parser { } //return value 1:sps, value2 :pps -func (self *Parser) parseSpecificInfo(src []byte) error { +func (parser *Parser) parseSpecificInfo(src []byte) error { if len(src) < 9 { return decDataNil } @@ -114,13 +114,13 @@ func (self *Parser) parseSpecificInfo(src []byte) error { pps = append(pps, startCode...) pps = append(pps, tmpBuf[3:]...) - self.specificInfo = append(self.specificInfo, sps...) - self.specificInfo = append(self.specificInfo, pps...) + parser.specificInfo = append(parser.specificInfo, sps...) + parser.specificInfo = append(parser.specificInfo, pps...) return nil } -func (self *Parser) isNaluHeader(src []byte) bool { +func (parser *Parser) isNaluHeader(src []byte) bool { if len(src) < naluBytesLen { return false } @@ -130,7 +130,7 @@ func (self *Parser) isNaluHeader(src []byte) bool { src[3] == 0x01 } -func (self *Parser) naluSize(src []byte) (int, error) { +func (parser *Parser) naluSize(src []byte) (int, error) { if len(src) < naluBytesLen { return 0, errors.New("nalusizedata invalid") } @@ -142,12 +142,12 @@ func (self *Parser) naluSize(src []byte) (int, error) { return size, nil } -func (self *Parser) getAnnexbH264(src []byte, w io.Writer) error { +func (parser *Parser) getAnnexbH264(src []byte, w io.Writer) error { dataSize := len(src) if dataSize < naluBytesLen { return videoDataInvalid } - self.pps.Reset() + parser.pps.Reset() _, err := w.Write(naluAud) if err != nil { return err @@ -159,7 +159,7 @@ func (self *Parser) getAnnexbH264(src []byte, w io.Writer) error { hasWriteSpsPps := false for dataSize > 0 { - nalLen, err = self.naluSize(src[index:]) + nalLen, err = parser.naluSize(src[index:]) if err != nil { return dataSizeNotMatch } @@ -173,11 +173,11 @@ func (self *Parser) getAnnexbH264(src []byte, w io.Writer) error { if !hasWriteSpsPps { hasWriteSpsPps = true if !hasSpsPps { - if _, err := w.Write(self.specificInfo); err != nil { + if _, err := w.Write(parser.specificInfo); err != nil { return err } } else { - if _, err := w.Write(self.pps.Bytes()); err != nil { + if _, err := w.Write(parser.pps.Bytes()); err != nil { return err } } @@ -198,11 +198,11 @@ func (self *Parser) getAnnexbH264(src []byte, w io.Writer) error { fallthrough case nalu_type_pps: hasSpsPps = true - _, err := self.pps.Write(startCode) + _, err := parser.pps.Write(startCode) if err != nil { return err } - _, err = self.pps.Write(src[index: index+nalLen]) + _, err = parser.pps.Write(src[index: index+nalLen]) if err != nil { return err } @@ -216,16 +216,16 @@ func (self *Parser) getAnnexbH264(src []byte, w io.Writer) error { return nil } -func (self *Parser) Parse(b []byte, isSeq bool, w io.Writer) (err error) { +func (parser *Parser) Parse(b []byte, isSeq bool, w io.Writer) (err error) { switch isSeq { case true: - err = self.parseSpecificInfo(b) + err = parser.parseSpecificInfo(b) case false: // is annexb - if self.isNaluHeader(b) { + if parser.isNaluHeader(b) { _, err = w.Write(b) } else { - err = self.getAnnexbH264(b, w) + err = parser.getAnnexbH264(b, w) } } return diff --git a/parser/mp3/parser.go b/parser/mp3/parser.go index d27dc911..2f854b36 100755 --- a/parser/mp3/parser.go +++ b/parser/mp3/parser.go @@ -21,21 +21,21 @@ var ( errIndexInvalid = errors.New("invalid rate index") ) -func (self *Parser) Parse(src []byte) error { +func (parser *Parser) Parse(src []byte) error { if len(src) < 3 { return errMp3DataInvalid } index := (src[2] >> 2) & 0x3 if index <= byte(len(mp3Rates)-1) { - self.samplingFrequency = mp3Rates[index] + parser.samplingFrequency = mp3Rates[index] return nil } return errIndexInvalid } -func (self *Parser) SampleRate() int { - if self.samplingFrequency == 0 { - self.samplingFrequency = 44100 +func (parser *Parser) SampleRate() int { + if parser.samplingFrequency == 0 { + parser.samplingFrequency = 44100 } - return self.samplingFrequency + return parser.samplingFrequency } diff --git a/parser/parser.go b/parser/parser.go index 42fc6026..ba1b1159 100755 --- a/parser/parser.go +++ b/parser/parser.go @@ -23,27 +23,27 @@ func NewCodecParser() *CodecParser { return &CodecParser{} } -func (self *CodecParser) SampleRate() (int, error) { - if self.aac == nil && self.mp3 == nil { +func (codeParser *CodecParser) SampleRate() (int, error) { + if codeParser.aac == nil && codeParser.mp3 == nil { return 0, errNoAudio } - if self.aac != nil { - return self.aac.SampleRate(), nil + if codeParser.aac != nil { + return codeParser.aac.SampleRate(), nil } - return self.mp3.SampleRate(), nil + return codeParser.mp3.SampleRate(), nil } -func (self *CodecParser) Parse(p *av.Packet, w io.Writer) (err error) { +func (codeParser *CodecParser) Parse(p *av.Packet, w io.Writer) (err error) { switch p.IsVideo { case true: f, ok := p.Header.(av.VideoPacketHeader) if ok { if f.CodecID() == av.VIDEO_H264 { - if self.h264 == nil { - self.h264 = h264.NewParser() + if codeParser.h264 == nil { + codeParser.h264 = h264.NewParser() } - err = self.h264.Parse(p.Data, f.IsSeq(), w) + err = codeParser.h264.Parse(p.Data, f.IsSeq(), w) } } case false: @@ -51,15 +51,15 @@ func (self *CodecParser) Parse(p *av.Packet, w io.Writer) (err error) { if ok { switch f.SoundFormat() { case av.SOUND_AAC: - if self.aac == nil { - self.aac = aac.NewParser() + if codeParser.aac == nil { + codeParser.aac = aac.NewParser() } - err = self.aac.Parse(p.Data, f.AACPacketType(), w) + err = codeParser.aac.Parse(p.Data, f.AACPacketType(), w) case av.SOUND_MP3: - if self.mp3 == nil { - self.mp3 = mp3.NewParser() + if codeParser.mp3 == nil { + codeParser.mp3 = mp3.NewParser() } - err = self.mp3.Parse(p.Data) + err = codeParser.mp3.Parse(p.Data) } } diff --git a/protocol/dash/dash.go b/protocol/dash/dash.go deleted file mode 100755 index 3f19715b..00000000 --- a/protocol/dash/dash.go +++ /dev/null @@ -1 +0,0 @@ -package dash diff --git a/protocol/hls/align.go b/protocol/hls/align.go index 26506b24..9eef54d5 100755 --- a/protocol/hls/align.go +++ b/protocol/hls/align.go @@ -9,9 +9,9 @@ type align struct { frameBase uint64 } -func (self *align) align(dts *uint64, inc uint32) { +func (a *align) align(dts *uint64, inc uint32) { aFrameDts := *dts - estPts := self.frameBase + self.frameNum*uint64(inc) + estPts := a.frameBase + a.frameNum*uint64(inc) var dPts uint64 if estPts >= aFrameDts { dPts = estPts - aFrameDts @@ -20,10 +20,10 @@ func (self *align) align(dts *uint64, inc uint32) { } if dPts <= uint64(syncms)*h264_default_hz { - self.frameNum++ + a.frameNum++ *dts = estPts return } - self.frameNum = 1 - self.frameBase = aFrameDts + a.frameNum = 1 + a.frameBase = aFrameDts } diff --git a/protocol/hls/audio_cache.go b/protocol/hls/audio_cache.go index 88dbd2fe..58dcd532 100755 --- a/protocol/hls/audio_cache.go +++ b/protocol/hls/audio_cache.go @@ -21,24 +21,24 @@ func newAudioCache() *audioCache { } } -func (self *audioCache) Cache(src []byte, pts uint64) bool { - if self.num == 0 { - self.offset = 0 - self.pts = pts - self.buf.Reset() +func (a *audioCache) Cache(src []byte, pts uint64) bool { + if a.num == 0 { + a.offset = 0 + a.pts = pts + a.buf.Reset() } - self.buf.Write(src) - self.offset += len(src) - self.num++ + a.buf.Write(src) + a.offset += len(src) + a.num++ return false } -func (self *audioCache) GetFrame() (int, uint64, []byte) { - self.num = 0 - return self.offset, self.pts, self.buf.Bytes() +func (a *audioCache) GetFrame() (int, uint64, []byte) { + a.num = 0 + return a.offset, a.pts, a.buf.Bytes() } -func (self *audioCache) CacheNum() byte { - return self.num +func (a *audioCache) CacheNum() byte { + return a.num } diff --git a/protocol/hls/hls.go b/protocol/hls/hls.go index 8deeceb8..3ea623ec 100755 --- a/protocol/hls/hls.go +++ b/protocol/hls/hls.go @@ -1,7 +1,6 @@ package hls import ( - "bytes" "errors" "fmt" "net" @@ -10,13 +9,9 @@ import ( "strconv" "strings" "time" - + "log" "github.com/gwuhaolin/livego/utils/cmap" "github.com/gwuhaolin/livego/av" - "github.com/gwuhaolin/livego/container/flv" - "github.com/gwuhaolin/livego/container/ts" - "github.com/gwuhaolin/livego/parser" - "log" ) const ( @@ -37,8 +32,8 @@ var crossdomainxml = []byte(` `) type Server struct { - l net.Listener - conns cmap.ConcurrentMap + listener net.Listener + conns cmap.ConcurrentMap } func NewServer() *Server { @@ -49,52 +44,52 @@ func NewServer() *Server { return ret } -func (self *Server) Serve(l net.Listener) error { +func (server *Server) Serve(listener net.Listener) error { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - self.handle(w, r) + server.handle(w, r) }) - self.l = l - http.Serve(l, mux) + server.listener = listener + http.Serve(listener, mux) return nil } -func (self *Server) GetWriter(info av.Info) av.WriteCloser { +func (server *Server) GetWriter(info av.Info) av.WriteCloser { var s *Source - ok := self.conns.Has(info.Key) + ok := server.conns.Has(info.Key) if !ok { log.Println("new hls source") s = NewSource(info) - self.conns.Set(info.Key, s) + server.conns.Set(info.Key, s) } else { - v, _ := self.conns.Get(info.Key) + v, _ := server.conns.Get(info.Key) s = v.(*Source) } return s } -func (self *Server) getConn(key string) *Source { - v, ok := self.conns.Get(key) +func (server *Server) getConn(key string) *Source { + v, ok := server.conns.Get(key) if !ok { return nil } return v.(*Source) } -func (self *Server) checkStop() { +func (server *Server) checkStop() { for { <-time.After(5 * time.Second) - for item := range self.conns.IterBuffered() { + for item := range server.conns.IterBuffered() { v := item.Val.(*Source) if !v.Alive() { log.Println("check stop and remove: ", v.Info()) - self.conns.Remove(item.Key) + server.conns.Remove(item.Key) } } } } -func (self *Server) handle(w http.ResponseWriter, r *http.Request) { +func (server *Server) handle(w http.ResponseWriter, r *http.Request) { if path.Base(r.URL.Path) == "crossdomain.xml" { w.Header().Set("Content-Type", "application/xml") w.Write(crossdomainxml) @@ -102,8 +97,8 @@ func (self *Server) handle(w http.ResponseWriter, r *http.Request) { } switch path.Ext(r.URL.Path) { case ".m3u8": - key, _ := self.parseM3u8(r.URL.Path) - conn := self.getConn(key) + key, _ := server.parseM3u8(r.URL.Path) + conn := server.getConn(key) if conn == nil { http.Error(w, ErrNoPublisher.Error(), http.StatusForbidden) return @@ -122,8 +117,8 @@ func (self *Server) handle(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Length", strconv.Itoa(len(body))) w.Write(body) case ".ts": - key, _ := self.parseTs(r.URL.Path) - conn := self.getConn(key) + key, _ := server.parseTs(r.URL.Path) + conn := server.getConn(key) if conn == nil { http.Error(w, ErrNoPublisher.Error(), http.StatusForbidden) return @@ -142,13 +137,13 @@ func (self *Server) handle(w http.ResponseWriter, r *http.Request) { } } -func (self *Server) parseM3u8(pathstr string) (key string, err error) { +func (server *Server) parseM3u8(pathstr string) (key string, err error) { pathstr = strings.TrimLeft(pathstr, "/") key = strings.TrimRight(pathstr, path.Ext(pathstr)) return } -func (self *Server) parseTs(pathstr string) (key string, err error) { +func (server *Server) parseTs(pathstr string) (key string, err error) { pathstr = strings.TrimLeft(pathstr, "/") paths := strings.SplitN(pathstr, "/", 3) if len(paths) != 3 { @@ -159,254 +154,3 @@ func (self *Server) parseTs(pathstr string) (key string, err error) { return } - -const ( - videoHZ = 90000 - aacSampleLen = 1024 - maxQueueNum = 512 - - h264_default_hz uint64 = 90 -) - -type Source struct { - av.RWBaser - seq int - info av.Info - bwriter *bytes.Buffer - btswriter *bytes.Buffer - demuxer *flv.Demuxer - muxer *ts.Muxer - pts, dts uint64 - stat *status - align *align - cache *audioCache - tsCache *TSCacheItem - tsparser *parser.CodecParser - closed bool - packetQueue chan av.Packet -} - -func NewSource(info av.Info) *Source { - info.Inter = true - s := &Source{ - info: info, - align: &align{}, - stat: newStatus(), - RWBaser: av.NewRWBaser(time.Second * 10), - cache: newAudioCache(), - demuxer: flv.NewDemuxer(), - muxer: ts.NewMuxer(), - tsCache: NewTSCacheItem(info.Key), - tsparser: parser.NewCodecParser(), - bwriter: bytes.NewBuffer(make([]byte, 100*1024)), - packetQueue: make(chan av.Packet, maxQueueNum), - } - go func() { - err := s.SendPacket() - if err != nil { - log.Println("send packet error: ", err) - s.closed = true - } - }() - return s -} - -func (self *Source) GetCacheInc() *TSCacheItem { - return self.tsCache -} - -func (self *Source) DropPacket(pktQue chan av.Packet, info av.Info) { - log.Printf("[%v] packet queue max!!!", info) - for i := 0; i < maxQueueNum-84; i++ { - tmpPkt, ok := <-pktQue - // try to don't drop audio - if ok && tmpPkt.IsAudio { - if len(pktQue) > maxQueueNum-2 { - <-pktQue - } else { - pktQue <- tmpPkt - } - } - - if ok && tmpPkt.IsVideo { - videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader) - // dont't drop sps config and dont't drop key frame - if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) { - pktQue <- tmpPkt - } - if len(pktQue) > maxQueueNum-10 { - <-pktQue - } - } - - } - log.Println("packet queue len: ", len(pktQue)) -} - -func (self *Source) Write(p av.Packet) error { - self.SetPreTime() - if len(self.packetQueue) >= maxQueueNum-24 { - self.DropPacket(self.packetQueue, self.info) - } else { - self.packetQueue <- p - } - return nil -} - -func (self *Source) SendPacket() error { - defer func() { - log.Printf("[%v] hls sender stop", self.info) - if r := recover(); r != nil { - log.Println("hls SendPacket panic: ", r) - } - }() - log.Printf("[%v] hls sender start", self.info) - for { - if self.closed { - return errors.New("closed") - } - - p, ok := <-self.packetQueue - if ok { - if p.IsMetadata { - continue - } - - err := self.demuxer.Demux(&p) - if err == flv.ErrAvcEndSEQ { - log.Println(err) - continue - } else { - if err != nil { - log.Println(err) - return err - } - } - compositionTime, isSeq, err := self.parse(&p) - if err != nil { - log.Println(err) - } - if err != nil || isSeq { - continue - } - if self.btswriter != nil { - self.stat.update(p.IsVideo, p.TimeStamp) - self.calcPtsDts(p.IsVideo, p.TimeStamp, uint32(compositionTime)) - self.tsMux(&p) - } - } else { - return errors.New("closed") - } - } -} - -func (self *Source) Info() (ret av.Info) { - return self.info -} - -func (self *Source) cleanup() { - close(self.packetQueue) - self.bwriter = nil - self.btswriter = nil - self.cache = nil - self.tsCache = nil -} - -func (self *Source) Close(err error) { - log.Println("hls source closed: ", self.info) - if !self.closed { - self.cleanup() - } - self.closed = true -} - -func (self *Source) cut() { - newf := true - if self.btswriter == nil { - self.btswriter = bytes.NewBuffer(nil) - } else if self.btswriter != nil && self.stat.durationMs() >= duration { - self.flushAudio() - - self.seq++ - filename := fmt.Sprintf("/%s/%d.ts", self.info.Key, time.Now().Unix()) - item := NewTSItem(filename, int(self.stat.durationMs()), self.seq, self.btswriter.Bytes()) - self.tsCache.SetItem(filename, item) - - self.btswriter.Reset() - self.stat.resetAndNew() - } else { - newf = false - } - if newf { - self.btswriter.Write(self.muxer.PAT()) - self.btswriter.Write(self.muxer.PMT(av.SOUND_AAC, true)) - } -} - -func (self *Source) parse(p *av.Packet) (int32, bool, error) { - var compositionTime int32 - var ah av.AudioPacketHeader - var vh av.VideoPacketHeader - if p.IsVideo { - vh = p.Header.(av.VideoPacketHeader) - if vh.CodecID() != av.VIDEO_H264 { - return compositionTime, false, ErrNoSupportVideoCodec - } - compositionTime = vh.CompositionTime() - if vh.IsKeyFrame() && vh.IsSeq() { - return compositionTime, true, self.tsparser.Parse(p, self.bwriter) - } - } else { - ah = p.Header.(av.AudioPacketHeader) - if ah.SoundFormat() != av.SOUND_AAC { - return compositionTime, false, ErrNoSupportAudioCodec - } - if ah.AACPacketType() == av.AAC_SEQHDR { - return compositionTime, true, self.tsparser.Parse(p, self.bwriter) - } - } - self.bwriter.Reset() - if err := self.tsparser.Parse(p, self.bwriter); err != nil { - return compositionTime, false, err - } - p.Data = self.bwriter.Bytes() - - if p.IsVideo && vh.IsKeyFrame() { - self.cut() - } - return compositionTime, false, nil -} - -func (self *Source) calcPtsDts(isVideo bool, ts, compositionTs uint32) { - self.dts = uint64(ts) * h264_default_hz - if isVideo { - self.pts = self.dts + uint64(compositionTs)*h264_default_hz - } else { - sampleRate, _ := self.tsparser.SampleRate() - self.align.align(&self.dts, uint32(videoHZ*aacSampleLen/sampleRate)) - self.pts = self.dts - } -} -func (self *Source) flushAudio() error { - return self.muxAudio(1) -} - -func (self *Source) muxAudio(limit byte) error { - if self.cache.CacheNum() < limit { - return nil - } - var p av.Packet - _, pts, buf := self.cache.GetFrame() - p.Data = buf - p.TimeStamp = uint32(pts / h264_default_hz) - return self.muxer.Mux(&p, self.btswriter) -} - -func (self *Source) tsMux(p *av.Packet) error { - if p.IsVideo { - return self.muxer.Mux(p, self.btswriter) - } else { - self.cache.Cache(p.Data, self.pts) - return self.muxAudio(cache_max_frames) - } -} diff --git a/protocol/hls/source.go b/protocol/hls/source.go new file mode 100644 index 00000000..2b98069f --- /dev/null +++ b/protocol/hls/source.go @@ -0,0 +1,264 @@ +package hls + +import ( + "fmt" + "time" + "bytes" + "log" + "errors" + "github.com/gwuhaolin/livego/parser" + "github.com/gwuhaolin/livego/av" + "github.com/gwuhaolin/livego/container/flv" + "github.com/gwuhaolin/livego/container/ts" +) + +const ( + videoHZ = 90000 + aacSampleLen = 1024 + maxQueueNum = 512 + + h264_default_hz uint64 = 90 +) + +type Source struct { + av.RWBaser + seq int + info av.Info + bwriter *bytes.Buffer + btswriter *bytes.Buffer + demuxer *flv.Demuxer + muxer *ts.Muxer + pts, dts uint64 + stat *status + align *align + cache *audioCache + tsCache *TSCacheItem + tsparser *parser.CodecParser + closed bool + packetQueue chan av.Packet +} + +func NewSource(info av.Info) *Source { + info.Inter = true + s := &Source{ + info: info, + align: &align{}, + stat: newStatus(), + RWBaser: av.NewRWBaser(time.Second * 10), + cache: newAudioCache(), + demuxer: flv.NewDemuxer(), + muxer: ts.NewMuxer(), + tsCache: NewTSCacheItem(info.Key), + tsparser: parser.NewCodecParser(), + bwriter: bytes.NewBuffer(make([]byte, 100*1024)), + packetQueue: make(chan av.Packet, maxQueueNum), + } + go func() { + err := s.SendPacket() + if err != nil { + log.Println("send packet error: ", err) + s.closed = true + } + }() + return s +} + +func (source *Source) GetCacheInc() *TSCacheItem { + return source.tsCache +} + +func (source *Source) DropPacket(pktQue chan av.Packet, info av.Info) { + log.Printf("[%v] packet queue max!!!", info) + for i := 0; i < maxQueueNum-84; i++ { + tmpPkt, ok := <-pktQue + // try to don't drop audio + if ok && tmpPkt.IsAudio { + if len(pktQue) > maxQueueNum-2 { + <-pktQue + } else { + pktQue <- tmpPkt + } + } + + if ok && tmpPkt.IsVideo { + videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader) + // dont't drop sps config and dont't drop key frame + if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) { + pktQue <- tmpPkt + } + if len(pktQue) > maxQueueNum-10 { + <-pktQue + } + } + + } + log.Println("packet queue len: ", len(pktQue)) +} + +func (source *Source) Write(p av.Packet) error { + source.SetPreTime() + if len(source.packetQueue) >= maxQueueNum-24 { + source.DropPacket(source.packetQueue, source.info) + } else { + source.packetQueue <- p + } + return nil +} + +func (source *Source) SendPacket() error { + defer func() { + log.Printf("[%v] hls sender stop", source.info) + if r := recover(); r != nil { + log.Println("hls SendPacket panic: ", r) + } + }() + log.Printf("[%v] hls sender start", source.info) + for { + if source.closed { + return errors.New("closed") + } + + p, ok := <-source.packetQueue + if ok { + if p.IsMetadata { + continue + } + + err := source.demuxer.Demux(&p) + if err == flv.ErrAvcEndSEQ { + log.Println(err) + continue + } else { + if err != nil { + log.Println(err) + return err + } + } + compositionTime, isSeq, err := source.parse(&p) + if err != nil { + log.Println(err) + } + if err != nil || isSeq { + continue + } + if source.btswriter != nil { + source.stat.update(p.IsVideo, p.TimeStamp) + source.calcPtsDts(p.IsVideo, p.TimeStamp, uint32(compositionTime)) + source.tsMux(&p) + } + } else { + return errors.New("closed") + } + } +} + +func (source *Source) Info() (ret av.Info) { + return source.info +} + +func (source *Source) cleanup() { + close(source.packetQueue) + source.bwriter = nil + source.btswriter = nil + source.cache = nil + source.tsCache = nil +} + +func (source *Source) Close(err error) { + log.Println("hls source closed: ", source.info) + if !source.closed { + source.cleanup() + } + source.closed = true +} + +func (source *Source) cut() { + newf := true + if source.btswriter == nil { + source.btswriter = bytes.NewBuffer(nil) + } else if source.btswriter != nil && source.stat.durationMs() >= duration { + source.flushAudio() + + source.seq++ + filename := fmt.Sprintf("/%s/%d.ts", source.info.Key, time.Now().Unix()) + item := NewTSItem(filename, int(source.stat.durationMs()), source.seq, source.btswriter.Bytes()) + source.tsCache.SetItem(filename, item) + + source.btswriter.Reset() + source.stat.resetAndNew() + } else { + newf = false + } + if newf { + source.btswriter.Write(source.muxer.PAT()) + source.btswriter.Write(source.muxer.PMT(av.SOUND_AAC, true)) + } +} + +func (source *Source) parse(p *av.Packet) (int32, bool, error) { + var compositionTime int32 + var ah av.AudioPacketHeader + var vh av.VideoPacketHeader + if p.IsVideo { + vh = p.Header.(av.VideoPacketHeader) + if vh.CodecID() != av.VIDEO_H264 { + return compositionTime, false, ErrNoSupportVideoCodec + } + compositionTime = vh.CompositionTime() + if vh.IsKeyFrame() && vh.IsSeq() { + return compositionTime, true, source.tsparser.Parse(p, source.bwriter) + } + } else { + ah = p.Header.(av.AudioPacketHeader) + if ah.SoundFormat() != av.SOUND_AAC { + return compositionTime, false, ErrNoSupportAudioCodec + } + if ah.AACPacketType() == av.AAC_SEQHDR { + return compositionTime, true, source.tsparser.Parse(p, source.bwriter) + } + } + source.bwriter.Reset() + if err := source.tsparser.Parse(p, source.bwriter); err != nil { + return compositionTime, false, err + } + p.Data = source.bwriter.Bytes() + + if p.IsVideo && vh.IsKeyFrame() { + source.cut() + } + return compositionTime, false, nil +} + +func (source *Source) calcPtsDts(isVideo bool, ts, compositionTs uint32) { + source.dts = uint64(ts) * h264_default_hz + if isVideo { + source.pts = source.dts + uint64(compositionTs)*h264_default_hz + } else { + sampleRate, _ := source.tsparser.SampleRate() + source.align.align(&source.dts, uint32(videoHZ*aacSampleLen/sampleRate)) + source.pts = source.dts + } +} +func (source *Source) flushAudio() error { + return source.muxAudio(1) +} + +func (source *Source) muxAudio(limit byte) error { + if source.cache.CacheNum() < limit { + return nil + } + var p av.Packet + _, pts, buf := source.cache.GetFrame() + p.Data = buf + p.TimeStamp = uint32(pts / h264_default_hz) + return source.muxer.Mux(&p, source.btswriter) +} + +func (source *Source) tsMux(p *av.Packet) error { + if p.IsVideo { + return source.muxer.Mux(p, source.btswriter) + } else { + source.cache.Cache(p.Data, source.pts) + return source.muxAudio(cache_max_frames) + } +} diff --git a/protocol/hls/ts_cache.go b/protocol/hls/ts_cache.go index 99ccd61c..e8c38d50 100755 --- a/protocol/hls/ts_cache.go +++ b/protocol/hls/ts_cache.go @@ -18,18 +18,18 @@ func NewTSCache() *TSCache { } } -func (self *TSCache) Set(key string, e *TSCacheItem) { - v, ok := self.entrys[key] +func (cache *TSCache) Set(key string, e *TSCacheItem) { + v, ok := cache.entrys[key] if !ok { - self.entrys[key] = e + cache.entrys[key] = e } if v.ID() != e.ID() { - self.entrys[key] = e + cache.entrys[key] = e } } -func (self *TSCache) Get(key string) *TSCacheItem { - v := self.entrys[key] +func (cache *TSCache) Get(key string) *TSCacheItem { + v := cache.entrys[key] return v } @@ -58,19 +58,19 @@ func NewTSCacheItem(id string) *TSCacheItem { } } -func (self *TSCacheItem) ID() string { - return self.id +func (tcCacheItem *TSCacheItem) ID() string { + return tcCacheItem.id } // TODO: found data race, fix it -func (self *TSCacheItem) GenM3U8PlayList() ([]byte, error) { +func (tcCacheItem *TSCacheItem) GenM3U8PlayList() ([]byte, error) { var seq int var getSeq bool var maxDuration int m3u8body := bytes.NewBuffer(nil) - for e := self.ll.Front(); e != nil; e = e.Next() { + for e := tcCacheItem.ll.Front(); e != nil; e = e.Next() { key := e.Value.(string) - v, ok := self.lm[key] + v, ok := tcCacheItem.lm[key] if ok { if v.Duration > maxDuration { maxDuration = v.Duration @@ -90,19 +90,19 @@ func (self *TSCacheItem) GenM3U8PlayList() ([]byte, error) { return w.Bytes(), nil } -func (self *TSCacheItem) SetItem(key string, item TSItem) { - if self.ll.Len() == self.num { - e := self.ll.Front() - self.ll.Remove(e) +func (tcCacheItem *TSCacheItem) SetItem(key string, item TSItem) { + if tcCacheItem.ll.Len() == tcCacheItem.num { + e := tcCacheItem.ll.Front() + tcCacheItem.ll.Remove(e) k := e.Value.(string) - delete(self.lm, k) + delete(tcCacheItem.lm, k) } - self.lm[key] = item - self.ll.PushBack(key) + tcCacheItem.lm[key] = item + tcCacheItem.ll.PushBack(key) } -func (self *TSCacheItem) GetItem(key string) (TSItem, error) { - item, ok := self.lm[key] +func (tcCacheItem *TSCacheItem) GetItem(key string) (TSItem, error) { + item, ok := tcCacheItem.lm[key] if !ok { return item, ErrNoKey } diff --git a/protocol/httpflv/http_flv.go b/protocol/httpflv/http_flv.go index 91a2fab3..8816f3de 100755 --- a/protocol/httpflv/http_flv.go +++ b/protocol/httpflv/http_flv.go @@ -6,9 +6,7 @@ import ( "net/http" "strings" "time" - "errors" - "github.com/gwuhaolin/livego/utils/uid" "github.com/gwuhaolin/livego/protocol/amf" "github.com/gwuhaolin/livego/av" diff --git a/protocol/httpopera/http_opera.go b/protocol/httpopera/http_opera.go index 2e19c759..c566163e 100755 --- a/protocol/httpopera/http_opera.go +++ b/protocol/httpopera/http_opera.go @@ -5,9 +5,8 @@ import ( "io/ioutil" "net" "net/http" - - "github.com/gwuhaolin/livego/av" "log" + "github.com/gwuhaolin/livego/av" "github.com/gwuhaolin/livego/protocol/rtmp" ) diff --git a/protocol/kcpts/kcp_ts.go b/protocol/kcpts/kcp_ts.go deleted file mode 100755 index 7a87ef8d..00000000 --- a/protocol/kcpts/kcp_ts.go +++ /dev/null @@ -1 +0,0 @@ -package kcpts diff --git a/protocol/private/protocol.go b/protocol/private/protocol.go deleted file mode 100755 index 735e4dc8..00000000 --- a/protocol/private/protocol.go +++ /dev/null @@ -1 +0,0 @@ -package private diff --git a/protocol/rtmp/cache/cache.go b/protocol/rtmp/cache/cache.go index bbe97318..444085c0 100755 --- a/protocol/rtmp/cache/cache.go +++ b/protocol/rtmp/cache/cache.go @@ -25,9 +25,9 @@ func NewCache() *Cache { } } -func (self *Cache) Write(p av.Packet) { +func (cache *Cache) Write(p av.Packet) { if p.IsMetadata { - self.metadata.Write(p) + cache.metadata.Write(p) return } else { if !p.IsVideo { @@ -35,7 +35,7 @@ func (self *Cache) Write(p av.Packet) { if ok { if ah.SoundFormat() == av.SOUND_AAC && ah.AACPacketType() == av.AAC_SEQHDR { - self.audioSeq.Write(p) + cache.audioSeq.Write(p) return } else { return @@ -46,7 +46,7 @@ func (self *Cache) Write(p av.Packet) { vh, ok := p.Header.(av.VideoPacketHeader) if ok { if vh.IsSeq() { - self.videoSeq.Write(p) + cache.videoSeq.Write(p) return } } else { @@ -55,23 +55,23 @@ func (self *Cache) Write(p av.Packet) { } } - self.gop.Write(p) + cache.gop.Write(p) } -func (self *Cache) Send(w av.WriteCloser) error { - if err := self.metadata.Send(w); err != nil { +func (cache *Cache) Send(w av.WriteCloser) error { + if err := cache.metadata.Send(w); err != nil { return err } - if err := self.videoSeq.Send(w); err != nil { + if err := cache.videoSeq.Send(w); err != nil { return err } - if err := self.audioSeq.Send(w); err != nil { + if err := cache.audioSeq.Send(w); err != nil { return err } - if err := self.gop.Send(w); err != nil { + if err := cache.gop.Send(w); err != nil { return err } diff --git a/protocol/rtmp/cache/gop.go b/protocol/rtmp/cache/gop.go index 9526eaa3..33abf72d 100755 --- a/protocol/rtmp/cache/gop.go +++ b/protocol/rtmp/cache/gop.go @@ -2,7 +2,6 @@ package cache import ( "errors" - "github.com/gwuhaolin/livego/av" ) @@ -24,24 +23,24 @@ func newArray() *array { return ret } -func (self *array) reset() { - self.index = 0 - self.packets = self.packets[:0] +func (array *array) reset() { + array.index = 0 + array.packets = array.packets[:0] } -func (self *array) write(packet av.Packet) error { - if self.index >= maxGOPCap { +func (array *array) write(packet av.Packet) error { + if array.index >= maxGOPCap { return ErrGopTooBig } - self.packets = append(self.packets, packet) - self.index++ + array.packets = append(array.packets, packet) + array.index++ return nil } -func (self *array) send(w av.WriteCloser) error { +func (array *array) send(w av.WriteCloser) error { var err error - for i := 0; i < self.index; i++ { - packet := self.packets[i] + for i := 0; i < array.index; i++ { + packet := array.packets[i] if err = w.Write(packet); err != nil { return err } @@ -64,27 +63,27 @@ func NewGopCache(num int) *GopCache { } } -func (self *GopCache) writeToArray(chunk av.Packet, startNew bool) error { +func (gopCache *GopCache) writeToArray(chunk av.Packet, startNew bool) error { var ginc *array if startNew { - ginc = self.gops[self.nextindex] + ginc = gopCache.gops[gopCache.nextindex] if ginc == nil { ginc = newArray() - self.num++ - self.gops[self.nextindex] = ginc + gopCache.num++ + gopCache.gops[gopCache.nextindex] = ginc } else { ginc.reset() } - self.nextindex = (self.nextindex + 1) % self.count + gopCache.nextindex = (gopCache.nextindex + 1) % gopCache.count } else { - ginc = self.gops[(self.nextindex+1)%self.count] + ginc = gopCache.gops[(gopCache.nextindex+1)%gopCache.count] } ginc.write(chunk) return nil } -func (self *GopCache) Write(p av.Packet) { +func (gopCache *GopCache) Write(p av.Packet) { var ok bool if p.IsVideo { vh := p.Header.(av.VideoPacketHeader) @@ -92,21 +91,21 @@ func (self *GopCache) Write(p av.Packet) { ok = true } } - if ok || self.start { - self.start = true - self.writeToArray(p, ok) + if ok || gopCache.start { + gopCache.start = true + gopCache.writeToArray(p, ok) } } -func (self *GopCache) sendTo(w av.WriteCloser) error { +func (gopCache *GopCache) sendTo(w av.WriteCloser) error { var err error - pos := (self.nextindex + 1) % self.count - for i := 0; i < self.num; i++ { - index := (pos - self.num + 1) + i + pos := (gopCache.nextindex + 1) % gopCache.count + for i := 0; i < gopCache.num; i++ { + index := (pos - gopCache.num + 1) + i if index < 0 { - index += self.count + index += gopCache.count } - g := self.gops[index] + g := gopCache.gops[index] err = g.send(w) if err != nil { return err @@ -115,6 +114,6 @@ func (self *GopCache) sendTo(w av.WriteCloser) error { return nil } -func (self *GopCache) Send(w av.WriteCloser) error { - return self.sendTo(w) +func (gopCache *GopCache) Send(w av.WriteCloser) error { + return gopCache.sendTo(w) } diff --git a/protocol/rtmp/cache/special.go b/protocol/rtmp/cache/special.go index 13c1fa22..3a56ac7a 100755 --- a/protocol/rtmp/cache/special.go +++ b/protocol/rtmp/cache/special.go @@ -33,14 +33,14 @@ func NewSpecialCache() *SpecialCache { return &SpecialCache{} } -func (self *SpecialCache) Write(p av.Packet) { - self.p = p - self.full = true +func (specialCache *SpecialCache) Write(p av.Packet) { + specialCache.p = p + specialCache.full = true } -func (self *SpecialCache) Send(w av.WriteCloser) error { - if !self.full { +func (specialCache *SpecialCache) Send(w av.WriteCloser) error { + if !specialCache.full { return nil } - return w.Write(self.p) + return w.Write(specialCache.p) } diff --git a/protocol/rtmp/core/chunk_stream.go b/protocol/rtmp/core/chunk_stream.go index b1355315..fab2e6b7 100755 --- a/protocol/rtmp/core/chunk_stream.go +++ b/protocol/rtmp/core/chunk_stream.go @@ -24,93 +24,93 @@ type ChunkStream struct { Data []byte } -func (self *ChunkStream) full() bool { - return self.got +func (chunkStream *ChunkStream) full() bool { + return chunkStream.got } -func (self *ChunkStream) new(pool *pool.Pool) { - self.got = false - self.index = 0 - self.remain = self.Length - self.Data = pool.Get(int(self.Length)) +func (chunkStream *ChunkStream) new(pool *pool.Pool) { + chunkStream.got = false + chunkStream.index = 0 + chunkStream.remain = chunkStream.Length + chunkStream.Data = pool.Get(int(chunkStream.Length)) } -func (self *ChunkStream) writeHeader(w *ReadWriter) error { +func (chunkStream *ChunkStream) writeHeader(w *ReadWriter) error { //Chunk Basic Header - h := self.Format << 6 + h := chunkStream.Format << 6 switch { - case self.CSID < 64: - h |= self.CSID + case chunkStream.CSID < 64: + h |= chunkStream.CSID w.WriteUintBE(h, 1) - case self.CSID-64 < 256: + case chunkStream.CSID-64 < 256: h |= 0 w.WriteUintBE(h, 1) - w.WriteUintLE(self.CSID-64, 1) - case self.CSID-64 < 65536: + w.WriteUintLE(chunkStream.CSID-64, 1) + case chunkStream.CSID-64 < 65536: h |= 1 w.WriteUintBE(h, 1) - w.WriteUintLE(self.CSID-64, 2) + w.WriteUintLE(chunkStream.CSID-64, 2) } //Chunk Message Header - ts := self.Timestamp - if self.Format == 3 { + ts := chunkStream.Timestamp + if chunkStream.Format == 3 { goto END } - if self.Timestamp > 0xffffff { + if chunkStream.Timestamp > 0xffffff { ts = 0xffffff } w.WriteUintBE(ts, 3) - if self.Format == 2 { + if chunkStream.Format == 2 { goto END } - if self.Length > 0xffffff { - return fmt.Errorf("length=%d", self.Length) + if chunkStream.Length > 0xffffff { + return fmt.Errorf("length=%d", chunkStream.Length) } - w.WriteUintBE(self.Length, 3) - w.WriteUintBE(self.TypeID, 1) - if self.Format == 1 { + w.WriteUintBE(chunkStream.Length, 3) + w.WriteUintBE(chunkStream.TypeID, 1) + if chunkStream.Format == 1 { goto END } - w.WriteUintLE(self.StreamID, 4) + w.WriteUintLE(chunkStream.StreamID, 4) END: //Extended Timestamp if ts >= 0xffffff { - w.WriteUintBE(self.Timestamp, 4) + w.WriteUintBE(chunkStream.Timestamp, 4) } return w.WriteError() } -func (self *ChunkStream) writeChunk(w *ReadWriter, chunkSize int) error { - if self.TypeID == av.TAG_AUDIO { - self.CSID = 4 - } else if self.TypeID == av.TAG_VIDEO || - self.TypeID == av.TAG_SCRIPTDATAAMF0 || - self.TypeID == av.TAG_SCRIPTDATAAMF3 { - self.CSID = 6 +func (chunkStream *ChunkStream) writeChunk(w *ReadWriter, chunkSize int) error { + if chunkStream.TypeID == av.TAG_AUDIO { + chunkStream.CSID = 4 + } else if chunkStream.TypeID == av.TAG_VIDEO || + chunkStream.TypeID == av.TAG_SCRIPTDATAAMF0 || + chunkStream.TypeID == av.TAG_SCRIPTDATAAMF3 { + chunkStream.CSID = 6 } totalLen := uint32(0) - numChunks := (self.Length / uint32(chunkSize)) + numChunks := (chunkStream.Length / uint32(chunkSize)) for i := uint32(0); i <= numChunks; i++ { - if totalLen == self.Length { + if totalLen == chunkStream.Length { break } if i == 0 { - self.Format = uint32(0) + chunkStream.Format = uint32(0) } else { - self.Format = uint32(3) + chunkStream.Format = uint32(3) } - if err := self.writeHeader(w); err != nil { + if err := chunkStream.writeHeader(w); err != nil { return err } inc := uint32(chunkSize) start := uint32(i) * uint32(chunkSize) - if uint32(len(self.Data))-start <= inc { - inc = uint32(len(self.Data)) - start + if uint32(len(chunkStream.Data))-start <= inc { + inc = uint32(len(chunkStream.Data)) - start } totalLen += inc end := start + inc - buf := self.Data[start:end] + buf := chunkStream.Data[start:end] if _, err := w.Write(buf); err != nil { return err } @@ -120,105 +120,105 @@ func (self *ChunkStream) writeChunk(w *ReadWriter, chunkSize int) error { } -func (self *ChunkStream) readChunk(r *ReadWriter, chunkSize uint32, pool *pool.Pool) error { - if self.remain != 0 && self.tmpFromat != 3 { - return fmt.Errorf("inlaid remin = %d", self.remain) +func (chunkStream *ChunkStream) readChunk(r *ReadWriter, chunkSize uint32, pool *pool.Pool) error { + if chunkStream.remain != 0 && chunkStream.tmpFromat != 3 { + return fmt.Errorf("inlaid remin = %d", chunkStream.remain) } - switch self.CSID { + switch chunkStream.CSID { case 0: id, _ := r.ReadUintLE(1) - self.CSID = id + 64 + chunkStream.CSID = id + 64 case 1: id, _ := r.ReadUintLE(2) - self.CSID = id + 64 + chunkStream.CSID = id + 64 } - switch self.tmpFromat { + switch chunkStream.tmpFromat { case 0: - self.Format = self.tmpFromat - self.Timestamp, _ = r.ReadUintBE(3) - self.Length, _ = r.ReadUintBE(3) - self.TypeID, _ = r.ReadUintBE(1) - self.StreamID, _ = r.ReadUintLE(4) - if self.Timestamp == 0xffffff { - self.Timestamp, _ = r.ReadUintBE(4) - self.exted = true + chunkStream.Format = chunkStream.tmpFromat + chunkStream.Timestamp, _ = r.ReadUintBE(3) + chunkStream.Length, _ = r.ReadUintBE(3) + chunkStream.TypeID, _ = r.ReadUintBE(1) + chunkStream.StreamID, _ = r.ReadUintLE(4) + if chunkStream.Timestamp == 0xffffff { + chunkStream.Timestamp, _ = r.ReadUintBE(4) + chunkStream.exted = true } else { - self.exted = false + chunkStream.exted = false } - self.new(pool) + chunkStream.new(pool) case 1: - self.Format = self.tmpFromat + chunkStream.Format = chunkStream.tmpFromat timeStamp, _ := r.ReadUintBE(3) - self.Length, _ = r.ReadUintBE(3) - self.TypeID, _ = r.ReadUintBE(1) + chunkStream.Length, _ = r.ReadUintBE(3) + chunkStream.TypeID, _ = r.ReadUintBE(1) if timeStamp == 0xffffff { timeStamp, _ = r.ReadUintBE(4) - self.exted = true + chunkStream.exted = true } else { - self.exted = false + chunkStream.exted = false } - self.timeDelta = timeStamp - self.Timestamp += timeStamp - self.new(pool) + chunkStream.timeDelta = timeStamp + chunkStream.Timestamp += timeStamp + chunkStream.new(pool) case 2: - self.Format = self.tmpFromat + chunkStream.Format = chunkStream.tmpFromat timeStamp, _ := r.ReadUintBE(3) if timeStamp == 0xffffff { timeStamp, _ = r.ReadUintBE(4) - self.exted = true + chunkStream.exted = true } else { - self.exted = false + chunkStream.exted = false } - self.timeDelta = timeStamp - self.Timestamp += timeStamp - self.new(pool) + chunkStream.timeDelta = timeStamp + chunkStream.Timestamp += timeStamp + chunkStream.new(pool) case 3: - if self.remain == 0 { - switch self.Format { + if chunkStream.remain == 0 { + switch chunkStream.Format { case 0: - if self.exted { + if chunkStream.exted { timestamp, _ := r.ReadUintBE(4) - self.Timestamp = timestamp + chunkStream.Timestamp = timestamp } case 1, 2: var timedet uint32 - if self.exted { + if chunkStream.exted { timedet, _ = r.ReadUintBE(4) } else { - timedet = self.timeDelta + timedet = chunkStream.timeDelta } - self.Timestamp += timedet + chunkStream.Timestamp += timedet } - self.new(pool) + chunkStream.new(pool) } else { - if self.exted { + if chunkStream.exted { b, err := r.Peek(4) if err != nil { return err } tmpts := binary.BigEndian.Uint32(b) - if tmpts == self.Timestamp { + if tmpts == chunkStream.Timestamp { r.Discard(4) } } } default: - return fmt.Errorf("invalid format=%d", self.Format) + return fmt.Errorf("invalid format=%d", chunkStream.Format) } - size := int(self.remain) + size := int(chunkStream.remain) if size > int(chunkSize) { size = int(chunkSize) } - buf := self.Data[self.index: self.index+uint32(size)] + buf := chunkStream.Data[chunkStream.index: chunkStream.index+uint32(size)] if _, err := r.Read(buf); err != nil { return err } - self.index += uint32(size) - self.remain -= uint32(size) - if self.remain == 0 { - self.got = true + chunkStream.index += uint32(size) + chunkStream.remain -= uint32(size) + if chunkStream.remain == 0 { + chunkStream.got = true } return r.readError diff --git a/protocol/rtmp/core/conn.go b/protocol/rtmp/core/conn.go index 9c436839..8d6d3ccd 100755 --- a/protocol/rtmp/core/conn.go +++ b/protocol/rtmp/core/conn.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "net" "time" - "github.com/gwuhaolin/livego/utils/pool" "github.com/gwuhaolin/livego/utils/pio" ) @@ -45,103 +44,103 @@ func NewConn(c net.Conn, bufferSize int) *Conn { } } -func (self *Conn) Read(c *ChunkStream) error { +func (conn *Conn) Read(c *ChunkStream) error { for { - h, _ := self.rw.ReadUintBE(1) + h, _ := conn.rw.ReadUintBE(1) // if err != nil { // log.Println("read from conn error: ", err) // return err // } format := h >> 6 csid := h & 0x3f - cs, ok := self.chunks[csid] + cs, ok := conn.chunks[csid] if !ok { cs = ChunkStream{} - self.chunks[csid] = cs + conn.chunks[csid] = cs } cs.tmpFromat = format cs.CSID = csid - err := cs.readChunk(self.rw, self.remoteChunkSize, self.pool) + err := cs.readChunk(conn.rw, conn.remoteChunkSize, conn.pool) if err != nil { return err } - self.chunks[csid] = cs + conn.chunks[csid] = cs if cs.full() { *c = cs break } } - self.handleControlMsg(c) + conn.handleControlMsg(c) - self.ack(c.Length) + conn.ack(c.Length) return nil } -func (self *Conn) Write(c *ChunkStream) error { +func (conn *Conn) Write(c *ChunkStream) error { if c.TypeID == idSetChunkSize { - self.chunkSize = binary.BigEndian.Uint32(c.Data) + conn.chunkSize = binary.BigEndian.Uint32(c.Data) } - return c.writeChunk(self.rw, int(self.chunkSize)) + return c.writeChunk(conn.rw, int(conn.chunkSize)) } -func (self *Conn) Flush() error { - return self.rw.Flush() +func (conn *Conn) Flush() error { + return conn.rw.Flush() } -func (self *Conn) Close() error { - return self.Conn.Close() +func (conn *Conn) Close() error { + return conn.Conn.Close() } -func (self *Conn) RemoteAddr() net.Addr { - return self.Conn.RemoteAddr() +func (conn *Conn) RemoteAddr() net.Addr { + return conn.Conn.RemoteAddr() } -func (self *Conn) LocalAddr() net.Addr { - return self.Conn.LocalAddr() +func (conn *Conn) LocalAddr() net.Addr { + return conn.Conn.LocalAddr() } -func (self *Conn) SetDeadline(t time.Time) error { - return self.Conn.SetDeadline(t) +func (conn *Conn) SetDeadline(t time.Time) error { + return conn.Conn.SetDeadline(t) } -func (self *Conn) NewAck(size uint32) ChunkStream { +func (conn *Conn) NewAck(size uint32) ChunkStream { return initControlMsg(idAck, 4, size) } -func (self *Conn) NewSetChunkSize(size uint32) ChunkStream { +func (conn *Conn) NewSetChunkSize(size uint32) ChunkStream { return initControlMsg(idSetChunkSize, 4, size) } -func (self *Conn) NewWindowAckSize(size uint32) ChunkStream { +func (conn *Conn) NewWindowAckSize(size uint32) ChunkStream { return initControlMsg(idWindowAckSize, 4, size) } -func (self *Conn) NewSetPeerBandwidth(size uint32) ChunkStream { +func (conn *Conn) NewSetPeerBandwidth(size uint32) ChunkStream { ret := initControlMsg(idSetPeerBandwidth, 5, size) ret.Data[4] = 2 return ret } -func (self *Conn) handleControlMsg(c *ChunkStream) { +func (conn *Conn) handleControlMsg(c *ChunkStream) { if c.TypeID == idSetChunkSize { - self.remoteChunkSize = binary.BigEndian.Uint32(c.Data) + conn.remoteChunkSize = binary.BigEndian.Uint32(c.Data) } else if c.TypeID == idWindowAckSize { - self.remoteWindowAckSize = binary.BigEndian.Uint32(c.Data) + conn.remoteWindowAckSize = binary.BigEndian.Uint32(c.Data) } } -func (self *Conn) ack(size uint32) { - self.received += uint32(size) - self.ackReceived += uint32(size) - if self.received >= 0xf0000000 { - self.received = 0 +func (conn *Conn) ack(size uint32) { + conn.received += uint32(size) + conn.ackReceived += uint32(size) + if conn.received >= 0xf0000000 { + conn.received = 0 } - if self.ackReceived >= self.remoteWindowAckSize { - cs := self.NewAck(self.ackReceived) - cs.writeChunk(self.rw, int(self.chunkSize)) - self.ackReceived = 0 + if conn.ackReceived >= conn.remoteWindowAckSize { + cs := conn.NewAck(conn.ackReceived) + cs.writeChunk(conn.rw, int(conn.chunkSize)) + conn.ackReceived = 0 } } @@ -174,7 +173,7 @@ const ( +------------------------------+------------------------- Pay load for the ‘User Control Message’. */ -func (self *Conn) userControlMsg(eventType, buflen uint32) ChunkStream { +func (conn *Conn) userControlMsg(eventType, buflen uint32) ChunkStream { var ret ChunkStream buflen += 2 ret = ChunkStream{ @@ -190,18 +189,18 @@ func (self *Conn) userControlMsg(eventType, buflen uint32) ChunkStream { return ret } -func (self *Conn) SetBegin() { - ret := self.userControlMsg(streamBegin, 4) +func (conn *Conn) SetBegin() { + ret := conn.userControlMsg(streamBegin, 4) for i := 0; i < 4; i++ { ret.Data[2+i] = byte(1 >> uint32((3-i)*8) & 0xff) } - self.Write(&ret) + conn.Write(&ret) } -func (self *Conn) SetRecorded() { - ret := self.userControlMsg(streamIsRecorded, 4) +func (conn *Conn) SetRecorded() { + ret := conn.userControlMsg(streamIsRecorded, 4) for i := 0; i < 4; i++ { ret.Data[2+i] = byte(1 >> uint32((3-i)*8) & 0xff) } - self.Write(&ret) + conn.Write(&ret) } diff --git a/protocol/rtmp/core/conn_client.go b/protocol/rtmp/core/conn_client.go index 2c4889cc..c634670b 100755 --- a/protocol/rtmp/core/conn_client.go +++ b/protocol/rtmp/core/conn_client.go @@ -53,24 +53,24 @@ func NewConnClient() *ConnClient { } } -func (self *ConnClient) readRespMsg() error { +func (connClient *ConnClient) readRespMsg() error { var err error var rc ChunkStream for { - if err = self.conn.Read(&rc); err != nil { + if err = connClient.conn.Read(&rc); err != nil { return err } switch rc.TypeID { case 20, 17: r := bytes.NewReader(rc.Data) - vs, err := self.decoder.DecodeBatch(r, amf.AMF0) + vs, err := connClient.decoder.DecodeBatch(r, amf.AMF0) if err != nil && err != io.EOF { return err } for k, v := range vs { switch v.(type) { case string: - switch self.curcmdName { + switch connClient.curcmdName { case cmdConnect, cmdCreateStream: if v.(string) != respResult { return ErrFail @@ -81,15 +81,15 @@ func (self *ConnClient) readRespMsg() error { } } case float64: - switch self.curcmdName { + switch connClient.curcmdName { case cmdConnect, cmdCreateStream: id := int(v.(float64)) if k == 1 { - if id != self.transID { + if id != connClient.transID { return ErrFail } } else if k == 3 { - self.streamid = uint32(id) + connClient.streamid = uint32(id) } case cmdPublish: if int(v.(float64)) != 0 { @@ -98,7 +98,7 @@ func (self *ConnClient) readRespMsg() error { } case amf.Object: objmap := v.(amf.Object) - switch self.curcmdName { + switch connClient.curcmdName { case cmdConnect: code, ok := objmap["code"] if ok && code.(string) != connectSuccess { @@ -117,83 +117,83 @@ func (self *ConnClient) readRespMsg() error { } } -func (self *ConnClient) writeMsg(args ...interface{}) error { - self.bytesw.Reset() +func (connClient *ConnClient) writeMsg(args ...interface{}) error { + connClient.bytesw.Reset() for _, v := range args { - if _, err := self.encoder.Encode(self.bytesw, v, amf.AMF0); err != nil { + if _, err := connClient.encoder.Encode(connClient.bytesw, v, amf.AMF0); err != nil { return err } } - msg := self.bytesw.Bytes() + msg := connClient.bytesw.Bytes() c := ChunkStream{ Format: 0, CSID: 3, Timestamp: 0, TypeID: 20, - StreamID: self.streamid, + StreamID: connClient.streamid, Length: uint32(len(msg)), Data: msg, } - self.conn.Write(&c) - return self.conn.Flush() + connClient.conn.Write(&c) + return connClient.conn.Flush() } -func (self *ConnClient) writeConnectMsg() error { +func (connClient *ConnClient) writeConnectMsg() error { event := make(amf.Object) - event["app"] = self.app + event["app"] = connClient.app event["type"] = "nonprivate" event["flashVer"] = "FMS.3.1" - event["tcUrl"] = self.tcurl - self.curcmdName = cmdConnect + event["tcUrl"] = connClient.tcurl + connClient.curcmdName = cmdConnect - if err := self.writeMsg(cmdConnect, self.transID, event); err != nil { + if err := connClient.writeMsg(cmdConnect, connClient.transID, event); err != nil { return err } - return self.readRespMsg() + return connClient.readRespMsg() } -func (self *ConnClient) writeCreateStreamMsg() error { - self.transID++ - self.curcmdName = cmdCreateStream - if err := self.writeMsg(cmdCreateStream, self.transID, nil); err != nil { +func (connClient *ConnClient) writeCreateStreamMsg() error { + connClient.transID++ + connClient.curcmdName = cmdCreateStream + if err := connClient.writeMsg(cmdCreateStream, connClient.transID, nil); err != nil { return err } - return self.readRespMsg() + return connClient.readRespMsg() } -func (self *ConnClient) writePublishMsg() error { - self.transID++ - self.curcmdName = cmdPublish - if err := self.writeMsg(cmdPublish, self.transID, nil, self.title, publishLive); err != nil { +func (connClient *ConnClient) writePublishMsg() error { + connClient.transID++ + connClient.curcmdName = cmdPublish + if err := connClient.writeMsg(cmdPublish, connClient.transID, nil, connClient.title, publishLive); err != nil { return err } - return self.readRespMsg() + return connClient.readRespMsg() } -func (self *ConnClient) writePlayMsg() error { - self.transID++ - self.curcmdName = cmdPlay - if err := self.writeMsg(cmdPlay, 0, nil, self.title); err != nil { +func (connClient *ConnClient) writePlayMsg() error { + connClient.transID++ + connClient.curcmdName = cmdPlay + if err := connClient.writeMsg(cmdPlay, 0, nil, connClient.title); err != nil { return err } - return self.readRespMsg() + return connClient.readRespMsg() } -func (self *ConnClient) Start(url string, method string) error { +func (connClient *ConnClient) Start(url string, method string) error { u, err := neturl.Parse(url) if err != nil { return err } - self.url = url + connClient.url = url path := strings.TrimLeft(u.Path, "/") ps := strings.SplitN(path, "/", 2) if len(ps) != 2 { return fmt.Errorf("u path err: %s", path) } - self.app = ps[0] - self.title = ps[1] - self.query = u.RawQuery - self.tcurl = "rtmp://" + u.Host + "/" + self.app + connClient.app = ps[0] + connClient.title = ps[1] + connClient.query = u.RawQuery + connClient.tcurl = "rtmp://" + u.Host + "/" + connClient.app port := ":1935" host := u.Host localIP := ":0" @@ -235,22 +235,22 @@ func (self *ConnClient) Start(url string, method string) error { log.Println("connection:", "local:", conn.LocalAddr(), "remote:", conn.RemoteAddr()) - self.conn = NewConn(conn, 4*1024) - if err := self.conn.HandshakeClient(); err != nil { + connClient.conn = NewConn(conn, 4*1024) + if err := connClient.conn.HandshakeClient(); err != nil { return err } - if err := self.writeConnectMsg(); err != nil { + if err := connClient.writeConnectMsg(); err != nil { return err } - if err := self.writeCreateStreamMsg(); err != nil { + if err := connClient.writeCreateStreamMsg(); err != nil { return err } if method == av.PUBLISH { - if err := self.writePublishMsg(); err != nil { + if err := connClient.writePublishMsg(); err != nil { return err } } else if method == av.PLAY { - if err := self.writePlayMsg(); err != nil { + if err := connClient.writePlayMsg(); err != nil { return err } } @@ -258,7 +258,7 @@ func (self *ConnClient) Start(url string, method string) error { return nil } -func (self *ConnClient) Write(c ChunkStream) error { +func (connClient *ConnClient) Write(c ChunkStream) error { if c.TypeID == av.TAG_SCRIPTDATAAMF0 || c.TypeID == av.TAG_SCRIPTDATAAMF3 { var err error @@ -267,20 +267,20 @@ func (self *ConnClient) Write(c ChunkStream) error { } c.Length = uint32(len(c.Data)) } - return self.conn.Write(&c) + return connClient.conn.Write(&c) } -func (self *ConnClient) Read(c *ChunkStream) (err error) { - return self.conn.Read(c) +func (connClient *ConnClient) Read(c *ChunkStream) (err error) { + return connClient.conn.Read(c) } -func (self *ConnClient) GetInfo() (app string, name string, url string) { - app = self.app - name = self.title - url = self.url +func (connClient *ConnClient) GetInfo() (app string, name string, url string) { + app = connClient.app + name = connClient.title + url = connClient.url return } -func (self *ConnClient) Close(err error) { - self.conn.Close() +func (connClient *ConnClient) Close(err error) { + connClient.conn.Close() } diff --git a/protocol/rtmp/core/conn_server.go b/protocol/rtmp/core/conn_server.go index 75a69bdf..5858f5fa 100755 --- a/protocol/rtmp/core/conn_server.go +++ b/protocol/rtmp/core/conn_server.go @@ -84,14 +84,14 @@ func NewConnServer(conn *Conn) *ConnServer { } } -func (self *ConnServer) writeMsg(csid, streamID uint32, args ...interface{}) error { - self.bytesw.Reset() +func (connServer *ConnServer) writeMsg(csid, streamID uint32, args ...interface{}) error { + connServer.bytesw.Reset() for _, v := range args { - if _, err := self.encoder.Encode(self.bytesw, v, amf.AMF0); err != nil { + if _, err := connServer.encoder.Encode(connServer.bytesw, v, amf.AMF0); err != nil { return err } } - msg := self.bytesw.Bytes() + msg := connServer.bytesw.Bytes() c := ChunkStream{ Format: 0, CSID: csid, @@ -101,11 +101,11 @@ func (self *ConnServer) writeMsg(csid, streamID uint32, args ...interface{}) err Length: uint32(len(msg)), Data: msg, } - self.conn.Write(&c) - return self.conn.Flush() + connServer.conn.Write(&c) + return connServer.conn.Flush() } -func (self *ConnServer) connect(vs []interface{}) error { +func (connServer *ConnServer) connect(vs []interface{}) error { for _, v := range vs { switch v.(type) { case string: @@ -114,41 +114,41 @@ func (self *ConnServer) connect(vs []interface{}) error { if id != 1 { return ErrReq } - self.transactionID = id + connServer.transactionID = id case amf.Object: obimap := v.(amf.Object) if app, ok := obimap["app"]; ok { - self.ConnInfo.App = app.(string) + connServer.ConnInfo.App = app.(string) } if flashVer, ok := obimap["flashVer"]; ok { - self.ConnInfo.Flashver = flashVer.(string) + connServer.ConnInfo.Flashver = flashVer.(string) } if tcurl, ok := obimap["tcUrl"]; ok { - self.ConnInfo.TcUrl = tcurl.(string) + connServer.ConnInfo.TcUrl = tcurl.(string) } if encoding, ok := obimap["objectEncoding"]; ok { - self.ConnInfo.ObjectEncoding = int(encoding.(float64)) + connServer.ConnInfo.ObjectEncoding = int(encoding.(float64)) } } } return nil } -func (self *ConnServer) releaseStream(vs []interface{}) error { +func (connServer *ConnServer) releaseStream(vs []interface{}) error { return nil } -func (self *ConnServer) fcPublish(vs []interface{}) error { +func (connServer *ConnServer) fcPublish(vs []interface{}) error { return nil } -func (self *ConnServer) connectResp(cur *ChunkStream) error { - c := self.conn.NewWindowAckSize(2500000) - self.conn.Write(&c) - c = self.conn.NewSetPeerBandwidth(2500000) - self.conn.Write(&c) - c = self.conn.NewSetChunkSize(uint32(1024)) - self.conn.Write(&c) +func (connServer *ConnServer) connectResp(cur *ChunkStream) error { + c := connServer.conn.NewWindowAckSize(2500000) + connServer.conn.Write(&c) + c = connServer.conn.NewSetPeerBandwidth(2500000) + connServer.conn.Write(&c) + c = connServer.conn.NewSetChunkSize(uint32(1024)) + connServer.conn.Write(&c) resp := make(amf.Object) resp["fmsVer"] = "FMS/3,0,1,123" @@ -158,38 +158,38 @@ func (self *ConnServer) connectResp(cur *ChunkStream) error { event["level"] = "status" event["code"] = "NetConnection.Connect.Success" event["description"] = "Connection succeeded." - event["objectEncoding"] = self.ConnInfo.ObjectEncoding - return self.writeMsg(cur.CSID, cur.StreamID, "_result", self.transactionID, resp, event) + event["objectEncoding"] = connServer.ConnInfo.ObjectEncoding + return connServer.writeMsg(cur.CSID, cur.StreamID, "_result", connServer.transactionID, resp, event) } -func (self *ConnServer) createStream(vs []interface{}) error { +func (connServer *ConnServer) createStream(vs []interface{}) error { for _, v := range vs { switch v.(type) { case string: case float64: - self.transactionID = int(v.(float64)) + connServer.transactionID = int(v.(float64)) case amf.Object: } } return nil } -func (self *ConnServer) createStreamResp(cur *ChunkStream) error { - return self.writeMsg(cur.CSID, cur.StreamID, "_result", self.transactionID, nil, self.streamID) +func (connServer *ConnServer) createStreamResp(cur *ChunkStream) error { + return connServer.writeMsg(cur.CSID, cur.StreamID, "_result", connServer.transactionID, nil, connServer.streamID) } -func (self *ConnServer) publishOrPlay(vs []interface{}) error { +func (connServer *ConnServer) publishOrPlay(vs []interface{}) error { for k, v := range vs { switch v.(type) { case string: if k == 2 { - self.PublishInfo.Name = v.(string) + connServer.PublishInfo.Name = v.(string) } else if k == 3 { - self.PublishInfo.Type = v.(string) + connServer.PublishInfo.Type = v.(string) } case float64: id := int(v.(float64)) - self.transactionID = id + connServer.transactionID = id case amf.Object: } } @@ -197,56 +197,56 @@ func (self *ConnServer) publishOrPlay(vs []interface{}) error { return nil } -func (self *ConnServer) publishResp(cur *ChunkStream) error { +func (connServer *ConnServer) publishResp(cur *ChunkStream) error { event := make(amf.Object) event["level"] = "status" event["code"] = "NetStream.Publish.Start" event["description"] = "Start publising." - return self.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event) + return connServer.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event) } -func (self *ConnServer) playResp(cur *ChunkStream) error { - self.conn.SetRecorded() - self.conn.SetBegin() +func (connServer *ConnServer) playResp(cur *ChunkStream) error { + connServer.conn.SetRecorded() + connServer.conn.SetBegin() event := make(amf.Object) event["level"] = "status" event["code"] = "NetStream.Play.Reset" event["description"] = "Playing and resetting stream." - if err := self.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event); err != nil { + if err := connServer.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event); err != nil { return err } event["level"] = "status" event["code"] = "NetStream.Play.Start" event["description"] = "Started playing stream." - if err := self.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event); err != nil { + if err := connServer.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event); err != nil { return err } event["level"] = "status" event["code"] = "NetStream.Data.Start" event["description"] = "Started playing stream." - if err := self.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event); err != nil { + if err := connServer.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event); err != nil { return err } event["level"] = "status" event["code"] = "NetStream.Play.PublishNotify" event["description"] = "Started playing notify." - if err := self.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event); err != nil { + if err := connServer.writeMsg(cur.CSID, cur.StreamID, "onStatus", 0, nil, event); err != nil { return err } - return self.conn.Flush() + return connServer.conn.Flush() } -func (self *ConnServer) handleCmdMsg(c *ChunkStream) error { +func (connServer *ConnServer) handleCmdMsg(c *ChunkStream) error { amfType := amf.AMF0 if c.TypeID == 17 { c.Data = c.Data[1:] } r := bytes.NewReader(c.Data) - vs, err := self.decoder.DecodeBatch(r, amf.Version(amfType)) + vs, err := connServer.decoder.DecodeBatch(r, amf.Version(amfType)) if err != nil && err != io.EOF { return err } @@ -255,43 +255,43 @@ func (self *ConnServer) handleCmdMsg(c *ChunkStream) error { case string: switch vs[0].(string) { case cmdConnect: - if err = self.connect(vs[1:]); err != nil { + if err = connServer.connect(vs[1:]); err != nil { return err } - if err = self.connectResp(c); err != nil { + if err = connServer.connectResp(c); err != nil { return err } case cmdCreateStream: - if err = self.createStream(vs[1:]); err != nil { + if err = connServer.createStream(vs[1:]); err != nil { return err } - if err = self.createStreamResp(c); err != nil { + if err = connServer.createStreamResp(c); err != nil { return err } case cmdPublish: - if err = self.publishOrPlay(vs[1:]); err != nil { + if err = connServer.publishOrPlay(vs[1:]); err != nil { return err } - if err = self.publishResp(c); err != nil { + if err = connServer.publishResp(c); err != nil { return err } - self.done = true - self.isPublisher = true + connServer.done = true + connServer.isPublisher = true log.Println("handle publish req done") case cmdPlay: - if err = self.publishOrPlay(vs[1:]); err != nil { + if err = connServer.publishOrPlay(vs[1:]); err != nil { return err } - if err = self.playResp(c); err != nil { + if err = connServer.playResp(c); err != nil { return err } - self.done = true - self.isPublisher = false + connServer.done = true + connServer.isPublisher = false log.Println("handle play req done") case cmdFcpublish: - self.fcPublish(vs) + connServer.fcPublish(vs) case cmdReleaseStream: - self.releaseStream(vs) + connServer.releaseStream(vs) case cmdFCUnpublish: case cmdDeleteStream: default: @@ -302,30 +302,30 @@ func (self *ConnServer) handleCmdMsg(c *ChunkStream) error { return nil } -func (self *ConnServer) ReadMsg() error { +func (connServer *ConnServer) ReadMsg() error { var c ChunkStream for { - if err := self.conn.Read(&c); err != nil { + if err := connServer.conn.Read(&c); err != nil { return err } switch c.TypeID { case 20, 17: - if err := self.handleCmdMsg(&c); err != nil { + if err := connServer.handleCmdMsg(&c); err != nil { return err } } - if self.done { + if connServer.done { break } } return nil } -func (self *ConnServer) IsPublisher() bool { - return self.isPublisher +func (connServer *ConnServer) IsPublisher() bool { + return connServer.isPublisher } -func (self *ConnServer) Write(c ChunkStream) error { +func (connServer *ConnServer) Write(c ChunkStream) error { if c.TypeID == av.TAG_SCRIPTDATAAMF0 || c.TypeID == av.TAG_SCRIPTDATAAMF3 { var err error @@ -334,20 +334,20 @@ func (self *ConnServer) Write(c ChunkStream) error { } c.Length = uint32(len(c.Data)) } - return self.conn.Write(&c) + return connServer.conn.Write(&c) } -func (self *ConnServer) Read(c *ChunkStream) (err error) { - return self.conn.Read(c) +func (connServer *ConnServer) Read(c *ChunkStream) (err error) { + return connServer.conn.Read(c) } -func (self *ConnServer) GetInfo() (app string, name string, url string) { - app = self.ConnInfo.App - name = self.PublishInfo.Name - url = self.ConnInfo.TcUrl + "/" + self.PublishInfo.Name +func (connServer *ConnServer) GetInfo() (app string, name string, url string) { + app = connServer.ConnInfo.App + name = connServer.PublishInfo.Name + url = connServer.ConnInfo.TcUrl + "/" + connServer.PublishInfo.Name return } -func (self *ConnServer) Close(err error) { - self.conn.Close() +func (connServer *ConnServer) Close(err error) { + connServer.conn.Close() } diff --git a/protocol/rtmp/core/conn_test.go b/protocol/rtmp/core/conn_test.go index df12260e..b3ff97ba 100755 --- a/protocol/rtmp/core/conn_test.go +++ b/protocol/rtmp/core/conn_test.go @@ -4,7 +4,6 @@ import ( "bytes" "io" "testing" - "github.com/stretchr/testify/assert" "github.com/gwuhaolin/livego/utils/pool" ) diff --git a/protocol/rtmp/core/handshake.go b/protocol/rtmp/core/handshake.go index fa90b6b4..4e22ea73 100755 --- a/protocol/rtmp/core/handshake.go +++ b/protocol/rtmp/core/handshake.go @@ -97,7 +97,7 @@ func hsCreate2(p []byte, key []byte) { copy(p[gap:], digest) } -func (self *Conn) HandshakeClient() (err error) { +func (conn *Conn) HandshakeClient() (err error) { var random [(1 + 1536*2) * 2]byte C0C1C2 := random[:1536*2+1] @@ -109,18 +109,18 @@ func (self *Conn) HandshakeClient() (err error) { C0[0] = 3 // > C0C1 - self.Conn.SetDeadline(time.Now().Add(timeout)) - if _, err = self.rw.Write(C0C1); err != nil { + conn.Conn.SetDeadline(time.Now().Add(timeout)) + if _, err = conn.rw.Write(C0C1); err != nil { return } - self.Conn.SetDeadline(time.Now().Add(timeout)) - if err = self.rw.Flush(); err != nil { + conn.Conn.SetDeadline(time.Now().Add(timeout)) + if err = conn.rw.Flush(); err != nil { return } // < S0S1S2 - self.Conn.SetDeadline(time.Now().Add(timeout)) - if _, err = io.ReadFull(self.rw, S0S1S2); err != nil { + conn.Conn.SetDeadline(time.Now().Add(timeout)) + if _, err = io.ReadFull(conn.rw, S0S1S2); err != nil { return } @@ -132,15 +132,15 @@ func (self *Conn) HandshakeClient() (err error) { } // > C2 - self.Conn.SetDeadline(time.Now().Add(timeout)) - if _, err = self.rw.Write(C2); err != nil { + conn.Conn.SetDeadline(time.Now().Add(timeout)) + if _, err = conn.rw.Write(C2); err != nil { return } - self.Conn.SetDeadline(time.Time{}) + conn.Conn.SetDeadline(time.Time{}) return } -func (self *Conn) HandshakeServer() (err error) { +func (conn *Conn) HandshakeServer() (err error) { var random [(1 + 1536*2) * 2]byte C0C1C2 := random[:1536*2+1] @@ -156,11 +156,11 @@ func (self *Conn) HandshakeServer() (err error) { S2 := S0S1S2[1536+1:] // < C0C1 - self.Conn.SetDeadline(time.Now().Add(timeout)) - if _, err = io.ReadFull(self.rw, C0C1); err != nil { + conn.Conn.SetDeadline(time.Now().Add(timeout)) + if _, err = io.ReadFull(conn.rw, C0C1); err != nil { return } - self.Conn.SetDeadline(time.Now().Add(timeout)) + conn.Conn.SetDeadline(time.Now().Add(timeout)) if C0[0] != 3 { err = fmt.Errorf("rtmp: handshake version=%d invalid", C0[0]) return @@ -188,20 +188,20 @@ func (self *Conn) HandshakeServer() (err error) { } // > S0S1S2 - self.Conn.SetDeadline(time.Now().Add(timeout)) - if _, err = self.rw.Write(S0S1S2); err != nil { + conn.Conn.SetDeadline(time.Now().Add(timeout)) + if _, err = conn.rw.Write(S0S1S2); err != nil { return } - self.Conn.SetDeadline(time.Now().Add(timeout)) - if err = self.rw.Flush(); err != nil { + conn.Conn.SetDeadline(time.Now().Add(timeout)) + if err = conn.rw.Flush(); err != nil { return } // < C2 - self.Conn.SetDeadline(time.Now().Add(timeout)) - if _, err = io.ReadFull(self.rw, C2); err != nil { + conn.Conn.SetDeadline(time.Now().Add(timeout)) + if _, err = io.ReadFull(conn.rw, C2); err != nil { return } - self.Conn.SetDeadline(time.Time{}) + conn.Conn.SetDeadline(time.Time{}) return } diff --git a/protocol/rtmp/rtmp.go b/protocol/rtmp/rtmp.go index 32153f06..1f791fe8 100755 --- a/protocol/rtmp/rtmp.go +++ b/protocol/rtmp/rtmp.go @@ -3,20 +3,15 @@ package rtmp import ( "net" "time" - "net/url" - "strings" - "errors" - "flag" - + "log" "github.com/gwuhaolin/livego/av" "github.com/gwuhaolin/livego/utils/uid" "github.com/gwuhaolin/livego/container/flv" "github.com/gwuhaolin/livego/protocol/rtmp/core" - "log" ) const ( @@ -40,27 +35,27 @@ func NewRtmpClient(h av.Handler, getter av.GetWriter) *Client { } } -func (self *Client) Dial(url string, method string) error { +func (c *Client) Dial(url string, method string) error { connClient := core.NewConnClient() if err := connClient.Start(url, method); err != nil { return err } if method == av.PUBLISH { writer := NewVirWriter(connClient) - self.handler.HandleWriter(writer) + c.handler.HandleWriter(writer) } else if method == av.PLAY { reader := NewVirReader(connClient) - self.handler.HandleReader(reader) - if self.getter != nil { - writer := self.getter.GetWriter(reader.Info()) - self.handler.HandleWriter(writer) + c.handler.HandleReader(reader) + if c.getter != nil { + writer := c.getter.GetWriter(reader.Info()) + c.handler.HandleWriter(writer) } } return nil } -func (self *Client) GetHandle() av.Handler { - return self.handler +func (c *Client) GetHandle() av.Handler { + return c.handler } type Server struct { @@ -75,7 +70,7 @@ func NewRtmpServer(h av.Handler, getter av.GetWriter) *Server { } } -func (self *Server) Serve(listener net.Listener) (err error) { +func (s *Server) Serve(listener net.Listener) (err error) { defer func() { if r := recover(); r != nil { log.Println("rtmp serve panic: ", r) @@ -91,11 +86,11 @@ func (self *Server) Serve(listener net.Listener) (err error) { conn := core.NewConn(netconn, 4*1024) log.Println("new client, connect remote:", conn.RemoteAddr().String(), "local:", conn.LocalAddr().String()) - go self.handleConn(conn) + go s.handleConn(conn) } } -func (self *Server) handleConn(conn *core.Conn) error { +func (s *Server) handleConn(conn *core.Conn) error { if err := conn.HandshakeServer(); err != nil { conn.Close() log.Println("handleConn HandshakeServer err:", err) @@ -110,17 +105,17 @@ func (self *Server) handleConn(conn *core.Conn) error { } if connServer.IsPublisher() { reader := NewVirReader(connServer) - self.handler.HandleReader(reader) + s.handler.HandleReader(reader) log.Printf("new publisher: %+v", reader.Info()) - if self.getter != nil { - writer := self.getter.GetWriter(reader.Info()) - self.handler.HandleWriter(writer) + if s.getter != nil { + writer := s.getter.GetWriter(reader.Info()) + s.handler.HandleWriter(writer) } } else { writer := NewVirWriter(connServer) log.Printf("new player: %+v", writer.Info()) - self.handler.HandleWriter(writer) + s.handler.HandleWriter(writer) } return nil @@ -162,17 +157,17 @@ func NewVirWriter(conn StreamReadWriteCloser) *VirWriter { return ret } -func (self *VirWriter) Check() { +func (v *VirWriter) Check() { var c core.ChunkStream for { - if err := self.conn.Read(&c); err != nil { - self.Close(err) + if err := v.conn.Read(&c); err != nil { + v.Close(err) return } } } -func (self *VirWriter) DropPacket(pktQue chan av.Packet, info av.Info) { +func (v *VirWriter) DropPacket(pktQue chan av.Packet, info av.Info) { log.Printf("[%v] packet queue max!!!", info) for i := 0; i < maxQueueNum-84; i++ { tmpPkt, ok := <-pktQue @@ -204,12 +199,12 @@ func (self *VirWriter) DropPacket(pktQue chan av.Packet, info av.Info) { } // -func (self *VirWriter) Write(p av.Packet) error { - if !self.closed { - if len(self.packetQueue) >= maxQueueNum-24 { - self.DropPacket(self.packetQueue, self.Info()) +func (v *VirWriter) Write(p av.Packet) error { + if !v.closed { + if len(v.packetQueue) >= maxQueueNum-24 { + v.DropPacket(v.packetQueue, v.Info()) } else { - self.packetQueue <- p + v.packetQueue <- p } return nil } else { @@ -217,16 +212,16 @@ func (self *VirWriter) Write(p av.Packet) error { } } -func (self *VirWriter) SendPacket() error { +func (v *VirWriter) SendPacket() error { var cs core.ChunkStream for { - p, ok := <-self.packetQueue + p, ok := <-v.packetQueue if ok { cs.Data = p.Data cs.Length = uint32(len(p.Data)) cs.StreamID = 1 cs.Timestamp = p.TimeStamp - cs.Timestamp += self.BaseTimeStamp() + cs.Timestamp += v.BaseTimeStamp() if p.IsVideo { cs.TypeID = av.TAG_VIDEO @@ -238,11 +233,11 @@ func (self *VirWriter) SendPacket() error { } } - self.SetPreTime() - self.RecTimeStamp(cs.Timestamp, cs.TypeID) - err := self.conn.Write(cs) + v.SetPreTime() + v.RecTimeStamp(cs.Timestamp, cs.TypeID) + err := v.conn.Write(cs) if err != nil { - self.closed = true + v.closed = true return err } } else { @@ -253,9 +248,9 @@ func (self *VirWriter) SendPacket() error { return nil } -func (self *VirWriter) Info() (ret av.Info) { - ret.UID = self.Uid - _, _, URL := self.conn.GetInfo() +func (v *VirWriter) Info() (ret av.Info) { + ret.UID = v.Uid + _, _, URL := v.conn.GetInfo() ret.URL = URL _url, err := url.Parse(URL) if err != nil { @@ -266,13 +261,13 @@ func (self *VirWriter) Info() (ret av.Info) { return } -func (self *VirWriter) Close(err error) { - log.Println("player ", self.Info(), "closed: "+err.Error()) - if !self.closed { - close(self.packetQueue) +func (v *VirWriter) Close(err error) { + log.Println("player ", v.Info(), "closed: "+err.Error()) + if !v.closed { + close(v.packetQueue) } - self.closed = true - self.conn.Close(err) + v.closed = true + v.conn.Close(err) } type VirReader struct { @@ -291,17 +286,17 @@ func NewVirReader(conn StreamReadWriteCloser) *VirReader { } } -func (self *VirReader) Read(p *av.Packet) (err error) { +func (v *VirReader) Read(p *av.Packet) (err error) { defer func() { if r := recover(); r != nil { log.Println("rtmp read packet panic: ", r) } }() - self.SetPreTime() + v.SetPreTime() var cs core.ChunkStream for { - err = self.conn.Read(&cs) + err = v.conn.Read(&cs) if err != nil { return err } @@ -315,16 +310,16 @@ func (self *VirReader) Read(p *av.Packet) (err error) { p.IsAudio = cs.TypeID == av.TAG_AUDIO p.IsVideo = cs.TypeID == av.TAG_VIDEO - p.IsMetadata = (cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3) + p.IsMetadata = cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3 p.Data = cs.Data p.TimeStamp = cs.Timestamp - self.demuxer.DemuxH(p) + v.demuxer.DemuxH(p) return err } -func (self *VirReader) Info() (ret av.Info) { - ret.UID = self.Uid - _, _, URL := self.conn.GetInfo() +func (v *VirReader) Info() (ret av.Info) { + ret.UID = v.Uid + _, _, URL := v.conn.GetInfo() ret.URL = URL _url, err := url.Parse(URL) if err != nil { @@ -334,7 +329,7 @@ func (self *VirReader) Info() (ret av.Info) { return } -func (self *VirReader) Close(err error) { - log.Println("publisher ", self.Info(), "closed: "+err.Error()) - self.conn.Close(err) +func (v *VirReader) Close(err error) { + log.Println("publisher ", v.Info(), "closed: "+err.Error()) + v.conn.Close(err) } diff --git a/protocol/rtmp/stream.go b/protocol/rtmp/stream.go index 3f1541fd..234a8b2b 100755 --- a/protocol/rtmp/stream.go +++ b/protocol/rtmp/stream.go @@ -3,11 +3,10 @@ package rtmp import ( "errors" "time" - + "log" "github.com/gwuhaolin/livego/utils/cmap" "github.com/gwuhaolin/livego/av" "github.com/gwuhaolin/livego/protocol/rtmp/cache" - "log" ) var ( diff --git a/protocol/rtp/rtp.go b/protocol/rtp/rtp.go deleted file mode 100755 index 4b249520..00000000 --- a/protocol/rtp/rtp.go +++ /dev/null @@ -1 +0,0 @@ -package rtp diff --git a/protocol/rtsp/protocol.go b/protocol/rtsp/protocol.go deleted file mode 100755 index 185db25a..00000000 --- a/protocol/rtsp/protocol.go +++ /dev/null @@ -1 +0,0 @@ -package rtsp diff --git a/protocol/rtsp/rtsp.go b/protocol/rtsp/rtsp.go deleted file mode 100755 index 185db25a..00000000 --- a/protocol/rtsp/rtsp.go +++ /dev/null @@ -1 +0,0 @@ -package rtsp diff --git a/protocol/webrtc/webrtc.go b/protocol/webrtc/webrtc.go deleted file mode 100755 index efb4a82d..00000000 --- a/protocol/webrtc/webrtc.go +++ /dev/null @@ -1 +0,0 @@ -package webrtc diff --git a/utils/cmap/cmap.go b/utils/cmap/cmap.go index 764a1dfd..bb382bd7 100755 --- a/utils/cmap/cmap.go +++ b/utils/cmap/cmap.go @@ -277,25 +277,3 @@ func fnv32(key string) uint32 { } return hash } - -// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal -// will probably won't know which to type to unmarshal into, in such case -// we'll end up with a value of type map[string]interface{}, In most cases this isn't -// out value type, this is why we've decided to remove this functionality. - -// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) { -// // Reverse process of Marshal. - -// tmp := make(map[string]interface{}) - -// // Unmarshal into a single map. -// if err := json.Unmarshal(b, &tmp); err != nil { -// return nil -// } - -// // foreach key,value pair in temporary map insert into our concurrent map. -// for key, val := range tmp { -// m.Set(key, val) -// } -// return nil -// } diff --git a/utils/pool/pool.go b/utils/pool/pool.go index cd5c16dc..54cadefb 100755 --- a/utils/pool/pool.go +++ b/utils/pool/pool.go @@ -7,13 +7,13 @@ type Pool struct { const maxpoolsize = 500 * 1024 -func (self *Pool) Get(size int) []byte { - if maxpoolsize-self.pos < size { - self.pos = 0 - self.buf = make([]byte, maxpoolsize) +func (pool *Pool) Get(size int) []byte { + if maxpoolsize-pool.pos < size { + pool.pos = 0 + pool.buf = make([]byte, maxpoolsize) } - b := self.buf[self.pos: self.pos+size] - self.pos += size + b := pool.buf[pool.pos: pool.pos+size] + pool.pos += size return b } diff --git a/utils/queue/queue.go b/utils/queue/queue.go index fc82ff9b..e601133b 100755 --- a/utils/queue/queue.go +++ b/utils/queue/queue.go @@ -2,7 +2,6 @@ package queue import ( "sync" - "github.com/gwuhaolin/livego/av" )