AICon议程上新60%,阿里国际、360智脑、科大讯飞、蔚来汽车分享大模型探索与实践 了解详情
写点什么

Debezium 和 Quarkus:通过 CDC 模式来避免双重写入

  • 2022-09-08
    北京
  • 本文字数:11712 字

    阅读完需:约 38 分钟

Debezium和Quarkus:通过CDC模式来避免双重写入

在本系列文章的第2部分,我们学习了 Kafka Streams 与 Quarkus 的集成。我们开发了一个简单的应用程序向一个 Kafka 主题生成事件,并使用 Kafka Streams 实时消费和处理这些事件。


在那个示例中,我们模拟了一家电影流媒体公司。我们将电影信息保存在一个 Kafka 主题中,当用户停止观看电影,我们将这个事件和已播放的时间保存到另一个主题中。我们对这些事件进行实时的后续处理,计算出一部电影播放超过 10 分钟的次数。


应用程序的架构如下图所示。

所有的信息都保存在 Kafka 主题中,但这在现实的项目中是不太可能发生的。


在现实当中,电影信息可能保存在传统的数据库中,并用一些分布式缓存来加快查询速度,或者使用搜索引擎建立索引。为简单起见,我们假设电影信息保存在数据库中。


这就提出了一个问题——我们如何在两个不同的系统中维护相同的数据,数据库作为主要保存数据的位置,Kafka 主题中的数据将使用 Kafka Streams 来处理。


本文将教你如何正确地以不同形式保存相同的数据。

双重写入

要解决这个问题,我们首先想到的可能是双重写入。这是一种最为简单的方法,应用程序负责维护所有位置的数据。例如,当有新的电影信息需要插入时,会执行一个数据库插入,并发送一个事件到 Kafka 主题。


代码可能像下面这样。

@Channel("movies")Emitter<Record<Long, String>> movieEmitter; private static ObjectMapper objectMapper = new ObjectMapper(); public Movie dualWriteInsert(Movie movie) throws JsonProcessingException {    // Inserts to DB    movie.persist();     // Send an event to movies topic    final String payloadJson = objectMapper.writeValueAsString(movie);    long id = movie.id;     movieEmitter.send(Record.of(id, payloadJson));
复制代码


这看起来没什么问题,很容易实现,也很有效,如果没有遇到什么奇怪的问题的话。下面让我们来看看这种方式会遇到怎样的问题。


  • 如果数据被持久化在数据库中,但发送到 Kafka 主题时失败了,你可以把这两个操作包装在一个事务块中。这可以解决事务问题,因为在出错时可以回滚。但你在性能方面付出了巨大的代价,事务范围越大,阻塞数据库的时间就越长。

  • 如果两个并发用户想同时更新一个电影信息,会发生什么情况?可能会发生这样的情况——第一个请求更新了数据库,并将事件发送给 Kafka,然后第二个请求再次更新数据库和 Kafka。在这种情况下,数据库和 Kafka 主题中的数据是对齐的。但是如果第一个请求只将数据持久化到数据库,第二个请求持久化并发送事件到 Kafka,然后第一个请求再将事件发送到 Kafka 主题,那么此时数据库和 Kafka 主题中的数据就发生了分歧,产生了不同的值,导致数据之间不一致。当然,你可以使用同步方法,但这将意味着巨大的性能损失。出现第二个问题是因为混合使用了不同的系统,数据库事务的保证范围只限于数据库本身,无法在不同的系统之间起作用。

两阶段提交

这个问题的一个可能的解决方案是使用两阶段提交协议。虽然这可能是一个很好的解决方案,但也存在两个问题。


  • 首先,并不是所有的系统都支持分布式事务和两阶段提交。

  • 这个协议的问题在于各方之间的通信需要进行额外的协调。这是一个可能的解决方案,但它不是一个通用的解决方案。在我们的示例中,Kafka 不支持分布式事务,所以让我们来看看另一个解决方案。

变更数据捕获

变更数据捕获(Change Data CaptureCDC)是一种模式,用于跟踪已更改的数据(例如,添加的新记录、更新的注册表等)并触发事件,让应用程序能够对变更作出反应。


有几种实现 CDC 的方法,例如,在使用行级别的时间戳、版本号或状态指示器,这样就可以定时从一个特定的点检查数据(例如,SELECT * WHERE status=not_read)。但这种方法有一个缺点,你需要经常访问数据库,但这些访问与业务无关,而且需要处理数据被删除的情况。


另一种方法是使用数据库触发器,即任何一个数据变更都会触发一个事件,并将事件保存在特定的事件表中。你可以捕获任何一个事件,但仍然需要定时轮询数据库。


大多数数据库都有事务日志,它记录了数据库的所有变更。日志扫描器会扫描这个日志,并以非侵入式的方式捕获变更。这种方法的好处如下所示。


  • 对数据库的影响最小。

  • 变更对应用程序来说是透明的,不需要插入特殊的列。

  • 事务完整性。

  • 不需要修改数据库 Schema。日志扫描是最好的方法,而Debezium是最流行的开源日志扫描器项目。

Debezium

Debezium 是一个通过扫描日志实现变更数据捕获的开源项目。启动数据库并配置 Debezium,用它消费数据库事务日志中的数据。对于提交给数据库的每一次插入、删除或更新,Debezium 都将触发一个事件,应用程序可以向它注册并作出相应的反应。


那么为什么说 Debezium、CDC 和 Kafka 可以帮助我们解决双重写入的问题呢?Kafka 主题由一个或多个分区组成,每个分区按照事件到达的顺序对事件进行排序(事件总是被追加到分区的末尾)。因此,如果我们想要维护并发操作的顺序问题(避免在系统之间有错位的数据),Kafka 主题可以帮我们解决这个问题。


当然,还有另外一个问题,即在并发操作的情况下,如何按照正确的顺序从数据库中读取数据。CDC 和日志扫描器可以确保事务提交后数据的顺序是正确的,并且是非侵入性的,而 Debezium 可以在这方面发挥作用。


你可以用两种不同的方式来操作 Debezium,这两种方式都是有效的,使用哪一种取决于具体情况。这两种方式分别是 Debezium 服务器或 Debezium 引擎(嵌入式)。

Debezium 服务器

Debezium 服务器将 Debezium 作为 Kafka Connect 实例运行。Kafka Connect 是一个独立的进程,由消费者和生产者启动,用于从 Kafka 读取数据。Kafka Connect 定义了不同数据系统的连接器,然后将大型数据集移入或移出 Kafka。由于连接器使用了 Kafka API,所以它们是可伸缩的,具有容错能力和较低的延迟。


在下面的例子中,假设你想将数据从一个 Kafka 主题导出到一个索引引擎,比如 ElasticSearch。你有两个选择。


使用 Kafka API 创建一个应用程序(就像我们在本系列的第1部分中看到的那样)从 Kafka 主题读取事件,然后使用 ElasticSearch 客户端将数据填充到索引中。


使用 ElasticSearch Kafka Connect,它已经实现了所有这些逻辑,你只需要配置和启动即可。


Debezium 做的是同样的事情,它从数据库读取事务日志,并将其发送到 Kafka 主题。


Debezium 最大的优点之一是它可以连接到多种数据库,如 MySQL、MongoDB、PostgreSQL、Oracle DB、SQL Server、DB 2、Cassandra 和 Vitesse。

Debezium 引擎

通常情况下我们会使用 Debezium 服务器,因为它不会干扰应用程序。它是一个用于接收数据变更并填充 Kafka 主题的服务。


但并不是所有的应用程序都需要 Kafka Connect 提供的容错能力或可伸缩性。此外,有时候应用程序必须自己捕获数据变更事件,并执行一些自定义逻辑,而不只将变更发送到消息传递系统中。


对这些情况,debezium-api 模块定义了一个 API 来将 Debezium 引擎嵌入到应用程序中。


到目前为止,我们知道了应该要尽量避免双重写入。我们的解决方案是使用 CDC 直接从事务日志中获取数据,并将其推送到 Kafka 主题,这样其他系统就可以以“事务性”的方式和顺序消费这些数据。

发件箱模式

看到这里,你可能会想:“好吧,我可以通过 CDC 对数据变更作出反应,但内部实体被暴露给了外部系统。”虽然这是真的,但请允许我向你介绍发件箱模式来避免这个问题。


发件箱模式提供了一个发件箱表,你可以在其中记录所有实体的操作(可能使用非规范化数据)。然后 CDC 系统(在我们的例子中使用的是 Debezium)对发件箱表(而不是实体表)中的变更做出反应,这样就实现了数据模型与其他系统的隔离。


需要注意的是,实体变更和发件箱必须在同一个事务中。


让我们把所有这些碎片放在一个 Quarkus 项目中,并解决我们在一开始提出的问题——如何在数据库中插入与电影相关的信息,并将其填充到外部系统(Kafka 主题)中。

我们的 Debezium 示例

我们不再为每一种场景手动编写代码,而是使用 Debezium 引擎并将它与 Quarkus 集成来解决这个问题。

创建项目

到 Quarkus起始页,选择 RestEasy Reactive 和 RestEasy Reactive Jackson 插件(用于编码/解码数据),实现 JAX-RS 端点,使用 Panache 和 MySQL 驱动程序将电影信息插入到 MySQL 数据库,使用 SmallRye Reactive Messaging 与 Kafka 发生交互。另外,取消选中 Started Code 选项,如下图所示。

你可以跳过这个手动步骤,并打开Quarkus Generator链接,这里所有的依赖项都被选中。然后按下“Generate your application”按钮,下载搭建好的应用程序 zip 文件包。


解压缩文件并在你最喜欢的 IDE 中打开项目。

开发

在开始编码之前,我们需要添加两个新的依赖项:一个用于使用 Debezium 引擎,另一个用于添加 Debezium Quarkus Outbox 插件。

Debezium 引擎

打开 pom.xml 文件并添加以下依赖项。


在 dependencyManagement 部分:

<dependency>   <groupId>io.debezium</groupId>   <artifactId>debezium-bom</artifactId>   <version>1.9.4.Final</version>   <type>pom</type>   <scope>import</scope></dependency>
复制代码


在 dependencies 部分:

<dependency>  <groupId>io.debezium</groupId>  <artifactId>debezium-ddl-parser</artifactId></dependency><dependency>  <groupId>io.debezium</groupId>  <artifactId>debezium-embedded</artifactId></dependency><!-- We connect to a MySQL database, so we need debezium MySQL connector --><dependency>  <groupId>io.debezium</groupId>  <artifactId>debezium-connector-mysql</artifactId></dependency>
复制代码


这是为了使用嵌入在应用程序中的 Debezium 引擎。如果我们使用 Debezium 服务器,就不需要这些依赖项,因为它是一个独立的服务。

Debezium Quarkus Outbox 插件

Quarkus 通过Debezium Quarkus Outbox插件实现发件箱模式。


打开 pom.xml 文件并添加以下依赖项。


在 dependencyManagement 部分:

<dependency>   <groupId>${quarkus.platform.group-id}</groupId>   <artifactId>quarkus-debezium-bom</artifactId>   <version>${quarkus.platform.version}</version>   <type>pom</type>   <scope>import</scope> </dependency>
复制代码


请注意 BOM 的版本与 Quarkus 版本要对齐,在这里是 2.10.1.Final。


在 dependencies 部分:

<dependency>  <groupId>io.debezium</groupId>  <artifactId>debezium-quarkus-outbox</artifactId></dependency>
复制代码

实现

你可以选择不使用发件箱模式或自己实现它,如果是这样,那么这些依赖项就都不需要。但为简单起见,我们在这里使用它。


有了这些依赖项,我们就可以创建带有 JPA 注解并扩展了 PanacheEntity 类的的 Movie 实体。

import javax.persistence.Entity; import io.quarkus.hibernate.orm.panache.PanacheEntity; @Entitypublic class Movie extends PanacheEntity {     // No worries Quarkus will change them   // to private and auto-generate getters/setters at compilation time   public String name;   public String director;   public String genre; }
复制代码


下一步是创建一个 HTTP 端点,使用 JAX-RS 注解将电影信息插入到数据库中。

import javax.inject.Inject;import javax.ws.rs.POST;import javax.ws.rs.Path; import org.jboss.logging.Logger; @Path("/movie")public class MovieResource {    // Service to insert the movie data into Movie and Outbox tables   @Inject   MovieService movieService;    // Injects the logger   @Inject   Logger logger;    // Http Post method to insert a movie   @POST   public Movie insert(Movie movie) {       logger.info("New Movie inserted " + movie.name);       System.out.println(":)");             return movieService.insertMovie(movie);   }}
复制代码


因为我们使用的是 Debezium Quarkus Outbox 插件,所以需要创建一个用于表示发件箱表中存储的内容的实体。实体必须实现 ExportedEvent 接口,并实现接口方法来识别发件箱表中放置的事件类型。

import java.time.Instant; import com.fasterxml.jackson.databind.JsonNode;import com.fasterxml.jackson.databind.ObjectMapper;import com.fasterxml.jackson.databind.node.ObjectNode; import io.debezium.outbox.quarkus.ExportedEvent; public class MovieEvent implements ExportedEvent<String, JsonNode> {    private static ObjectMapper mapper = new ObjectMapper();    // Set the type enclosed inside the event   private static final String TYPE = "Movie";   // Set the event type   private static final String EVENT_TYPE = "MovieCreated";    private final long gameId;   private final JsonNode jsonNode;   private final Instant timestamp;    // Saves Game info in the class   public MovieEvent(Movie movie) {       this.gameId = movie.id;       this.timestamp = Instant.now();       // Saves game content in a string column in JSON format       this.jsonNode = convertToJson(movie);   }    @Override   public String getAggregateId() {       return String.valueOf(this.gameId);   }    @Override   public String getAggregateType() {       return TYPE;   }    @Override   public JsonNode getPayload() {       return jsonNode;   }    @Override   public Instant getTimestamp() {       return timestamp;   }    @Override   public String getType() {       return EVENT_TYPE;   }     private JsonNode convertToJson(Movie movie) {       ObjectNode asJson = mapper.createObjectNode()               .put("id", movie.id)               .put("name", movie.name)               .put("director", movie.director)               .put("genre", movie.genre);             return asJson;   } }
复制代码


在将 Debezium 逻辑添加到代码之前,我们还需要实现 MovieService 类,加入插入数据的逻辑。这个逻辑应该将电影信息持久化到 Movie 表中,并将 MovieEvent 实体持久化到由 OutboxEvent 插件管理的表中。


这个插件提供了一个特定的 CDI 事件来持久化实现了 ExportedEvent 接口的事件。我们唯一要做的事情是触发一个事件,数据将自动被持久化。

import javax.enterprise.context.ApplicationScoped;import javax.enterprise.event.Event;import javax.inject.Inject;import javax.transaction.Transactional; import io.debezium.outbox.quarkus.ExportedEvent; @ApplicationScopedpublic class MovieService {     // CDI event interface triggering Outbox entities   @Inject   Event<ExportedEvent<?, ?>> event;    // Transaction method   @Transactional   public Movie insertMovie(Movie movie) {        // Persists data       movie.persist();             // Persists outbox content       event.fire(new MovieEvent(movie));             return movie;   }}
复制代码


最后一步是配置 Debezium 引擎,并将其嵌入到应用程序中。


要配置引擎,你需要设置数据库信息(主机名、端口、凭证)以及 Debezium 要监控的数据库和表。


import java.io.File;import java.io.IOException; import javax.enterprise.inject.Produces; import org.eclipse.microprofile.config.inject.ConfigProperty; import io.debezium.config.Configuration; public class DebeziumConfiguration {     // Debezium needs Database URL and credentials to login and   // monitor transaction logs   @ConfigProperty(name = "quarkus.datasource.jdbc.url")   String url;    @ConfigProperty(name = "quarkus.datasource.password")   String password;    @ConfigProperty(name = "quarkus.datasource.username")   String username;
@Produces public Configuration configureDebezium() throws IOException { // Custom class to get database name or hostname of Database server MySqlJdbcParser jdbcParser = MySqlJdbcParser.parse(url); File fileOffset = File.createTempFile("offset", ".dat"); File fileDbHistory = File.createTempFile("dbhistory", ".dat"); return io.debezium.config.Configuration.create() .with("name", "movies-mysql-connector") // configures MySQL connector .with("connector.class", "io.debezium.connector.mysql.MySqlConnector") .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") .with("offset.storage.file.filename", fileOffset.getAbsolutePath()) .with("offset.flush.interval.ms", "60000") // Configures database location .with("database.hostname", jdbcParser.getHost()) .with("database.port", jdbcParser.getPort()) .with("database.user", "root") .with("database.allowPublicKeyRetrieval", "true") .with("database.password", password) .with("database.dbname", jdbcParser.getDatabase()) .with("database.include.list", jdbcParser.getDatabase()) // Debezium only sends events for the modifications of OutboxEvent table and not all tables .with("table.include.list", jdbcParser.getDatabase() + ".OutboxEvent") .with("include.schema.changes", "false") .with("database.server.id", "10181") .with("database.server.name", "movies-mysql-db-server") .with("database.history", "io.debezium.relational.history.FileDatabaseHistory") .with("database.history.file.filename", fileDbHistory.getAbsolutePath()) .build(); } }
复制代码


DebeziumListener CDI 类会在应用程序启动时启动 Debezium。


Debezium 引擎并不是在单独的线程中运行,所以我们需要提供一个并行运行的线程,而不是阻塞应用程序的线程。在 Quarkus 中,我们可以使用 ManagedExecutor 提供执行程序线程来运行 Debezium。


然后,我们需要使用 DebeziumEngine 类来实例化 Debezium 引擎,并设置在上一步中创建的配置属性。最重要的一个步骤是注册一个在 Debezium 每次生成事件时触发的方法。notifying 方法会对这个自定义方法进行注册,在我们的示例中,这个方法叫作 handleChangeEvent。


这个方法用于接收事件,我们可以实现任何我们想要的逻辑——将事件发送到 Kafka 主题或者其他服务——任何你可以在 Java 中实现的东西。

import java.io.IOException; import javax.enterprise.context.ApplicationScoped;import javax.enterprise.event.Observes; import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import org.eclipse.microprofile.context.ManagedExecutor;import org.eclipse.microprofile.reactive.messaging.Channel;import org.eclipse.microprofile.reactive.messaging.Emitter; import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.JsonNode;import com.fasterxml.jackson.databind.ObjectMapper; import io.debezium.config.Configuration;import io.debezium.embedded.Connect;import io.debezium.engine.DebeziumEngine;import io.debezium.engine.RecordChangeEvent;import io.debezium.engine.format.ChangeEventFormat;import io.quarkus.runtime.ShutdownEvent;import io.quarkus.runtime.StartupEvent;import io.smallrye.reactive.messaging.kafka.Record; import static io.debezium.data.Envelope.FieldName.*;import static io.debezium.data.Envelope.Operation;
@ApplicationScopedpublic class DebeziumListener { private static ObjectMapper objectMapper = new ObjectMapper(); // Start the Debezium engine in a different thread ManagedExecutor executor; // Debezium configuration object Configuration configuration; private DebeziumEngine<RecordChangeEvent<SourceRecord>> engine; public DebeziumListener(ManagedExecutor executor, Configuration configuration) { this.executor = executor; this.configuration = configuration; } // Interface to send events to movies Kafka topic @Channel("movies") Emitter<Record<Long, JsonNode>> movieEmitter; void onStart(@Observes StartupEvent event) { // Configures Debezium engine this.engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) .using(this.configuration.asProperties()) // For each event triggered by Debezium, the handleChangeEvnt method is called .notifying(this::handleChangeEvent) .build(); // Starts Debezium in different thread this.executor.execute(this.engine); }
void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) { // For each triggered event, we get the information SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record(); Struct sourceRecordChangeValue= (Struct) sourceRecord.value(); if (sourceRecordChangeValue != null) { Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION)); // Only insert operations are processed if(operation == Operation.CREATE) { // Get insertation info Struct struct = (Struct) sourceRecordChangeValue.get(AFTER); String type = struct.getString("type"); String payload = struct.getString("payload"); if ("GameCreated".equals(type)) { try { final JsonNode payloadJson = objectMapper.readValue(payload, JsonNode.class); long id = payloadJson.get("id").asLong(); // Populate content to Kafka topic movieEmitter.send(Record.of(id, payloadJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } } } } }
void onStop(@Observes ShutdownEvent event) throws IOException { if (this.engine != null) { this.engine.close(); } } }
复制代码

运行

这个示例是自包含的,因此你不需要启动任何东西,因为 Quarkus 会为你启动它。


Panache 和 Kafka Connector 已经与 Quarkus DevServices集成,因此我们不需要启动 Kafka 集群或 MySQL 数据库,也不需要将它们配置为 Quarkus Dev 模式。电脑上需要有一个可运行的容器运行时,比如 Podman 或任何其他兼容 OCI 的工具。


为了便于跟踪,在运行应用程序之前,我们向应用程序中添加两个配置属性。在 application.properties 文件中添加下面的两行。


quarkus.hibernate-orm.log.sql=truequarkus.debezium-outbox.remove-after-insert=false
复制代码


第一行记录执行的 SQL 语句。这有助于在插入数据时对两张表(Movies 和 OutboxEvent)进行验证。


第二行避免 Debezium 在使用发件箱表后删除数据。


在终端窗口中启动服务:

./mvnw clean quarkus:dev
2022-07-07 11:36:22,942 INFO [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Waiting for keepalive thread to start2022-07-07 11:36:22,948 INFO [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Keepalive thread is running2022-07-07 11:37:43,889 INFO [org.acm.MovieResource] (executor-thread-1) New Movie inserted string
复制代码


几秒钟后,Kafka 集群、MySQL 实例和应用程序就启动起来了。


通过检查运行的容器来验证实例:

docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS fa316bfae219 vectorized/redpanda:v21.11.3 "sh -c 'while [ ! -f…" 49 seconds ago Up 45 seconds 8081-8082/tcp, 9644/tcp, 0.0.0.0:55002->9092/tcp
4c220f7ee066 mysql:8.0 "docker-entrypoint.s…" 50 seconds ago Up 46 seconds 33060/tcp, 0.0.0.0:60652->3306/tcp
e41cae02ff02 testcontainers/ryuk:0.3.3 "/app" 53 seconds ago Up 50 seconds 0.0.0.0:60650->8080/tcp
复制代码


Kafka 集群运行在端口 55002 上,和 MySQL(ID 为 4c220f7ee066)运行在端口 60652 上。


注意:不同情况下端口和 ID 可能不同。


在另一个终端窗口中运行 curl 命令,插入一个新的 Movie 记录。

curl -X 'POST' \  'http://localhost:8080/movie' \  -H 'accept: application/json' \  -H 'Content-Type: application/json' \  -d '{  "name": "Minions: The Rise of Gru",  "director": "Kyle Balda",  "genre": "Animation"}'
复制代码


检查 Quarkus 终端窗口,可以看到数据库运行的 SQL 语句。

:)Hibernate:    select        next_val as id_val    from        hibernate_sequence for update


Hibernate: update hibernate_sequence set next_val= ? where next_val=?
// Insert into Movie
Hibernate: insert into Movie (director, genre, name, id) values (?, ?, ?, ?)
// Automatically OutboxEvent table receives an insert
Hibernate: insert into OutboxEvent (aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id) values (?, ?, ?, ?, ?, ?, ?)
复制代码


为了验证 Debezium 可以检测到变更并将其推送到 Movies Kafka 主题,我们将运行kcat工具来查询 Kafka 主题。


kcat -b localhost:55002 -C -t movies
{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}% Reached end of topic movies [0] at offset 1
复制代码

结论

我们实现了一种解决方案,通过使用 Debezium 读取事务日志并为每一个变更触发一个事件,解决了数据库和外部系统之间的双重写入问题。


在本例中,我们使用了 Debezium 引擎,并实现了在触发事件时执行的逻辑。


嵌入式方式可能在某些场景中有用,但在其他场景中(特别是在需要高可伸缩性和容错能力的项目中),Debezium 服务器可能更适合。如果使用 Debezium 服务器(作为一个 Kafka Connect 进程),你的代码就不需要做出修改(没有依赖项),因为 Debezium 是一个独立的进程,它会自己连接到数据库事务日志,检测变更,并将它们发送到 Kafka 主题。由于事件是有序的,所以任何系统都可以消费主题中的变更事件。

尽管在使用 Debezium 时,发件箱模式并不是必需的(到最后,Debezium 可以监听任何一张表中的变更),但隔离数据是一个很好的实践,发件箱模式可以帮助你做到这一点。


集成(微)服务架构最初看起来可能很容易,但当你开始集成数据时,事情就变复杂了,而 Debezium 项目可以帮助你完成这项任务。


源代码可以在GitHub上找到。


作者简介:

Alex Soto 是 Red Hat 的开发者体验总监。他对 Java 和软件自动化充满热情,并相信开源软件模型。Soto 是《Testing Java Microservices》(Manning)和《Quarkus Cookbook》(O'Reilly)的合著者,也是几个开源项目的贡献者。自 2017 年以来,他获得 Java Champion 的称号,也是 Salle URL University 的国际演讲者和教师。你可以在 Twitter 上关注他(Alex Soto),了解 Kubernetes 和 Java 世界正在发生的事情。


原文链接

https://www.infoq.com/articles/change-data-capture-debezium/


相关阅读:

本系列第一部分:使用 Apache Kafka 实现 Quarkus 的反应式消息

本系列第二部分:Kafka Streams 与 Quarkus:实时处理事件

2022-09-08 09:175710

评论

发布
暂无评论
发现更多内容

ATRS Week 5

Geek_c25301

产品解读 | 分布式多模数据库:KaiwuDB

KaiwuDB

数据库

Mac电脑轻松gps定位:AnyGo for Mac激活中文最新

mac大玩家j

GPS定位 Mac软件 虚拟定位

Unite for Mac(将网站转化为应用程序) v4.6中文激活版

mac

苹果mac Windows软件 Unite 应用程序转换工具

【直播预约中】 腾讯大数据 x StarRocks|构建新一代实时湖仓

StarRocks

数据库 大数据 数据湖

自动化量化交易APP软件系统开发

V\TG【ch3nguang】

高频量化交易系统开发(高频程序化交易)

V\TG【ch3nguang】

glTF模型骨骼动画

3D建模设计

GLTF 骨骼动画

用无代码搭建数据中台,竟然如此丝滑

陈橘又青

低代码 无代码开发 无代码 无代码平台

蓝易云:Linux ps命令详解,Linux查看进程。

百度搜索:蓝易云

云计算 Linux 运维 云服务器 ps

征服数据宇宙,新华三存储护卫队早有准备?

白洞计划

存储

合约量化交易软件搭建开发部署

V\TG【ch3nguang】

量化合约

量化交易app软件定制开发

V\TG【ch3nguang】

【介绍篇】Supabase起源和演进过程

张文平

postgres Supabase firebase

蓝易云:CPU、内存、缓存的关系详细解释!

百度搜索:蓝易云

云计算 运维 cpu 内存 硬盘

在加密货币交易所开发中使用人工智能和机器学习

区块链软件开发推广运营

交易所开发 数字藏品开发 dapp开发 区块链开发 NFT开发

浏览器缓存清理推荐 Cookie激活中文最新版

胖墩儿不胖y

缓存清理 Mac软件 清理缓存

中国“好房子”亮相东博会,东方式生活凭什么走向世界?

脑极体

全屋智能

Python变量:创建、类型、命名规则和作用域详解

小万哥

Python 程序员 软件 后端 开发

聚焦企业开放OpenAPI痛难点,华为云API Explorer助力伙伴构建API门户

华为云PaaS服务小智

软件开发 API 华为云

IPQ5018 VS IPQ6010 VS IPQ9574|Advancing Wireless Connectivity: The Power of Wi-Fi 6 & Wi-Fi 7

wallyslilly

IPQ6010 ipq5018 ipq9574 IPQ9274

Java并发Map的面试指南:线程安全数据结构的奥秘

程序那些事

Java 多线程 程序那些事 面试秘籍

什么是顶点颜色

3D建模设计

GLTF

GLTF动画

3D建模设计

3D动画 GLTF

对话在行人|厦门航空:紧抓数智化转型关键因素实现业财融合

用友BIP

2023全球商业创新大会 对话在行人

深拷贝和浅拷贝介绍

梦笔生花

c++

量化交易系统源码开发,对冲交易机器人系统开发

V\TG【ch3nguang】

Debezium和Quarkus:通过CDC模式来避免双重写入_语言 & 开发_Alex Soto_InfoQ精选文章