Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communication with webrtc native using data channel is slow #2288

Closed
FH0 opened this issue Jul 28, 2022 · 2 comments
Closed

Communication with webrtc native using data channel is slow #2288

FH0 opened this issue Jul 28, 2022 · 2 comments

Comments

@FH0
Copy link

FH0 commented Jul 28, 2022

We use webrtc native on ios and android platforms. Below are the issue we found. For this, we had to use webrtc native in our go project.

pion/webrtc/v3 v3.1.43
webrtc native latest (I don't know how to get the version)
WSL2(Utuntu-20.04)

speed unstable

iperf3 client -> pion/webrtc (go) -> webrtc native (c++) -> iperf3 server

image

go code
package main

import (
	"encoding/base64"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"io"
	"net"

	"github.com/pion/webrtc/v3"
)

/*
iperf3 -s
go run ./build/ -listen=:5555
iperf3 -c 127.0.0.1 -p 5555 -b 500M
*/
func main() {
	listenAddress := flag.String("listen", ":0", "address to listen on")
	flag.Parse()

	// 创建 peerConnection
	config := webrtc.Configuration{}
	var err error
	peerConnection, err := webrtc.NewPeerConnection(config)
	if err != nil {
		panic(err)
	}

	// 注册 peerConnection 回调
	peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) {
		fmt.Println("peerConnection.OnICEConnectionStateChange: ", is.String())
	})
	peerConnection.OnNegotiationNeeded(func() {
		fmt.Println("peerConnection.OnNegotiationNeeded")

		offer, err := peerConnection.CreateOffer(nil)
		if err != nil {
			panic(err)
		}
		err = peerConnection.SetLocalDescription(offer)
		if err != nil {
			panic(err)
		}
	})
	peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
		// 跳过,直到最后一次
		if i != nil {
			return
		}

		// 发送 offer
		offer := peerConnection.LocalDescription()
		offerBytes, err := json.Marshal(offer)
		if err != nil {
			panic(err)
		}
		offerBase64 := base64.StdEncoding.EncodeToString(offerBytes)
		fmt.Println(offerBase64)

		// 接收 answer
		fmt.Println("please enter answer")
		var answerBase64 string
		fmt.Scanln(&answerBase64)
		answerBytes, err := base64.StdEncoding.DecodeString(answerBase64)
		if err != nil {
			panic(err)
		}
		var answer webrtc.SessionDescription
		err = json.Unmarshal(answerBytes, &answer)
		if err != nil {
			panic(err)
		}
		err = peerConnection.SetRemoteDescription(answer)
		if err != nil {
			panic(err)
		}
	})

	// 监听端口
	listener, err := net.Listen("tcp", *listenAddress)
	if err != nil {
		panic(err)
	}

	for {
		conn, err := listener.Accept()
		if err != nil {
			panic(err)
		}

		// 创建 dataChannel
		dataChan, err := peerConnection.CreateDataChannel(conn.RemoteAddr().String(), nil)
		if err != nil {
			panic(err)
		}
		dataChan.OnClose(func() {
			fmt.Println("dataChan.OnClose")
		})
		dataChan.OnOpen(func() {
			fmt.Printf("Data channel '%s'-'%d' open.\n", dataChan.Label(), dataChan.ID())

			// 转发数据
			buf := make([]byte, 8*1024)
			for {
				nread, err := conn.Read(buf)
				if nread > 0 {
					err = dataChan.Send(buf[:nread])
					if err != nil {
						panic(err)
					}
				}
				if err != nil {
					if errors.Is(err, io.EOF) {
						return
					}
					panic(err)
				}
			}
		})
		dataChan.OnMessage(func(msg webrtc.DataChannelMessage) {
			// 转发数据
			_, err = conn.Write(msg.Data)
			if err != nil {
				panic(err)
			}
		})
	}
}
c++ code
#include <iostream>

#include <api/create_peerconnection_factory.h>
#include <boost/asio.hpp>
#include <boost/beast/core/detail/base64.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>

using namespace std;
using namespace webrtc;
using namespace rtc;
using namespace boost::beast::detail;
using namespace boost::property_tree;
using namespace boost::asio;
using namespace boost::asio::ip;

class DummySetSessionDescriptionObserver
    : public SetSessionDescriptionObserver {
  public:
    static DummySetSessionDescriptionObserver *Create() {
        return new RefCountedObject<DummySetSessionDescriptionObserver>();
    }
    virtual void OnSuccess() {}
    virtual void OnFailure(RTCError error) { assert(false); }
};

class DataChannelObserver : public webrtc::DataChannelObserver {
  public:
    DataChannelObserver(scoped_refptr<DataChannelInterface> dataChannel,
                        io_context &ioc)
        : dataChannel(dataChannel), ioc(ioc) {}

  protected:
    void OnStateChange() {
        cout << "OnStateChange: " << dataChannel->state() << endl;

        if (dataChannel->state() == DataChannelInterface::kOpen) {
            tcp::socket tcpSocket(ioc);
            tcpSocket.async_connect(
                tcp::endpoint(ip::make_address("127.0.0.1"), 5201),
                [this](boost::system::error_code ec) {
                    if (ec) {
                        cout << ec.message() << endl;
                        return;
                    }
                    doRead();
                });
            this->tcpSocket =
                shared_ptr<tcp::socket>(new tcp::socket(move(tcpSocket)));
        } else if (dataChannel->state() == DataChannelInterface::kClosed) {
            delete (this);
        }
    }

    void OnMessage(const DataBuffer &buffer) {
        async_write(*tcpSocket,
                           boost::asio::buffer(buffer.data.data(), buffer.size()),
                           [this](boost::system::error_code ec, size_t bytesTransferred) {
                               if (ec) {
                                   cout << ec.message() << endl;
                                   return;
                               }
                           });
    }

  private:
    scoped_refptr<DataChannelInterface> dataChannel;
    io_context &ioc;
    shared_ptr<tcp::socket> tcpSocket;
    vector<char> tcpSocketReadBuf = vector<char>(8 * 1024);

    void doRead() {
        tcpSocket->async_read_some(
            buffer(tcpSocketReadBuf.data(),
                   tcpSocketReadBuf.size()),
            [this](boost::system::error_code ec, size_t nread) {
                if (ec) {
                    cout << ec.message() << endl;
                    return;
                }
                dataChannel->Send(DataBuffer(CopyOnWriteBuffer(tcpSocketReadBuf.data(), nread), true));
                doRead();
            });
    }
};

class Client : public PeerConnectionObserver,
               public CreateSessionDescriptionObserver {
  public:
    scoped_refptr<PeerConnectionInterface> peerConnection;
    io_context &ioc;
    unique_ptr<Thread> signalingThread;

    Client(io_context &ioc) : ioc(ioc) {
        // 创建 peerConnection
        signalingThread = Thread::Create();
        signalingThread->Start();
        PeerConnectionFactoryDependencies dependencies;
        dependencies.signaling_thread = signalingThread.get();
        auto peerConnectionFactory =
            CreateModularPeerConnectionFactory(move(dependencies));
        PeerConnectionInterface::RTCConfiguration configuration;
        PeerConnectionDependencies connectionDependencies(this);
        auto peerConnectionOrError =
            peerConnectionFactory->CreatePeerConnectionOrError(
                configuration, move(connectionDependencies));
        if (!peerConnectionOrError.ok()) {
            cout << "!peerConnectionOrError.ok()" << endl;
            exit(1);
        }
        peerConnection = peerConnectionOrError.MoveValue();
    }

    void start() {
        // 读取 offer
        cout << "please enter offer" << endl;
        string offerBase64;
        getline(cin, offerBase64);
        string offerStr(base64::decoded_size(offerBase64.size()), ' ');
        auto decodePair = base64::decode((void *)offerStr.data(),
                                         offerBase64.c_str(), offerBase64.size());
        offerStr.resize(decodePair.first);
        stringstream offerStream;
        offerStream << offerStr;
        ptree pt;
        read_json(offerStream, pt);
        auto sdpStr = pt.get<string>("sdp");
        auto sdp = CreateSessionDescription(SdpType::kOffer, sdpStr);
        peerConnection->SetRemoteDescription(
            DummySetSessionDescriptionObserver::Create(), sdp.release());
        PeerConnectionInterface::RTCOfferAnswerOptions options;
        peerConnection->CreateAnswer(this, options);
    }

  protected:
    /*
    PeerConnectionObserver
    */

    void OnSignalingChange(PeerConnectionInterface::SignalingState new_state) {
        cout << "OnSignalingChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void OnDataChannel(scoped_refptr<DataChannelInterface> data_channel) {
        cout << "OnDataChannel: " << data_channel->label() << endl;
        auto dataChannelObserver = new ::DataChannelObserver(data_channel, ioc);
        data_channel->RegisterObserver(dataChannelObserver);
    }

    void OnNegotiationNeededEvent(uint32_t event_id) {
        cout << "OnNegotiationNeededEvent: " << event_id << endl;
    }

    void
    OnIceConnectionChange(PeerConnectionInterface::IceConnectionState new_state) {
        cout << "OnIceConnectionChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void
    OnConnectionChange(PeerConnectionInterface::PeerConnectionState new_state) {
        cout << "OnConnectionChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void
    OnIceGatheringChange(PeerConnectionInterface::IceGatheringState new_state) {
        cout << "OnIceGatheringChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void OnIceCandidate(const IceCandidateInterface *candidate) {
        cout << "OnIceCandidate" << endl;
    }

    /*
    CreateSessionDescriptionObserver
    */

    void OnSuccess(SessionDescriptionInterface *desc) {
        cout << "OnSuccess" << endl;

        if (desc->GetType() == SdpType::kAnswer) {
            peerConnection->SetLocalDescription(
                DummySetSessionDescriptionObserver::Create(), desc);
            desc = const_cast<SessionDescriptionInterface *>(
                peerConnection->local_description());
            string sdpStr;
            desc->ToString(&sdpStr);
            stringstream answerStream;
            ptree pt;
            pt.add("type", "answer");
            pt.add("sdp", sdpStr);
            write_json(answerStream, pt, false);
            string answerBase64(base64::encoded_size(answerStream.str().size()), ' ');
            auto nwrite = base64::encode(answerBase64.data(),
                                         (void *)answerStream.str().c_str(),
                                         answerStream.str().size());
            answerBase64.resize(nwrite);
            cout << answerBase64 << endl;
        }
    }

    void OnFailure(RTCError error) {
        cout << "OnFailure: " << error.message() << endl;
    }
};

/*
clear
g++ -o server -g -ggdb \
    server.cpp \
    -I/usr/local/include/webrtc \
    -I/usr/local/include/webrtc/third_party/abseil-cpp \
    -std=c++17 \
    -DWEBRTC_POSIX \
    /home/test/webrtc-checkout/src/out/Release-gcc/obj/libwebrtc.a \
    -ldl -lpthread -lX11
g++ -o server -O2 -s \
    server.cpp \
    -I/usr/local/include/webrtc \
    -I/usr/local/include/webrtc/third_party/abseil-cpp \
    -std=c++17 \
    -DWEBRTC_POSIX \
    /home/test/webrtc-checkout/src/out/Release-gcc/obj/libwebrtc.a \
    -ldl -lpthread -lX11
./server

*/
int main(int argc, char const *argv[]) {
    io_context ioc(4);
    scoped_refptr<Client> client{new RefCountedObject<Client>(ioc)};
    client->start();

    boost::asio::executor_work_guard<decltype(ioc.get_executor())> work{ioc.get_executor()};
    ioc.run();

    return 0;
}

speed fast

iperf3 client -> pion/webrtc (go) -> pion/webrtc (go) -> iperf3 server

I tested it but the code is missing.

speed fast

iperf3 client -> webrtc native (c++) -> webrtc native (c++) -> iperf3 server

image

c++ code
#include <iostream>

#include <api/create_peerconnection_factory.h>
#include <boost/asio.hpp>
#include <boost/beast/core/detail/base64.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>

using namespace std;
using namespace webrtc;
using namespace rtc;
using namespace boost::beast::detail;
using namespace boost::property_tree;
using namespace boost::asio;
using namespace boost::asio::ip;

class DummySetSessionDescriptionObserver
    : public SetSessionDescriptionObserver {
  public:
    static DummySetSessionDescriptionObserver *Create() {
        return new RefCountedObject<DummySetSessionDescriptionObserver>();
    }
    virtual void OnSuccess() {}
    virtual void OnFailure(RTCError error) { assert(false); }
};

class DataChannelObserver : public webrtc::DataChannelObserver {
  public:
    string label;
    DataChannelObserver(scoped_refptr<DataChannelInterface> dataChannel,
                        io_context &ioc,
                        shared_ptr<tcp::socket> tcpSocket,
                        string label)
        : dataChannel(dataChannel), ioc(ioc), tcpSocket(tcpSocket), label(label) {}

  protected:
    void OnStateChange() {
        cout << "OnStateChange: " << label << ": " << DataChannelInterface::DataStateString(dataChannel->state()) << endl;

        if (dataChannel->state() == DataChannelInterface::kOpen) {
            if (tcpSocket == nullptr) {
                tcp::socket tcpSocket(ioc);
                tcpSocket.async_connect(
                    tcp::endpoint(ip::make_address("127.0.0.1"), 5201),
                    [this](boost::system::error_code ec) {
                        if (ec) {
                            cout << ec.message() << endl;
                            return;
                        }
                        doRead();
                    });
                this->tcpSocket =
                    shared_ptr<tcp::socket>(new tcp::socket(move(tcpSocket)));
            } else {
                doRead();
            }
        } else if (dataChannel->state() == DataChannelInterface::kClosed) {
            delete (this);
        }
    }

    void OnMessage(const DataBuffer &buffer) {
        async_write(*tcpSocket,
                    boost::asio::buffer(buffer.data.data(), buffer.size()),
                    [this](boost::system::error_code ec, size_t bytesTransferred) {
                        if (ec) {
                            cout << ec.message() << endl;
                            return;
                        }
                    });
    }

  private:
    scoped_refptr<DataChannelInterface> dataChannel;
    io_context &ioc;
    shared_ptr<tcp::socket> tcpSocket;
    vector<char> tcpSocketReadBuf = vector<char>(8 * 1024);

    void doRead() {
        tcpSocket->async_read_some(
            buffer(tcpSocketReadBuf.data(),
                   tcpSocketReadBuf.size()),
            [this](boost::system::error_code ec, size_t nread) {
                if (ec) {
                    cout << ec.message() << endl;
                    return;
                }
                dataChannel->Send(DataBuffer(CopyOnWriteBuffer(tcpSocketReadBuf.data(), nread), true));
                doRead();
            });
    }
};

class Client : public PeerConnectionObserver,
               public CreateSessionDescriptionObserver {
  public:
    scoped_refptr<PeerConnectionInterface> peerConnection;
    io_context &ioc;
    unique_ptr<Thread> signalingThread;
    Client *otherClient = nullptr;
    tcp::acceptor *acceptor;

    Client(io_context &ioc) : ioc(ioc) {
        // 创建 peerConnection
        signalingThread = Thread::Create();
        signalingThread->Start();
        PeerConnectionFactoryDependencies dependencies;
        dependencies.signaling_thread = signalingThread.get();
        auto peerConnectionFactory =
            CreateModularPeerConnectionFactory(move(dependencies));
        PeerConnectionInterface::RTCConfiguration configuration;
        PeerConnectionDependencies connectionDependencies(this);
        auto peerConnectionOrError =
            peerConnectionFactory->CreatePeerConnectionOrError(
                configuration, move(connectionDependencies));
        if (!peerConnectionOrError.ok()) {
            cout << "!peerConnectionOrError.ok()" << endl;
            exit(1);
        }
        peerConnection = peerConnectionOrError.MoveValue();
    }

    void doListen() {
        acceptor = new tcp::acceptor(ioc, tcp::endpoint(tcp::v4(), 5555));
        doAccept();
    }

    void doAccept() {
        acceptor->async_accept([this](boost::system::error_code ec, tcp::socket socket) {
            cout << "acceptor->async_accept" << endl;

            if (ec) {
                cout << ec.message() << endl;
                return;
            }

            signalingThread->Invoke<void>(RTC_FROM_HERE, [this, &socket]() {
                stringstream srcAddr;
                srcAddr << socket.remote_endpoint();
                auto dataChannelOrError = peerConnection->CreateDataChannelOrError(srcAddr.str(), nullptr);
                if (!dataChannelOrError.ok()) {
                    cout << "!dataChannelOrError.ok()" << endl;
                    exit(1);
                }
                auto dataChannel = dataChannelOrError.MoveValue();
                auto dataChannelObserver = new ::DataChannelObserver(dataChannel, ioc, shared_ptr<tcp::socket>(new tcp::socket(move(socket))), "doAccept");
                dataChannel->RegisterObserver(dataChannelObserver);
            });

            doAccept();
        });
    }

    void SetOtherClient(Client *client) {
        otherClient = client;
    }

  protected:
    /*
    PeerConnectionObserver
    */

    void OnSignalingChange(PeerConnectionInterface::SignalingState new_state) {
        cout << "OnSignalingChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void OnDataChannel(scoped_refptr<DataChannelInterface> data_channel) {
        cout << "OnDataChannel: " << data_channel->label() << endl;
        auto dataChannelObserver = new ::DataChannelObserver(data_channel, ioc, nullptr, "OnDataChannel");
        data_channel->RegisterObserver(dataChannelObserver);
    }

    void OnNegotiationNeededEvent(uint32_t event_id) {
        cout << "OnNegotiationNeededEvent: " << event_id << endl;

        PeerConnectionInterface::RTCOfferAnswerOptions options;
        peerConnection->CreateOffer(this, options);
    }

    void
    OnIceConnectionChange(PeerConnectionInterface::IceConnectionState new_state) {
        cout << "OnIceConnectionChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void
    OnConnectionChange(PeerConnectionInterface::PeerConnectionState new_state) {
        cout << "OnConnectionChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void
    OnIceGatheringChange(PeerConnectionInterface::IceGatheringState new_state) {
        cout << "OnIceGatheringChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void OnIceCandidate(const IceCandidateInterface *candidate) {
        cout << "OnIceCandidate" << endl;
    }

    /*
    CreateSessionDescriptionObserver
    */

    void OnSuccess(SessionDescriptionInterface *desc) {
        cout << "OnSuccess" << endl;

        peerConnection->SetLocalDescription(DummySetSessionDescriptionObserver::Create(), desc);
        auto localSDP = const_cast<SessionDescriptionInterface *>(peerConnection->local_description());
        otherClient->peerConnection->SetRemoteDescription(DummySetSessionDescriptionObserver::Create(), localSDP);

        if (desc->GetType() == SdpType::kOffer) {
            PeerConnectionInterface::RTCOfferAnswerOptions options;
            otherClient->peerConnection->CreateAnswer(otherClient, options);
        }
    }

    void OnFailure(RTCError error) {
        cout << "OnFailure: " << error.message() << endl;
    }
};

/*
clear
g++ -o double_side -g -ggdb \
    double_side.cpp \
    -I/usr/local/include/webrtc \
    -I/usr/local/include/webrtc/third_party/abseil-cpp \
    -std=c++17 \
    -DWEBRTC_POSIX \
    /home/test/webrtc-checkout/src/out/Debug-gcc/obj/libwebrtc.a \
    -ldl -lpthread -lX11
g++ -o double_side -O2 -s \
    double_side.cpp \
    -I/usr/local/include/webrtc \
    -I/usr/local/include/webrtc/third_party/abseil-cpp \
    -std=c++17 \
    -DWEBRTC_POSIX \
    /home/test/webrtc-checkout/src/out/Release-gcc/obj/libwebrtc.a \
    -ldl -lpthread -lX11
./double_side

*/
int main(int argc, char const *argv[]) {
    io_context ioc(1);
    scoped_refptr<Client> client1{new RefCountedObject<Client>(ioc)};
    scoped_refptr<Client> client2{new RefCountedObject<Client>(ioc)};
    client1->SetOtherClient(client2.get());
    client2->SetOtherClient(client1.get());
    client2->doListen();

    boost::asio::executor_work_guard<decltype(ioc.get_executor())> work{ioc.get_executor()};
    ioc.run();

    return 0;
}
@FH0
Copy link
Author

FH0 commented Sep 3, 2022

I did further tests. You can find the code used here.

Pure pion/webrtc took less time while transfering the same amount of data. And fewer packets are used (11808 vs 29838).

image

image

image

@Sean-Der
Copy link
Member

Sean-Der commented May 9, 2024

Hi @FH0

Sorry I didn't respond to this sooner. Improving the performance of Pion's SCTP implementation has been something we have focused on.

See this thread for work/exploration that has been done on it.

@Sean-Der Sean-Der closed this as not planned Won't fix, can't repro, duplicate, stale May 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants