写点什么

SRS 5.0:如何实现 SRT 协程化

  • 2022-07-04
  • 本文字数:6071 字

    阅读完需:约 20 分钟

SRS 5.0:如何实现 SRT 协程化

协程是现代服务器的核心技术,能极大简化逻辑和提升维护性;SRT 是逐渐在取代 RTMP 推流的新协议,但它有自己的 IO 框架;只有实现了 SRT 的协程化,才能成为 SRS 的核心的成熟的协议,这是 SRS 5.0 迈出的第一步,也是至关重要的一步。


SRS 5.0 从 2022 年初启动以来,经过摸索和探讨,确定了以媒体网关作为核心方向,详细请看SRS 5.0核心问题定义和解法


SRT 作为主播/广播推流领域广泛采用的协议,而 Web 浏览器却不支持播放 SRT 流,这恰恰是媒体网关的核心价值,可以将 SRT 转成 RTMP/HLS/WebRTC 后,实现广播领域的 Web 超低延迟方案,还可以把 SRT 强大的跨国传输能力用起来。


而这些美好愿景的基础,就是这次要介绍的:改造 SRT 支持协程化(Coroutine Native SRT)。这是 SRS 5.0 至关重要的一步,也是具备深远影响的一步,详细代码请参考PR#3010


我们先了解下详细的背景介绍。

Introduction


在直播推流领域,RTMP 是事实上的工业标准,广泛使用,也是直播源站之间兼容性最好的协议。


随着场景的丰富和直播的发展, 几个比较严重的问题逐渐暴露出来:


  1. TCP 推流在长距离传输下,受丢包以及 RTT 抖动影响非常大,效果很差。

  2. RTMP 协议不支持多音轨,以及 H265、AV1 等一系列新的编解码。

  3. Adobe 已经放弃 RTMP 协议,很多年没有更新了,未来也不会更新。


为了解决这些问题,2018 年左右,广播电视领域开始广泛应用 SRT 协议推流,越来越多的推流设备和平台都支持了 SRT 协议。


SRS 在 2019 年底,SRS 4.0 支持了 SRT 推流,目前存在以下的问题:


  1. SRT 在 SRS 上使用多线程+异步实现,某些异常导致的程序 Crash,难以排查。

  2. SRT 在 SRS 实现是异步方式,代码复杂,维护难度高。

  3. HTTP 回调,SRT 播放不生效;SRT 推流依赖转 RTMP 后,RTMP 触发的回调。

  4. SRT 无法直接转 WebRTC,而是先转 RTMP 再转 WebRTC,导致延迟高。


这些问题的核心原因,是由于 SRT 使用了独立的异步 IO 和多线程,无法和 SRS 已有的 ST 协程结合起来。


要彻底解决这个问题,必须将 SRT 协程化,和 SRS 使用同一套 ST 协程框架。SRS 5.0 已经完成,详细代码请参考PR#3010,这是非常重要的一个功能。


在介绍 SRT 协程化之前,先介绍下什么是协程化,我们看下 ST 的 TCP 协程化(Coroutine Native),这是最好的例子。

Coroutine Native TCP

首先,非协程化的代码,也就是 epoll 驱动的异步代码,大概逻辑是这样:


int fd = accept(listen_fd); // Got a TCP connection.
int n = read(fd, buf, sizeof(buf));if (n == -1) { if (errno == EAGAIN) { // Not ready return epoll_ctl(fd, EPOLLIN); // Wait for fd to be ready. } return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


Note: 为了方便表达关键逻辑,我们使用示意代码,具体代码可以参考 epoll 的示例代码。


一般 read 是业务逻辑,读出来的数据也是业务数据,但是这里却需要在 EAGAIN 时调用 epoll 这个底层框架处理 fd。这就是把底层逻辑和业务逻辑混合在一起,导致难以维护。


Note: 尽管 NGINX 包装了一层框架,但是本质上并不能解决这个异步回调问题,当 fd 没有准备好必须返回当前函数,所以导致很多状态需要保存和恢复,在复杂的逻辑中状态机也变得非常复杂。


下面我们看协程化的逻辑会是怎样的,同样以上面代码为例:


st_netfd_t fd = st_accept(listen_fd); // Got a TCP connection
int n = st_read(fd, buf, sizeof(buf));if (n == -1) { return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


简单的看,就是没有 EAGAIN,看起来要么读取到了数据,要么就是出现了错误,不会出现 fd 没有准备好的情况。这就给整个业务逻辑带来了非常好的体验,因为不需要保存状态了,不会反复的尝试读取。


同样用 epoll,为何st_read就没有 EAGAIN 呢?这就是协程化,不是没有,只是在底下处理了这个事件。我们看st_readv这个函数:


ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte) {    while ((n = read(fd->osfd, buf, nbyte)) < 0) {        if (errno == EINTR)            continue;        if (!_IO_NOT_READY_ERROR) // Error, if not EAGAIN.            return -1;
/* Wait until the socket becomes readable */ if (st_netfd_poll(fd, POLLIN) < 0) // EAGAIN return -1; } return n;}
复制代码


很明显在EAGAIN时,会调用st_netfd_poll。在st_netfd_poll函数里,会将当前协程切换出让,调度线程执行下一个协程。并且在未来某个时刻,会因为 IO 事件到达或者超时错误,而将当前协程恢复。


Note: 由于协程切换和恢复,都是在这个函数中实现的,对于上层调用的代码,看起来没有发生什么,所以就不仅没有 EAGAIN 这个错误消息,也不会返回上一层函数,当然也不需要保存状态和恢复状态。


我们可以总结下,如何协程化任何协议的思路:


  1. 直接对 API 进行一次调用,如果成功,那么直接返回。

  2. 如果 API 返回失败,检查错误,非 IO 等待的错误直接返回。

  3. 将当前协程出让,调度器运行其他协程,直到该 FD 上的事件返回或者超时;如果超时,则返回错误;如果事件到达,则重复上面的步骤。


我们可以按照这个思路将 SRT 进行协程化(Coroutine Native)。

Coroutine Native SRT

我们以srt_recvmsg函数的协程化为例,这个函数类似 TCP 的read函数,定义如下:


SRT_API int srt_recvmsg (SRTSOCKET u, char* buf, int len);
复制代码


我们同样,提供一个SrsSrtSocket::recvmsg的函数,类似st_read函数,实现如下:


srs_error_t SrsSrtSocket::recvmsg(void* buf, size_t size, ssize_t* nread) {  while (true) {    int ret = srt_recvmsg(srt_fd_, (char*)buf, size);    if (ret >= 0) { // Receive message ok.      recv_bytes_ += ret;       *nread = ret;      return err;    }        // Got something error, return immediately.    if (srt_getlasterror(NULL) != SRT_EASYNCRCV) {      return srs_error_new(ERROR_SRT_IO, "srt_recvmsg");    }        // Wait for the fd ready or error, switch to other coroutines.    if ((err = wait_readable()) != srs_success) { // EAGAIN.      return srs_error_wrap(err, "wait readable");    }  }    return err;}
复制代码


可以看到和st_read非常类似,在wait_readable中也会实现协程的切换和恢复,只是我们使用st_cond_t条件变量来实现:


srs_error_t SrsSrtSocket::wait_readable() {  srt_poller_->mod_socket(this, SRT_EPOLL_IN);  srs_cond_timedwait(read_cond_);}
复制代码


Note: 这里先修改了 epoll 侦听的事件SRT_EPOLL_IN,等待 fd 可读后,再等待条件变量触发。


而触发这个条件变量的函数,是在SrsSrtPoller::wait,实现如下:


srs_error_t SrsSrtPoller::wait(int timeout_ms, int* pn_fds) {  int ret = srt_epoll_uwait(srt_epoller_fd_, events_.data(), events_.size());  for (int i = 0; i < ret; ++i) {    if (event.events & SRT_EPOLL_IN) {      srt_skt->notify_readable();    }  }}
void SrsSrtSocket::notify_readable() { srs_cond_signal(read_cond_);}
复制代码


这样就完全做到了将 SRT API 协程化,其他的 API 比如 srt_sendmsg, srt_connnect, srt_accept 也是类似的操作。


下面我们对比下,协程化(Coroutine Native)之后,和原始的回调(Callback)的区别。

Coroutine Native PK Callback

将 SRT 协程化以后, 业务逻辑和底层代码分离,上层的代码逻辑清晰明了。


先看看 accept 这个逻辑,之前也是由 epoll 触发的事件处理,创建srt_conn这个数据结构:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_connection(status, read_fds[index], "read fd");  }}
void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd) { if (status == SRTS_LISTENING) { conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen); _handle_ptr->add_newconn(conn_fd, SRT_EPOLL_IN); }}
void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { _push_conn_map.insert(std::make_pair(conn_ptr->get_path(), conn_ptr)); _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);}
复制代码


Note: 创建的srt_conn本身就是保存在全局数据结构之中,在后续的回调事件中持续修改和变更这个数据结构。


我们对比下协程化之后的业务逻辑,收到会话之后启动处理协程:


srs_error_t SrsSrtListener::cycle() {  while (true) {    srs_srt_t client_srt_fd = srs_srt_socket_invalid();    srt_skt_->accept(&client_srt_fd);        srt_server_->accept_srt_client(srt_fd);  }}
srs_error_t SrsSrtServer::accept_srt_client(srs_srt_t srt_fd) { fd_to_resource(srt_fd, &srt_conn); conn_manager_->add(srt_conn); srt_conn->start();}
复制代码


Note: 虽然有全局变量维护srt_conn,但这里不会关注到 epoll 的处理,而是由协程主导的执行逻辑,而不是由回调主导的逻辑。


回调主导的逻辑,维护和了解代码时,必须要从 epoll 回调事件开始看,而且不同事件都在修改srt_conn这个对象的状态,要了解对象生命周期是很有难度的。而协程主导的逻辑,它的生命周期是在协程中,收到srt_conn就启动协程处理它,后续的读写也在协程中。


我们继续看srt_conn的读处理逻辑,之前直接使用原生 SRT 的 read 函数,同样是由 epoll 的事件触发回调:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_data(status, read_fds[index], "read fd");  }}
void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { auto conn_ptr = get_srt_conn(conn_fd); int mode = conn_ptr->get_mode(); if (mode == PUSH_SRT_MODE && status == SRTS_CONNECTED) { handle_push_data(status, path, subpath, conn_fd); }}
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { srt_conn_ptr = get_srt_conn(conn_fd); if (status != SRTS_CONNECTED) { // Error. close_push_conn(conn_fd); return; }
ret = srt_conn_ptr->read_async(data, DEF_DATA_SIZE); if (ret <= 0) { // Error. if (srt_getlasterror(NULL) != SRT_EASYNCRCV) { return; } close_push_conn(conn_fd); return; }
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);}
复制代码


Note: 在回调中我们需要处理各种状态,而这个srt_conn的状态变化,是由各种回调决定的,很难一次了解到这个会话的主要处理逻辑。


我们看看 SRT 协程化之后,这个业务逻辑是怎样写的:


srs_error_t SrsMpegtsSrtConn::do_publishing() {  while (true) {    ssize_t nb = 0;    if ((err = srt_conn_->read(buf, sizeof(buf), &nb)) != srs_success) {      return srs_error_wrap(err, "srt: recvmsg");    }        if ((err = on_srt_packet(buf, nb)) != srs_success) {      return srs_error_wrap(err, "srt: process packet");    }  }}
复制代码


Note: 这里srt_conn的生命周期非常明确,它的状态就是直接在这里返回错误,对于这个会话来说,这就是它的主循环,不会因为read而导致进入 SRT 的 epoll 大循环,我们在维护时也不用关注这个异步事件触发和处理。


再次强调一次,维护代码时,我们需要了解的信息量是非常不同的。在基于异步回调的逻辑中,我们在回调函数中,是需要关注目前对象有哪些状态,修改了哪些状态,其他异步事件又有哪些影响。而基于协程的逻辑中,没有这些状态,协程的创建和执行,就是线性的,或者说这些状态就是在协程的函数调用中。


Note: 为何异步回调的状态就不能在函数调用中呢?因为异步回调的堆栈中不能保存srt_conn的状态,它本质上就是一个协程,保存的是 epoll 的循环的状态。而协程是根据每个srt_conn所创建的,它的堆栈中保存的都是这个对应的srt_conn的状态。


这本质上,是由于异步回调的状态,只能保存在全局数据结构之中。而协程的状态,是可以保存在各个局部变量之中,每个函数的局部变量,都是这个协程所独有的,协程没有结束前都可以使用。

What is the Next

SRS 虽然完成了 SRT 协程化,并没有解决所有的问题。后续的计划包括:


  1. SRT 直接转 WebRTC,低延迟直播的另外一种方式。

  2. 某些服务器之间的长链路可以将 TCP 替换为 SRT 传输, 比如跨国的 RTMP 转发。

  3. SRT 工具链的完善,比如srs-bench,支持压测 SRT 流。


欢迎加入 SRS 开源社区,一起做好一个流媒体服务器,让全世界都来白嫖。

One More Thing

有些朋友也很好奇,真正商用的视频云的 SRT,和开源的 SRT 的服务器,有什么区别,都做过哪些优化。


Note: 由于开源服务器侧重标准协议和兼容性,有些优化并不适合在开源项目中实现,所以在云计算的商业化服务器和开源服务器,一定是存在很大差异的。就算是 Linux 系统,其实云计算的 Linux 内核,和开源的 Linux 内核,也有很大的差异。


腾讯云在 SRT 的实战中积累很多经验,请参考之前分享的文章:



其中,最为严重的是 SRT 重传率过高、限带宽下表现不如 TCP/QUIC 等,腾讯云针对这些问题,做了几个优化:


  1. SRT 重传乱序度自适应:当接收到乱序报文,首次发起重传时,会根据当前的乱序度,等待 N 个包之后才发起重传。原生 SRT 这个乱序度是固定值,我们修改成为根据网络乱序情况自适应。

  2. SRT 传输参数优化:通过对参数优化,减少了一半的重传率。

  3. 加入了 BBR 拥塞控制算法:原生 SRT 拥塞控制非常弱,评估的带宽波动也非常大。我们加入了 BBR 拥塞控制算法,针对性的解决了这个问题。

  4. 强化了 SRT 多链路传输,增加了带宽聚合的模式:原生 SRT 只有 backup,broadcast 两种多链路传输模式,我们针对直播场景增加了 auto 模式,能够做到讲多个网卡的带宽聚合后进行直播,并智能动态选择链路。


Note: 腾讯云音视频在音视频领域已有超过 21 年的技术积累,持续支持国内 90%的音视频客户实现云上创新,独家具备 RT-ONE™ 全球网络,在此基础上,构建了业界最完整的 PaaS 产品家族,并通过腾讯云视立方 RT-Cube™ 提供 All in One 的终端 SDK,助力客户一键获取众多腾讯云音视频能力。腾讯云音视频为全真互联时代,提供坚实的数字化助力。


作者介绍:


肖志宏 (hondaxiao) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS TOC。


杨成立 (Winlin) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS 作者,音视频服务器和视频云领域超过 13 年经验。

2022-07-04 16:246978

评论

发布
暂无评论
发现更多内容

优化 20% 资源成本,新东方的 Serverless 实践之路

阿里巴巴云原生

阿里云 Serverless 云原生

商密SIG月度动态:文件加密支持SM4算法、Anolis 8.8将默认集成 | 龙蜥 SIG

OpenAnolis小助手

开源 算法 龙蜥社区 商密 SM4

互联网医疗领域月度观察——二十大报告明确提出健康中国建设目标,互联网医疗是建设重点

易观分析

互联网医疗

React Streaming SSR 原理解析

字节跳动终端技术

React

游戏引擎中的实时渲染和在V-Ray中渲染有什么区别?

3DCAT实时渲染

渲染引擎 游戏引擎 渲染服务 游戏开发引擎

如何提升工作效率--工具篇(60/100)

hackstoic

工具 工作效率

AI与低代码的结合及应用

力软低代码开发平台

为什么数字化时代需要 BizDevOps?

阿里云云效

DevOps 数字化转型 数字化 BizDevOps

Transformer:让ChatGPT站在肩膀上的巨人?

Baihai IDP

AI Transformer

【深入浅出Spring原理及实战】「源码原理实战」从底层角度去分析研究PropertySourcesPlaceholderConfigurer的原理及实战注入机制

洛神灬殇

spring 12月日更 12 月 PK 榜 Spring 配置解析

YonBuilder移动开发平台 AVM框架 封装省市区级联选择弹框

YonBuilder低代码开发平台

开发者 AVM

Spring中11个最常用的扩展点,你知道几个?

JAVA旭阳

Java spring

WhaleDI数据治理利器之“低成本数据质量管理”

鲸品堂

12 月 PK 榜

自动化测试技术笔记(一):前期调研怎么做

老张

自动化测试

正向拆解mlops

mlops

MLOps

GBase 8s安装

@下一站

国产数据库 12月日更 12月月更 Gbase8s

第五届“强网”拟态防御国际精英挑战赛精彩落幕!——网络空间安全大赛再立新标杆

科技热闻

flutter系列之:移动端手势的具体使用

程序那些事

flutter 大前端 程序那些事

2022-12-19:大的国家。如果一个国家满足下述两个条件之一,则认为该国是 大国 : 面积至少为 300 万平方公里(即,3000000 km2),或者 人口至少为 2500 万(即 250000

福大大架构师每日一题

数据库 福大大

Osx10.14升级watchman踩坑记

Geek_pwdeic

macos homebrew

调试3D渲染和3D可视化的五个好处

3DCAT实时渲染

可视化 3D渲染 云渲染 实时渲染

在霍格沃兹测试开发学社学习是种怎样的体验?

测吧(北京)科技有限公司

测试

一文读懂于Zebec生态中的潜在收益方式

股市老人

Colossal-AI助力,摩尔线程预训练语言模型MusaBert荣登CLUE榜单TOP10

科技热闻

Verilog语言的循环语句

芯动大师

Verilog Verilog语法 Verilog循环

2022年中国汽车智能网联云服务分析

易观分析

5G 汽车 智联网

建筑、工程和施工产业中的3D可视化

3DCAT实时渲染

可视化 云渲染 实时云渲染 云渲染平台

数据治理体系建设与数据资产路线图规划

用友BIP

HTTP协议基础知识

穿过生命散发芬芳

HTTP 12月月更

华为时习知,让企业培训更简单!

科技怪授

华为

SRS 5.0:如何实现 SRT 协程化_架构_肖志宏_InfoQ精选文章