QCon 演讲火热征集中,快来分享技术实践与洞见! 了解详情
写点什么

KafkaFlow 入门指南:构建可扩展的 Kafka 事件驱动应用

作者:Guilherme Ferreira

  • 2023-10-11
    北京
  • 本文字数:5379 字

    阅读完需:约 18 分钟

大小:2.06M时长:12:00
KafkaFlow入门指南:构建可扩展的Kafka事件驱动应用

在本文中,我们将会探讨 KafkaFlow 提供提供的特性。如果你正在使用.NET 构建 Apache Kafka 生产者和消费者,那么本文将会介绍如何借助 KafkaFlow 来简化你的生活。

为何要关注它?


KafkaFlow 为 Confluent .NET Kafka 客户端提供了一个抽象层。它使得使用、维护和测试 Kafka 消费者和生产者均更加容易。


假设你要为市场营销活动创建一个客户端目录(Client Catalog)。我们需要一项服务来消费那些捕获新客户端的消息。当开始设计所需的服务时,你会发现现有的服务在如何消费消息方面并不一致。


常见的情形是,团队在解决一些简单的问题(如优雅关机)时,往往会陷入困境。你会发现整个组织有四种不同的 JSON 序列化器实现,这只是挑战之一。


采用 KafkaFlow 这样的框架能够简化流程并加快开发周期。KafkaFlow 拥有一系列旨在提升开发人员体验的特性:


  1. 中间件(Middleware):KafkaFlow 允许开发人员创建中间件来处理消息,从而实现对 Kafka 生产者/消费者管道的更多控制和定制。

  2. 处理器(Handler):引入了处理器的概念,允许开发人员将主题中的消息处理转发给专用消息类型的处理器。

  3. 反序列化算法(Deserialization Algorithms):提供了一套开箱即用的序列化和反序列化算法。

  4. 多线程消费者:提供了保证消息处理顺序的多线程功能,有助于优化系统资源的使用。

  5. 管理 API 和仪表盘:提供了 API 和仪表盘来管理消费者和消费者群组,可以在运行时进行暂停、恢复或倒回偏移。

  6. 消费者限流:提供了一种简便的方式,为主题的消费提供优先级。接下来,我们探讨一下这些特性,看看它们在解决类似问题方面的潜力。

KafkaFlow 生产者:简化消息的生成


我们从消息的生产者开始。


向 Kafka 中生成消息并不是什么高难的火箭科学。即便如此,KafkaFlow 还是为 Confluent 的.NET Kafka 客户端的生产者接口提供了更高级别的抽象,从而能够简化代码并提升可维护性。


下面是一个如何使用 KafkaFlow 生产者发送消息的样例:


await _producers["my-topic-events"]    .ProduceAsync("my-topic", message.Id.ToString(), message);
复制代码


这样,我们就可以向 Kafka 生成消息,而无需直接处理序列化或底层 Kafka 客户端的其他复杂问题。不仅如此,定义和管理生产者还可以通过服务配置上的流畅接口(Fluent Interface)轻松实现。


services.AddKafka(kafka => kafka    .AddCluster(cluster => cluster        .WithBrokers(new[] { "host:9092" })        .AddProducer(            "product-events",            producer =>                producer            ...        )    ));
复制代码


生产者往往很简单,但也有一些常见的问题需要解决,比如压缩或序列化。我们来探讨一下。

在 KafkaFlow 中自定义序列化/反序列化


在 Apache Kafka 中,一个很有吸引力的特性就是与数据格式无关。但是,这就将责任转移给了生产者和消费者。如果考虑不周全,可能会导致在整个系统中出现由多种方式实现同一种结果的现象。因此,序列化显然是一个由客户端框架处理的用例。


KafkaFlow 具有适用于 JSON、Protobuf 甚至 Avro 的序列化器。只需将它们添加到中间件配置中就可以使用。


.AddProducer<ProductEventsProducer>(producer => producer       ...       .AddMiddlewares(middlewares => middleware           ...           .AddSerializer<JsonMessageSerializer>()       ))
复制代码


鉴于我们可以为消息使用自定义的序列化器/反序列化器,所以这个列表并不局限于这三种。虽然 Confluent 的.NET Kafka 客户端已经支持自定义序列化/反序列化,但 KafkaFlow 通过提供更优雅的处理方式简化了这一过程。举例来说,要使用自定义序列化器,我们可以这样做:


public class MySerializer : ISerializer{       public Task SerializeAsync(object message, Stream output, ISerializerContext context)       {             // 序列化逻辑在这里       }
public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context) { // 反序列化逻辑在这里 }}
// 在设置Kafka消费者/生产者的时候,注册自定义的序列化器
.AddProducer<MyProducer>(producer => producer ... .AddMiddlewares(middlewares => middleware ... .AddSerializer<MySerializer>() ))
复制代码

KafkaFlow 中的消息处理


消费者带来了大量的问题和可能性。第一个问题就是“如何处理消息?”


我们从最简单的方式开始。随着像 MediatR 这样的库的出现,CQRS 和 Meditor 模式得到了普及,.NET 开发人员习惯于将消息处理器与请求/消息接收器解耦。KafkaFlow 将同样的原则引入到了 Kafka 消费者中。


KafkaFlow 消息处理器允许开发人员定义特定的逻辑来处理来自 Kafka 主题的消息。按照设计,KafkaFlow 的消息处理器结构能够更好地分离关注点,并使代码更整洁、更易于维护。


如下是一个消息处理器的示例:


public class MyMessageHandler : IMessageHandler<MyMessageType>{    public Task Handle(IMessageContext context, MyMessageType message)    {        // 消息处理逻辑在这里    }}
复制代码


这个处理器可以在消费者配置中进行注册:


.AddConsumer(consumer => consumer...       .AddMiddlewares(middlewares => middlewares           ...             .AddTypedHandlers(handlers => handlers                     .AddHandler<MyMessageHandler>()              )       ))
复制代码


通过这种方式,可以轻松地将消费者和处理器分开,从而提升了可维护性和可测性。如果你的微服务只处理具有一种消息类型的一个主题,这可能会显得引入了不必要的复杂性。在这种情况下,你可以使用中间件。

KafkaFlow 中的中间件


KafkaFlow 是面向中间件的。你可能已经注意到,在消息处理器的代码片段中提到了“中间件”。所以,你可能会问什么是中间件。


中间件使得类型化处理器(Typed Handler)成为可能。消息会被传递到一个中间件管道,该管道将会被依次调用。如果你使用过 MediatR 管道的话,可能会对这一概念有所了解。此外,中间件还可以用来进行一系列的转换。换句话说,给定的中间件可以将传入的消息转换到下一个中间件。


KafkaFlow 中的中间件封装了处理消息的逻辑。管道是可扩展的,允许开发人员在消息处理管道中添加行为。


如下是一个中间件的样例:


public class MyMiddleware : IMessageMiddleware{    public async Task Invoke(IMessageContext context, MiddlewareDelegate next)    {         // 预处理逻辑位于这里                  await next(context);                   // 后处理逻辑位于这里           }}
复制代码


要使用该中间件,可以在消费者配置中进行注册:


.AddConsumer(consumer => consumer       ...       .AddMiddlewares(middlewares => middlewares             ...             .Add<MyMiddleware>()         ))   
复制代码


通过这种方式,开发人员就可以在消息处理管道中插入自定义逻辑,从而提供灵活性和控制力。类型化处理器是中间件的一种形式。所以,你甚至可以在没有类型化处理器的情况下处理消息,实现自己的中间件,或者也可以使用中间件来构建消息管道,在处理消息之前执行校验、丰富化等操作。

在 KafkaFlow 中处理并发


一旦开始思考基础设施的效率,你就会发现许多 Kafka 消费者没有得到充分利用。最常见的实现方式是单线程的,这限制了资源的利用率。因此,当我们需要扩展的时候,只能进行横向扩展,以保持所需的吞吐量。


KafkaFlow 为实现基础设施的高效率带来了另外一种可选方案。KafkaFlow 让开发人员可以控制单个消费者可以并发处理多少消息。它使用了 Worker 的理念,这些 Worker 可以协同消费一个主题。这一功能能够让你优化 Kafka 消费者,使其更好地匹配系统的能力。


如下是一个如何为消费者设置并发 worker 数量的样例:


.AddConsumer(consumer => consumer.Topic("topic-name")       .WithGroupId("sample-group")       .WithBufferSize(100)       .WithWorkersCount(10) // 设置worker的数量       .AddMiddlewares(middlewares => middlewares        ...        ))
复制代码


即便有并发 worker,KafkaFlow 也能确保顺序。

批处理


随着规模的扩大,你将会面临延迟和吞吐量之间的权衡。为了解决这个问题,KafkaFlow 有一个重要的特性,叫做“批量消费”。这个特性满足了以批量方式消费和处理来自 Kafka 的消息时对效率和性能的要求。在需要一起处理一组消息,而不是单个处理消息的场景下,该特性发挥着重要作用。

什么是批量消费?


在批量消费方式中,系统不是在收到消息后对其进行原子性地处理,而是将多条消息分组,然后一次性地对其进行处理。这种方法在处理大量数据时更为有效,尤其是在消息相互独立的情况下。批量执行操作会提高整体性能。

KafkaFlow 的批量消费方式


KafkaFlow 利用中间件系统提供批量处理功能。批量处理中间件能够让你根据批量大小或时间跨度(timespan)对消息进行分组。一旦达到其中的某个条件,中间件就会将这组消息转发给下一个中间件。


services.AddKafka(kafka => kafka    .AddCluster(cluster => cluster        .WithBrokers(new[] { "host:9092" })        .AddConsumer(            consumerBuilder => consumerBuilder            ...            .AddMiddlewares(                middlewares => middlewares                    ...                    .BatchConsume(100, TimeSpan.FromSeconds(10))                    .Add<HandlingMiddleware>()            )        )    ));
复制代码

批量消费对性能的影响


通过批量处理,开发人员可以在基于 Kafka 的应用程序中实现更高的吞吐量。它可以加快处理速度,因为与启动和完成每个处理任务相关的开销会大大减少。这将全面提升系统的性能。


同时,这种方式还能减少网络 I/O 操作,因为数据是以更大的分块获取的,这能够进一步提高处理速度,尤其是在需要关注网络延迟的系统中。

KafkaFlow 的消费者管理


KafkaFlow 还简化了 Kafka 消费者管理相关的任务。通过 KafkaFlow 的管理 API,我们可以启动、停止、暂停消费者以及倒回偏移(rewind offset)。


管理 API 可以在编程接口、REST API 或 Dashboard UI 中使用。



KafkaFlow 的管理仪表盘

消费者限流


通常,底层技术可能无法像 Kafka 消费者那样以相同的方式应对高负载期。这会带来稳定性的问题,而这正是限流的用武之地。


消费者限流是一种管理消息消费的方式,它能够使应用程序根据指标动态调整消息消费的速度。

优先级


假设你正在运行一个应用程序,希望将原子操作和批量操作分隔到不同的消费者和主题中。与批量操作相比,你可能更愿意优先处理原子操作。按照传统方式,由于消息生成的速度可能存在差异,所以管理这种差异化可能很具挑战性。


在这种情况下,消费者限流就很有价值了,它允许我们监控那些负责原子操作的消费者的滞后(lag)情况。根据这一指标,我们可以对处理批量操作的消费者实施限流,确保优先处理原子操作。


那结果是什么呢?高效、灵活和优化的消费流程。


借助 KafkaFlow 的流畅接口,为消费者添加限流功能是非常简单的。下面是一个简单的样例:


.AddConsumer(    consumer => consumer        .Topic("bulk-topic")        .WithName("bulkConsumer")        .AddMiddlewares(            middlewares => middlewares                .ThrottleConsumer(                    t => t                        .ByOtherConsumersLag("singleConsumer")                        .WithInterval(TimeSpan.FromSeconds(5))                        .AddAction(a => a.AboveThreshold(10).ApplyDelay(100))                        .AddAction(a => a.AboveThreshold(100).ApplyDelay(1_000))                        .AddAction(a => a.AboveThreshold(1_000).ApplyDelay(10_000)))                .AddSerializer<JsonCoreSerializer>()        ))
复制代码

KafkaFlow:展望未来


目前,KafkaFlow 在 Kafka 的基础上提供了一个健壮的、对开发人员友好的抽象,简化了使用.NET 构建实时数据处理应用程序的过程。但是,与其他活跃的开源项目一样,KafkaFlow 也在不断演进和完善。


项目目前的发展轨迹来看,我们可以预测几个方面的发展方向。例如,KafkaFlow 可能会进一步增强其中间件系统,为消息处理提供更多的控制权和灵活性。我们可能还会看到更广泛的管理 API,为开发人员提供对 Kafka 集群更大的控制权。


由于设计上的可扩展性,我们可以期待 KafkaFlow 社区会不断壮大,带来更多的贡献、创新特性、扩展和支持。随着越来越多的开发人员和组织采用 KafkaFlow,我们会看到学习资源、教程、案例和其他社区内容不断涌现,这些内容可以帮助新用户入门,也可以帮助现有的用户从库中学习更多的知识。

结论


KafkaFlow 是一个便利、对开发人员友好的工具,它简化了在.NET 中使用 Kafka 的工作。在开发人员体验和可用性方面,它均表现出色。该框架的设计非常适合整洁、可读性强的代码。在 Apache Kafka 上构建应用程序时,KafkaFlow 通过中间件、消息处理器以及对复杂问题的抽象,实现了清晰的分离,这有助于保持代码库的可管理性和可理解性。


除此之外,围绕 KafkaFlow 的社区在不断壮大。如果你正在使用 Kafka 并希望提高生产力和可靠性,那 KafkaFlow 绝对值得考虑。


原文链接:

Building Kafka Event-Driven Applications with KafkaFlow

2023-10-11 08:0011116

评论

发布
暂无评论
发现更多内容

Spring-boot 单元测试

陈靓-哲露

Java异常面试题(2020最新版)

Java架构师迁哥

区块链支付系统开发技术方案,USDT支付系统搭建

13530558032

数字经济时代来临 区块链护航数字资产安全

CECBC

金融 数字时代

易观方舟Argo+CRM | 让企业数据发挥更大价值

易观大数据

架构大作业

赵龙

未来已来!全球一流科技盛会——云栖大会9月17日线上隆重举办

北柯

天猫成立房产部门,利用区块链承载交易多项服务功能

CECBC

区块链 房地产

Java-技术专题-JMX超详细解读

洛神灬殇

Java程序员博客系统推荐!我调研了100来个 Java 开源博客系统,发现这 5 个最好用!

Java 项目管理 计算机 框架设计

银行数仓体系发展之路

易观大数据

一文详解分布式缓存(附代码)

架构师修行之路

缓存 分布式 分布式缓存

架构师训练营第 1 期-第一周命题作业

arthur

QPS、TPS、RT、并发数、吞吐量理解和性能优化深入思考

艾小仙

架构 编程语言

架构师训练营第一周作业

邓昀垚

架构师训练营第一周学习总结

邓昀垚

极客大学架构师训练营

实战案例丨GaussDB for DWS如何识别坏味道的SQL

华为云开发者联盟

数据库 sql 算子

数字货币交易所技术开发,交易所源码

13530558032

区块链技术智能合约有哪些实际的应用场景

CECBC

智能合约 区块链技术

本以为自己MySQL够牛逼了,直到亲自去阿里受虐了一次!

Java架构师迁哥

Java-技术专题-AQS和Volatile和Synchronized实现原理

洛神灬殇

TCP和HTTP中的KeepAlive机制总结

陈德伟

nginx TCP 性能 网络 HTTP

面试官:你说说互斥锁、自旋锁、读写锁、悲观锁、乐观锁的应用场景

小林coding

乐观锁 高并发 操作系统 计算机基础

架构师训练营1期 -- 第一周作业

曾彪彪

极客大学架构师训练营

Week15

一叶知秋

LeetCode题解:622. 设计循环队列,使用双向链表,JavaScript,详细注释

Lee Chen

大前端 LeetCode

Spring Boot CLI 介绍

hungxy

Spring Boot Spring Boot CLI

DDD+微服务实战:什么是DDD?

AI代笔

微服务 领域驱动设计 DDD

甲方日常 17

句子

生活 随笔杂谈

数字资产钱包开发方案,区块链数字钱包软件源码

13530558032

华为HMS的“生态雪球”,滚动在万物智联的新跑道

脑极体

KafkaFlow入门指南:构建可扩展的Kafka事件驱动应用_业务架构_InfoQ精选文章