diff --git a/README.md b/README.md index 70ad4712c..a2210f6cb 100644 --- a/README.md +++ b/README.md @@ -679,6 +679,19 @@ Supports connection to [Wyze](https://www.wyze.com/) cameras, using WebRTC proto Supports [Amazon Kinesis Video Streams](https://aws.amazon.com/kinesis/video-streams/), using WebRTC protocol. You need to specify signalling WebSocket URL with all credentials in query params, `client_id` and `ice_servers` list in [JSON format](https://developer.mozilla.org/en-US/docs/Web/API/RTCIceServer). + +**tuya** + +Supports [Tuya IPC cameras](https://developer.tuya.com/en/docs/iot/webrtc?id=Kacsd4x2hl0se), using WebRTC protocol. To enable, you need to specify following in query parameters: +- `format` Set to `tuya` to use Tuya +- `client_id` Tuya Cloud Project Access ID/Client ID +- `client_secret` Tuya Cloud Project Access Secret/Client Secret +- `uid` Tuya linked app UID (referet to [Tuya Procedure step 3](https://developer.tuya.com/en/docs/iot/webrtc?id=Kacsd4x2hl0se#title-4-Prerequisites) for more info) +- `device_id` Tuya device ID (you can retreive if from Cloud Project devices page) + +URL should be `https://openapi.tuyaeu.com` or other one ([see Tuya docs](https://developer.tuya.com/en/docs/iot/api-request?id=Ka4a8uuo1j4t4#title-1-Endpoints)) depending on your region. + + ```yaml streams: webrtc-whep: webrtc:http://192.168.1.123:1984/api/webrtc?src=camera1 @@ -686,6 +699,7 @@ streams: webrtc-openipc: webrtc:ws://192.168.1.123/webrtc_ws#format=openipc#ice_servers=[{"urls":"stun:stun.kinesisvideo.eu-north-1.amazonaws.com:443"}] webrtc-wyze: webrtc:http://192.168.1.123:5000/signaling/camera1?kvs#format=wyze webrtc-kinesis: webrtc:wss://...amazonaws.com/?...#format=kinesis#client_id=...#ice_servers=[{...},{...}] + webrtc-tuya: webrtc:https://openapi.tuyaeu.com#format=tuya#client_id=...#client_secret=...#uid=...#device_id=... ``` **PS.** For `kinesis` sources you can use [echo](#source-echo) to get connection params using `bash`/`python` or any other script language. diff --git a/go.mod b/go.mod index b038b110d..c3841889b 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,15 @@ go 1.20 require ( github.com/asticode/go-astits v1.13.0 + github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/expr-lang/expr v1.16.9 - github.com/gorilla/websocket v1.5.1 + github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/mattn/go-isatty v0.0.20 github.com/miekg/dns v1.1.59 github.com/pion/ice/v2 v2.3.24 github.com/pion/interceptor v0.1.29 + github.com/pion/logging v0.2.2 github.com/pion/rtcp v1.2.14 github.com/pion/rtp v1.8.6 github.com/pion/sdp/v3 v3.0.9 @@ -21,28 +24,29 @@ require ( github.com/sigurn/crc8 v0.0.0-20220107193325-2243fe600f9f github.com/stretchr/testify v1.9.0 github.com/tadglines/go-pkgs v0.0.0-20210623144937-b983b20f54f9 - golang.org/x/crypto v0.24.0 + github.com/tidwall/gjson v1.17.3 + golang.org/x/crypto v0.25.0 gopkg.in/yaml.v3 v3.0.1 ) require ( github.com/asticode/go-astikit v0.30.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/kr/pretty v0.2.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/pion/datachannel v1.5.6 // indirect github.com/pion/dtls/v2 v2.2.11 // indirect - github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.12 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/sctp v1.8.16 // indirect github.com/pion/transport/v2 v2.2.5 // indirect github.com/pion/turn/v2 v2.1.6 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect golang.org/x/mod v0.18.0 // indirect - golang.org/x/net v0.26.0 // indirect + golang.org/x/net v0.27.0 // indirect golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.21.0 // indirect + golang.org/x/sys v0.22.0 // indirect golang.org/x/tools v0.22.0 // indirect ) diff --git a/go.sum b/go.sum index 727787ac2..c1e6433e9 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= github.com/expr-lang/expr v1.16.5 h1:m2hvtguFeVaVNTHj8L7BoAyt7O0PAIBaSVbjdHgRXMs= github.com/expr-lang/expr v1.16.5/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ= github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI= @@ -16,6 +18,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -106,6 +110,12 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tadglines/go-pkgs v0.0.0-20210623144937-b983b20f54f9 h1:aeN+ghOV0b2VCmKKO3gqnDQ8mLbpABZgRR2FVYx4ouI= github.com/tadglines/go-pkgs v0.0.0-20210623144937-b983b20f54f9/go.mod h1:roo6cZ/uqpwKMuvPG0YmzI5+AmUiMWfjCBZpGXqbTxE= +github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94= +github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -121,6 +131,8 @@ golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= @@ -144,6 +156,8 @@ golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -172,6 +186,8 @@ golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/internal/webrtc/client.go b/internal/webrtc/client.go index d42c51dda..bdea0fc81 100644 --- a/internal/webrtc/client.go +++ b/internal/webrtc/client.go @@ -12,6 +12,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/internal/webrtc/tuya" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/gorilla/websocket" @@ -54,6 +55,8 @@ func streamsHandler(rawURL string) (core.Producer, error) { } else if format == "wyze" { // https://github.com/mrlt8/docker-wyze-bridge return wyzeClient(rawURL) + } else if format == "tuya" { + return tuya.TuyaClient(rawURL, query) } else { return whepClient(rawURL) } diff --git a/internal/webrtc/tuya/http.go b/internal/webrtc/tuya/http.go new file mode 100644 index 000000000..e0d48e3a3 --- /dev/null +++ b/internal/webrtc/tuya/http.go @@ -0,0 +1,197 @@ +package tuya + +import ( + "bytes" + "crypto/md5" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "github.com/tidwall/gjson" +) + +func (t *tuyaSession) makeHttpSign(ts int64) string { + // If httpAccessToken is "" then this is un-authed request, so no need to do 'if' here + data := fmt.Sprintf("%s%s%s%d", t.config.ClientID, t.httpAccessToken, t.config.Secret, ts) + val := md5.Sum([]byte(data)) + res := fmt.Sprintf("%X", val) + return res +} + +func (t *tuyaSession) httpRequest(method string, path string, body io.Reader) (res []byte, err error) { + client := &http.Client{ + Timeout: time.Second * 5, + } + + url := fmt.Sprintf("%s%s", t.config.OpenAPIURL, path) + + request, err := http.NewRequest(method, url, body) + if err != nil { + log.Printf("create http request fail: %s", err.Error()) + + return + } + + ts := time.Now().UnixNano() / 1000000 + sign := t.makeHttpSign(ts) + + // TODO: do we need all this headers? + + request.Header.Set("Accept", "*") + request.Header.Set("Content-Type", "application/json") + request.Header.Set("Access-Control-Allow-Origin", "*") + request.Header.Set("Access-Control-Allow-Methods", "*") + request.Header.Set("Access-Control-Allow-Headers", "*") + request.Header.Set("mode", "no-cors") + request.Header.Set("client_id", t.config.ClientID) + request.Header.Set("access_token", t.httpAccessToken) + request.Header.Set("sign", sign) + request.Header.Set("t", strconv.FormatInt(ts, 10)) + + response, err := client.Do(request) + if err != nil { + log.Printf("http request fail: %s", err.Error()) + + return + } + defer response.Body.Close() + + res, err = io.ReadAll(response.Body) + if err != nil { + log.Printf("read http response fail: %s", err.Error()) + + return + } + + return +} + +func (t *tuyaSession) Authorize() (err error) { + t.httpAccessToken = "" // Clear all access token if present + + body, err := t.httpRequest("GET", "/v1.0/token?grant_type=1", nil) + if err != nil { + log.Printf("sync OpenAPI ressponse to config fail: %s", err.Error()) + return + } + + accessTokenValue := gjson.GetBytes(body, "result.access_token") + if !accessTokenValue.Exists() { + log.Printf("access_token not exits in body: %s", string(body)) + return errors.New("access_token not exist") + } + + t.httpAccessToken = accessTokenValue.String() + + return +} + +func (t *tuyaSession) GetMotoIDAndAuth() (motoID, auth, iceServers string, err error) { + path := fmt.Sprintf("/v1.0/users/%s/devices/%s/webrtc-configs", t.config.UID, t.config.DeviceID) + + body, err := t.httpRequest("GET", path, nil) + if err != nil { + log.Printf("GET webrtc-configs fail: %s, body: %s", err.Error(), string((body))) + + return + } + + motoIDValue := gjson.GetBytes(body, "result.moto_id") + if !motoIDValue.Exists() { + log.Printf("moto_id not exist in webrtc-configs, body: %s", string(body)) + + return "", "", "", errors.New("moto_id not exist") + } + + authValue := gjson.GetBytes(body, "result.auth") + if !authValue.Exists() { + log.Printf("auth not exist in webrtc-configs, body: %s", string(body)) + + return "", "", "", errors.New("auth not exist") + } + + iceServersValue := gjson.GetBytes(body, "result.p2p_config.ices") + if !iceServersValue.Exists() { + log.Printf("iceServers not exist in webrtc-configs, body: %s", string(body)) + + return "", "", "", errors.New("p2p_config.ices not exist") + } + + var tokens []Token + err = json.Unmarshal([]byte(iceServersValue.String()), &tokens) + if err != nil { + log.Printf("unmarshal to tokens fail: %s", err.Error()) + return "", "", "", err + } + + ices := make([]WebToken, 0) + for _, token := range tokens { + if strings.HasPrefix(token.Urls, "stun") { + ices = append(ices, WebToken{ + Urls: token.Urls, + }) + } else if strings.HasPrefix(token.Urls, "turn") { + ices = append(ices, WebToken{ + Urls: token.Urls, + Username: token.Username, + Credential: token.Credential, + }) + } + } + + iceServersBytes, err := json.Marshal(&ices) + if err != nil { + log.Printf("marshal token to web tokens fail: %s", err.Error()) + return "", "", "", err + } + + motoID = motoIDValue.String() + auth = authValue.String() + iceServers = string(iceServersBytes) + + return +} + +func (t *tuyaSession) GetHubConfig() (config *OpenIoTHubConfig, err error) { + request := &OpenIoTHubConfigRequest{ + UID: t.config.UID, + UniqueID: uuid.New().String(), + LinkType: "mqtt", + Topics: "ipc", + } + + payload, err := json.Marshal(request) + if err != nil { + log.Printf("marshal OpenIoTHubConfig Request fail: %s", err.Error()) + return nil, err + } + + body, err := t.httpRequest("POST", "/v2.0/open-iot-hub/access/config", bytes.NewReader(payload)) + + if err != nil { + log.Printf("get OpenIoTHub config from http fail: %s", err.Error()) + return + } + + if !gjson.GetBytes(body, "success").Bool() { + log.Printf("request OpenIoTHub Config fail, body: %s", string(body)) + return nil, errors.New("request hub config fail") + } + + config = &OpenIoTHubConfig{} + + err = json.Unmarshal([]byte(gjson.GetBytes(body, "result").String()), config) + if err != nil { + log.Printf("unmarshal OpenIoTHub config to object fail: %s, body: %s", err.Error(), string(body)) + return + } + + return +} diff --git a/internal/webrtc/tuya/mqtt.go b/internal/webrtc/tuya/mqtt.go new file mode 100644 index 000000000..d6b42159b --- /dev/null +++ b/internal/webrtc/tuya/mqtt.go @@ -0,0 +1,274 @@ +package tuya + +import ( + "encoding/json" + "log" + "strings" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type TuyaMqtt struct { + client mqtt.Client + motoID string + auth string + iceServers string + publishTopic string + subscribeTopic string + MQTTUID string + + mqttReady core.Waiter + + handleAnswerFrame func(answerFrame AnswerFrame) + handleCandidateFrame func(candidateFrame CandidateFrame) +} + +func (t *tuyaSession) StartMqtt() (err error) { + t.mqtt.motoID, t.mqtt.auth, t.mqtt.iceServers, err = t.GetMotoIDAndAuth() + if err != nil { + log.Printf("GetMotoIDAndAuth fail: %s", err.Error()) + t.mqtt.mqttReady.Done(err) + return + } + + hubConfig, err := t.GetHubConfig() + if err != nil { + log.Printf("GetHubConfig fail: %s", err.Error()) + t.mqtt.mqttReady.Done(err) + return + } + + t.mqtt.publishTopic = hubConfig.SinkTopic.IPC + t.mqtt.subscribeTopic = hubConfig.SourceSink.IPC + + t.mqtt.publishTopic = strings.Replace(t.mqtt.publishTopic, "moto_id", t.mqtt.motoID, 1) + t.mqtt.publishTopic = strings.Replace(t.mqtt.publishTopic, "{device_id}", t.config.DeviceID, 1) + + log.Printf("publish topic: %s", t.mqtt.publishTopic) + log.Printf("subscribe topic: %s", t.mqtt.subscribeTopic) + + parts := strings.Split(t.mqtt.subscribeTopic, "/") + t.mqtt.MQTTUID = parts[3] + + opts := mqtt.NewClientOptions().AddBroker(hubConfig.Url). + SetClientID(hubConfig.ClientID). + SetUsername(hubConfig.Username). + SetPassword(hubConfig.Password). + SetOnConnectHandler((func(c mqtt.Client) { + t.mqttOnConnect(c) + })). + SetConnectTimeout(10 * time.Second) + + t.mqtt.client = mqtt.NewClient(opts) + if token := t.mqtt.client.Connect(); token.Wait() && token.Error() != nil { + log.Printf("create mqtt client fail: %s", token.Error().Error()) + + err = token.Error() + t.mqtt.mqttReady.Done(err) + return + } + + return +} + +func (t *tuyaSession) mqttOnConnect(client mqtt.Client) { + options := client.OptionsReader() + + log.Printf("%s connect to mqtt success", options.ClientID()) + + if token := client.Subscribe(t.mqtt.subscribeTopic, 1, func(c mqtt.Client, m mqtt.Message) { + t.mqttConsume(m) + }); token.Wait() && token.Error() != nil { + log.Printf("subcribe fail: %s, topic: %s", token.Error().Error(), t.mqtt.subscribeTopic) + t.mqtt.mqttReady.Done(token.Error()) + return + } + + t.mqtt.mqttReady.Done(nil) + + log.Print("subscribe mqtt topic success") +} + +func (t *tuyaSession) StopMqtt() { + t.mqtt.client.Disconnect(1000) +} + +func (t *tuyaSession) sendOffer(sessionID string, sdp string) { + offerFrame := struct { + Mode string `json:"mode"` + Sdp string `json:"sdp"` + StreamType uint32 `json:"stream_type"` + Auth string `json:"auth"` + }{ + Mode: "webrtc", + Sdp: sdp, + StreamType: 1, + Auth: t.mqtt.auth, + } + + offerMqtt := &MqttMessage{ + Protocol: 302, + Pv: "2.2", + T: time.Now().Unix(), + Data: MqttFrame{ + Header: MqttFrameHeader{ + Type: "offer", + From: t.mqtt.MQTTUID, + To: t.config.DeviceID, + SubDevID: "", + SessionID: sessionID, + MotoID: t.mqtt.motoID, + }, + Message: offerFrame, + }, + } + + sendBytes, err := json.Marshal(offerMqtt) + if err != nil { + log.Printf("marshal offer mqtt to bytes fail: %s", err.Error()) + + return + } + + t.mqttPublish(sendBytes) +} + +func (t *tuyaSession) sendCandidate(sessionID string, candidate string) { + candidateFrame := struct { + Mode string `json:"mode"` + Candidate string `json:"candidate"` + }{ + Mode: "webrtc", + Candidate: candidate, + } + + candidateMqtt := &MqttMessage{ + Protocol: 302, + Pv: "2.2", + T: time.Now().Unix(), + Data: MqttFrame{ + Header: MqttFrameHeader{ + Type: "candidate", + From: t.mqtt.MQTTUID, + To: t.config.DeviceID, + SubDevID: "", + SessionID: sessionID, + MotoID: t.mqtt.motoID, + }, + Message: candidateFrame, + }, + } + + sendBytes, err := json.Marshal(candidateMqtt) + if err != nil { + log.Printf("marshal candidate mqtt to bytes fail: %s", err.Error()) + + return + } + + t.mqttPublish(sendBytes) +} + +// 发布mqtt消息 +func (t *tuyaSession) mqttPublish(payload []byte) { + token := t.mqtt.client.Publish(t.mqtt.publishTopic, 1, false, payload) + if token.Error() != nil { + log.Printf("mqtt publish fail: %s, topic: %s", token.Error().Error(), + t.mqtt.publishTopic) + } +} + +func (t *tuyaSession) mqttConsume(msg mqtt.Message) { + tmp := struct { + Protocol int `json:"protocol"` + Pv string `json:"pv"` + T int64 `json:"t"` + Data struct { + Header MqttFrameHeader `json:"header"` + Message json.RawMessage `json:"msg"` + } `json:"data"` + }{} + + if err := json.Unmarshal(msg.Payload(), &tmp); err != nil { + log.Printf("unmarshal received mqtt fail: %s, payload: %s", err.Error(), string(msg.Payload())) + + return + } + + rmqtt := &MqttMessage{ + Protocol: tmp.Protocol, + Pv: tmp.Pv, + T: tmp.T, + Data: MqttFrame{ + Header: tmp.Data.Header, + Message: tmp.Data.Message, + }, + } + + log.Printf("mqtt recv message, session: %s, type: %s, from: %s, to: %s", + rmqtt.Data.Header.SessionID, + rmqtt.Data.Header.Type, + rmqtt.Data.Header.From, + rmqtt.Data.Header.To) + + t.mqttDispatch(rmqtt) +} + +// 分发从mqtt服务器接受到的消息 +func (t *tuyaSession) mqttDispatch(msg *MqttMessage) { + + switch msg.Data.Header.Type { + case "answer": + t.mqttHandleAnswer(msg) + case "candidate": + t.mqttHandleCandidate(msg) + } +} + +func (t *tuyaSession) mqttHandleAnswer(msg *MqttMessage) { + frame, ok := msg.Data.Message.(json.RawMessage) + if !ok { + log.Printf("convert interface{} to []byte fail, session: %s", msg.Data.Header.SessionID) + + return + } + + answerFrame := AnswerFrame{} + + if err := json.Unmarshal(frame, &answerFrame); err != nil { + log.Printf("unmarshal mqtt answer frame fail: %s, session: %s, frame: %s", + err.Error(), + msg.Data.Header.SessionID, + string(msg.Data.Message.([]byte))) + + return + } + + t.mqtt.handleAnswerFrame(answerFrame) +} + +func (t *tuyaSession) mqttHandleCandidate(msg *MqttMessage) { + frame, ok := msg.Data.Message.(json.RawMessage) + if !ok { + log.Printf("convert interface{} to []byte fail, session: %s", msg.Data.Header.SessionID) + return + } + + candidateFrame := CandidateFrame{} + + if err := json.Unmarshal(frame, &candidateFrame); err != nil { + log.Printf("unmarshal mqtt candidate frame fail: %s, session: %s, frame: %s", + err.Error(), + msg.Data.Header.SessionID, + string(msg.Data.Message.([]byte))) + return + } + + // candidate from device start with "a=", end with "\r\n", which are not needed by Pion webRTC + candidateFrame.Candidate = strings.TrimPrefix(candidateFrame.Candidate, "a=") + candidateFrame.Candidate = strings.TrimSuffix(candidateFrame.Candidate, "\r\n") + + t.mqtt.handleCandidateFrame(candidateFrame) +} diff --git a/internal/webrtc/tuya/mqtt_types.go b/internal/webrtc/tuya/mqtt_types.go new file mode 100644 index 000000000..335911007 --- /dev/null +++ b/internal/webrtc/tuya/mqtt_types.go @@ -0,0 +1,33 @@ +package tuya + +type MqttFrameHeader struct { + Type string `json:"type"` + From string `json:"from"` + To string `json:"to"` + SubDevID string `json:"sub_dev_id"` + SessionID string `json:"sessionid"` + MotoID string `json:"moto_id"` + TransactionID string `json:"tid"` +} + +type MqttFrame struct { + Header MqttFrameHeader `json:"header"` + Message interface{} `json:"msg"` +} + +type MqttMessage struct { + Protocol int `json:"protocol"` + Pv string `json:"pv"` + T int64 `json:"t"` + Data MqttFrame `json:"data"` +} + +type AnswerFrame struct { + Mode string `json:"mode"` + Sdp string `json:"sdp"` +} + +type CandidateFrame struct { + Mode string `json:"mode"` + Candidate string `json:"candidate"` +} diff --git a/internal/webrtc/tuya/tuya.go b/internal/webrtc/tuya/tuya.go new file mode 100644 index 000000000..21efe1557 --- /dev/null +++ b/internal/webrtc/tuya/tuya.go @@ -0,0 +1,163 @@ +package tuya + +import ( + "errors" + "log" + "net/url" + "regexp" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/webrtc" + pion "github.com/pion/webrtc/v3" +) + +type TuyaConfig struct { + // Set by user + OpenAPIURL string + ClientID string + Secret string + UID string + DeviceID string + + // Set by code + MQTTUID string +} + +type tuyaSession struct { + config TuyaConfig + httpAccessToken string + sessionId string + mqtt TuyaMqtt + offerSent core.Waiter + connected core.Waiter +} + +func MakeTuyaSession(rawURL string, query url.Values) *tuyaSession { + tc := &tuyaSession{} + tc.sessionId = core.RandString(6, 62) + tc.config.OpenAPIURL = rawURL + tc.config.ClientID = query.Get("client_id") + tc.config.Secret = query.Get("client_secret") + tc.config.UID = query.Get("uid") + tc.config.DeviceID = query.Get("device_id") + return tc +} + +func TuyaClient(rawURL string, query url.Values) (core.Producer, error) { + tc := MakeTuyaSession(rawURL, query) + + // 1. Get Tuya Auth token + if err := tc.Authorize(); err != nil { + return nil, err + } + + // 2. Get iceServers + _, _, iceServers, err := tc.GetMotoIDAndAuth() + if err != nil { + return nil, err + } + + // 3. Create Peer Connection + + api, err := webrtc.NewAPIWithLogs() + if err != nil { + return nil, err + } + + conf := pion.Configuration{} + + conf.ICEServers, err = webrtc.UnmarshalICEServers([]byte(iceServers)) + if err != nil { + return nil, err + } + + pc, err := api.NewPeerConnection(conf) + + prod := webrtc.NewConn(pc) + prod.FormatName = "webrtc/tuya" + prod.Mode = core.ModeActiveProducer + prod.Protocol = "ws" + prod.URL = rawURL + + // 4. Open Mqtt connection to device + + if err := tc.StartMqtt(); err != nil { + return nil, err + } + + if err := tc.mqtt.mqttReady.Wait(); err != nil { + return nil, err + } + + tc.mqtt.handleAnswerFrame = func(answerFrame AnswerFrame) { + // 6. Get answer + + // HACK TO force ICERoleControlled - for some reason Tuya wants to control ICE + desc := pion.SessionDescription{ + Type: pion.SDPTypePranswer, + SDP: answerFrame.Sdp, + } + if err = pc.SetRemoteDescription(desc); err != nil { + return + } + prod.SetAnswer(answerFrame.Sdp) + if err != nil { + log.Printf("tuya: Failed to set answer %s", err.Error()) + } + } + tc.mqtt.handleCandidateFrame = func(candidateFrame CandidateFrame) { + // 7. Continue to receiving candidates + if candidateFrame.Candidate != "" { + prod.AddCandidate(candidateFrame.Candidate) + if err != nil { + log.Printf("tuya: Failed to add candidate %s", err.Error()) + } + } + } + + prod.Listen(func(msg any) { + switch msg := msg.(type) { + case *pion.ICECandidate: + _ = tc.offerSent.Wait() + tc.sendCandidate(tc.sessionId, "a="+msg.ToJSON().Candidate) + + case pion.PeerConnectionState: + switch msg { + case pion.PeerConnectionStateConnecting: + break + case pion.PeerConnectionStateConnected: + tc.connected.Done(nil) + default: + tc.connected.Done(errors.New("webrtc: " + msg.String())) + } + } + }) + + // Order is important here, if audio comes after video, tuya sends broken SDP + medias := []*core.Media{ + {Kind: core.KindAudio, Direction: core.DirectionSendRecv}, + {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, + } + + // 5. Create and send offer + offer, err := prod.CreateOffer(medias) + if err != nil { + return nil, err + } + + // shorter sdp, remove a=extmap... line, device ONLY allow 8KB json payload + re := regexp.MustCompile(`\r\na=extmap[^\r\n]*`) + offer = re.ReplaceAllString(offer, "") + + tc.sendOffer(tc.sessionId, offer) + tc.offerSent.Done(nil) + + // Final: Wait for connection + if err = tc.connected.Wait(); err != nil { + return nil, err + } + + tc.StopMqtt() + + return prod, nil +} diff --git a/internal/webrtc/tuya/types.go b/internal/webrtc/tuya/types.go new file mode 100644 index 000000000..22919f7f0 --- /dev/null +++ b/internal/webrtc/tuya/types.go @@ -0,0 +1,43 @@ +package tuya + +type OpenIoTHubConfig struct { + Url string `json:"url"` // mqtt连接地址(包括protocol、ip、port) + ClientID string `json:"client_id"` // mqtt连接client_id(用户账号及unique_id 生成的一个唯一不变的映射)一个clientId 即可以用于发布也可以订阅 + Username string `json:"username"` // mqtt连接用户名(用户账号生成的一个唯一不变的映射) + Password string `json:"password"` // mqtt连接密码 ,失效期内该字段不变 + + // 发布topic,控制设备可通过该topic完成 + SinkTopic struct { + IPC string `json:"ipc"` + } `json:"sink_topic"` + + // 订阅topic,设备事件、设备状态同步,可以订阅该topic + SourceSink struct { + IPC string `json:"ipc"` + } `json:"source_topic"` + + ExpireTime int `json:"expire_time"` // 当前配置有效时长,当前配置失效后所有的连接都将断开 +} + +// OpenIoTHubConfigRequest 向开放平台申请mqtt连接的http请求体 +type OpenIoTHubConfigRequest struct { + UID string `json:"uid"` // 涂鸦用户id + UniqueID string `json:"unique_id"` // 连接端按unique_id隔离,当同一用户需要在多端登录时,调用方需要保证unique_id不同 + LinkType string `json:"link_type"` // 连接类型,暂只支持mqtt + Topics string `json:"topics"` // 关注的mqtt topic,本Sample只关注ipc topic +} + +// Token ICE Token from OpenAPI +type Token struct { + Urls string `json:"urls"` + Username string `json:"username"` + Credential string `json:"credential"` + TTL int `json:"ttl"` +} + +// WebToken ICE Token to Chrome +type WebToken struct { + Urls string `json:"urls,omitempty"` + Username string `json:"username,omitempty"` + Credential string `json:"credential,omitempty"` +} diff --git a/pkg/webrtc/api.go b/pkg/webrtc/api.go index 0361e6b40..819e1456b 100644 --- a/pkg/webrtc/api.go +++ b/pkg/webrtc/api.go @@ -2,9 +2,11 @@ package webrtc import ( "net" + "os" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/interceptor" + "github.com/pion/logging" "github.com/pion/webrtc/v3" ) @@ -16,6 +18,10 @@ func NewAPI() (*webrtc.API, error) { return NewServerAPI("", "", nil) } +func NewAPIWithLogs() (*webrtc.API, error) { + return NewServerAPIWithLogs("", "", nil) +} + type Filters struct { Candidates []string `yaml:"candidates"` Interfaces []string `yaml:"interfaces"` @@ -114,6 +120,103 @@ func NewServerAPI(network, address string, filters *Filters) (*webrtc.API, error ), nil } +func NewServerAPIWithLogs(network, address string, filters *Filters) (*webrtc.API, error) { + // for debug logs add to env: `PION_LOG_DEBUG=all` + m := &webrtc.MediaEngine{} + //if err := m.RegisterDefaultCodecs(); err != nil { + // return nil, err + //} + if err := RegisterDefaultCodecs(m); err != nil { + return nil, err + } + + i := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { + return nil, err + } + + s := webrtc.SettingEngine{} + + factory := logging.DefaultLoggerFactory{} + factory.DefaultLogLevel = logging.LogLevelDebug + factory.ScopeLevels = make(map[string]logging.LogLevel) + factory.Writer = os.Stdout + + s.LoggerFactory = &factory + + // fix https://github.com/pion/webrtc/pull/2407 + s.SetDTLSInsecureSkipHelloVerify(true) + + if filters != nil && filters.Interfaces != nil { + s.SetIncludeLoopbackCandidate(true) + s.SetInterfaceFilter(func(name string) bool { + return core.Contains(filters.Interfaces, name) + }) + } else { + // disable listen on Hassio docker interfaces + s.SetInterfaceFilter(func(name string) bool { + return name != "hassio" && name != "docker0" + }) + } + + if filters != nil && filters.IPs != nil { + s.SetIncludeLoopbackCandidate(true) + s.SetIPFilter(func(ip net.IP) bool { + return core.Contains(filters.IPs, ip.String()) + }) + } + + if filters != nil && filters.Networks != nil { + var networkTypes []webrtc.NetworkType + for _, s := range filters.Networks { + if networkType, err := webrtc.NewNetworkType(s); err == nil { + networkTypes = append(networkTypes, networkType) + } + } + s.SetNetworkTypes(networkTypes) + } else { + s.SetNetworkTypes([]webrtc.NetworkType{ + webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6, + webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6, + }) + } + + if filters != nil && len(filters.UDPPorts) == 2 { + _ = s.SetEphemeralUDPPortRange(filters.UDPPorts[0], filters.UDPPorts[1]) + } + + //if len(hosts) != 0 { + // // support only: host, srflx + // if candidateType, err := webrtc.NewICECandidateType(hosts[0]); err == nil { + // s.SetNAT1To1IPs(hosts[1:], candidateType) + // } else { + // s.SetNAT1To1IPs(hosts, 0) // 0 = host + // } + //} + + if address != "" { + if network == "" || network == "tcp" { + if ln, err := net.Listen("tcp", address); err == nil { + tcpMux := webrtc.NewICETCPMux(nil, ln, 8) + s.SetICETCPMux(tcpMux) + } + } + + if network == "" || network == "udp" { + if ln, err := net.ListenPacket("udp", address); err == nil { + udpMux := webrtc.NewICEUDPMux(nil, ln) + s.SetICEUDPMux(udpMux) + } + } + } + + return webrtc.NewAPI( + webrtc.WithMediaEngine(m), + webrtc.WithInterceptorRegistry(i), + webrtc.WithSettingEngine(s), + ), nil +} + func RegisterDefaultCodecs(m *webrtc.MediaEngine) error { for _, codec := range []webrtc.RTPCodecParameters{ {