写点什么

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统

  • 2021-12-06
  • 本文字数:5645 字

    阅读完需:约 19 分钟

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统

作者 | 袁小栋、程君杰

审核校对 | 杜恒、岁月、白玙、不周

 

随着各行各业移动互联和云计算技术的普及发展,大数据计算已深入人心,最常见的比如 flink、spark 等。这些大数据框架,采用中心化的 Master-Slave 架构,依赖和部署比较重,每个任务也有较大开销,有较大的使用成本。RocketMQ Streams 着重打造轻量计算引擎,除了消息队列,无额外依赖,对过滤场景做了大量优化,性能提升 3-5 倍,资源节省 50%-80%。


RocketMQ Streams 适合大数据量->高过滤->轻窗口计算的场景,核心打造轻资源,高性能优势,在资源敏感场景中有很大优势,最低 1core,1g 可部署,建议的应用场景(安全,风控,边缘计算,消息队列流计算)。


RocketMQ Streams 兼容 Blink(Flink 的阿里内部版本) 的 SQL,UDF/UDTF/UDAF,多数 Blink 任务可以直接迁移成 RocketMQ Streams 任务。将来还会发布和 Flink 的融合版本,RocketMQ Streams 可以直接发布成 Flink 任务,既可以享有 RocketMQ Streams 带来的高性能,轻资源,还可以和现有的 Flink 任务统一运维和管理。


本篇文章主要从五个方面来介绍 RocketMQ Streams 实时计算平台:

首先简单先介绍一下什么是 RocketMQ Streams;

第二部分,基于 RocketMQ Streams 的 SDK,来了解下它是怎么去使用的;

第三部分,RocketMQ Streams 整体的架构以及它的原理实现;

第四部分,在云安全的场景下该怎么使用 RocketMQ Streams;

第五部分,RocketMQ Streams 的未来规划。

一、什么是 RocketMQ Streams?


本章节从基础简介、设计思路和特点三方面对 RocketMQ streams 进行整体介绍。

1.1 RocketMQ Streams 简介

1)首先,它是一个 Lib 包,启动即运行,和业务直接集成;

2)然后,它具备 SQL 引擎能力,兼容 Blink SQL 语法,兼容 Blink UDF/UDTF/UDAF;

3)其次,它包含 ETL 引擎,可以无编码实现数据的 ETL,过滤和转存;

4)最后,它基于数据开发 SDK,大量实用组件可直接使用,如:Source、sink、script、filter、lease、scheduler、configurable 不局限流的场景。



1.2 RocketMQ Streams 的特点


RocketMQ streams 基于上述的实现思路,可以看到它有以下几个特点:


1.2.1 轻量


1 核 1g 就可以部署,依赖较轻,在测试场景下用 Jar 包直接写个 main 方法就可以运行,在正式环境下最多依赖消息队列和存储(其中存储是可选的,主要是为了分片切换时的容错)。

1.2.2 高性能


实现高过滤优化器,包括前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹等,比优化前性能提升 3-5 倍,资源节省 50%以上。

1.2.3 维表 JOIN(千万数据量维表支持)


设计高压缩内存存储数据,无 java 头部和对齐的开销,存储接近原始数据大小,纯内存操作,性能最大化,同时对于 Mysql 提供了多线程并发加载,提高加载维表的速度。

1.2.4 高扩展的能力

  • Source 可按需扩展,已实现:RocketMQ,File,Kafka;

  • Sink 可按需扩展,已实现:RocketMQ,File,Kafka,Mysql,ES;

  • 可按 Blink 规范扩展 UDF/UDTF/UDAF;

  • 提供了更轻的 UDF/UDTF 扩展能力,不需要任何依赖就可以完成函数的扩展。

1.2.5 提供了丰富的大数据的能力

包括精确计算一次灵活的窗口,双流 join,统计,开窗,各种转换过滤,满足大数据开发的各种场景,支持弹性容错的能力。

 

二、RocketMQ Streams 的使用


RocketMQ Streams 对外提供两种 SDK,一种是 DSL SDK,一种是 SQL SDK,用户可以按需选择; DSL SDK 支持实时场景 DSL 语义; SQL SDK 兼容 Blink(Flink 的阿里内部版本) SQL 的语法,多数 Blink SQL 可以通过 RocketMQ Streams 运行;


接下来,我们详细的介绍一下这两种 SDK。

2.1 环境要求

  • JDK1.8 版本以上;

  • Maven 3.2 版本以上。

2.2 DSL SDK

利用 DSL SDK 开发实时任务时,需要做如下的一些准备工作:

2.2.1 依赖准备

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-streams-clients</artifactId>    <version>1.0.0-SNAPSHOT</version></dependency>
复制代码


准备工作完成后,就可以直接开发自己的实时程序。

2.2.2 代码开发

DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
source.fromFile("~/admin/data/text.txt",false) .map(message->message + "--") .toPrint(1) .start();
复制代码


其中:

1)Namespace 是业务隔离的,相同的业务可以写成相同的 Namespace。相同的 Namespace 在任务调度里可以跑在进程里,也可以共享一些配置;

2)pipelineName 可以理解成就是 job name ,唯一区分 job;

3)DataStreamSource 主要是创建 Source,然后这个程序运行起来,最终的结果就是在原始的消息里面会加"--",然后把它打印出来。

2.2.3 丰富的算子

RocketMQ streams 提供了丰富的算子, 包括:

  • source 算子:包括 fromFile, fromRocketMQ, fromKafka 以及可以自定义 source 来源的 from 算子;

  • sink 算子: 包括 toFile, toRocketMQ, toKafka,toDB,toPrint, toES 以及可以自定义 sink 的 to 算子;

  • action 算子:包括 Filter,Expression,Script,selectFields,Union,forEach,Split,Select,Join,Window 等多个算子。

2.2.4 部署执行

基于 DSL SDK 完成开发,通过下面命令打成 jar 包,执行 jar,或直接执行任务的 main 方法。


mvn -Prelease-all -DskipTests clean install -Ujava -jar jarName mainClass &
复制代码


2.3 SQL SDK

2.3.1 依赖准备

 

  <dependency>      <groupId>com.alibaba</groupId>      <artifactId>rsqldb-clients</artifactId>      <version>1.0.0-SNAPSHOT</version></dependency>
复制代码


2.3.2 代码开发

 

首先开发业务逻辑代码, 可以保存为文件也可以直接使用文本;


CREATE FUNCTION json_concat as 'xxx.xxx.JsonConcat';
CREATE TABLE `table_name` ( `scan_time` VARCHAR, `file_name` VARCHAR, `cmdline` VARCHAR,) WITH ( type='file', filePath='/tmp/file.txt', isJsonData='true', msgIsJsonArray='false');

-- 数据标准化
create view data_filter asselect *from ( select scan_time as logtime , lower(cmdline) as lower_cmdline , file_name as proc_name from table_name)xwhere ( lower(proc_name) like '%.xxxxxx' or lower_cmdline like 'xxxxx%' or lower_cmdline like 'xxxxxxx%' or lower_cmdline like 'xxxx' or lower_cmdline like 'xxxxxx' );
CREATE TABLE `output` ( `logtime` VARCHAR , `lower_cmdline` VARCHAR , `proc_name` VARCHAR) WITH ( type = 'print');
insert into outputselect *from aegis_log_proc_format_raw;
复制代码


其中

  • CREATE FUNCTION:引入外部的函数来支持业务逻辑, 包括 flink 以及系统函数;

  • CREATE Table:创建 source/sink;

  • CREATE VIEW:执行字段转化,拆分,过滤;

  • INSERT INTO:数据写入 sink;

  • 函数:内置函数,udf 函数。

2.3.3 SQL 扩展

RocketMQ streams 支持三种 SQL 扩展能力,具体实现细节请看:https://github.com/alibaba/rsqldb

1)通过 Blink UDF/UDTF/UDAF 扩展 SQL 能力;

2)通过 RocketMQ streams 扩展 SQL 能力,只要实现函数名是 eval 的 java bean 即可;

3)通过现有 java 代码扩展 SQL 能力,create function 函数名就是 java 类的方法名。

2.3.4 SQL 执行

你可以从这里下载最新的 Rocketmq Streams 代码并构建。


cd rsqldb/mvn -Prelease-all -DskipTests clean install -Ucp rsqldb-runner/target/rocketmq-streams-sql-{版本号}-distribution.tar.gz 部署的目录
复制代码


解压 tar.gz 包, 进入目录结构


tar -xvf rocketmq-streams-{版本号}-distribution.tar.gzcd rocketmq-streams-{版本号}
复制代码


其目录结构如下 

  • bin 指令目录,包括启动和停止指令

  • conf 配置目录,包括日志配置以及应用的相关配置文件

  • jobs 存放 sql,可以两级目录存储

  • ext 存放扩展的 UDF/UDTF/UDAF/Source/Sink

  • lib 依赖包目录

  • log 日志目录


2.3.4.1 执行 SQL


#指定sql的路径,启动实时任务bin/start-sql.sh sql_file_path
复制代码


2.3.4.2 执行多个 SQL


如果想批量执行一批 SQL,可以把 SQL 放到 jobs 目录,最多可以有两层,把 sql 放到对应目录中,通过 start 指定子目录或 sql 执行任务。


2.3.4.3 任务停止


# 停止过程不加任何参数,则会将目前所有运行的任务同时停止bin/stop.sh
# 停止过程添加了任务名称, 则会将目前运行的所有同名的任务都全部停止bin/stop.sh sqlname
复制代码


2.3.4.4 日志查看


目前所有的运行日志都会存储在 log/catalina.out 文件中。


三、架构设计及原理分析

3.1 RocketMQ Streams 设计思路

在了解完 RocketMQ streams 的基本简介,接下来,我们看下 RocketMQ streams 的设计思路,设计思路主要从设计目标和策略两个方面来介绍:

3.1.1 设计目标

  • 依赖少,部署简单,1 核 1g 单实例可部署,可随意扩展规模;

  • 打造场景优势,重点打造大数据量->高过滤->轻窗口计算的场景,功能覆盖度要全,实现需要的大数据特性:Exactly-ONCE、灵活的窗口(滚动、滑动、会话窗口);

  • 要在保持低资源的前提下,对高过滤有性能突破,打造性能优势;

  • 兼容 Blink SQL,UDF/UDTF/UDAF,让非技术人员更容易上手。


3.1.2 策略(适配场景:大数据量>高过滤/ETL>低窗口计算)

  • 采用 shared-nothing 的分布式架构设计,依赖消息队列做负载均衡和容错机制,单实例可启动,增加实例实现能力扩展,并发能力取决于分片数;

  • 利用消息队列的分片做 shuffle,利用消息队列负载均衡实现容错;

  • 利用存储实现状态备份,实现 Exactly-ONCE 的语义。用结构化远程存储实现快速启动,不等本地存储恢复。

  • 重力打造过滤优化器,通过前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹提高过滤性能



3.2 RocketMQ Streams Source 的实现

1)Source 要求实现最少消费一次的语义,系统通过 checkpoint 系统消息实现,在提交 offset 前发送 checkpoint 消息,通知所有算子刷新内存。

2)Source 支持分片的自动负载和容错

  • 数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作;

  • 当有新分片时,发送新增分片消息,让算子完成分片初始化。

3)数据源通过 start 方法,启动 consuemr 获取消息;

4)原始消息经过编码,附加头部信息包装成 Message 投递给后续算子。



3.3 RocketMQ Streams Sink 的实现

1)Sink 是实时性和吞吐的一个结合;

2)实现一个 sink 只要继承 AbstractSink 类实现 batchInsert 方法即可。batchInsert 的含义是一批数据写入存储,需要子类调用存储接口实现,尽量应用存储的批处理接口,提高吞吐;

3)常规的使用方式是写 message->cache->flush->存储的方式,系统会严格保证每次批次写入存储的量不超过 batchsize 的量,如果超过了,会拆分成多批写入;



4)Sink 有一个 cache,数据默认写 cache,批次写入存储,提高吞吐(一个分片一个 cache);

5)可以开启自动刷新,每个分片会有一个线程,定时刷新 cache 数据到存储,提高实时性。实现类:DataSourceAutoFlushTask;

6)通过调用 flush 方法刷新 cache 到存储;

7)Sink 的 cache 会有内存保护,当 cache 的消息条数>batchSize,会强制刷新,释放内存。


3.4 RocketMQ Streams Exactly-ONCE 实现

1)Source 确保在 commit offset 时,会发送 checkpoint 系统消息,收到消息的组件会完成存盘操作,消息至少消费一次;

2)每条消息会有消息头部,里面封装了 queueld 和 offset;

2)组件在存储数据时,会把 queueld 和处理的最大 offset 存储下来,当有消息重复时,根据 maxoffset 去重;

3)内存保护,一个 checkpoint 周期可能有多次 flush(条数触发),保障内存占用可控。



3.5 RocketMQ Streams Window

实现方式:

1)支持滚动、滑动和会话窗口,支持事件时间和自然时间(消息进入算子的时间);

2)支持 Emit 语法,可以在触发前或触发后,每隔 n 段时间,更新一次数据;比如 1 小时窗口,窗口触发前希望每分钟看到最新结果,窗口触发后希望不丢失迟到一天内的数据,且每 10 分钟更新数据。

3)支持高性能模式和高可靠模式,高性能模式不依赖远程存储,但在分片切换时,有丢失窗数据的风险;

4)快速启动,无需等待本地存储恢复,在发生错误或分片切换时,异步从远程存储恢复数据,同时直接访问远程存储计算;

5)利用消息队列负载均衡,实现扩容缩容容,每个 queue 是一份组,一个分组同一刻只被一台机器消费;

6)正常计算依赖本地存储,具备 flink 相似的计算性能。



四、RocketMQ Streams 在安全场景的最佳实践

4.1 背景

从公共云转战专有云,遇到了新的问题。因为专有云像大数据这种 saas 服务是非必须输出的,且最小输出规模也比较大,用户成本会增加很多,难落地,导致安全能力无法快速同步到专有云。



4.2 解决办法

RocketMQ Streams 在云安全的应用-流计算

  • 基于安全场景打造轻量级计算引擎,基于安全高过滤的场景特点,可以针对高过滤场景优化,然后再做较重的统计、窗口、join 操作,因为过滤率比较高,可以用更轻的方案实现统计和 join 操作;

  • SQL 和引擎都可热升级



业务结果

1)规则覆盖:自建引擎,覆盖 100%规则(正则,join,统计);

2)轻资源,内存是公共云引擎的 1/24,cpu 是 1/6,依赖过滤优化器,资源不随规则线性增加,新增规则无资源压力,通过高压缩表,支持千万情报;

3)SQL 发布,通过 c/s 部署模式,SQL 引擎热发布,尤其护网场景,可快速上线规则;

4)性能优化,对核心组件进行专题性能优化,保持高性能,每实例(2g,4 核,41 规则)5000qps 以上。

 

五、RocketMQ Streams 的未来规划

5.1 打造 RocketMQ 一体化计算能力

1)和 RocketMQ 整合,去除 DB 依赖,融合 RocketMQ KV;

2)和 RocketMQ 混部,支持本地计算,利用本地特点,打造高性能;

3)打造边缘计算最佳实践

5.2 Connector 增强

1)支持 pull 消费方式,checkpoint 异步刷新;

2)兼容 blink/flink connector。

5.3 ETL 能力建设

1)增加文件,syslog 的数据接入能力

2)兼容 Grok 解析,增加常用日志的解析能力;

3)打造日志 ETL 的最佳实践

5.4 稳定性和易用性打造

1)Window 多场景测试,提升稳定性,性能优化;

2)补充测试用例,文档,应用场景。


六、开源地址

RocketMQ-Streams: https://github.com/apache/rocketmq-streams

RocketMQ-Streams-SQL:https://github.com/alibaba/rsqldb


以上是本次对 RocketMQ stream 的整体介绍,希望对大家有所帮助和启发。

 

2021-12-06 12:156159

评论 1 条评论

发布
用户头像
双流join怎么用的,源码中的例子跑不起来啊?
2021-12-21 15:51
回复
没有更多了
发现更多内容

俄罗斯通过加密货币税法:重新定义数字货币规则

区块链软件开发推广运营

交易所开发 dapp开发 链游开发 公链开发 链游开发代币开发

重塑用户体验!快手电商智能巡检平台的实践与探索

快手技术

前端

TDengine vs InfluxDB:谁的“流式计算”功能是真的?

TDengine

数据库 tdengine 时序数据库

用例图如何在线制作?10个用例图模板案例推荐!

职场工具箱

效率工具 UML 用例图 在线白板 绘图软件

法行宝爱企查AI形象上线,AI版“职场搭子”度律度秘替你打工

科技热闻

电力数据驱动的节能创新:TDengine Cloud 在智慧楼宇中的深度应用

TDengine

数据库 tdengine 时序数据库

模型输出可保存为数据集、支持配置社区活动作为课程作业|ModelWhale 版本更新

ModelWhale

Python 人工智能 数据分析 元数据

MPC2024明道云伙伴大会圆满结束

明道云

进军东南亚!Coremail泰国分公司启航

科技热闻

揭秘1688阿里巴巴API接口:解锁商品评论与描述详情图的深度探索之旅

代码忍者

API 接口 pinduoduo API

InfluxDB vs TDengine :2025 年了,谁家用的数据库还不能高效读缓存?

TDengine

数据库 tdengine 时序数据库

TikTok直播网络要求是什么?

Ogcloud

TikTok tiktok直播 tiktok直播专线 tiktok直播网络 tiktok直播加速

从微软 SSAS 到国产替代,这家企业终于松了一口气

Kyligence

阿里巴巴热卖商品推荐API接口的获取与应用

科普小能手

阿里巴巴 电商 API API 接口 阿里巴巴数据采集

解锁电商数据宝藏:淘宝天猫API接口深度探索——商品评论与描述详情图获取指南

代码忍者

API 接口 pinduoduo API

Kubernetes为什么要从docker切换ContainerD

虚实的星空

Docker Containerd

Web端软件测试工具

测试人

软件测试

TextIn文档解析表格处理模型优化,显著提升表格解析性能

合合技术团队

人工智能 表格 AIGC 文档图像

明道云正式发布国际品牌Nocoly

明道云

TDengine 签约深圳综合粒子,赋能粒子研究新突破

TDengine

数据库 tdengine 时序数据库

ERP系统实施的难点不是系统本身,而是企业的人与管理

积木链小链

企业管理 ERP 中小企业

夜莺短信告警教程

巴辉特

夜莺监控 夜莺Nightingale 夜莺短信告警

项目经理如何向客户更好地汇报项目情况

Hi-CodeCaptain

项目管理 软件测试 精准测试 代码覆盖率 质量内建

集团总部与分公司组网:选择MPLS还是SD-WAN?

Ogcloud

SD-WAN 企业组网 企业网络 SD-WAN组网 SD-WAN服务商

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统_大数据_袁小栋_InfoQ精选文章