写点什么

SRS 5.0:如何实现 SRT 协程化

  • 2022-07-04
  • 本文字数:6071 字

    阅读完需:约 20 分钟

SRS 5.0:如何实现 SRT 协程化

协程是现代服务器的核心技术,能极大简化逻辑和提升维护性;SRT 是逐渐在取代 RTMP 推流的新协议,但它有自己的 IO 框架;只有实现了 SRT 的协程化,才能成为 SRS 的核心的成熟的协议,这是 SRS 5.0 迈出的第一步,也是至关重要的一步。


SRS 5.0 从 2022 年初启动以来,经过摸索和探讨,确定了以媒体网关作为核心方向,详细请看SRS 5.0核心问题定义和解法


SRT 作为主播/广播推流领域广泛采用的协议,而 Web 浏览器却不支持播放 SRT 流,这恰恰是媒体网关的核心价值,可以将 SRT 转成 RTMP/HLS/WebRTC 后,实现广播领域的 Web 超低延迟方案,还可以把 SRT 强大的跨国传输能力用起来。


而这些美好愿景的基础,就是这次要介绍的:改造 SRT 支持协程化(Coroutine Native SRT)。这是 SRS 5.0 至关重要的一步,也是具备深远影响的一步,详细代码请参考PR#3010


我们先了解下详细的背景介绍。

Introduction


在直播推流领域,RTMP 是事实上的工业标准,广泛使用,也是直播源站之间兼容性最好的协议。


随着场景的丰富和直播的发展, 几个比较严重的问题逐渐暴露出来:


  1. TCP 推流在长距离传输下,受丢包以及 RTT 抖动影响非常大,效果很差。

  2. RTMP 协议不支持多音轨,以及 H265、AV1 等一系列新的编解码。

  3. Adobe 已经放弃 RTMP 协议,很多年没有更新了,未来也不会更新。


为了解决这些问题,2018 年左右,广播电视领域开始广泛应用 SRT 协议推流,越来越多的推流设备和平台都支持了 SRT 协议。


SRS 在 2019 年底,SRS 4.0 支持了 SRT 推流,目前存在以下的问题:


  1. SRT 在 SRS 上使用多线程+异步实现,某些异常导致的程序 Crash,难以排查。

  2. SRT 在 SRS 实现是异步方式,代码复杂,维护难度高。

  3. HTTP 回调,SRT 播放不生效;SRT 推流依赖转 RTMP 后,RTMP 触发的回调。

  4. SRT 无法直接转 WebRTC,而是先转 RTMP 再转 WebRTC,导致延迟高。


这些问题的核心原因,是由于 SRT 使用了独立的异步 IO 和多线程,无法和 SRS 已有的 ST 协程结合起来。


要彻底解决这个问题,必须将 SRT 协程化,和 SRS 使用同一套 ST 协程框架。SRS 5.0 已经完成,详细代码请参考PR#3010,这是非常重要的一个功能。


在介绍 SRT 协程化之前,先介绍下什么是协程化,我们看下 ST 的 TCP 协程化(Coroutine Native),这是最好的例子。

Coroutine Native TCP

首先,非协程化的代码,也就是 epoll 驱动的异步代码,大概逻辑是这样:


int fd = accept(listen_fd); // Got a TCP connection.
int n = read(fd, buf, sizeof(buf));if (n == -1) { if (errno == EAGAIN) { // Not ready return epoll_ctl(fd, EPOLLIN); // Wait for fd to be ready. } return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


Note: 为了方便表达关键逻辑,我们使用示意代码,具体代码可以参考 epoll 的示例代码。


一般 read 是业务逻辑,读出来的数据也是业务数据,但是这里却需要在 EAGAIN 时调用 epoll 这个底层框架处理 fd。这就是把底层逻辑和业务逻辑混合在一起,导致难以维护。


Note: 尽管 NGINX 包装了一层框架,但是本质上并不能解决这个异步回调问题,当 fd 没有准备好必须返回当前函数,所以导致很多状态需要保存和恢复,在复杂的逻辑中状态机也变得非常复杂。


下面我们看协程化的逻辑会是怎样的,同样以上面代码为例:


st_netfd_t fd = st_accept(listen_fd); // Got a TCP connection
int n = st_read(fd, buf, sizeof(buf));if (n == -1) { return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


简单的看,就是没有 EAGAIN,看起来要么读取到了数据,要么就是出现了错误,不会出现 fd 没有准备好的情况。这就给整个业务逻辑带来了非常好的体验,因为不需要保存状态了,不会反复的尝试读取。


同样用 epoll,为何st_read就没有 EAGAIN 呢?这就是协程化,不是没有,只是在底下处理了这个事件。我们看st_readv这个函数:


ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte) {    while ((n = read(fd->osfd, buf, nbyte)) < 0) {        if (errno == EINTR)            continue;        if (!_IO_NOT_READY_ERROR) // Error, if not EAGAIN.            return -1;
/* Wait until the socket becomes readable */ if (st_netfd_poll(fd, POLLIN) < 0) // EAGAIN return -1; } return n;}
复制代码


很明显在EAGAIN时,会调用st_netfd_poll。在st_netfd_poll函数里,会将当前协程切换出让,调度线程执行下一个协程。并且在未来某个时刻,会因为 IO 事件到达或者超时错误,而将当前协程恢复。


Note: 由于协程切换和恢复,都是在这个函数中实现的,对于上层调用的代码,看起来没有发生什么,所以就不仅没有 EAGAIN 这个错误消息,也不会返回上一层函数,当然也不需要保存状态和恢复状态。


我们可以总结下,如何协程化任何协议的思路:


  1. 直接对 API 进行一次调用,如果成功,那么直接返回。

  2. 如果 API 返回失败,检查错误,非 IO 等待的错误直接返回。

  3. 将当前协程出让,调度器运行其他协程,直到该 FD 上的事件返回或者超时;如果超时,则返回错误;如果事件到达,则重复上面的步骤。


我们可以按照这个思路将 SRT 进行协程化(Coroutine Native)。

Coroutine Native SRT

我们以srt_recvmsg函数的协程化为例,这个函数类似 TCP 的read函数,定义如下:


SRT_API int srt_recvmsg (SRTSOCKET u, char* buf, int len);
复制代码


我们同样,提供一个SrsSrtSocket::recvmsg的函数,类似st_read函数,实现如下:


srs_error_t SrsSrtSocket::recvmsg(void* buf, size_t size, ssize_t* nread) {  while (true) {    int ret = srt_recvmsg(srt_fd_, (char*)buf, size);    if (ret >= 0) { // Receive message ok.      recv_bytes_ += ret;       *nread = ret;      return err;    }        // Got something error, return immediately.    if (srt_getlasterror(NULL) != SRT_EASYNCRCV) {      return srs_error_new(ERROR_SRT_IO, "srt_recvmsg");    }        // Wait for the fd ready or error, switch to other coroutines.    if ((err = wait_readable()) != srs_success) { // EAGAIN.      return srs_error_wrap(err, "wait readable");    }  }    return err;}
复制代码


可以看到和st_read非常类似,在wait_readable中也会实现协程的切换和恢复,只是我们使用st_cond_t条件变量来实现:


srs_error_t SrsSrtSocket::wait_readable() {  srt_poller_->mod_socket(this, SRT_EPOLL_IN);  srs_cond_timedwait(read_cond_);}
复制代码


Note: 这里先修改了 epoll 侦听的事件SRT_EPOLL_IN,等待 fd 可读后,再等待条件变量触发。


而触发这个条件变量的函数,是在SrsSrtPoller::wait,实现如下:


srs_error_t SrsSrtPoller::wait(int timeout_ms, int* pn_fds) {  int ret = srt_epoll_uwait(srt_epoller_fd_, events_.data(), events_.size());  for (int i = 0; i < ret; ++i) {    if (event.events & SRT_EPOLL_IN) {      srt_skt->notify_readable();    }  }}
void SrsSrtSocket::notify_readable() { srs_cond_signal(read_cond_);}
复制代码


这样就完全做到了将 SRT API 协程化,其他的 API 比如 srt_sendmsg, srt_connnect, srt_accept 也是类似的操作。


下面我们对比下,协程化(Coroutine Native)之后,和原始的回调(Callback)的区别。

Coroutine Native PK Callback

将 SRT 协程化以后, 业务逻辑和底层代码分离,上层的代码逻辑清晰明了。


先看看 accept 这个逻辑,之前也是由 epoll 触发的事件处理,创建srt_conn这个数据结构:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_connection(status, read_fds[index], "read fd");  }}
void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd) { if (status == SRTS_LISTENING) { conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen); _handle_ptr->add_newconn(conn_fd, SRT_EPOLL_IN); }}
void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { _push_conn_map.insert(std::make_pair(conn_ptr->get_path(), conn_ptr)); _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);}
复制代码


Note: 创建的srt_conn本身就是保存在全局数据结构之中,在后续的回调事件中持续修改和变更这个数据结构。


我们对比下协程化之后的业务逻辑,收到会话之后启动处理协程:


srs_error_t SrsSrtListener::cycle() {  while (true) {    srs_srt_t client_srt_fd = srs_srt_socket_invalid();    srt_skt_->accept(&client_srt_fd);        srt_server_->accept_srt_client(srt_fd);  }}
srs_error_t SrsSrtServer::accept_srt_client(srs_srt_t srt_fd) { fd_to_resource(srt_fd, &srt_conn); conn_manager_->add(srt_conn); srt_conn->start();}
复制代码


Note: 虽然有全局变量维护srt_conn,但这里不会关注到 epoll 的处理,而是由协程主导的执行逻辑,而不是由回调主导的逻辑。


回调主导的逻辑,维护和了解代码时,必须要从 epoll 回调事件开始看,而且不同事件都在修改srt_conn这个对象的状态,要了解对象生命周期是很有难度的。而协程主导的逻辑,它的生命周期是在协程中,收到srt_conn就启动协程处理它,后续的读写也在协程中。


我们继续看srt_conn的读处理逻辑,之前直接使用原生 SRT 的 read 函数,同样是由 epoll 的事件触发回调:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_data(status, read_fds[index], "read fd");  }}
void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { auto conn_ptr = get_srt_conn(conn_fd); int mode = conn_ptr->get_mode(); if (mode == PUSH_SRT_MODE && status == SRTS_CONNECTED) { handle_push_data(status, path, subpath, conn_fd); }}
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { srt_conn_ptr = get_srt_conn(conn_fd); if (status != SRTS_CONNECTED) { // Error. close_push_conn(conn_fd); return; }
ret = srt_conn_ptr->read_async(data, DEF_DATA_SIZE); if (ret <= 0) { // Error. if (srt_getlasterror(NULL) != SRT_EASYNCRCV) { return; } close_push_conn(conn_fd); return; }
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);}
复制代码


Note: 在回调中我们需要处理各种状态,而这个srt_conn的状态变化,是由各种回调决定的,很难一次了解到这个会话的主要处理逻辑。


我们看看 SRT 协程化之后,这个业务逻辑是怎样写的:


srs_error_t SrsMpegtsSrtConn::do_publishing() {  while (true) {    ssize_t nb = 0;    if ((err = srt_conn_->read(buf, sizeof(buf), &nb)) != srs_success) {      return srs_error_wrap(err, "srt: recvmsg");    }        if ((err = on_srt_packet(buf, nb)) != srs_success) {      return srs_error_wrap(err, "srt: process packet");    }  }}
复制代码


Note: 这里srt_conn的生命周期非常明确,它的状态就是直接在这里返回错误,对于这个会话来说,这就是它的主循环,不会因为read而导致进入 SRT 的 epoll 大循环,我们在维护时也不用关注这个异步事件触发和处理。


再次强调一次,维护代码时,我们需要了解的信息量是非常不同的。在基于异步回调的逻辑中,我们在回调函数中,是需要关注目前对象有哪些状态,修改了哪些状态,其他异步事件又有哪些影响。而基于协程的逻辑中,没有这些状态,协程的创建和执行,就是线性的,或者说这些状态就是在协程的函数调用中。


Note: 为何异步回调的状态就不能在函数调用中呢?因为异步回调的堆栈中不能保存srt_conn的状态,它本质上就是一个协程,保存的是 epoll 的循环的状态。而协程是根据每个srt_conn所创建的,它的堆栈中保存的都是这个对应的srt_conn的状态。


这本质上,是由于异步回调的状态,只能保存在全局数据结构之中。而协程的状态,是可以保存在各个局部变量之中,每个函数的局部变量,都是这个协程所独有的,协程没有结束前都可以使用。

What is the Next

SRS 虽然完成了 SRT 协程化,并没有解决所有的问题。后续的计划包括:


  1. SRT 直接转 WebRTC,低延迟直播的另外一种方式。

  2. 某些服务器之间的长链路可以将 TCP 替换为 SRT 传输, 比如跨国的 RTMP 转发。

  3. SRT 工具链的完善,比如srs-bench,支持压测 SRT 流。


欢迎加入 SRS 开源社区,一起做好一个流媒体服务器,让全世界都来白嫖。

One More Thing

有些朋友也很好奇,真正商用的视频云的 SRT,和开源的 SRT 的服务器,有什么区别,都做过哪些优化。


Note: 由于开源服务器侧重标准协议和兼容性,有些优化并不适合在开源项目中实现,所以在云计算的商业化服务器和开源服务器,一定是存在很大差异的。就算是 Linux 系统,其实云计算的 Linux 内核,和开源的 Linux 内核,也有很大的差异。


腾讯云在 SRT 的实战中积累很多经验,请参考之前分享的文章:



其中,最为严重的是 SRT 重传率过高、限带宽下表现不如 TCP/QUIC 等,腾讯云针对这些问题,做了几个优化:


  1. SRT 重传乱序度自适应:当接收到乱序报文,首次发起重传时,会根据当前的乱序度,等待 N 个包之后才发起重传。原生 SRT 这个乱序度是固定值,我们修改成为根据网络乱序情况自适应。

  2. SRT 传输参数优化:通过对参数优化,减少了一半的重传率。

  3. 加入了 BBR 拥塞控制算法:原生 SRT 拥塞控制非常弱,评估的带宽波动也非常大。我们加入了 BBR 拥塞控制算法,针对性的解决了这个问题。

  4. 强化了 SRT 多链路传输,增加了带宽聚合的模式:原生 SRT 只有 backup,broadcast 两种多链路传输模式,我们针对直播场景增加了 auto 模式,能够做到讲多个网卡的带宽聚合后进行直播,并智能动态选择链路。


Note: 腾讯云音视频在音视频领域已有超过 21 年的技术积累,持续支持国内 90%的音视频客户实现云上创新,独家具备 RT-ONE™ 全球网络,在此基础上,构建了业界最完整的 PaaS 产品家族,并通过腾讯云视立方 RT-Cube™ 提供 All in One 的终端 SDK,助力客户一键获取众多腾讯云音视频能力。腾讯云音视频为全真互联时代,提供坚实的数字化助力。


作者介绍:


肖志宏 (hondaxiao) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS TOC。


杨成立 (Winlin) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS 作者,音视频服务器和视频云领域超过 13 年经验。

2022-07-04 16:246579

评论

发布
暂无评论
发现更多内容

YYDS!浪潮云蝉联中国政务云服务运营市场占有率第一

云计算

沙场秋点兵——MySQL容器化性能测试对比

焱融科技

MySQL 云计算 容器 高性能 分布式存储

netty系列之:netty架构概述

程序那些事

Java Netty nio 程序那些事

十大排序算法--归并排序

Ayue、

排序算法 8月日更

oeasy教您玩转vim - 2 - # 使用帮助

o

vim

为什么将网络虚拟化与实现服务器虚拟化不同?

九河云安全

团队对质量负责,“我”可以不负责?

BY林子

敏捷测试 责任流程模型

Zilliz 陈室余:音视频相似性检索的技术实现丨ECUG Meetup 回顾

七牛云

AI 音视频 ECUG 七牛云

iOS官方瘦身方案ODR(一):初见On-Demand Resources

LabLawliet

ios 独立开发者 优化技巧 Apple Developer 8月日更

在 Dubbo3.0 上服务治理的实践

阿里巴巴中间件

云计算 Serverless 云原生 dubbo 中间件

Vue进阶(七):走近 package.json

No Silver Bullet

Vue npm 8月日更

架构实战营 学习总结

👈

架构实战营

聊聊Go语言中的数组与切片

架构精进之路

8月日更

一文带你认识LPWA通信技术

华为云开发者联盟

物联网 通信 NB-IoT LPWA SigFox

针对于香港服务器快速威胁检测是加强安全的关键

九河云安全

互联网寒冬!大厂Android开发面试解答

欢喜学安卓

android 程序员 面试 移动开发

大佬分享开发经验!2021年华为Android面试真题解析

欢喜学安卓

android 程序员 面试 移动开发

手把手教你在Windows和Linux下安装Redis及了解Redis基本操作

Regan Yue

redis Linux windows 8月日更

跟我学AI建模:分子动力学仿真模拟之DeepMD-kit框架

华为云开发者联盟

AI 仿真 分子动力学 分子 建模

架构实战营 毕业设计

👈

架构实战营

oeasy教您玩转vim - 3 - # 打开文件

o

基于香港云服务器的解决方案可以增强金融服务公司在降低成本的同时降低风险

九河云安全

手撸二叉树之平衡二叉树

HelloWorld杰少

数据结构与算法 8月日更

秒杀系统设计

Vincent

架构训练营

中国大学 MOOC Android 性能优化:冷启动优化总结

有道技术团队

大前端 安卓 网易有道

测试开发之系统篇-按需创建测试虚拟机

禅道项目管理

虚拟机 自动化测试 测试开发

kafka日志写入logstash

Rubble

Logstash Kafk 8月日更

oeasy教您玩转vim - 3 - # 打开文件

o

云计算重塑生命科学行业,北鲲云加速生物制药企业转型

北鲲云

Python代码阅读(第3篇):列表的最小公倍数

Felix

Python 编程 Code Programing 阅读代码

Nginx的常用功能总结

程序员阿杜

Java nginx 8月日更

SRS 5.0:如何实现 SRT 协程化_架构_肖志宏_InfoQ精选文章