diff --git a/av/av.go b/av/av.go index 18895c94..82b3b294 100755 --- a/av/av.go +++ b/av/av.go @@ -62,6 +62,7 @@ type Packet struct { IsVideo bool IsMetadata bool TimeStamp uint32 // dts + StreamID uint32 Header PacketHeader Data []byte } @@ -148,5 +149,5 @@ type WriteCloser interface { Closer Alive CalcTime - Write(Packet) error + Write(*Packet) error } diff --git a/configure/liveconfig.go b/configure/liveconfig.go new file mode 100644 index 00000000..68305735 --- /dev/null +++ b/configure/liveconfig.go @@ -0,0 +1,74 @@ +package configure + +import ( + "encoding/json" + "io/ioutil" + "log" +) + +/* +{ + [ + { + "application":"live", + "live":"on", + "hls":"on", + "static_push":["rtmp://xx/live"] + } + ] +} +*/ +type Application struct { + Appname string + Liveon string + Hlson string + Static_push []string +} + +type ServerCfg struct { + Server []Application +} + +var RtmpServercfg ServerCfg + +func LoadConfig(configfilename string) error { + log.Printf("starting load configure file(%s)......", configfilename) + data, err := ioutil.ReadFile(configfilename) + if err != nil { + log.Printf("ReadFile %s error:%v", configfilename, err) + return err + } + + log.Printf("loadconfig: \r\n%s", string(data)) + + err = json.Unmarshal(data, &RtmpServercfg) + if err != nil { + log.Printf("json.Unmarshal error:%v", err) + return err + } + log.Printf("get config json data:%v", RtmpServercfg) + return nil +} + +func CheckAppName(appname string) bool { + for _, app := range RtmpServercfg.Server { + if (app.Appname == appname) && (app.Liveon == "on") { + return true + } + } + return false +} + +func GetStaticPushUrlList(appname string) ([]string, bool) { + for _, app := range RtmpServercfg.Server { + if (app.Appname == appname) && (app.Liveon == "on") { + if len(app.Static_push) > 0 { + return app.Static_push, true + } else { + return nil, false + } + } + + } + return nil, false +} diff --git a/container/flv/muxer.go b/container/flv/muxer.go index c3954026..452145cb 100755 --- a/container/flv/muxer.go +++ b/container/flv/muxer.go @@ -72,7 +72,7 @@ func NewFLVWriter(app, title, url string, ctx *os.File) *FLVWriter { return ret } -func (writer *FLVWriter) Write(p av.Packet) error { +func (writer *FLVWriter) Write(p *av.Packet) error { writer.RWBaser.SetPreTime() h := writer.buf[:headerLen] typeID := av.TAG_VIDEO diff --git a/livego.cfg b/livego.cfg new file mode 100644 index 00000000..b9ddbef0 --- /dev/null +++ b/livego.cfg @@ -0,0 +1,10 @@ +{ + "server": [ + { + "appname":"live", + "liveon":"on", + "hlson":"on" + } + ] +} + diff --git a/main.go b/livego.go similarity index 69% rename from main.go rename to livego.go index 66a627fd..a6e2976f 100755 --- a/main.go +++ b/livego.go @@ -2,21 +2,23 @@ package main import ( "flag" - "net" - "time" - "log" - "github.com/gwuhaolin/livego/protocol/rtmp" + "github.com/gwuhaolin/livego/configure" "github.com/gwuhaolin/livego/protocol/hls" "github.com/gwuhaolin/livego/protocol/httpflv" "github.com/gwuhaolin/livego/protocol/httpopera" + "github.com/gwuhaolin/livego/protocol/rtmp" + "log" + "net" + "time" ) var ( - version = "master" - rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address") - httpFlvAddr = flag.String("httpflv-addr", ":7001", "HTTP-FLV server listen address") - hlsAddr = flag.String("hls-addr", ":7002", "HLS server listen address") - operaAddr = flag.String("manage-addr", ":8080", "HTTP manage interface server listen address") + version = "master" + rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address") + httpFlvAddr = flag.String("httpflv-addr", ":7001", "HTTP-FLV server listen address") + hlsAddr = flag.String("hls-addr", ":7002", "HLS server listen address") + operaAddr = flag.String("manage-addr", ":8090", "HTTP manage interface server listen address") + configfilename = flag.String("cfgfile", "livego.cfg", "live configure filename") ) func init() { @@ -49,7 +51,16 @@ func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) { log.Fatal(err) } - rtmpServer := rtmp.NewRtmpServer(stream, hlsServer) + var rtmpServer *rtmp.Server + + if hlsServer == nil { + rtmpServer = rtmp.NewRtmpServer(stream, nil) + log.Printf("hls server disable....") + } else { + rtmpServer = rtmp.NewRtmpServer(stream, hlsServer) + log.Printf("hls server enable....") + } + defer func() { if r := recover(); r != nil { log.Println("RTMP server panic: ", r) @@ -83,7 +94,7 @@ func startHTTPOpera(stream *rtmp.RtmpStream) { if err != nil { log.Fatal(err) } - opServer := httpopera.NewServer(stream) + opServer := httpopera.NewServer(stream, *rtmpAddr) go func() { defer func() { if r := recover(); r != nil { @@ -104,9 +115,16 @@ func main() { } }() log.Println("start livego, version", version) + err := configure.LoadConfig(*configfilename) + if err != nil { + return + } + stream := rtmp.NewRtmpStream() hlsServer := startHls() startHTTPFlv(stream) - //startHTTPOpera(stream) + startHTTPOpera(stream) + startRtmp(stream, hlsServer) + //startRtmp(stream, nil) } diff --git a/protocol/hls/source.go b/protocol/hls/source.go index 53a8f5ac..a1fb670e 100644 --- a/protocol/hls/source.go +++ b/protocol/hls/source.go @@ -35,7 +35,7 @@ type Source struct { tsCache *TSCacheItem tsparser *parser.CodecParser closed bool - packetQueue chan av.Packet + packetQueue chan *av.Packet } func NewSource(info av.Info) *Source { @@ -51,7 +51,7 @@ func NewSource(info av.Info) *Source { tsCache: NewTSCacheItem(info.Key), tsparser: parser.NewCodecParser(), bwriter: bytes.NewBuffer(make([]byte, 100*1024)), - packetQueue: make(chan av.Packet, maxQueueNum), + packetQueue: make(chan *av.Packet, maxQueueNum), } go func() { err := s.SendPacket() @@ -67,7 +67,7 @@ func (source *Source) GetCacheInc() *TSCacheItem { return source.tsCache } -func (source *Source) DropPacket(pktQue chan av.Packet, info av.Info) { +func (source *Source) DropPacket(pktQue chan *av.Packet, info av.Info) { log.Printf("[%v] packet queue max!!!", info) for i := 0; i < maxQueueNum-84; i++ { tmpPkt, ok := <-pktQue @@ -95,8 +95,19 @@ func (source *Source) DropPacket(pktQue chan av.Packet, info av.Info) { log.Println("packet queue len: ", len(pktQue)) } -func (source *Source) Write(p av.Packet) error { +func (source *Source) Write(p *av.Packet) (err error) { + err = nil + if source.closed { + err = errors.New("hls source closed") + return + } source.SetPreTime() + defer func() { + if e := recover(); e != nil { + errString := fmt.Sprintf("hls source has already been closed:%v", e) + err = errors.New(errString) + } + }() if len(source.packetQueue) >= maxQueueNum-24 { source.DropPacket(source.packetQueue, source.info) } else { @@ -104,7 +115,7 @@ func (source *Source) Write(p av.Packet) error { source.packetQueue <- p } } - return nil + return } func (source *Source) SendPacket() error { @@ -114,6 +125,7 @@ func (source *Source) SendPacket() error { log.Println("hls SendPacket panic: ", r) } }() + log.Printf("[%v] hls sender start", source.info) for { if source.closed { @@ -126,7 +138,7 @@ func (source *Source) SendPacket() error { continue } - err := source.demuxer.Demux(&p) + err := source.demuxer.Demux(p) if err == flv.ErrAvcEndSEQ { log.Println(err) continue @@ -136,7 +148,7 @@ func (source *Source) SendPacket() error { return err } } - compositionTime, isSeq, err := source.parse(&p) + compositionTime, isSeq, err := source.parse(p) if err != nil { log.Println(err) } @@ -146,7 +158,7 @@ func (source *Source) SendPacket() error { if source.btswriter != nil { source.stat.update(p.IsVideo, p.TimeStamp) source.calcPtsDts(p.IsVideo, p.TimeStamp, uint32(compositionTime)) - source.tsMux(&p) + source.tsMux(p) } } else { return errors.New("closed") diff --git a/protocol/httpflv/writer.go b/protocol/httpflv/writer.go index 9d82ec2d..353f964e 100755 --- a/protocol/httpflv/writer.go +++ b/protocol/httpflv/writer.go @@ -3,6 +3,7 @@ package httpflv import ( "time" "errors" + "fmt" "log" "net/http" "github.com/gwuhaolin/livego/utils/uid" @@ -17,14 +18,14 @@ const ( ) type FLVWriter struct { - Uid string + Uid string av.RWBaser app, title, url string buf []byte closed bool closedChan chan struct{} ctx http.ResponseWriter - packetQueue chan av.Packet + packetQueue chan *av.Packet } func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { @@ -37,7 +38,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { RWBaser: av.NewRWBaser(time.Second * 10), closedChan: make(chan struct{}), buf: make([]byte, headerLen), - packetQueue: make(chan av.Packet, maxQueueNum), + packetQueue: make(chan *av.Packet, maxQueueNum), } ret.ctx.Write([]byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09}) @@ -53,7 +54,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { return ret } -func (flvWriter *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) { +func (flvWriter *FLVWriter) DropPacket(pktQue chan *av.Packet, info av.Info) { log.Printf("[%v] packet queue max!!!", info) for i := 0; i < maxQueueNum-84; i++ { tmpPkt, ok := <-pktQue @@ -80,18 +81,25 @@ func (flvWriter *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) { log.Println("packet queue len: ", len(pktQue)) } -func (flvWriter *FLVWriter) Write(p av.Packet) error { - if !flvWriter.closed { - if len(flvWriter.packetQueue) >= maxQueueNum-24 { - flvWriter.DropPacket(flvWriter.packetQueue, flvWriter.Info()) - } else { - flvWriter.packetQueue <- p +func (flvWriter *FLVWriter) Write(p *av.Packet) (err error) { + err = nil + if flvWriter.closed { + err = errors.New("flvwrite source closed") + return + } + defer func() { + if e := recover(); e != nil { + errString := fmt.Sprintf("FLVWriter has already been closed:%v", e) + err = errors.New(errString) } - return nil + }() + if len(flvWriter.packetQueue) >= maxQueueNum-24 { + flvWriter.DropPacket(flvWriter.packetQueue, flvWriter.Info()) } else { - return errors.New("closed") + flvWriter.packetQueue <- p } + return } func (flvWriter *FLVWriter) SendPacket() error { diff --git a/protocol/httpopera/http_opera.go b/protocol/httpopera/http_opera.go index c566163e..294359e2 100755 --- a/protocol/httpopera/http_opera.go +++ b/protocol/httpopera/http_opera.go @@ -2,7 +2,9 @@ package httpopera import ( "encoding/json" - "io/ioutil" + "fmt" + "github.com/gwuhaolin/livego/protocol/rtmp/rtmprelay" + "io" "net" "net/http" "log" @@ -35,86 +37,203 @@ type OperationChange struct { Stop bool `json:"stop"` } +type ClientInfo struct { + url string + rtmpRemoteClient *rtmp.Client + rtmpLocalClient *rtmp.Client +} + type Server struct { - handler av.Handler + handler av.Handler + session map[string]*rtmprelay.RtmpRelay + rtmpAddr string } -func NewServer(h av.Handler) *Server { +func NewServer(h av.Handler, rtmpAddr string) *Server { return &Server{ - handler: h, + handler: h, + session: make(map[string]*rtmprelay.RtmpRelay), + rtmpAddr: rtmpAddr, } } func (s *Server) Serve(l net.Listener) error { mux := http.NewServeMux() - mux.HandleFunc("/rtmp/operation", func(w http.ResponseWriter, r *http.Request) { - s.handleOpera(w, r) + + mux.Handle("/statics", http.FileServer(http.Dir("statics"))) + + mux.HandleFunc("/control/push", func(w http.ResponseWriter, r *http.Request) { + s.handlePush(w, r) + }) + mux.HandleFunc("/control/pull", func(w http.ResponseWriter, r *http.Request) { + s.handlePull(w, r) + }) + mux.HandleFunc("/stat/livestat", func(w http.ResponseWriter, r *http.Request) { + s.GetLiveStatics(w, r) }) http.Serve(l, mux) return nil } -// handleOpera, 拉流和推流的http api -// @Path: /rtmp/operation -// @Method: POST -// @Param: json -// method string, "push" or "pull" -// url string -// stop bool - -// @Example, -// curl -v -H "Content-Type: application/json" -X POST --data \ -// '{"method":"pull","url":"rtmp://127.0.0.1:1935/live/test"}' \ -// http://localhost:8087/rtmp/operation -func (s *Server) handleOpera(w http.ResponseWriter, r *http.Request) { - rep := &Response{ - w: w, +type stream struct { + Key string `json:"key"` + Url string `json:"Url"` + StreamId uint32 `json:"StreamId"` + VideoTotalBytes uint64 `json:123456` + VideoSpeed uint64 `json:123456` + AudioTotalBytes uint64 `json:123456` + AudioSpeed uint64 `json:123456` +} + +type streams struct { + Publishers []stream `json:"publishers"` + Players []stream `json:"players"` +} + +//http://127.0.0.1:8090/stat/livestat +func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) { + rtmpStream := server.handler.(*rtmp.RtmpStream) + if rtmpStream == nil { + io.WriteString(w, "

Get rtmp stream information error

") + return } - if r.Method != "POST" { - rep.Status = 14000 - rep.Message = "bad request method" - rep.SendJson() + msgs := new(streams) + for item := range rtmpStream.GetStreams().IterBuffered() { + if s, ok := item.Val.(*rtmp.Stream); ok { + if s.GetReader() != nil { + switch s.GetReader().(type) { + case *rtmp.VirReader: + v := s.GetReader().(*rtmp.VirReader) + msg := stream{item.Key, v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS, + v.ReadBWInfo.AudioDatainBytes, v.ReadBWInfo.AudioSpeedInBytesperMS} + msgs.Publishers = append(msgs.Publishers, msg) + } + } + } + } + + for item := range rtmpStream.GetStreams().IterBuffered() { + ws := item.Val.(*rtmp.Stream).GetWs() + for s := range ws.IterBuffered() { + if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok { + if pw.GetWriter() != nil { + switch pw.GetWriter().(type) { + case *rtmp.VirWriter: + v := pw.GetWriter().(*rtmp.VirWriter) + msg := stream{item.Key, v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS, + v.WriteBWInfo.AudioDatainBytes, v.WriteBWInfo.AudioSpeedInBytesperMS} + msgs.Players = append(msgs.Players, msg) + } + } + } + } + } + resp, _ := json.Marshal(msgs) + w.Header().Set("Content-Type", "application/json") + w.Write(resp) +} + +//http://127.0.0.1:8090/control/push?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456 +func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) { + var retString string + var err error + + req.ParseForm() + + oper := req.Form["oper"] + app := req.Form["app"] + name := req.Form["name"] + url := req.Form["url"] + + log.Printf("control pull: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url) + if (len(app) <= 0) || (len(name) <= 0) || (len(url) <= 0) { + io.WriteString(w, "control push parameter error, please check them.
") return - } else { - result, err := ioutil.ReadAll(r.Body) - if err != nil { - rep.Status = 15000 - rep.Message = "read request body error" - rep.SendJson() + } + + remoteurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app[0] + "/" + name[0] + localurl := url[0] + + keyString := "pull:" + app[0] + "/" + name[0] + if oper[0] == "stop" { + pullRtmprelay, found := s.session[keyString] + + if !found { + retString = fmt.Sprintf("session key[%s] not exist, please check it again.", keyString) + io.WriteString(w, retString) return } - r.Body.Close() - log.Println("post body", result) + log.Printf("rtmprelay stop push %s from %s", remoteurl, localurl) + pullRtmprelay.Stop() - var op Operation - err = json.Unmarshal(result, &op) + delete(s.session, keyString) + retString = fmt.Sprintf("

push url stop %s ok


", url[0]) + io.WriteString(w, retString) + log.Printf("pull stop return %s", retString) + } else { + pullRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl) + log.Printf("rtmprelay start push %s from %s", remoteurl, localurl) + err = pullRtmprelay.Start() if err != nil { - rep.Status = 12000 - rep.Message = "parse json body failed" - rep.SendJson() - return + retString = fmt.Sprintf("push error=%v", err) + } else { + s.session[keyString] = pullRtmprelay + retString = fmt.Sprintf("

push url start %s ok


", url[0]) } + io.WriteString(w, retString) + log.Printf("pull start return %s", retString) + } +} - switch op.Method { - case "push": - s.Push(op.URL, op.Stop) - case "pull": - s.Pull(op.URL, op.Stop) - } +//http://127.0.0.1:8090/control/push?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456 +func (s *Server) handlePush(w http.ResponseWriter, req *http.Request) { + var retString string + var err error + + req.ParseForm() + + oper := req.Form["oper"] + app := req.Form["app"] + name := req.Form["name"] + url := req.Form["url"] - rep.Status = 10000 - rep.Message = op.Method + " " + op.URL + " success" - rep.SendJson() + log.Printf("control push: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url) + if (len(app) <= 0) || (len(name) <= 0) || (len(url) <= 0) { + io.WriteString(w, "control push parameter error, please check them.
") + return } -} -func (s *Server) Push(uri string, stop bool) error { - rtmpClient := rtmp.NewRtmpClient(s.handler, nil) - return rtmpClient.Dial(uri, av.PUBLISH) -} + localurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app[0] + "/" + name[0] + remoteurl := url[0] + + keyString := "push:" + app[0] + "/" + name[0] + if oper[0] == "stop" { + pushRtmprelay, found := s.session[keyString] + if !found { + retString = fmt.Sprintf("

session key[%s] not exist, please check it again.

", keyString) + io.WriteString(w, retString) + return + } + log.Printf("rtmprelay stop push %s from %s", remoteurl, localurl) + pushRtmprelay.Stop() -func (s *Server) Pull(uri string, stop bool) error { - rtmpClient := rtmp.NewRtmpClient(s.handler, nil) - return rtmpClient.Dial(uri, av.PLAY) + delete(s.session, keyString) + retString = fmt.Sprintf("

push url stop %s ok


", url[0]) + io.WriteString(w, retString) + log.Printf("push stop return %s", retString) + } else { + pushRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl) + log.Printf("rtmprelay start push %s from %s", remoteurl, localurl) + err = pushRtmprelay.Start() + if err != nil { + retString = fmt.Sprintf("push error=%v", err) + } else { + retString = fmt.Sprintf("

push url start %s ok


", url[0]) + s.session[keyString] = pushRtmprelay + } + + io.WriteString(w, retString) + log.Printf("push start return %s", retString) + } } diff --git a/protocol/rtmp/cache/cache.go b/protocol/rtmp/cache/cache.go index 444085c0..1dd25cdc 100755 --- a/protocol/rtmp/cache/cache.go +++ b/protocol/rtmp/cache/cache.go @@ -27,7 +27,7 @@ func NewCache() *Cache { func (cache *Cache) Write(p av.Packet) { if p.IsMetadata { - cache.metadata.Write(p) + cache.metadata.Write(&p) return } else { if !p.IsVideo { @@ -35,7 +35,7 @@ func (cache *Cache) Write(p av.Packet) { if ok { if ah.SoundFormat() == av.SOUND_AAC && ah.AACPacketType() == av.AAC_SEQHDR { - cache.audioSeq.Write(p) + cache.audioSeq.Write(&p) return } else { return @@ -46,7 +46,7 @@ func (cache *Cache) Write(p av.Packet) { vh, ok := p.Header.(av.VideoPacketHeader) if ok { if vh.IsSeq() { - cache.videoSeq.Write(p) + cache.videoSeq.Write(&p) return } } else { @@ -55,7 +55,7 @@ func (cache *Cache) Write(p av.Packet) { } } - cache.gop.Write(p) + cache.gop.Write(&p) } func (cache *Cache) Send(w av.WriteCloser) error { diff --git a/protocol/rtmp/cache/gop.go b/protocol/rtmp/cache/gop.go index 33abf72d..1c1a1d0a 100755 --- a/protocol/rtmp/cache/gop.go +++ b/protocol/rtmp/cache/gop.go @@ -12,13 +12,13 @@ var ( type array struct { index int - packets []av.Packet + packets []*av.Packet } func newArray() *array { ret := &array{ index: 0, - packets: make([]av.Packet, 0, maxGOPCap), + packets: make([]*av.Packet, 0, maxGOPCap), } return ret } @@ -28,7 +28,7 @@ func (array *array) reset() { array.packets = array.packets[:0] } -func (array *array) write(packet av.Packet) error { +func (array *array) write(packet *av.Packet) error { if array.index >= maxGOPCap { return ErrGopTooBig } @@ -63,7 +63,7 @@ func NewGopCache(num int) *GopCache { } } -func (gopCache *GopCache) writeToArray(chunk av.Packet, startNew bool) error { +func (gopCache *GopCache) writeToArray(chunk *av.Packet, startNew bool) error { var ginc *array if startNew { ginc = gopCache.gops[gopCache.nextindex] @@ -83,7 +83,7 @@ func (gopCache *GopCache) writeToArray(chunk av.Packet, startNew bool) error { return nil } -func (gopCache *GopCache) Write(p av.Packet) { +func (gopCache *GopCache) Write(p *av.Packet) { var ok bool if p.IsVideo { vh := p.Header.(av.VideoPacketHeader) diff --git a/protocol/rtmp/cache/special.go b/protocol/rtmp/cache/special.go index 3a56ac7a..b8aefbc4 100755 --- a/protocol/rtmp/cache/special.go +++ b/protocol/rtmp/cache/special.go @@ -26,14 +26,14 @@ func init() { type SpecialCache struct { full bool - p av.Packet + p *av.Packet } func NewSpecialCache() *SpecialCache { return &SpecialCache{} } -func (specialCache *SpecialCache) Write(p av.Packet) { +func (specialCache *SpecialCache) Write(p *av.Packet) { specialCache.p = p specialCache.full = true } diff --git a/protocol/rtmp/core/conn_client.go b/protocol/rtmp/core/conn_client.go index c634670b..4016de0f 100755 --- a/protocol/rtmp/core/conn_client.go +++ b/protocol/rtmp/core/conn_client.go @@ -22,6 +22,7 @@ var ( publishStart = "NetStream.Publish.Start" playStart = "NetStream.Play.Start" connectSuccess = "NetConnection.Connect.Success" + onBWDone = "onBWDone" ) var ( @@ -53,6 +54,11 @@ func NewConnClient() *ConnClient { } } +func (connClient *ConnClient) DecodeBatch(r io.Reader, ver amf.Version) (ret []interface{}, err error) { + vs, err := connClient.decoder.DecodeBatch(r, ver) + return vs, err +} + func (connClient *ConnClient) readRespMsg() error { var err error var rc ChunkStream @@ -60,21 +66,24 @@ func (connClient *ConnClient) readRespMsg() error { if err = connClient.conn.Read(&rc); err != nil { return err } + if err != nil && err != io.EOF { + return err + } switch rc.TypeID { case 20, 17: r := bytes.NewReader(rc.Data) - vs, err := connClient.decoder.DecodeBatch(r, amf.AMF0) - if err != nil && err != io.EOF { - return err - } + vs, _ := connClient.decoder.DecodeBatch(r, amf.AMF0) + + log.Printf("readRespMsg: vs=%v", vs) for k, v := range vs { switch v.(type) { case string: switch connClient.curcmdName { case cmdConnect, cmdCreateStream: if v.(string) != respResult { - return ErrFail + return errors.New(v.(string)) } + case cmdPublish: if v.(string) != onStatus { return ErrFail @@ -84,6 +93,7 @@ func (connClient *ConnClient) readRespMsg() error { switch connClient.curcmdName { case cmdConnect, cmdCreateStream: id := int(v.(float64)) + if k == 1 { if id != connClient.transID { return ErrFail @@ -112,6 +122,7 @@ func (connClient *ConnClient) readRespMsg() error { } } } + return nil } } @@ -146,6 +157,7 @@ func (connClient *ConnClient) writeConnectMsg() error { event["tcUrl"] = connClient.tcurl connClient.curcmdName = cmdConnect + log.Printf("writeConnectMsg: connClient.transID=%d, event=%v", connClient.transID, event) if err := connClient.writeMsg(cmdConnect, connClient.transID, event); err != nil { return err } @@ -155,10 +167,24 @@ func (connClient *ConnClient) writeConnectMsg() error { func (connClient *ConnClient) writeCreateStreamMsg() error { connClient.transID++ connClient.curcmdName = cmdCreateStream + + log.Printf("writeCreateStreamMsg: connClient.transID=%d", connClient.transID) if err := connClient.writeMsg(cmdCreateStream, connClient.transID, nil); err != nil { return err } - return connClient.readRespMsg() + + for { + err := connClient.readRespMsg() + if err == nil { + return err + } + + if err == ErrFail { + log.Println("writeCreateStreamMsg readRespMsg err=%v", err) + return err + } + } + } func (connClient *ConnClient) writePublishMsg() error { @@ -173,6 +199,9 @@ func (connClient *ConnClient) writePublishMsg() error { func (connClient *ConnClient) writePlayMsg() error { connClient.transID++ connClient.curcmdName = cmdPlay + log.Printf("writePlayMsg: connClient.transID=%d, cmdPlay=%v, connClient.title=%v", + connClient.transID, cmdPlay, connClient.title) + if err := connClient.writeMsg(cmdPlay, 0, nil, connClient.title); err != nil { return err } @@ -236,15 +265,23 @@ func (connClient *ConnClient) Start(url string, method string) error { log.Println("connection:", "local:", conn.LocalAddr(), "remote:", conn.RemoteAddr()) connClient.conn = NewConn(conn, 4*1024) + + log.Println("HandshakeClient....") if err := connClient.conn.HandshakeClient(); err != nil { return err } + + log.Println("writeConnectMsg....") if err := connClient.writeConnectMsg(); err != nil { return err } + log.Println("writeCreateStreamMsg....") if err := connClient.writeCreateStreamMsg(); err != nil { + log.Println("writeCreateStreamMsg error", err) return err } + + log.Println("method control:", method, av.PUBLISH, av.PLAY) if method == av.PUBLISH { if err := connClient.writePublishMsg(); err != nil { return err @@ -281,6 +318,10 @@ func (connClient *ConnClient) GetInfo() (app string, name string, url string) { return } +func (connClient *ConnClient) GetStreamId() uint32 { + return connClient.streamid +} + func (connClient *ConnClient) Close(err error) { connClient.conn.Close() } diff --git a/protocol/rtmp/rtmp.go b/protocol/rtmp/rtmp.go index 4769be2d..ea3722ee 100755 --- a/protocol/rtmp/rtmp.go +++ b/protocol/rtmp/rtmp.go @@ -1,21 +1,25 @@ package rtmp import ( - "net" - "time" - "net/url" - "strings" "errors" "flag" - "log" + "fmt" "github.com/gwuhaolin/livego/av" - "github.com/gwuhaolin/livego/utils/uid" + "github.com/gwuhaolin/livego/configure" "github.com/gwuhaolin/livego/container/flv" "github.com/gwuhaolin/livego/protocol/rtmp/core" + "github.com/gwuhaolin/livego/utils/uid" + "log" + "net" + "net/url" + "reflect" + "strings" + "time" ) const ( - maxQueueNum = 1024 + maxQueueNum = 1024 + SAVE_STATICS_INTERVAL = 5000 ) var ( @@ -42,9 +46,11 @@ func (c *Client) Dial(url string, method string) error { } if method == av.PUBLISH { writer := NewVirWriter(connClient) + log.Printf("client Dial call NewVirWriter url=%s, method=%s", url, method) c.handler.HandleWriter(writer) } else if method == av.PLAY { reader := NewVirReader(connClient) + log.Printf("client Dial call NewVirReader url=%s, method=%s", url, method) c.handler.HandleReader(reader) if c.getter != nil { writer := c.getter.GetWriter(reader.Info()) @@ -103,12 +109,28 @@ func (s *Server) handleConn(conn *core.Conn) error { log.Println("handleConn read msg err:", err) return err } + + appname, _, _ := connServer.GetInfo() + + if ret := configure.CheckAppName(appname); !ret { + err := errors.New("application name=%s is not configured") + conn.Close() + log.Println("CheckAppName err:", err) + return err + } + + log.Printf("handleConn: IsPublisher=%v", connServer.IsPublisher()) if connServer.IsPublisher() { + if pushlist, ret := configure.GetStaticPushUrlList(appname); ret && (pushlist != nil) { + log.Printf("GetStaticPushUrlList: %v", pushlist) + } reader := NewVirReader(connServer) s.handler.HandleReader(reader) log.Printf("new publisher: %+v", reader.Info()) if s.getter != nil { + writeType := reflect.TypeOf(s.getter) + log.Printf("handleConn:writeType=%v", writeType) writer := s.getter.GetWriter(reader.Info()) s.handler.HandleWriter(writer) } @@ -132,12 +154,26 @@ type StreamReadWriteCloser interface { Read(c *core.ChunkStream) error } +type StaticsBW struct { + StreamId uint32 + VideoDatainBytes uint64 + LastVideoDatainBytes uint64 + VideoSpeedInBytesperMS uint64 + + AudioDatainBytes uint64 + LastAudioDatainBytes uint64 + AudioSpeedInBytesperMS uint64 + + LastTimestamp int64 +} + type VirWriter struct { - Uid string - closed bool + Uid string + closed bool av.RWBaser conn StreamReadWriteCloser - packetQueue chan av.Packet + packetQueue chan *av.Packet + WriteBWInfo StaticsBW } func NewVirWriter(conn StreamReadWriteCloser) *VirWriter { @@ -145,8 +181,10 @@ func NewVirWriter(conn StreamReadWriteCloser) *VirWriter { Uid: uid.NewId(), conn: conn, RWBaser: av.NewRWBaser(time.Second * time.Duration(*writeTimeout)), - packetQueue: make(chan av.Packet, maxQueueNum), + packetQueue: make(chan *av.Packet, maxQueueNum), + WriteBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0}, } + go ret.Check() go func() { err := ret.SendPacket() @@ -157,6 +195,30 @@ func NewVirWriter(conn StreamReadWriteCloser) *VirWriter { return ret } +func (v *VirWriter) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) { + nowInMS := int64(time.Now().UnixNano() / 1e6) + + v.WriteBWInfo.StreamId = streamid + if isVideoFlag { + v.WriteBWInfo.VideoDatainBytes = v.WriteBWInfo.VideoDatainBytes + length + } else { + v.WriteBWInfo.AudioDatainBytes = v.WriteBWInfo.AudioDatainBytes + length + } + + if v.WriteBWInfo.LastTimestamp == 0 { + v.WriteBWInfo.LastTimestamp = nowInMS + } else if (nowInMS - v.WriteBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL { + diffTimestamp := (nowInMS - v.WriteBWInfo.LastTimestamp) / 1000 + + v.WriteBWInfo.VideoSpeedInBytesperMS = (v.WriteBWInfo.VideoDatainBytes - v.WriteBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000 + v.WriteBWInfo.AudioSpeedInBytesperMS = (v.WriteBWInfo.AudioDatainBytes - v.WriteBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000 + + v.WriteBWInfo.LastVideoDatainBytes = v.WriteBWInfo.VideoDatainBytes + v.WriteBWInfo.LastAudioDatainBytes = v.WriteBWInfo.AudioDatainBytes + v.WriteBWInfo.LastTimestamp = nowInMS + } +} + func (v *VirWriter) Check() { var c core.ChunkStream for { @@ -167,7 +229,7 @@ func (v *VirWriter) Check() { } } -func (v *VirWriter) DropPacket(pktQue chan av.Packet, info av.Info) { +func (v *VirWriter) DropPacket(pktQue chan *av.Packet, info av.Info) { log.Printf("[%v] packet queue max!!!", info) for i := 0; i < maxQueueNum-84; i++ { tmpPkt, ok := <-pktQue @@ -199,17 +261,26 @@ func (v *VirWriter) DropPacket(pktQue chan av.Packet, info av.Info) { } // -func (v *VirWriter) Write(p av.Packet) error { - if !v.closed { - if len(v.packetQueue) >= maxQueueNum-24 { - v.DropPacket(v.packetQueue, v.Info()) - } else { - v.packetQueue <- p +func (v *VirWriter) Write(p *av.Packet) (err error) { + err = nil + + if v.closed { + err = errors.New("VirWriter closed") + return + } + defer func() { + if e := recover(); e != nil { + errString := fmt.Sprintf("VirWriter has already been closed:%v", e) + err = errors.New(errString) } - return nil + }() + if len(v.packetQueue) >= maxQueueNum-24 { + v.DropPacket(v.packetQueue, v.Info()) } else { - return errors.New("closed") + v.packetQueue <- p } + + return } func (v *VirWriter) SendPacket() error { @@ -219,7 +290,7 @@ func (v *VirWriter) SendPacket() error { if ok { cs.Data = p.Data cs.Length = uint32(len(p.Data)) - cs.StreamID = 1 + cs.StreamID = p.StreamID cs.Timestamp = p.TimeStamp cs.Timestamp += v.BaseTimeStamp() @@ -233,6 +304,7 @@ func (v *VirWriter) SendPacket() error { } } + v.SaveStatics(p.StreamID, uint64(cs.Length), p.IsVideo) v.SetPreTime() v.RecTimeStamp(cs.Timestamp, cs.TypeID) err := v.conn.Write(cs) @@ -240,6 +312,7 @@ func (v *VirWriter) SendPacket() error { v.closed = true return err } + } else { return errors.New("closed") } @@ -271,18 +344,45 @@ func (v *VirWriter) Close(err error) { } type VirReader struct { - Uid string + Uid string av.RWBaser - demuxer *flv.Demuxer - conn StreamReadWriteCloser + demuxer *flv.Demuxer + conn StreamReadWriteCloser + ReadBWInfo StaticsBW } func NewVirReader(conn StreamReadWriteCloser) *VirReader { return &VirReader{ - Uid: uid.NewId(), - conn: conn, - RWBaser: av.NewRWBaser(time.Second * time.Duration(*writeTimeout)), - demuxer: flv.NewDemuxer(), + Uid: uid.NewId(), + conn: conn, + RWBaser: av.NewRWBaser(time.Second * time.Duration(*writeTimeout)), + demuxer: flv.NewDemuxer(), + ReadBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0}, + } +} + +func (v *VirReader) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) { + nowInMS := int64(time.Now().UnixNano() / 1e6) + + v.ReadBWInfo.StreamId = streamid + if isVideoFlag { + v.ReadBWInfo.VideoDatainBytes = v.ReadBWInfo.VideoDatainBytes + length + } else { + v.ReadBWInfo.AudioDatainBytes = v.ReadBWInfo.AudioDatainBytes + length + } + + if v.ReadBWInfo.LastTimestamp == 0 { + v.ReadBWInfo.LastTimestamp = nowInMS + } else if (nowInMS - v.ReadBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL { + diffTimestamp := (nowInMS - v.ReadBWInfo.LastTimestamp) / 1000 + + //log.Printf("now=%d, last=%d, diff=%d", nowInMS, v.ReadBWInfo.LastTimestamp, diffTimestamp) + v.ReadBWInfo.VideoSpeedInBytesperMS = (v.ReadBWInfo.VideoDatainBytes - v.ReadBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000 + v.ReadBWInfo.AudioSpeedInBytesperMS = (v.ReadBWInfo.AudioDatainBytes - v.ReadBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000 + + v.ReadBWInfo.LastVideoDatainBytes = v.ReadBWInfo.VideoDatainBytes + v.ReadBWInfo.LastAudioDatainBytes = v.ReadBWInfo.AudioDatainBytes + v.ReadBWInfo.LastTimestamp = nowInMS } } @@ -311,8 +411,11 @@ func (v *VirReader) Read(p *av.Packet) (err error) { p.IsAudio = cs.TypeID == av.TAG_AUDIO p.IsVideo = cs.TypeID == av.TAG_VIDEO p.IsMetadata = cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3 + p.StreamID = cs.StreamID p.Data = cs.Data p.TimeStamp = cs.Timestamp + + v.SaveStatics(p.StreamID, uint64(len(p.Data)), p.IsVideo) v.demuxer.DemuxH(p) return err } diff --git a/protocol/rtmp/rtmprelay/rtmprelay.go b/protocol/rtmp/rtmprelay/rtmprelay.go new file mode 100644 index 00000000..1bcae28c --- /dev/null +++ b/protocol/rtmp/rtmprelay/rtmprelay.go @@ -0,0 +1,125 @@ +package rtmprelay + +import ( + "bytes" + "errors" + "fmt" + "github.com/gwuhaolin/livego/protocol/amf" + "github.com/gwuhaolin/livego/protocol/rtmp/core" + "io" + "log" +) + +var ( + STOP_CTRL = "RTMPRELAY_STOP" +) + +type RtmpRelay struct { + PlayUrl string + PublishUrl string + cs_chan chan core.ChunkStream + sndctrl_chan chan string + connectPlayClient *core.ConnClient + connectPublishClient *core.ConnClient + startflag bool +} + +func NewRtmpRelay(playurl *string, publishurl *string) *RtmpRelay { + return &RtmpRelay{ + PlayUrl: *playurl, + PublishUrl: *publishurl, + cs_chan: make(chan core.ChunkStream, 500), + sndctrl_chan: make(chan string), + connectPlayClient: nil, + connectPublishClient: nil, + startflag: false, + } +} + +func (self *RtmpRelay) rcvPlayChunkStream() { + log.Println("rcvPlayRtmpMediaPacket connectClient.Read...") + for { + var rc core.ChunkStream + + if self.startflag == false { + self.connectPlayClient.Close(nil) + log.Printf("rcvPlayChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl) + break + } + err := self.connectPlayClient.Read(&rc) + + if err != nil && err == io.EOF { + break + } + //log.Printf("connectPlayClient.Read return rc.TypeID=%v length=%d, err=%v", rc.TypeID, len(rc.Data), err) + switch rc.TypeID { + case 20, 17: + r := bytes.NewReader(rc.Data) + vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0) + + log.Printf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err) + case 18: + log.Printf("rcvPlayRtmpMediaPacket: metadata....") + case 8, 9: + self.cs_chan <- rc + } + } +} + +func (self *RtmpRelay) sendPublishChunkStream() { + for { + select { + case rc := <-self.cs_chan: + //log.Printf("sendPublishChunkStream: rc.TypeID=%v length=%d", rc.TypeID, len(rc.Data)) + self.connectPublishClient.Write(rc) + case ctrlcmd := <-self.sndctrl_chan: + if ctrlcmd == STOP_CTRL { + self.connectPublishClient.Close(nil) + log.Printf("sendPublishChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl) + break + } + } + } +} + +func (self *RtmpRelay) Start() error { + if self.startflag { + err := errors.New(fmt.Sprintf("The rtmprelay already started, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)) + return err + } + + self.connectPlayClient = core.NewConnClient() + self.connectPublishClient = core.NewConnClient() + + log.Printf("play server addr:%v starting....", self.PlayUrl) + err := self.connectPlayClient.Start(self.PlayUrl, "play") + if err != nil { + log.Printf("connectPlayClient.Start url=%v error", self.PlayUrl) + return err + } + + log.Printf("publish server addr:%v starting....", self.PublishUrl) + err = self.connectPublishClient.Start(self.PublishUrl, "publish") + if err != nil { + log.Printf("connectPublishClient.Start url=%v error", self.PublishUrl) + self.connectPlayClient.Close(nil) + return err + } + + self.startflag = true + go self.rcvPlayChunkStream() + go self.sendPublishChunkStream() + + return nil +} + +func (self *RtmpRelay) Stop() { + if !self.startflag { + log.Printf("The rtmprelay already stoped, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl) + return + } + + self.startflag = false + self.sndctrl_chan <- STOP_CTRL + +} diff --git a/protocol/rtmp/rtmprelay/staticrelay.go b/protocol/rtmp/rtmprelay/staticrelay.go new file mode 100644 index 00000000..ab90b43f --- /dev/null +++ b/protocol/rtmp/rtmprelay/staticrelay.go @@ -0,0 +1,180 @@ +package rtmprelay + +import ( + "errors" + "fmt" + "github.com/gwuhaolin/livego/av" + "github.com/gwuhaolin/livego/configure" + "github.com/gwuhaolin/livego/protocol/rtmp/core" + "log" + "sync" +) + +type StaticPush struct { + RtmpUrl string + packet_chan chan *av.Packet + sndctrl_chan chan string + connectClient *core.ConnClient + startflag bool +} + +var G_StaticPushMap = make(map[string](*StaticPush)) +var g_MapLock = new(sync.RWMutex) + +var ( + STATIC_RELAY_STOP_CTRL = "STATIC_RTMPRELAY_STOP" +) + +func GetStaticPushList(appname string) ([]string, error) { + pushurlList, ok := configure.GetStaticPushUrlList(appname) + + if !ok { + return nil, errors.New("no static push url") + } + + return pushurlList, nil +} + +func GetAndCreateStaticPushObject(rtmpurl string) *StaticPush { + g_MapLock.RLock() + staticpush, ok := G_StaticPushMap[rtmpurl] + log.Printf("GetAndCreateStaticPushObject: %s, return %v", rtmpurl, ok) + if !ok { + g_MapLock.RUnlock() + newStaticpush := NewStaticPush(rtmpurl) + + g_MapLock.Lock() + G_StaticPushMap[rtmpurl] = newStaticpush + g_MapLock.Unlock() + + return newStaticpush + } + g_MapLock.RUnlock() + + return staticpush +} + +func GetStaticPushObject(rtmpurl string) (*StaticPush, error) { + g_MapLock.RLock() + if staticpush, ok := G_StaticPushMap[rtmpurl]; ok { + g_MapLock.RUnlock() + return staticpush, nil + } + g_MapLock.RUnlock() + + return nil, errors.New(fmt.Sprintf("G_StaticPushMap[%s] not exist....")) +} + +func ReleaseStaticPushObject(rtmpurl string) { + g_MapLock.RLock() + if _, ok := G_StaticPushMap[rtmpurl]; ok { + g_MapLock.RUnlock() + + log.Printf("ReleaseStaticPushObject %s ok", rtmpurl) + g_MapLock.Lock() + delete(G_StaticPushMap, rtmpurl) + g_MapLock.Unlock() + } else { + g_MapLock.RUnlock() + log.Printf("ReleaseStaticPushObject: not find %s", rtmpurl) + } +} + +func NewStaticPush(rtmpurl string) *StaticPush { + return &StaticPush{ + RtmpUrl: rtmpurl, + packet_chan: make(chan *av.Packet, 500), + sndctrl_chan: make(chan string), + connectClient: nil, + startflag: false, + } +} + +func (self *StaticPush) Start() error { + if self.startflag { + return errors.New(fmt.Sprintf("StaticPush already start %s", self.RtmpUrl)) + } + + self.connectClient = core.NewConnClient() + + log.Printf("static publish server addr:%v starting....", self.RtmpUrl) + err := self.connectClient.Start(self.RtmpUrl, "publish") + if err != nil { + log.Printf("connectClient.Start url=%v error", self.RtmpUrl) + return err + } + log.Printf("static publish server addr:%v started, streamid=%d", self.RtmpUrl, self.connectClient.GetStreamId()) + go self.HandleAvPacket() + + self.startflag = true + return nil +} + +func (self *StaticPush) Stop() { + if !self.startflag { + return + } + + log.Printf("StaticPush Stop: %s", self.RtmpUrl) + self.sndctrl_chan <- STATIC_RELAY_STOP_CTRL + self.startflag = false +} + +func (self *StaticPush) WriteAvPacket(packet *av.Packet) { + if !self.startflag { + return + } + + self.packet_chan <- packet +} + +func (self *StaticPush) sendPacket(p *av.Packet) { + if !self.startflag { + return + } + var cs core.ChunkStream + + cs.Data = p.Data + cs.Length = uint32(len(p.Data)) + cs.StreamID = self.connectClient.GetStreamId() + cs.Timestamp = p.TimeStamp + //cs.Timestamp += v.BaseTimeStamp() + + //log.Printf("Static sendPacket: rtmpurl=%s, length=%d, streamid=%d", + // self.RtmpUrl, len(p.Data), cs.StreamID) + if p.IsVideo { + cs.TypeID = av.TAG_VIDEO + } else { + if p.IsMetadata { + cs.TypeID = av.TAG_SCRIPTDATAAMF0 + } else { + cs.TypeID = av.TAG_AUDIO + } + } + + self.connectClient.Write(cs) +} + +func (self *StaticPush) HandleAvPacket() { + if !self.IsStart() { + log.Printf("static push %s not started", self.RtmpUrl) + return + } + + for { + select { + case packet := <-self.packet_chan: + self.sendPacket(packet) + case ctrlcmd := <-self.sndctrl_chan: + if ctrlcmd == STATIC_RELAY_STOP_CTRL { + self.connectClient.Close(nil) + log.Printf("Static HandleAvPacket close: publishurl=%s", self.RtmpUrl) + break + } + } + } +} + +func (self *StaticPush) IsStart() bool { + return self.startflag +} diff --git a/protocol/rtmp/stream.go b/protocol/rtmp/stream.go index 09f0ec4d..57805367 100755 --- a/protocol/rtmp/stream.go +++ b/protocol/rtmp/stream.go @@ -2,11 +2,13 @@ package rtmp import ( "errors" - "time" - "log" "github.com/gwuhaolin/livego/av" "github.com/gwuhaolin/livego/protocol/rtmp/cache" + "github.com/gwuhaolin/livego/protocol/rtmp/rtmprelay" "github.com/orcaman/concurrent-map" + "log" + "strings" + "time" ) var ( @@ -14,7 +16,7 @@ var ( ) type RtmpStream struct { - streams cmap.ConcurrentMap + streams cmap.ConcurrentMap //key } func NewRtmpStream() *RtmpStream { @@ -27,6 +29,8 @@ func NewRtmpStream() *RtmpStream { func (rs *RtmpStream) HandleReader(r av.ReadCloser) { info := r.Info() + log.Printf("HandleReader: info[%v]", info) + var stream *Stream i, ok := rs.streams.Get(info.Key) if stream, ok = i.(*Stream); ok { @@ -38,9 +42,10 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) { stream = ns rs.streams.Set(info.Key, ns) } - }else { + } else { stream = NewStream() rs.streams.Set(info.Key, stream) + stream.info = info } stream.AddReader(r) @@ -48,11 +53,14 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) { func (rs *RtmpStream) HandleWriter(w av.WriteCloser) { info := w.Info() + log.Printf("HandleWriter: info[%v]", info) + var s *Stream ok := rs.streams.Has(info.Key) if !ok { s = NewStream() rs.streams.Set(info.Key, s) + s.info = info } else { item, ok := rs.streams.Get(info.Key) if ok { @@ -60,7 +68,6 @@ func (rs *RtmpStream) HandleWriter(w av.WriteCloser) { s.AddWriter(w) } } - } func (rs *RtmpStream) GetStreams() cmap.ConcurrentMap { @@ -84,6 +91,7 @@ type Stream struct { cache *cache.Cache r av.ReadCloser ws cmap.ConcurrentMap + info av.Info } type PackWriterCloser struct { @@ -137,9 +145,172 @@ func (s *Stream) AddWriter(w av.WriteCloser) { s.ws.Set(info.UID, pw) } +/*检测本application下是否配置static_push, +如果配置, 启动push远端的连接*/ +func (s *Stream) StartStaticPush() { + key := s.info.Key + + dscr := strings.Split(key, "/") + if len(dscr) < 1 { + return + } + + index := strings.Index(key, "/") + if index < 0 { + return + } + + streamname := key[index+1:] + appname := dscr[0] + + log.Printf("StartStaticPush: current streamname=%s, appname=%s", streamname, appname) + pushurllist, err := rtmprelay.GetStaticPushList(appname) + if err != nil || len(pushurllist) < 1 { + log.Printf("StartStaticPush: GetStaticPushList error=%v", err) + return + } + + for _, pushurl := range pushurllist { + pushurl := pushurl + "/" + streamname + log.Printf("StartStaticPush: static pushurl=%s", pushurl) + + staticpushObj := rtmprelay.GetAndCreateStaticPushObject(pushurl) + if staticpushObj != nil { + if err := staticpushObj.Start(); err != nil { + log.Printf("StartStaticPush: staticpushObj.Start %s error=%v", pushurl, err) + } else { + log.Printf("StartStaticPush: staticpushObj.Start %s ok", pushurl) + } + } else { + log.Printf("StartStaticPush GetStaticPushObject %s error", pushurl) + } + } +} + +func (s *Stream) StopStaticPush() { + key := s.info.Key + + log.Printf("StopStaticPush......%s", key) + dscr := strings.Split(key, "/") + if len(dscr) < 1 { + return + } + + index := strings.Index(key, "/") + if index < 0 { + return + } + + streamname := key[index+1:] + appname := dscr[0] + + log.Printf("StopStaticPush: current streamname=%s, appname=%s", streamname, appname) + pushurllist, err := rtmprelay.GetStaticPushList(appname) + if err != nil || len(pushurllist) < 1 { + log.Printf("StopStaticPush: GetStaticPushList error=%v", err) + return + } + + for _, pushurl := range pushurllist { + pushurl := pushurl + "/" + streamname + log.Printf("StopStaticPush: static pushurl=%s", pushurl) + + staticpushObj, err := rtmprelay.GetStaticPushObject(pushurl) + if (staticpushObj != nil) && (err == nil) { + staticpushObj.Stop() + rtmprelay.ReleaseStaticPushObject(pushurl) + log.Printf("StopStaticPush: staticpushObj.Stop %s ", pushurl) + } else { + log.Printf("StopStaticPush GetStaticPushObject %s error", pushurl) + } + } +} + +func (s *Stream) IsSendStaticPush() bool { + key := s.info.Key + + dscr := strings.Split(key, "/") + if len(dscr) < 1 { + return false + } + + appname := dscr[0] + + //log.Printf("SendStaticPush: current streamname=%s, appname=%s", streamname, appname) + pushurllist, err := rtmprelay.GetStaticPushList(appname) + if err != nil || len(pushurllist) < 1 { + //log.Printf("SendStaticPush: GetStaticPushList error=%v", err) + return false + } + + index := strings.Index(key, "/") + if index < 0 { + return false + } + + streamname := key[index+1:] + + for _, pushurl := range pushurllist { + pushurl := pushurl + "/" + streamname + //log.Printf("SendStaticPush: static pushurl=%s", pushurl) + + staticpushObj, err := rtmprelay.GetStaticPushObject(pushurl) + if (staticpushObj != nil) && (err == nil) { + return true + //staticpushObj.WriteAvPacket(&packet) + //log.Printf("SendStaticPush: WriteAvPacket %s ", pushurl) + } else { + log.Printf("SendStaticPush GetStaticPushObject %s error", pushurl) + } + } + return false +} + +func (s *Stream) SendStaticPush(packet av.Packet) { + key := s.info.Key + + dscr := strings.Split(key, "/") + if len(dscr) < 1 { + return + } + + index := strings.Index(key, "/") + if index < 0 { + return + } + + streamname := key[index+1:] + appname := dscr[0] + + //log.Printf("SendStaticPush: current streamname=%s, appname=%s", streamname, appname) + pushurllist, err := rtmprelay.GetStaticPushList(appname) + if err != nil || len(pushurllist) < 1 { + //log.Printf("SendStaticPush: GetStaticPushList error=%v", err) + return + } + + for _, pushurl := range pushurllist { + pushurl := pushurl + "/" + streamname + //log.Printf("SendStaticPush: static pushurl=%s", pushurl) + + staticpushObj, err := rtmprelay.GetStaticPushObject(pushurl) + if (staticpushObj != nil) && (err == nil) { + staticpushObj.WriteAvPacket(&packet) + //log.Printf("SendStaticPush: WriteAvPacket %s ", pushurl) + } else { + log.Printf("SendStaticPush GetStaticPushObject %s error", pushurl) + } + } +} + func (s *Stream) TransStart() { s.isStart = true var p av.Packet + + log.Printf("TransStart:%v", s.info) + + s.StartStaticPush() + for { if !s.isStart { s.closeInter() @@ -151,11 +322,17 @@ func (s *Stream) TransStart() { s.isStart = false return } + + if s.IsSendStaticPush() { + s.SendStaticPush(p) + } + s.cache.Write(p) for item := range s.ws.IterBuffered() { v := item.Val.(*PackWriterCloser) if !v.init { + //log.Printf("cache.send: %v", v.w.Info()) if err = s.cache.Send(v.w); err != nil { log.Printf("[%s] send cache packet error: %v, remove", v.w.Info(), err) s.ws.Remove(item.Key) @@ -163,7 +340,10 @@ func (s *Stream) TransStart() { } v.init = true } else { - if err = v.w.Write(p); err != nil { + new_packet := p + //writeType := reflect.TypeOf(v.w) + //log.Printf("w.Write: type=%v, %v", writeType, v.w.Info()) + if err = v.w.Write(&new_packet); err != nil { log.Printf("[%s] write packet error: %v, remove", v.w.Info(), err) s.ws.Remove(item.Key) } @@ -173,9 +353,12 @@ func (s *Stream) TransStart() { } func (s *Stream) TransStop() { + log.Printf("TransStop: %s", s.info.Key) + if s.isStart && s.r != nil { s.r.Close(errors.New("stop old")) } + s.isStart = false } @@ -204,6 +387,7 @@ func (s *Stream) CheckAlive() (n int) { func (s *Stream) closeInter() { if s.r != nil { + s.StopStaticPush() log.Printf("[%v] publisher closed", s.r.Info()) }