HarmonyOS开发者限时福利来啦!最高10w+现金激励等你拿~ 了解详情
写点什么

基于 LogMiner 和 Debezium 构建可用于生产实践的 Oracle 实时数据采集工具

  • 2022-07-04
  • 本文字数:6243 字

    阅读完需:约 20 分钟

基于 LogMiner 和 Debezium 构建可用于生产实践的 Oracle 实时数据采集工具

实时数据采集(又称为“变化数据捕获”Change Data Capture,下文又称 CDC、实时抽数)在实时计算、跨库实时同步等领域具有极高的应用价值。相较于其他数据库,Oracle 由于其闭源特性,目前缺乏免费且好用的解决方案。尽管 Oracle 提供了 LogMiner 接口且被以 Debezium 为代表的优秀开源方案所使用,但应用于生产实践时,Debezium 相较于 Oracle 官方付费工具 OGG,具有如下硬伤:


  • 只能对主库 CDC,无法用于 ADG 备库(参见DBZ-3866)。由于 LogMiner 的设计初衷为对 Redo logs 的诊断工具而非专门的 CDC 工具,故并未对持续运行进行任何效率、开销等优化,直接运用于主库势必会对主库业务正常运行造成影响。这对于应用 Oracle 的典型企业(银行、电信等要求高可靠性的公司)实则不可接受。

  • 抽取性能受限。按照Oracle官方的说法,在每秒钟几百个事务时就会遇到较大的延迟和内存问题。Debezium 框架由于其实现,实际解析速度比这个数值更低。


为此,笔者通过深入研究 LogMiner 原理和 Debezium 源码,在其基础上进行改进,使其真正可用于生产实践。


说明:笔者的实验环境为 Oracle 11g 和 Debezium 1.5.4。但通过核对 Oracle 最新官方文档(21c)和 Debezium 最新源码(截至 2022 年 6 月 21 日最新版 1.9.4),本文所涉及的方案对 11g-21c 的 Oracle,以及 Debezium1.5.4 以上版本均适用(文中涉及到的 Debezium 源码均为 1.5.4 版本,非 1.5.4 版本的具体代码细节需根据文中原理自行定位)。限于篇幅本文对 Debezium 基础使用不做介绍,读者可参考官方文档了解其基本配置和用法。

一、LogMiner 抽数的基本过程

 

限于篇幅仅简要介绍其五个基本步骤,详细说明可参阅Oracle官方文档

1)构造字典文件:EXEC DBMS_LOGMNR_D.BUILD(OPTIONS=> DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);

2)添加 Redo Logs:EXEC DBMS_LOGMNR.ADD_LOGFILE(LOGFILENAME => '/oracle/logs/log2.f', OPTIONS=> DBMS_LOGMNR.ADDFILE);

3)开启 LogMiner:EXEC DBMS_LOGMNR.START_LOGMNR(startScn => '..', endScn => '..');

4)查询 V$LOGMNR_CONTENTS 视图获取 Redo 信息并处理

5)查询当前的活跃 Online Redo Log 是否有变化,如果无变化则跳转到第 3)步,如果有变化则关闭 LogMiner(EXEC SYS.DBMS_LOGMNR.END_LOGMNR)并跳转回第 1)步。

二、ADG 备库抽数原理


为了介绍从 ADG 备库 CDC 方案,在此先对Oracle对Redo Logs的管理ADG的原理简介如下:


Online Redo Logs 记录 Oracle 中的一切变化信息,由至少 2 个(组)预分配的文件构成。LGWR(LoG WriteR)进程循环写入这些文件,每次仅往其中一个(组)Online Redo Log 写入变化信息,每一条信息对应一个单调递增的 SCN(System Change Number)。当该 Online Redo Log 写满,或者被显式调用 ALTER SYSTEM SWITCH LOGFILE(强制切换 Online Redo Log),或者写入时间超过 ARCHIVE_LAG_TARGET 参数值,则 LGWR 切换到下一个(组)Online Redo Log 写入变化信息。同时,上一个(组)Online Redo Log 会由归档进程 ARCn 将其内容写入新的 Archived Redo Log。


图源:Oracle 官方文档


图源:Oracle 官方文档


ADG 为了保证同步效率,选择了仅仅同步恢复 Oracle 事务所需的信息(即 Redo 记录)。为了免受磁盘 I/O 状态影响,Redo 记录在尚未落地至主库 Online Redo Log 文件时,即直接从内存中实时传送至备库,备库收到这些 Redo 信息后便将其实际应用于物理库中。备库中 RFS(Remote File Server)进程负责将收到的 Redo 信息同步写入 Standby Redo Logs。当主库发生 Online Redo Log 切换时,RFS 进程同步进行 Standby Redo Log 切换,将新的 Redo 信息写入下一个(组)Standby Redo Log,并由归档进程 ARCn 将上一个(组)Standby Redo Log 内容写入 Archived Redo Log。


图源:《Oracle Active Data Guard versus Storage Remote Mirroring


由上可知:

  • 数据抽取时 Online Redo Logs 和 Archived Redo Logs 缺一不可。前者用于获取最新的实时数据,后者用于在抽取速度跟不上数据库变化速度时,保证能够无遗漏地获取所有变化数据。

  • ADG 备库中,Standby Redo Logs + Archived Redo Logs,具有和主库 Online Redo Logs + Archived Redo Logs 相同的内容。

三、ADG 备库抽数实践


根据上述原理,我们可对 LogMiner 抽数步骤进行调整如下:

1. 备库新建单独 Oracle 实例


使用 DBCA 工具(推荐)或者 CREATE DATABASE 语句(参数复杂,不推荐)在 ADG 备库所在物理机新建单独的 Oracle 实例。假设新建的数据库服务名为 logmnr,原备库的服务名为 datadg。(后文将详述为什么需要新建 logmnr 库进行抽数而不是直接在 datadg 上抽数,以及为什么要在备库所在物理机上新建 logmnr 库)。后续的过程需关注每一步是在 datadg 实例上做,还是在 logmnr 实例上做。

2. 创建平面(Flat)字典文件


在 datadg 实例上,使用 DBMS_LOGMNR_D.BUILD 命令创建字典文件:

EXECUTE DBMS_LOGMNR_D.BUILD(DICTIONARY_LOCATION=>'/home/oracle/logmnr', DICTIONARY_FILENAME=>'dictionary.ora',OPTIONS=> DBMS_LOGMNR_D.STORE_IN_FLAT_FILE);

注意自 Oracle 12c 起往后的版本,dictionary_location 需替换为目录对象,具体参见官方文档说明。

3. 筛选 Standby Redo Logs 和 Archived Redo Logs。


在 datadg 实例上筛选。Archived Redo Logs 的筛选逻辑和 Debezium 框架内置实现相同,读者可直接参考 io.debezium.connector.oracle.logminer.SqlUtils 类中 archiveLogsQuery 方法实现。Standby Redo Logs 的筛选逻辑如下(可参考V$STANDBY_LOG的字段说明理解下述逻辑):

SELECT MIN(F.MEMBER) AS FILE_NAME, L.LAST_CHANGE# AS LAST_CHANGE, F.GROUP#, L.FIRST_CHANGE# AS FIRST_CHANGE, 'CURRENT' as STATUS FROM V$STANDBY_LOG L, V$LOGFILE F WHERE F.GROUP# = L.GROUP# AND (L.ARCHIVED='NO' or L.STATUS='ACTIVE') GROUP BY F.GROUP#, L.LAST_CHANGE#, L.FIRST_CHANGE#, L.STATUS ORDER BY 3;

4. DBMS_LOGMNR 添加 Redo Logs


在 logmnr 实例上,使用 DBMS_LOGMNR.ADD_LOGFILE 添加第 3)步所筛选出的 Redo Logs。(注意:我们第 1 步中选择在 ADG 备库所在的物理机上创建新的 Oracle 实例,就是为了能使该实例能正常读取这些 Redo Logs)

5. DBMS_LOGMNR 开启 LogMiner


在 logmnr 实例上,使用 DBMS_LOGMNR.START_LOGMNR 开启 LogMiner,并查询 V$LOGMNR_CONTENTS 视图获取 Redo 信息并处理。

6. 判断跳转步骤


在 datadg 实例上,查询当前的活跃 Standby Redo Logs 是否有变化。无变化则循环执行第 5 步,否则关闭 LogMiner(EXEC SYS.DBMS_LOGMNR.END_LOGMNR)并跳转到第 2 步。

四、ADG 备库抽数相关问题

1. 为什么需要新建单独的 Oracle 实例 logmnr 进行抽数?


笔者尝试过直接在 datadg 上进行抽数(即上一章节的所有步骤均在 datadg 上执行),在执行到查询 V$LOGMNR_CONTENTS 步骤时,发现查询会卡死无法继续。

为此,笔者按照如下步骤开启 Oracle 10046 Trace(SQL_TRACE):

alter session set tracefile_identifier='10046';

alter session set timed_statistics = true;

alter session set statistics_level=all;

alter session set max_dump_file_size = unlimited;

alter session set events '10046 trace name context forever,level 12';


并重新将所有步骤在 datadg 中执行,执行到查询 V$LOGMNR_CONTENTS 步骤时,观察在 Trace 目录下(select value from v$diag_info where name='Diag Trace'),10046 文件中会持续输出以下内容:


*** 2022-06-27 15:06:04.528

WAIT #139654617260408: nam='enq: WL - contention' ela= 30000095 name|mode=1464598531 log # / thread id #=2001421497 sequence #=972043434 obj#=-1 tim=1656313564528578

WAIT #139654617260408: nam='ARCH wait for archivelog lock' ela= 140 p1=0 p2=0 p3=0 obj#=-1 tim=1656313564528712

*** 2022-06-27 15:06:34.537

WAIT #139654617260408: nam='enq: WL - contention' ela= 30008500 name|mode=1464598531 log # / thread id #=2001421497 sequence #=972043434 obj#=-1 tim=1656313594537263

WAIT #139654617260408: nam='ARCH wait for archivelog lock' ela= 149 p1=0 p2=0 p3=0 obj#=-1 tim=1656313594537402


由于 ADG 备库中的相关进程对这些 Redo Logs 文件加锁处理,使得 LogMiner 进程无法同时对其进行操作。但由于这种加锁仅限于这些进程所在的 Oracle 实例(即 datadg)而非该实例所在的操作系统(物理机),故在同一物理机上创建新的 Oracle 实例可解决该问题。

2. 字典文件是做什么用的?为什么需要采用 Flat File?


字典文件用于将表名、字段名等元信息的物理表示转换为逻辑表示。如果在 DBMS_LOGMNR.START_LOGMNR 步骤中不指定字典文件,则得到的 Redo SQL 将会类似于下面这样:


update "UNKNOWN"."OBJ# 1456235" set "COL 64" = HEXTORAW('3330383339') where "COL 1" = HEXTORAW('313330323230313038333835383436') and ...... and ROWID = 'AAFjhrAA6AANcSoAAF';


Oracle 提供了三种访问字典文件的方式:直接使用 online catalog,将字典文件导出至 Redo Log,将字典文件导出至 Flat File(即单独的字典文件)。由于我们选择在单独的 Oracle 实例中抽数,无法获取 online catalog,故只能选择第二种或第三种方式。尽管 Oracle 官方不推荐第三种方式(所以 Debezium 中并未内置支持),但该方式的效率实测比第二种高出不少(经笔者实测,采用第二种方法会将数据延迟由几秒钟放大至 3-5 分钟),且在我们的生产实践中并未观察到 Oracle 官方声称的不一致现象(因为生产系统鲜少有对存量表和字段的删除、插入等 DDL 操作),故推荐将字典文件存储于 Flat File。

五、提升 Debezium 框架抽数速度


按照上文的原理和基本步骤,笔者对 Debezium 框架进行了修改,成功实现了对 Oracle ADG 备库的数据抽取。但又出现了新的问题:


  • 抽取速度过低,实际发送至 Kafka 的速度仅为约 60 条/秒。

  • 对于大事务(譬如一个更新了几十万条记录的 update 语句)会报内存溢出,无法继续抽数。


形成该问题的原因有二:

l  LogMiner 的设计初衷为对 Redo logs 的诊断工具而非专门的 CDC 工具,故对抽取效率并未做优化。

l  Debezium 框架仅考虑到功能的正确实现,并未考虑到对抽取效率的调优。

据此,笔者通过排查程序运行堵点,采用分发器模式对源代码进行重构,并对部分逻辑进行优化,最终使大事务时的解析速度从原先的 60 条/秒提升至 3000 条/秒以上。

1. Debezium 解析 V$LOGMNR_CONTENTS 的过程


V$LOGMNR_CONTENTS 中记录了按照 SCN 顺序排列的变化信息。需要注意的是,对于 DML 类操作,变化信息是基于行的(譬如一个 UPDATE 语句更新了 100 条记录,那么 LGWR 进程会实际上往 Redo Log 中写入 100 条 UPDATE 语句,每条语句仅更新一行,V$LOGMNR_CONTENTS 中也会实际上存储 100 条 DML 记录)。


Debezium 在调用完毕 DBMS_LOGMNR.START_LOGMNR 后,依次进行如下操作:(详见 io.debezium.connector.oracle.logminer 包下 LogMinerStreamingChangeEventSource 和 LogMinerQueryResultProcessor 类):


1)读取 V$LOGMNR_CONTENTS 并将结果记于 ResultSet。

2)若 ResultSet 中仍有内容,读取新的记录并判断类型。

3)如果是 DML 操作,那么将操作内容暂存于 Transaction 中;如果是 COMMIT 操作,那么将 Transaction 暂存内容组装为 Kafka Connect 支持的格式并发送;如果是 ROLLBACK 操作,那么将 Transaction 暂存内容丢弃。

4)跳回第 2)步继续执行,直到 ResultSet 全部内容读取结束。


需要注意的是,在 Debezium 中,以上过程全部为主线程串行执行。这种方式具有如下问题:


  • 所有的操作均为单线程顺序执行,一方面单线程执行没有充分挖掘并发效率,另一方面,顺序执行将本地 CPU 密集操作(步骤 2)-3))和数据库密集操作(步骤 1))分开,造成资源的严重浪费。

  • 所有 Redo 记录顺序处理,无法预知当前处理的 DML 操作所对应的 Transaction 将要被 COMMIT 还是 ROLLBACK,故无法将 Redo 信息直接发送至 Kafka Connect,需多一步组装暂存的过程,不仅耗时还可能出现如下文所说的内存溢出的问题。

2. 架构优化


根据上节分析,笔者采用分发器模式,对 Debezium 整体框架进行了优化,下图为架构设计:



对该架构说明如下:


1)Oracle 对 LogMiner 做了非常严格的资源限制,每个 Logminer 进程资源消耗都不能超过 1 个 CPU 核心。因而当我们开启多个 LogMiner 进程同时挖掘时,可以观察到挖掘速度会有明显提升。但为了保证开启多进程后不影响 Redo 信息的处理顺序,我们需要维护一个 SCN 队列,代表每个 START_LOGMNR 操作的起始 SCN。Redo 信息分发器需要按照该顺序进行 Redo 信息分发。


每个 LogMiner 进程需要将查询 V$LOGMNR_CONTENTS 结果进行存储的原因,一是查询该视图的时间开销较大,不能直接让 Redo 信息分发器查询;二是暂存后 LogMiner 进程可以继续向后挖掘,不需要等待 Redo 信息分发器的后续处理。


2)Redo 信息分发器按照 SCN 顺序,对每一条 Redo 信息,拿到 Transaction ID 后直接将其派发到 Transaction 处理线程池的对应 Transaction 线程。特别地,如果这条 Redo 信息操作类型为 COMMIT,那么将对应 Transaction 线程的 commitSCN 置为该条信息的 SCN,并且将该 SCN 加入 CommitSCN 队列;如果这条 Redo 信息操作类型为 ROLLBACK,那么将对应 Transaction 线程的 rollbackSignal 置为 1。


每个 Transaction 线程在处理信息时,如果 rollbackSignal 为 1,那么直接结束处理过程;如果 CommitSCN 不为空且等于 CommitSCN 队头元素,则将当前解析完毕的信息直接发送至 Kafka Connect,否则通过对象流方式暂存,稍后再实际发送。

3. 需要关注的问题(一)——通过对象流暂存解析后 Redo 信息


Debezium 截至目前最新版本,均在内存中暂存操作类型为 DML 的 Redo 解析信息(具体可见 Io.debezium.connector.oracle.logminer.TransactionalBuffer 类中的内部类 Transaction,其使用 List<DmlEvent>直接存储解析后 DML 信息)。对于尚未提交的大事务,直接将 DML 信息存储于内存中极易发生内存溢出(笔者实测当一个 update 语句更新 30 万条记录时即可让开辟 2G 内存的 Debezium 进程因内存溢出而崩溃)。


解决方式是使用对象流方式,将待提交信息序列化存储于文件中。需要注意:


  • 为了防止内存泄露,不要将所有待提交事务转储于同一个文件中,可以譬如每 1000 条记录切换一个文件。

  • 对象流读取/写入需要 I/O 开销,在处理 DML 记录和 COMMIT 事件阶段,应分别启动专门的写入线程和读取线程,并采用生产者/消费者模式,构造缓冲区队列,对待写入或已读取的 DmlEvent 进行管理。

 

4. 需要关注的问题(二)——修正 Debezium 中过于繁冗的解析


Io.debezium.relational.RelationalChangeRecordEmitter 类中的 emitCreateRecord/emitReadRecord/emitUpdateRecord/emitDeleteRecord 中,均执行了 tableSchema.valueFromColumnData 方法。在笔者的排查中,该方法是除 LogMiner 本身执行效率外,实际发送至 Kafka 的速度过慢的最大性能瓶颈。


该方法所做的是将 LogMiner 解析后的记录转换为 Kafka Connect 能够识别的记录。通过阅读 LogMiner 的解析器 io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser 源码可知,该解析器事实上将所有字段都转换成了 String 类型。由于 String 类型一定可被 Kafka Connect 正确识别,笔者选择了在 RelationalChangeRecordEmitter 的子类 io.debezium.connector.oracle.logminer.LogMinerChangeRecordEmitter 中,对相关的方法进行重写,省去调用 tableSchema.valueFromColumnData 的过程,直接将已解析的字段传递给 Kafka Connect。

六、结束语


本文阐述了针对 Debezium 做适应性优化和改造的方法,使其能够对主库无影响的情况下针对 Oracle 备库进行抽数,并将抽取速度提升至生产实际可用的程度。对于一般应用系统可作为替代 Oracle OGG 的“免费”且“好用”的方案选择。

作者介绍:

丁杨,现任职于中国农业银行研发中心,在大数据领域具有多年研发经验,现主要负责流批一体化计算在银行业务领域的应用研究。

2022-07-04 15:3514161

评论 8 条评论

发布
用户头像
能给个作者的联系方式吗?
2023-07-25 21:03 · 上海
回复
用户头像
你好,debezium在抽取adg备库的时候会调用createSchemaChangeEventsForTables
该操作需要写的权限,备库需要打开写权限吗?
2023-06-15 22:29 · 湖南
回复
你好,抽取adg备库的问题解决了吗?
2024-06-26 17:08 · 广东
回复
用户头像
想问一下,logminer长时间启动,PGA内存占用不断上升这个问题,你这边是怎么解决的呢?参考debezium的文档,设置了sleep时间 以及 调整了oracle的日志切换频率,都不能解决PGA内存问题,oracle11g, debezium版本 1.9.7
2023-04-03 16:22 · 北京
回复
可以看看log.mining.session.max.ms参数,文档链接:https://debezium.io/documentation/reference/2.2/connectors/oracle.html#oracle-property-log-mining-session-max-ms
2023-04-06 11:58 · 浙江
回复
你好,这个参数我也设置了,使用的是debezium1.9.7的版本,oracle11.0.2.4版本,PGA内存依然不断上升,请问你这块的oracle版本是多少啊?我现在怀疑和oracle的版本有关
2023-04-18 16:27 · 北京
回复
用户头像
求源码学习
2022-08-05 16:12
回复
用户头像
源码可否提供出来学习
2022-07-12 16:32
回复
没有更多了
发现更多内容

mes系统在新材料行业中的应用价值

万界星空科技

mes 万界星空科技 生产管理MES系统 新材料mes 新材料行业

几张图带你了解TiDB架构演进

TiDB 社区干货传送门

版本升级

电感生活So EZ 长安马自达MAZDA EZ-6全场景开放道路试驾

科技热闻

万界星空科技MES系统如何实现设备数据集成

万界星空科技

数据采集 mes 设备管理 万界星空科技

音频录制软件Audio Hijack for Mac

Mac相关知识分享

音频录制软件

《黑神话:悟空》真的带火云电脑了吗?

脑极体

AI

非凸科技钻石赞助第四届Rust China Conf 2024

非凸科技

网站下载工具SiteSucker Pro for Mac

Mac相关知识分享

对比传统数据库,TiDB 强在哪?谈谈 TiDB 的适应场景和产品能力

TiDB 社区干货传送门

How to Add a Built-in Function to TiDB Using a Cursor in 20 Minutes

TiDB 社区干货传送门

TiDB 源码解读

TiDB 数据库核心原理与架构_Lesson 01 TiDB 数据库架构概述课程整理

TiDB 社区干货传送门

TiDB 底层架构

襄阳等保测评机构有哪些?在哪里?

行云管家

等保 等保测评

超级驾趣学院 长安马自达MAZDA EZ-6驾驭全场景出行

Geek_2d6073

金蝶云·苍穹OEM版产品正式发布!AI时代共创软件产业新质生产力

金蝶云·苍穹

金蝶 生态伙伴 金蝶云苍穹

关于新版本 tidb dashboard API 调用说明

TiDB 社区干货传送门

集群管理 管理与运维 故障排查/诊断 新版本/特性解读 7.x 实践

MySQL 扛不住了,来试试这款平替的“国产化改造”必入手的国产数据库吧!

TiDB 社区干货传送门

NocoBase 与 NocoDB:开源无代码工具深度对比

NocoBase

开源 低代码 无代码开发 低代码开发 无代码

一文说清楚数据集成中的流处理与批处理的区别

RestCloud

Apache 数据处理 批处理 ETL 流处理

SQL 中 Drop、Delete 与 Truncate 的区别

Chat2DB

数据库 开源 AI sql

是什么让 TiDB 从一款中国受欢迎的数据库产品在短短几年内成为全球受欢迎的数据库产品?

TiDB 社区干货传送门

防止电脑休眠软件Lungo for Mac

Mac相关知识分享

pdf转换及ocr识别软件PDFify for mac

Mac相关知识分享

NPU 与 GPU 相比,有什么差别?| 技术速览

Baihai IDP

程序员 AI gpu 企业 9 月 PK 榜 NPU

TiDB在 G7 的实践和未来

TiDB 社区干货传送门

火山引擎VeDI核心产品DataTester再进化,A/B大模型应用评测功能上线

字节跳动数据平台

大数据 A/B 测试 对比实验 数字化增长

2024 医疗 Datathon 又叕来啦~!“理-工-医-信”跨学科联合科研,以数据驱动医疗实践

ModelWhale

R 语言 datathon 医疗大数据

参与“2024,我想和 TDengine 谈谈”有奖征文活动,赢 AirPods

TDengine

数据库 tdengine 时序数据库

品牌未来式,增长进行时|2024凯度BrandZ中国品牌盛典回顾

财见

软件测试 | 手工转测试开发轻松实现薪资 50%涨幅的逆袭之路

测试人

软件测试

交换机是什么?云管理平台可以管理交换吗?

行云管家

云计算 云服务 交换机 云管理

【黄金圆环】在研发领域的实践分享

京东科技开发者

基于 LogMiner 和 Debezium 构建可用于生产实践的 Oracle 实时数据采集工具_架构_丁杨_InfoQ精选文章