写点什么

SRS 5.0:如何实现 SRT 协程化

  • 2022-07-04
  • 本文字数:6071 字

    阅读完需:约 20 分钟

SRS 5.0:如何实现 SRT 协程化

协程是现代服务器的核心技术,能极大简化逻辑和提升维护性;SRT 是逐渐在取代 RTMP 推流的新协议,但它有自己的 IO 框架;只有实现了 SRT 的协程化,才能成为 SRS 的核心的成熟的协议,这是 SRS 5.0 迈出的第一步,也是至关重要的一步。


SRS 5.0 从 2022 年初启动以来,经过摸索和探讨,确定了以媒体网关作为核心方向,详细请看SRS 5.0核心问题定义和解法


SRT 作为主播/广播推流领域广泛采用的协议,而 Web 浏览器却不支持播放 SRT 流,这恰恰是媒体网关的核心价值,可以将 SRT 转成 RTMP/HLS/WebRTC 后,实现广播领域的 Web 超低延迟方案,还可以把 SRT 强大的跨国传输能力用起来。


而这些美好愿景的基础,就是这次要介绍的:改造 SRT 支持协程化(Coroutine Native SRT)。这是 SRS 5.0 至关重要的一步,也是具备深远影响的一步,详细代码请参考PR#3010


我们先了解下详细的背景介绍。

Introduction


在直播推流领域,RTMP 是事实上的工业标准,广泛使用,也是直播源站之间兼容性最好的协议。


随着场景的丰富和直播的发展, 几个比较严重的问题逐渐暴露出来:


  1. TCP 推流在长距离传输下,受丢包以及 RTT 抖动影响非常大,效果很差。

  2. RTMP 协议不支持多音轨,以及 H265、AV1 等一系列新的编解码。

  3. Adobe 已经放弃 RTMP 协议,很多年没有更新了,未来也不会更新。


为了解决这些问题,2018 年左右,广播电视领域开始广泛应用 SRT 协议推流,越来越多的推流设备和平台都支持了 SRT 协议。


SRS 在 2019 年底,SRS 4.0 支持了 SRT 推流,目前存在以下的问题:


  1. SRT 在 SRS 上使用多线程+异步实现,某些异常导致的程序 Crash,难以排查。

  2. SRT 在 SRS 实现是异步方式,代码复杂,维护难度高。

  3. HTTP 回调,SRT 播放不生效;SRT 推流依赖转 RTMP 后,RTMP 触发的回调。

  4. SRT 无法直接转 WebRTC,而是先转 RTMP 再转 WebRTC,导致延迟高。


这些问题的核心原因,是由于 SRT 使用了独立的异步 IO 和多线程,无法和 SRS 已有的 ST 协程结合起来。


要彻底解决这个问题,必须将 SRT 协程化,和 SRS 使用同一套 ST 协程框架。SRS 5.0 已经完成,详细代码请参考PR#3010,这是非常重要的一个功能。


在介绍 SRT 协程化之前,先介绍下什么是协程化,我们看下 ST 的 TCP 协程化(Coroutine Native),这是最好的例子。

Coroutine Native TCP

首先,非协程化的代码,也就是 epoll 驱动的异步代码,大概逻辑是这样:


int fd = accept(listen_fd); // Got a TCP connection.
int n = read(fd, buf, sizeof(buf));if (n == -1) { if (errno == EAGAIN) { // Not ready return epoll_ctl(fd, EPOLLIN); // Wait for fd to be ready. } return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


Note: 为了方便表达关键逻辑,我们使用示意代码,具体代码可以参考 epoll 的示例代码。


一般 read 是业务逻辑,读出来的数据也是业务数据,但是这里却需要在 EAGAIN 时调用 epoll 这个底层框架处理 fd。这就是把底层逻辑和业务逻辑混合在一起,导致难以维护。


Note: 尽管 NGINX 包装了一层框架,但是本质上并不能解决这个异步回调问题,当 fd 没有准备好必须返回当前函数,所以导致很多状态需要保存和恢复,在复杂的逻辑中状态机也变得非常复杂。


下面我们看协程化的逻辑会是怎样的,同样以上面代码为例:


st_netfd_t fd = st_accept(listen_fd); // Got a TCP connection
int n = st_read(fd, buf, sizeof(buf));if (n == -1) { return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


简单的看,就是没有 EAGAIN,看起来要么读取到了数据,要么就是出现了错误,不会出现 fd 没有准备好的情况。这就给整个业务逻辑带来了非常好的体验,因为不需要保存状态了,不会反复的尝试读取。


同样用 epoll,为何st_read就没有 EAGAIN 呢?这就是协程化,不是没有,只是在底下处理了这个事件。我们看st_readv这个函数:


ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte) {    while ((n = read(fd->osfd, buf, nbyte)) < 0) {        if (errno == EINTR)            continue;        if (!_IO_NOT_READY_ERROR) // Error, if not EAGAIN.            return -1;
/* Wait until the socket becomes readable */ if (st_netfd_poll(fd, POLLIN) < 0) // EAGAIN return -1; } return n;}
复制代码


很明显在EAGAIN时,会调用st_netfd_poll。在st_netfd_poll函数里,会将当前协程切换出让,调度线程执行下一个协程。并且在未来某个时刻,会因为 IO 事件到达或者超时错误,而将当前协程恢复。


Note: 由于协程切换和恢复,都是在这个函数中实现的,对于上层调用的代码,看起来没有发生什么,所以就不仅没有 EAGAIN 这个错误消息,也不会返回上一层函数,当然也不需要保存状态和恢复状态。


我们可以总结下,如何协程化任何协议的思路:


  1. 直接对 API 进行一次调用,如果成功,那么直接返回。

  2. 如果 API 返回失败,检查错误,非 IO 等待的错误直接返回。

  3. 将当前协程出让,调度器运行其他协程,直到该 FD 上的事件返回或者超时;如果超时,则返回错误;如果事件到达,则重复上面的步骤。


我们可以按照这个思路将 SRT 进行协程化(Coroutine Native)。

Coroutine Native SRT

我们以srt_recvmsg函数的协程化为例,这个函数类似 TCP 的read函数,定义如下:


SRT_API int srt_recvmsg (SRTSOCKET u, char* buf, int len);
复制代码


我们同样,提供一个SrsSrtSocket::recvmsg的函数,类似st_read函数,实现如下:


srs_error_t SrsSrtSocket::recvmsg(void* buf, size_t size, ssize_t* nread) {  while (true) {    int ret = srt_recvmsg(srt_fd_, (char*)buf, size);    if (ret >= 0) { // Receive message ok.      recv_bytes_ += ret;       *nread = ret;      return err;    }        // Got something error, return immediately.    if (srt_getlasterror(NULL) != SRT_EASYNCRCV) {      return srs_error_new(ERROR_SRT_IO, "srt_recvmsg");    }        // Wait for the fd ready or error, switch to other coroutines.    if ((err = wait_readable()) != srs_success) { // EAGAIN.      return srs_error_wrap(err, "wait readable");    }  }    return err;}
复制代码


可以看到和st_read非常类似,在wait_readable中也会实现协程的切换和恢复,只是我们使用st_cond_t条件变量来实现:


srs_error_t SrsSrtSocket::wait_readable() {  srt_poller_->mod_socket(this, SRT_EPOLL_IN);  srs_cond_timedwait(read_cond_);}
复制代码


Note: 这里先修改了 epoll 侦听的事件SRT_EPOLL_IN,等待 fd 可读后,再等待条件变量触发。


而触发这个条件变量的函数,是在SrsSrtPoller::wait,实现如下:


srs_error_t SrsSrtPoller::wait(int timeout_ms, int* pn_fds) {  int ret = srt_epoll_uwait(srt_epoller_fd_, events_.data(), events_.size());  for (int i = 0; i < ret; ++i) {    if (event.events & SRT_EPOLL_IN) {      srt_skt->notify_readable();    }  }}
void SrsSrtSocket::notify_readable() { srs_cond_signal(read_cond_);}
复制代码


这样就完全做到了将 SRT API 协程化,其他的 API 比如 srt_sendmsg, srt_connnect, srt_accept 也是类似的操作。


下面我们对比下,协程化(Coroutine Native)之后,和原始的回调(Callback)的区别。

Coroutine Native PK Callback

将 SRT 协程化以后, 业务逻辑和底层代码分离,上层的代码逻辑清晰明了。


先看看 accept 这个逻辑,之前也是由 epoll 触发的事件处理,创建srt_conn这个数据结构:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_connection(status, read_fds[index], "read fd");  }}
void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd) { if (status == SRTS_LISTENING) { conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen); _handle_ptr->add_newconn(conn_fd, SRT_EPOLL_IN); }}
void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { _push_conn_map.insert(std::make_pair(conn_ptr->get_path(), conn_ptr)); _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);}
复制代码


Note: 创建的srt_conn本身就是保存在全局数据结构之中,在后续的回调事件中持续修改和变更这个数据结构。


我们对比下协程化之后的业务逻辑,收到会话之后启动处理协程:


srs_error_t SrsSrtListener::cycle() {  while (true) {    srs_srt_t client_srt_fd = srs_srt_socket_invalid();    srt_skt_->accept(&client_srt_fd);        srt_server_->accept_srt_client(srt_fd);  }}
srs_error_t SrsSrtServer::accept_srt_client(srs_srt_t srt_fd) { fd_to_resource(srt_fd, &srt_conn); conn_manager_->add(srt_conn); srt_conn->start();}
复制代码


Note: 虽然有全局变量维护srt_conn,但这里不会关注到 epoll 的处理,而是由协程主导的执行逻辑,而不是由回调主导的逻辑。


回调主导的逻辑,维护和了解代码时,必须要从 epoll 回调事件开始看,而且不同事件都在修改srt_conn这个对象的状态,要了解对象生命周期是很有难度的。而协程主导的逻辑,它的生命周期是在协程中,收到srt_conn就启动协程处理它,后续的读写也在协程中。


我们继续看srt_conn的读处理逻辑,之前直接使用原生 SRT 的 read 函数,同样是由 epoll 的事件触发回调:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_data(status, read_fds[index], "read fd");  }}
void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { auto conn_ptr = get_srt_conn(conn_fd); int mode = conn_ptr->get_mode(); if (mode == PUSH_SRT_MODE && status == SRTS_CONNECTED) { handle_push_data(status, path, subpath, conn_fd); }}
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { srt_conn_ptr = get_srt_conn(conn_fd); if (status != SRTS_CONNECTED) { // Error. close_push_conn(conn_fd); return; }
ret = srt_conn_ptr->read_async(data, DEF_DATA_SIZE); if (ret <= 0) { // Error. if (srt_getlasterror(NULL) != SRT_EASYNCRCV) { return; } close_push_conn(conn_fd); return; }
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);}
复制代码


Note: 在回调中我们需要处理各种状态,而这个srt_conn的状态变化,是由各种回调决定的,很难一次了解到这个会话的主要处理逻辑。


我们看看 SRT 协程化之后,这个业务逻辑是怎样写的:


srs_error_t SrsMpegtsSrtConn::do_publishing() {  while (true) {    ssize_t nb = 0;    if ((err = srt_conn_->read(buf, sizeof(buf), &nb)) != srs_success) {      return srs_error_wrap(err, "srt: recvmsg");    }        if ((err = on_srt_packet(buf, nb)) != srs_success) {      return srs_error_wrap(err, "srt: process packet");    }  }}
复制代码


Note: 这里srt_conn的生命周期非常明确,它的状态就是直接在这里返回错误,对于这个会话来说,这就是它的主循环,不会因为read而导致进入 SRT 的 epoll 大循环,我们在维护时也不用关注这个异步事件触发和处理。


再次强调一次,维护代码时,我们需要了解的信息量是非常不同的。在基于异步回调的逻辑中,我们在回调函数中,是需要关注目前对象有哪些状态,修改了哪些状态,其他异步事件又有哪些影响。而基于协程的逻辑中,没有这些状态,协程的创建和执行,就是线性的,或者说这些状态就是在协程的函数调用中。


Note: 为何异步回调的状态就不能在函数调用中呢?因为异步回调的堆栈中不能保存srt_conn的状态,它本质上就是一个协程,保存的是 epoll 的循环的状态。而协程是根据每个srt_conn所创建的,它的堆栈中保存的都是这个对应的srt_conn的状态。


这本质上,是由于异步回调的状态,只能保存在全局数据结构之中。而协程的状态,是可以保存在各个局部变量之中,每个函数的局部变量,都是这个协程所独有的,协程没有结束前都可以使用。

What is the Next

SRS 虽然完成了 SRT 协程化,并没有解决所有的问题。后续的计划包括:


  1. SRT 直接转 WebRTC,低延迟直播的另外一种方式。

  2. 某些服务器之间的长链路可以将 TCP 替换为 SRT 传输, 比如跨国的 RTMP 转发。

  3. SRT 工具链的完善,比如srs-bench,支持压测 SRT 流。


欢迎加入 SRS 开源社区,一起做好一个流媒体服务器,让全世界都来白嫖。

One More Thing

有些朋友也很好奇,真正商用的视频云的 SRT,和开源的 SRT 的服务器,有什么区别,都做过哪些优化。


Note: 由于开源服务器侧重标准协议和兼容性,有些优化并不适合在开源项目中实现,所以在云计算的商业化服务器和开源服务器,一定是存在很大差异的。就算是 Linux 系统,其实云计算的 Linux 内核,和开源的 Linux 内核,也有很大的差异。


腾讯云在 SRT 的实战中积累很多经验,请参考之前分享的文章:



其中,最为严重的是 SRT 重传率过高、限带宽下表现不如 TCP/QUIC 等,腾讯云针对这些问题,做了几个优化:


  1. SRT 重传乱序度自适应:当接收到乱序报文,首次发起重传时,会根据当前的乱序度,等待 N 个包之后才发起重传。原生 SRT 这个乱序度是固定值,我们修改成为根据网络乱序情况自适应。

  2. SRT 传输参数优化:通过对参数优化,减少了一半的重传率。

  3. 加入了 BBR 拥塞控制算法:原生 SRT 拥塞控制非常弱,评估的带宽波动也非常大。我们加入了 BBR 拥塞控制算法,针对性的解决了这个问题。

  4. 强化了 SRT 多链路传输,增加了带宽聚合的模式:原生 SRT 只有 backup,broadcast 两种多链路传输模式,我们针对直播场景增加了 auto 模式,能够做到讲多个网卡的带宽聚合后进行直播,并智能动态选择链路。


Note: 腾讯云音视频在音视频领域已有超过 21 年的技术积累,持续支持国内 90%的音视频客户实现云上创新,独家具备 RT-ONE™ 全球网络,在此基础上,构建了业界最完整的 PaaS 产品家族,并通过腾讯云视立方 RT-Cube™ 提供 All in One 的终端 SDK,助力客户一键获取众多腾讯云音视频能力。腾讯云音视频为全真互联时代,提供坚实的数字化助力。


作者介绍:


肖志宏 (hondaxiao) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS TOC。


杨成立 (Winlin) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS 作者,音视频服务器和视频云领域超过 13 年经验。

2022-07-04 16:247276

评论

发布
暂无评论
发现更多内容

基于Flink+ClickHouse打造轻量级点击流实时数仓

Apache Flink

flink

动态代理玩不明白?别紧张,你只是缺少这个demo

小Q

Java 编程 程序员 开发 动态代理

多种方式实现 LazyMan

局外人

大前端 队列 Promise

SpringBoot-技术专题-Caffeine用法

码界西柚

CloudQuery,数据库管理用它就够了!

BinTools图尔兹

数据库 sql 安全 工具软件

工作流引擎,企业运作加速器

Marilyn

敏捷开发 工作流 快速开发

anyRTC直播带货解决方案

anyRTC开发者

音视频 WebRTC 直播 RTC

十年Java开发经验,走了五年弯路,整理了一份Java架构师进阶路线及进阶资料!

Java架构之路

Java 程序员 面试 程序人生 编程语言

如何获得工作成就感

滴滴普惠出行

区块链的浪潮开始涌动了

CECBC

区块链 期货

Pulsar 社区周报|09-19 ~ 09-25

Apache Pulsar

大数据 开源 Apache Pulsar 消息中间件

比MySQL快839倍!揭开分析型数据库JCHDB的神秘面纱

京东科技开发者

数据库 JCHDB

媒介狂想曲

善宝橘

媒介 想象

区块链来了 职业教育这么干

CECBC

区块链 职业教育

C++函数模板的偏特化

Qing Wang

c++

Java之父都需要的一本能够更深入地了解Java编程语言的书

Java架构之路

Java 程序员 面试 编程语言

通过MapReduce降低服务响应时间

万俊峰Kevin

mapreduce Go 语言

spring-boot-route(十六)使用logback生产日志文件

Java旅途

Java Spring Boot logback

一文带你了解文字识别

华为云开发者联盟

技术 识别 文字

干掉PPT!现场编码的职级晋升答辩你参加过么?

华为云开发者联盟

软件 开发者 API

Hive UDF/UDAF 总结

windism

高难度对话读书笔记——表达自我

wo是一棵草

蚂蚁金服架构师分享一套内部Java并发编程进阶笔记,白嫖太香了

Java架构追梦

Java 学习 架构 面试 并发编程

重新学习面向对象设计之开放-封闭原则

IT老兵重开始

面向对象设计 OCP 开闭原则

技术解读丨GaussDB数仓高可用容灾利器之逻辑备份

华为云开发者联盟

数据 容灾 备份

是的,你没看错,自己的APP也能运行微信小程序了

FinClip

小程序flutter, 跨平台 小程序生态 移动开发

血亏!阿里P8轻易把总结了近一年的java高级特性笔记送人了

996小迁

Java 学习 架构 笔记 Java高级特性

软件测试人员的职业发展之路

BY林子

软件测试 QA 职业发展

风雨边城

满天星

美食 旅行

区块链应用众多难题“卡脖子”

CECBC

区块链 金融 供应链融资

SRS 5.0:如何实现 SRT 协程化_架构_肖志宏_InfoQ精选文章