写点什么

通过 TCP/IP 每分钟发送数十亿条消息

作者:George Ball

  • 2023-05-29
    北京
  • 本文字数:9026 字

    阅读完需:约 30 分钟

通过TCP/IP每分钟发送数十亿条消息

在构建分布式应用程序时,数据表示是最重要的问题之一。我们必须确保一个组件发送给另一个“远程”组件(即属于不同进程的组件)的数据被正确接收,且保持不变。这可能看起来很简单,但需要注意的是,不同的通信组件可能是用完全不同的语言开发的。除此之外,不同的硬件/系统架构可能使用不同的方式来表示“相同”的数据,这让事情变得更加复杂。只是简单地将数据字节从一个组件复制到另一个组件是远远不够的。即使在 Java 中,我们可能认为可以免受这种情况的困扰,但不同的 JVM 实现或来自同一个实现的两个不同的 JVM 版本也并不一定百分之百使用相同的对象内部表示。


这个问题最常见的解决方案是定义一种可以被不同进程(甚至不同编程语言)理解的“规范”数据表示,并在发送数据之前将其转换为这种格式,然后在接收到数据后再转换为接收方自己的格式。现在已经有几种这样的“有线格式”,从基于文本的标准,如 YAML、JSON 或 XML,到二进制格式,如Protobuf


Chronicle公司,我们开发了许多库来支持基于低延迟消息传递的应用程序,主要用于金融服务行业。我们为来自世界各地的客户提供定制的解决方案开发和咨询服务,其中大多数客户来自金融领域。


其中一个库是Chronicle Wire,它为 Java 对象的内部 JVM 表示和持久化状态(或与其他 Java 进程通信的格式)之间提供了高性能转换。


Chronicle Wire 源于Chronicle Queue项目。在每秒数百万条消息的规模下,Chronicle Queue 可以为同一机器不同 JVM 之间的消息传递提供个位数的微秒级延迟,或者为不同的机器之间提供数十微秒的稳定延迟。Wire 现在成为 Chronicle 开发的大多数软件组件的关键组成部分,从用于组件之间对象状态的序列化和反序列化,到用于管理这些组件配置的高效模型。


随着软件架构越来越多地转向分布式、基于事件驱动,我们希望扩展 Chronicle Wire 的使用场景,支持组件之间的 TCP/IP 互连。本文将对此特性进行基本的概述,并提供一些如何使用这些特性的简单示例。


我们已经看到了一些令人吃惊的性能数据——例如,Peter Lawrey 的文章“如果不创建很多对象,Java其实非常快”提供的一个基准测试显示,在一台机器上构建的环回 TCP/IP 网络每分钟能够传递超过 40 亿个事件。


我们将其与其他类似的数据交换技术(特别是 Jackson 和 BSON)进行了基准测试比较。在处理 100 字节消息的测试中,使用 Chronicle Wire,每条消息的 99.99 可用性处理时间约为 10.5 微秒,而使用 Jackson/BSON 则为 1400 微秒,这是一个显著的差异。


我们将在本文中介绍一些相关的关键概念。我们正在将这些特性设计得既灵活又高效,以后的文章将会展示一些更高级的用例。

什么是 Chronicle Wire

Chronicle Wire 作为应用程序和字节流之间的一个层,充当数据的源或接收器。Wire 序列化 Java 对象的状态并将其放到字节流中,或者从字节流中读取字节序列并基于消息中携带的信息将其反序列化成 Java 对象。


我们来看一个简单的例子。我们将模拟 Java 对象的持久化,将对象的状态序列化到 Wire,然后再读取成对象。我们将使用一个叫作 Person 的类。


public class Person extends SelfDescribingMarshallable {   private String name;   @NanoTime   private long timestampNS;   @Base85   private long userName;
}
复制代码


这个类的完整代码可以在 Chronicle Wire 的Github代码库中找到。


父类型 SelfDescribingMarshallable 包含与 Wire 交互所必需的功能——它大致相当于 Java 序列化所使用的 java.io.Serializable 接口,不过它更强大并且不存在安全缺陷。SelfDescribingMarshallable 对象不需要额外的东西来支持编组和解组——比如 XML 的模式或 Protobuf(或 SBE)的代码生成器。此外,这个接口还提供了 Java 数据对象方法 equals()、hashcode()和 toString()的实现。


Chronicle Wire 使用 @NanoTime 注解将属性值编码为时间戳,使用 @Base85 注解编码短字符串来节省空间。这两个注解还提供了从紧凑的内部表示到友好的字符串表示的转换。


我们来创建一个 Chronicle Wire 实例,它将使用 Java 堆中的一个内存区域将对象编组成 YAML 并解组。


Wire yWire = Wire.newYamlWireOnHeap();
复制代码


创建和初始化 Person 实例:


Person p1 = new Person()       .name("George Ball")       .timestampNS(CLOCK.currentTimeNanos())       .userName(Base85.INSTANCE.parse("georgeb"));System.out.println("p1: " + p1);
复制代码


我们使用了重载方法和链式方法而不是 get…()和 set…()方法来访问和修改对象属性。代码打印出了 Person 对象的初始化状态,调用了父类 SelfDescribingMarshallable 的 toString()方法:


p1: !Person {  name: George Ball,  timestampNS: 2022-11-11T10:11:26.1922124,  userName: georgeb}
复制代码


现在我们将对象序列化到 Wire。因为创建的 Wire 使用了 YAML,所以可以很容易显示其中的内容:


Wire yWire = Wire.newYamlWireOnHeap();p1.writeMarshallable(yWire);System.out.println(yWire);
复制代码


我们可以看到被序列化的属性:


name: George BalltimestampNS: 2022-11-11T10:11:54.7071341userName: georgeb
复制代码


现在我们可以创建一个空的 Person 实例,然后用从 Wire 中回读的属性值来填充它,并将其打印出来:


Person p2 = new Person();p2.readMarshallable(yWire);System.out.println("p2: " + p2);
复制代码


从输出可以看到,新创建的对象的状态是对的:


p2: !Person {  name: George Ball,  timestampNS: 2022-11-11T10:13:29.388,  userName: georgeb}
复制代码


完整的代码可以在 Chronicle Wire 的Github代码库中找到。

MethodWriter 和 MethodReader

通常,使用 Wire 序列化和反序列化的对象都是与我们的应用程序相关的某种类型的数据。如果使用 Chronicle Queue 作为消息传输,那么这些对象将构成消息的有效负荷,我们把它们叫作数据传输对象(Data Transfer Object,DTO)。


我们也可以从不同的角度来看待这个功能。序列化的 Person 对象包含了 YAML 格式的属性:


name: George BalltimestampNS: 2022-11-11T10:11:54.7071341userName: georgeb
复制代码


如果再进一步,我们可以使用 Wire 编码和发送请求来调用带有参数的方法。由于消息传输的单向性,这些方法必须是 viod 的,即不能返回值。我们假设有一个可以操作 Person 对象的接口,暂时还没有提供方法的实现:


public interface PersonOps {   void addPerson(Person p);}
复制代码


为简单起见,这里只指定了一个方法。它只接受一个 Person 类型的参数,并将其添加到集合中。根据前面的示例,我们或许会将这个类的实例编码为:


addPerson: {  name: George Ball,  timestampNS: 2022-11-11T10:11:54.7071341,  userName: georgeb}
复制代码


然后解码为方法调用的形式:


personOps.addPerson(       Marshallable.fromString(Person.class, "" +               "name: Alice Smithl\n" +               "timestampNS: 2022-11-11T10:11:54.7071341\n" +               "userName: alices\n"));
复制代码


Chronicle Wire 提供了这种对方法调用进行编码和解码的能力。发送方使用 MethodWriter 类,接收方使用 MethodReader 类。


例如,对于上面显示的 PersonOps 类,我们可以创建一个 MethodWriter:


final PersonOps personOps = yWire.methodWriter(PersonOps.class);
复制代码


methodWriter 将返回一个包含 addPerson()存根实现的接口实例,用于将调用请求编码到 Wire。我们可以这样调用这个方法:


personOps.addPerson(p1);
personOps.addPerson(new Person() .name("Bob Singh") .timestampNS(CLOCK.currentTimeNanos()) .userName(Base85.INSTANCE.parse("bobs")));
复制代码


如果我们看一下 Wire,将会看到调用请求被编码成消息:


addPerson: {  name: Alice Smith,  timestampNS: 2022-11-11T10:11:54.7071341,  userName: alices}...addPerson: {  name: George Ball,  timestampNS: 2022-11-11T10:28:47.466,  userName: georgeb}...addPerson: {  name: Bob Singh,  timestampNS: 2022-11-11T10:28:48.3001121,  userName: bobs}...
复制代码


在接收端,我们可以创建一个 MethodReader 对象,它将提供在解码时被调用的方法的实现:


MethodReader reader = yWire.methodReader(       (PersonOps) p -> System.out.println("added " + p));
复制代码


当消息被读取和解码时,这个方法将被调用:


for (int i = 0; i < 3; i++)   reader.readOne();
复制代码


当方法被调用时,我们将看到 System.out.println()的输出:


added !Person {  name: Alice Smith,  timestampNS: 2022-11-11T10:11:54.7071341,  userName: alices}
added !Person { name: George Ball, timestampNS: 2022-11-11T10:28:47.466, userName: georgeb}
added !Person { name: Bob Jones, timestampNS: 2022-11-11T10:28:48.3001121, userName: bobj}
复制代码


这看起来非常强大,因为它为我们提供了一种高度灵活和高效的方式来编码事件或消息,并将它们与处理程序关联起来。Wire 编码的灵活性都是可用的——文本格式或二进制格式——正如 Wire 许多不同类型的底层传输一样。


接下来,我们来了解一下基于 TCP/IP 网络通信的 Wire 传输将带来怎样的可能性。

概念简介

新的功能基于以下三个抽象概念。

Channel

Chronicle Channel 是对两个组件之间的双向点对点连接的抽象。在创建 Channel 时指定的通道类型定义了底层传输的类型。初始实现使用异步套接字或连接同一进程内两个端点的内部通道来支持 TCP/IP,主要目标是支持更高级的传输,如 GRPC、REST 或 WebSocket 等。


Channel 在两个组件之间来回传输被打包成 Chronicle Wire 消息的 Event。初始实现支持 TCP/IP 或“本地”(进程内)通道,但也可以为不同的传输定义 Chennel 类型。

Context

Context 是 Channel 的管理容器,负责管理 Channel 的配置和生命周期。

Handler

Handler 是与 Channel 绑定在一起的组件,它定义了如何处理传入的事件,以及如何传输传出(结果)事件。这样可以实现各种形式的会话管理。框架提供了许多预定义的 Handler,也支持自定义。


在建立连接时,一个 Handler 会与 Channel 相关联,通常由连接的“发起者”(即客户端)指定。

使用 Channel

我们来看一些实际的示例。

示例 1:Hello, World

一般来说,第一个示例是简单地打印“Hello”消息。代码注释中的编号表示关键点,并与下面的列表对应:


public class Channel1ReadWrite {
private static final String URL = System.getProperty("url", "tcp://:3334"); // ===> (1)
public static void main(String[] args) {
try (ChronicleContext context = ChronicleContext.newContext(URL).name("Channel1"); // ===> (2) ChronicleChannel channel = context.newChannelSupplier(new EchoHandler()).get()) {
Jvm.startup().on(Channel1.class, "Channel set up on port: " + channel.channelCfg().port());
Says says = channel.methodWriter(Says.class); // ===> (3) says.say("Well hello there");
StringBuilder eventType = new StringBuilder(); // ===> (4) String text = channel.readOne(eventType, String.class); Jvm.startup().on(Channel1.class, ">>>> " + eventType + ": " + text);
} }}
复制代码


  1. 创建 Channel 的关键参数是一个 URL 字符串。目前只支持 TCP/IP 作为传输机制,但未来会支持更多的传输机制。这个字符串在 Chronicle Channel 中的含义如下表所示。


URL格式含义
internal://Channel内部到进程
tcp://:{port}Channel接受传入请求
tcp://{hostname}:{port}Channel的客户端


  1. 我们使用 try-with-resources 来确保所有创建的组件在使用完以后都会被关闭。首先,我们创建 Context,用于管理 Channel 的生命周期和配置。Context 提供了一个工厂方法,可以用来创建新的 Channel。在请求新的 Channel 时,我们指定使用哪个 Handler 来处理传入的事件。在本例中,我们使用 EchoHandler,顾名思义,它会将事件发送回发送方。


为连接设置服务器端套接字所需的工作都由工厂方法完成,它返回的 Channel 可以被我们使用。


  1. TCP/IP 是全双工协议,所以我们获得的 Channel 是双向的。我们可以通过 Channel 发送事件,使用下面的类生成的 MethodWriter:


public interface Says extends Syncable {   void say(String say);}

Says says = channel.methodWriter(Says.class);says.say("Well hello there");
复制代码


  1. 然后,我们可以使用 Chronicle Wire 从通道读取回传的事件并将其显示出来。当运行这个简单的示例时,我们可以看到这样的输出:


[main] INFO run.chronicle.wire.channel.demo1.Channel1 - Channel set up on port: 3334[main] INFO run.chronicle.wire.channel.demo1.Channel1 - >>>> say: Well hello there
复制代码

示例 2:客户端和服务器端分离

第一个示例太过简单,因为它将客户端和服务器端的功能都放在同一个进程里。这对于测试或调试来说可能比较方便,但现在我们希望将它们分离到各自的进程中。我们来看看分离之后的服务器:


public class ChannelService {   static final int PORT = Integer.getInteger("port", 4441);
public static void main(String[] args) throws IOException { System.setProperty("port", "" + PORT); // set if not set. ChronicleGatewayMain.main(args); }}
复制代码


由于我们使用了辅助类 ChronicleGatewayMain,代码变得非常简短。辅助类封装了设置服务器端(Channel 接收器)、移除模板代码和尽可能多地使用默认设置的功能。


客户端的代码如下所示,注释中的编号表示关键点:


public class ChannelClient {
private static final String URL = System.getProperty("url", "tcp://localhost:" + ChannelService.PORT); // ===> (1)
public static void main(String[] args) {
try (ChronicleContext context = ChronicleContext.newContext(URL).name("ChannelClient"); // ===> (2) ChronicleChannel channel = context.newChannelSupplier(new EchoHandler()).get()) {
Jvm.startup().on(ChannelClient.class, "Channel set up on port: " + channel.channelCfg().port()); Says says = channel.methodWriter(Says.class); // ===> (3) says.say("Well hello there");
StringBuilder eventType = new StringBuilder(); String text = channel.readOne(eventType, String.class);
Jvm.startup().on(ChannelClient.class, ">>>> " + eventType + ": " + text); } }}
复制代码


  1. URL 字符串包含了主机名和端口号,它告诉创建通道的逻辑我们正在初始化客户端通道。

  2. 根据 URL 字符串格式创建客户端 Context。在从客户端 Context 创建通道时,我们指定了在接收端使用哪个 Handler。

  3. 通道建立起来之后,剩下的代码与第一个示例中的代码一样。当客户端和服务器端应用程序同时运行起来时,输出如下所示:


[main] INFO run.chronicle.wire.channel.demo2.ChannelClient - Channel set up on port: 4441[main] INFO run.chronicle.wire.channel.demo2.ChannelClient - >>>> say: Well hello there
复制代码

示例 3:简单的请求/响应交互

前面我们已经了解如何使用 Wire 的 MethodReader 和 MethodWriter 来实现请求进程外方法调用。现在,我们可以扩展这个示例,演示使用基于 TCP/IP 通道的 Wire 来实现类似于远程过程调用的服务请求/响应处理能力。


服务本身很简单,只提供了一个方法——我们的目的是演示构造服务和访问服务所需的步骤。


这个例子涉及四个部分:


  1. Service——根据输入和输出的消息类型实现业务逻辑。

  2. Channel Handler——将服务连接到底层的 Channel。

  3. Service Driver——作为服务器端的入口点,创建和配置服务和 Channel Handler。

  4. Client——一个单独的应用程序,创建和发送请求,并接收响应。

Service

服务提供了一个可以处理受支持请求的接口,其定义为:


public interface PersonOps {   void addPerson ( Person p );}
复制代码


Person 类与之前定义的一样。


Chronicle 中的消息传递是单向的,所以服务 API 的方法是 void 的。因此,我们需要为响应消息定义第二个接口:


public interface ResponseSender {   void respond(ReqStatus status);}
复制代码


ReqStatus 类表示方法是否执行成功,其定义为:


public enum ReqStatus {   OK,   ERROR}
复制代码


这两个接口连接在一起形成了一个处理传入请求的 Handler:


public class PersonOpsProcessor implements PersonOpsHandler {   private transient ResponseSender responder;                                                  // ===> (1)   public PersonOpsProcessor responder(ResponseSender responseSender) {        // ===> (2)       this.responder = responseSender;       return this;   }   @Override   public void addPerson(Person p) {                                                                  // ===> (3)       responder.respond(ReqStatus.OK);   }}
复制代码


  1. 这个字段将保存对该服务的输出的引用。

  2. 在本例中,ResponseSender 是通过 setter 方法注入的,当然也可以通过构造函数注入。

  3. 实现了 PersonOps 接口中的方法,为简单起见,它只发送一个成功的状态响应。

Channel Handler

根据之前的概念简介,Channel Handler 的职责是处理在其关联通道上传递的消息/事件。


对于本例,我们需要定义一个类,它将通道上的传入消息分派给服务的 Handler,并将服务输出连接到通道:


public class PersonSvcHandler extends AbstractHandler<PersonSvcHandler> {                  // ===> (1)   private final PersonOpsHandler personOpsHandler;                                                       // ===> (2)   public PersonSvcHandler(PersonOpsHandler personOpsHandler) {                                  // ===> (3)       this.personOpsHandler = personOpsHandler;   }   public void run(ChronicleContext context, ChronicleChannel channel) {                           // ===> (4)       channel.eventHandlerAsRunnable(           personOpsHandler.responder(channel.methodWriter(ResponseSender.class))       ).run();   }   @Override   public ChronicleChannel asInternalChannel(ChronicleContext context,                             // ===> (5)                                                                          ChronicleChannelCfg channelCfg) {       throw new UnsupportedOperationException("Internal Channel not supported");   }}
复制代码


  1. 基类实现了通用的平台功能,子类将为我们的服务提供定制的逻辑。

  2. 对 Handler 实现类的引用。

  3. PersonOpsHandler 是通过构造函数注入的。

  4. 当发起一个新的通道连接时,就会启动一个 Handler,并初始化必要的 MethodReader 和 MethodWriter 对象。这些逻辑被封装在 run()方法中,每个发起的通道连接都会执行这一步。

  5. 在这个示例类中,我们显式禁止创建在内部通道运行的 Handler。

Service Driver

完成了这些步骤后,编写服务的驱动类就简单了,与之前的例子或多或少相同,就是使用 ChronicleGatewayMain 类来创建配置通道。


public class PersonSvcMain {   static final int PORT = Integer.getInteger("port", 7771);   public static void main(String... args) throws IOException {       System.setProperty("port", "" + PORT);       ChronicleGatewayMain.main(args);   }}
复制代码

Client

要实现一个简单的 Person 服务客户端,我们可以创建一个通道,然后向服务发出请求。


public class PersonClient {   private static final String URL = System.getProperty("url", "tcp://localhost:" + PersonSvcMain.PORT);                           // ===> (1)   public static void main(String[] args) {       try (ChronicleContext context = ChronicleContext.newContext(URL)) {           ChronicleChannel channel = context.newChannelSupplier(new PersonSvcHandler(new PersonOpsProcessor()))      // ===> (2)                                                               .get();           final PersonOps personOps = channel.methodWriter(PersonOps.class);                                                               // ===> (3)           Person thePerson = new Person()                                                   .name("George")                                                   .timestampNS(SystemTimeProvider.CLOCK.currentTimeNanos())                                                   .userName(Base85.INSTANCE.parse("georgeb")));;           personOps.addPerson(thePerson);           StringBuilder evtType = new StringBuilder();           ReqStatus response = channel.readOne(evtType, ReqStatus.class);           Jvm.startup().on(PersonClient.class, " >>> " + evtType + ": " + response);       }   }}
复制代码


  1. 默认情况下,URL 的端口号与服务器中配置的端口号一样。

  2. 创建 Channel,注入自定义 Handler 实例。

  3. 在创建好以后,我们就可以使用 Channel 的 MethodWriter 方法来生成存根方法,这些方法将向服务发送序列化的事件。

总结

Chronicle Wire 增加了一些新功能,允许通过 TCP/IP 与其他组件通信。本文介绍了 Wire 实现这些功能的基本思想,并提供了一些简单的示例。


这种快速高效的通信在分布式服务架构中还有很多应用场景。除了本文的示例之外,Chronicle Wire 的GitHub项目库中还提供了其他示例。

作者简介

George Ball 目前在 Chronicle 公司的技术文档组工作,致力于构建 Chronicle 低延迟 Java 库和框架文档。在此之前,他是摩根士丹利 Java 平台工程团队的一员,在公司的 Java 基础设施向云端迁移过程中增强 Java 库,并改善开发者体验。他在分布式系统方面有超过 35 年的经验,特别是在 JVM 生态系统方面。


原文链接

https://www.infoq.com/articles/billions-messages-minute/


相关阅读:

TCP协议已不适用现今的数据中心

性能提升 57% ,SMC-R 透明加速 TCP 实战解析


2023-05-29 11:084833

评论

发布
暂无评论
发现更多内容

人工智能 | 聊聊AutoGPT那些事儿

测吧(北京)科技有限公司

测试

企业如何通过熔断降级增强服务稳定性和系统可用性?

袋鼠云数栈

熔断 API 降级 数据服务 熔断降级

小程序开发实战案例四 | 小程序标题栏如何设置

盐焗代码虾

支付宝 小程序开发 导航栏

软件测试/人工智能丨如何利用 ChatGPT 编写测试方案

测试人

人工智能 软件测试

软件测试/人工智能丨利用人工智能 ChatGPT 自动进行测试需求分析

测试人

人工智能 软件测试

ChatGPT插件:沉浸式体验人工智能

测吧(北京)科技有限公司

测试

【教程】Ipa Guard为iOS应用提供免费加密混淆方案

雪奈椰子

人工智能 | LangChain 核心模块PromptsModelsParsers

测吧(北京)科技有限公司

测试

一文搞懂得物前端监控

得物技术

大前端

文物数字化建模纹理贴图

3D建模设计

纹理贴图 模型渲染 材质纹理 材质编辑

软件测试/人工智能|人工智能与自动化测试结合实战-探索人工智能在测试领域中的应用

霍格沃兹测试开发学社

软件测试/测试开发|Docker+Jmeter+InfluxDB+Grafana 搭建性能监控平台

霍格沃兹测试开发学社

软件测试/人工智能|教你轻松玩转Edge浏览器

霍格沃兹测试开发学社

定档12月28日,WAVE SUMMIT+深度学习开发者大会2023狂欢来袭!

飞桨PaddlePaddle

人工智能 深度学习 开发者 WAVE SUMMIT

Illustrator 2024 for mac(标准矢量插画设计软件) v28.1完整激活版

mac

苹果mac Windows软件 矢量图形编辑软件 Illustrator 2023 AI023

使用 Taro 开发鸿蒙原生应用 —— 当 Taro 遇到纯血鸿蒙 | 京东云技术团队

京东科技开发者

taro 前端 Web 鸿蒙Next

2023 年中国 IT 用户满意度调查结果公布,融云获评「中国数字化转型新锐企业」

融云 RongCloud

数字化转型 网络 IT 企业 政企

【论文解读】System 2 Attention提高大语言模型客观性和事实性

合合技术团队

人工智能 自然语言处理 大模型 语言模型

如何零成本的提高3D模型的加载速度

3D建模设计

纹理贴图 模型渲染 材质纹理 材质编辑

JavaScipt验证URL新方法(2023 年版)

凌览

JavaScript node.js 前端

从零创建一个带action的GPT(1/2)

Bob Lin

AI ChatGPT LLM GPTs

从零创建带action的GPT(2/2)

Bob Lin

openai ChatGPT LLM GPT-4 #LangChain

2023年中国IT用户满意度征集结果公布

Geek_2d6073

最常用的4种光纤接口结构是什么样式呢?

小齐写代码

LED显示屏行业:消费驱动和零售渠道的新发展

Dylan

技术 LED显示屏 led显示屏厂家 消费

技术分享 | ChatGPT API 调用总超时?破题思路在这

LigaAI

Python 后端 openai chatgpt api chatpt

详细了解云堡垒机的作用,提高企业数据信息安全

行云管家

云计算 云服务 数据安全 企业上云 云堡垒机

非专业的建模人员如何给模型设置材质纹理贴图?

3D建模设计

材质贴图 纹理贴图 模型渲染 材质编辑

TCP连接断开:为什么要挥手四次

华为云开发者联盟

开发 华为云 数据传输 华为云开发者联盟

通过TCP/IP每分钟发送数十亿条消息_软件工程_InfoQ精选文章