立即领取|华润集团、宁德核电、东风岚图等 20+ 标杆企业数字化人才培养实践案例 了解详情
写点什么

如何使用流处理器 Pipy 来创建网络代理

作者:Ali Naqvi

  • 2022-02-14
  • 本文字数:7378 字

    阅读完需:约 24 分钟

如何使用流处理器Pipy来创建网络代理

在这篇文章中,我们将介绍 Pipy,一个开源的云原生网络流处理器。本文将首先描述它的模块化设计,然后介绍如何快速构建一个高性能的网络代理来满足特定的需求。Pipy 经过了实战检验,已经被多个商业客户所使用。

 

Pipy 是一个开源、轻量级、高性能、模块化、可编程的云原生网络流处理器。它适用于众多应用场景,包括但不限于边缘路由器、负载均衡器 &代理解决方案、API 网关、静态 HTTP 服务器、服务网格挎斗等。

 

事实证明,这些属性对 Pipy 来说都有特定的意义,让我们逐项来看一下。

轻量级

编译后的 Pipy 可执行文件大小约为 6MB,只需要很小的内存就能运行。

高性能

Pipy 是用 C++编写的,以 Asio 异步 I/O 库为基础构建。

模块化

Pipy 的内核采用了模块化设计,有许多可重用的小模块(过滤器),把它们连接在一起就可以形成一个管道,网络数据在这个管道中流动并被处理。

流处理器

Pipy 使用一个事件驱动的管道来操作网络流,它消耗输入流,执行用户提供的转换,并输出流。Pipy 流将数据字节抽象为以下其中一类事件:

Data

网络流由数据字节组成并成块出现。Pipy将大块的数据抽象成一个数据事件。

MessageStart

MessageEnd

StreamEnd

这三个非数据事件作为标记,为原始字节流提供了高层次的语义,作为业务逻辑实现的基础。

可编程

Pipy 通过其定制开发的组件 PipyJS 提供内置的 JavaScript 支持,该组件是 Pipy 代码库的一部分,但不依赖于该代码库。 PipyJS 具有高度的可定制性,性能可预测,而且没有垃圾收集开销。将来,PipyJS 可能会转移到单独的包中。

Pipy 的设计

Pipy 的内部工作原理类似于Unix管道,但不同的是,Unix 管道处理的是离散的字节,而 Pipy 处理的是事件流。

 

Pipy 通过一个过滤器链来处理传入的数据流,过滤器负责处理请求记录、认证、SSL 卸载、请求转发等常规问题。每个过滤器都从其输入中读取事件流并写入输出,一个过滤器的输出与下一个过滤器的输入相连。

管道

一条过滤器链即一个管道,Pipy 根据其输入源将管道分为 3 个不同的类别。

  • 端口管道

从一个网络端口读入数据事件,处理它们,然后将结果写回同一端口。这就是最常用的请求和响应模式。

 

例如,当 Pipy 作为 HTTP 服务器时,端口管道的输入是来自客户端的 HTTP 请求,而管道的输出则是发回客户端的 HTTP 响应。

 

  • 计时器管道

周期性地获取 MessageStart MessageEnd 事件对作为输入。当需要类似于cron job的功能时很有用。

 

  • 子管道

它与连接过滤器(例如 link)协同工作,它从前面的管道接收事件,将其送入子管道进行处理,然后再从子管道读回输出,并将其传递给下一个过滤器。

 

理解子管道连接过滤器的最好方法是,把它们看成过程编程中子程序的被调用者和调用者。连接过滤器的输入是子程序的参数,连接过滤器的输出是其返回值。

 

注意:像端口 &计时器这样的管道不能从连接过滤器调用。

上下文

Pipy 另一个重要的概念是上下文。上下文是隶属于一个管道的一组变量。在 Pipy 实例中,每条管道都可以访问相同的变量集。换句话说,上下文具有相同的形状。当启动一个 Pipy 实例时,所做的第一件事就是通过定义变量和它们的初始值来定义上下文的形状。

 

每个管道都会克隆你在开始时定义的初始上下文。当一个子管道启动时,它要么共享要么克隆其父管道的上下文,这取决于你使用了哪一个连接过滤器。例如,link 过滤器共享其父管道的上下文,而 demux 过滤器则克隆它。

 

对于嵌入管道的脚本来说,这些上下文变量就是它们的全局变量,也就是说,只要这些变量存在于同一个脚本文件中,这些脚本就可以从任何地方访问它们。

 

对于一名经验丰富的程序员来说,这可能显得很奇怪,因为全局变量通常意味着它们是全局唯一的。这些变量只能有一组,但在 Pipy 中,我们可以有很多组这样的变量(又称上下文),这取决于针对传入的网络连接开放了多少管道,以及有多少子管道克隆了其父管道的上下文。

PipyJS 

PipyJS 是一个嵌入式的小型 JavaScript 引擎,其设计旨在实现高性能,消除垃圾收集开销。它支持 ECMAScript 标准的一个子集,某些方面有偏离。目前,它支持 JavaScript 表达式、函数,并实现了 JavaScript 标准 API,如 String、Array 等。

 

如上所述,上下文是 PipyJS 一个非常关键的特性,Pipy 用一种特定的方式对其进行了扩展,以满足代理服务器的特殊需求,后者需要为每个连接提供多组全局变量。每个上下文状态对其他上下文都是不可见的,因此,它是唯一的,只有它的定义者才能访问。

 

如果你熟悉多线程编程的概念,那么你也可以把上下文看作是TLS(线程本地存储),其中全局变量在不同的线程中具有不同的值。

兼容性

Pipy 的设计旨在跨不同的操作系统和 CPU 架构实现高度兼容。Pipy 已经在以下这些平台上进行了全面测试:

  • CentOS 7

  • Ubuntu 18/20

  • FreeBSD 12/13

  • macOS Big Sur

在生产环境中,建议使用 CentOS7/REHL7 或 FreeBSD。

快速入门

对于那些缺乏耐心的读者,可以使用 docker 运行 Pipy 的生产版本,使用 Pipy 官方 GitHub 仓库提供的一个教程脚本即可。这里,让我们遵循经典示例Hello World!的规范,但把这句话改为Hi there!

 

Pipy 的 Docker 镜像可以通过几个环境变量来配置:

  • PIPY_CONFIG_FILE=</path/to/config-file> 指定 Pipy 配置文件的位置;

  • PIPY_SPAWN=n 是你想要启动的 Pipy 实例的数量,其中 n 为实例数,这个数是从 0 开始的,0 代表 1 个实例。例如,如果想要启动 4 个实例,则使用PIPY_SPAWN=3 。

$ docker run --rm -e PIPY_CONFIG_FILE=\https://raw.githubusercontent.com/flomesh-io/pipy/main/tutorial/01-hello/hello.js \-e PIPY_SPAWN=1 -p 8080:8080 flomesh/pipy-pjs:latest
复制代码

上述命令会用提供的脚本启动 Pipy 服务器。敏锐的用户可能已经注意到,我们通过环境变量PIPY_CONFIG_FILE提供了一个远程 Pipy 脚本的链接,而不是一个本地文件,Pipy 足够智能,可以处理这种情况。

 

下面是tutorial/01-hello/hello.js文件的内容,供参考:

pipy()

.listen(8080) .serveHTTP( new Message('Hi, there!\n') )
复制代码

在这个脚本中,我们定义了一个端口管道,它监听 8080 端口,并为从监听端口收到的每个 HTTP 请求返回“Hi, there!”。

 

既然我们已经通过上面的docker run命令暴露了本地 8080 端口,那么我们可以在同一端口上进行测试了:

$ curl http://localhost:8080
复制代码

执行上述命令,控制台中应该显示“Hi, there!”。

 

如果是出于学习、开发或调试的目的,建议在本地安装 Pipy(从源代码构建 Pipy 或针对你的操作系统下载一个预构建版本),因为它提供了 Web 管理控制台以及相关的文档和教程。

 

安装到本地后,运行pipy,不需要任何参数,就可以在6060端口启动管理控制台,但如果要监听不同的端口,可以通过--admin-port=参数配置。

 

监听 6060 端口的 Pipy 管理控制台

 

要从源代码构建 Pipy 或针对你的操作系统安装预编译的二进制文件,请参考PipyGithub 库的 README.md 文件。

通过 CLI 运行

要启动 Pipy 代理,可以用一个 PipyJS 脚本文件运行 Pipy。例如,如果需要一个简单的回显服务器,针对每个传入的请求都用所接收到的消息体进行响应,那么就用脚本tutorial/01-hello/hello.js

$ pipy tutorial/01-hel lo/hello.js
复制代码

另外,在开发和调试时,可以启动带有内置 Web UI 的 Pipy:

$ pipy tutorial/01-hello/hello.js --admin-port=6060
复制代码

显示命令行选项

$ pipy --help
复制代码

列出内置过滤器及其参数

$ pipy --list-filters$ pipy --help-filters   
复制代码

前文从概念和技术上对 Pipy 做了一个简短的介绍,这些内容也是我们实现一个支持缓存和负载均衡的网络代理所需要了解的,这一点我们在下一节会看到。

编写一个网络代理

假设我们正在运行不同服务的单独实例,我们想要添加一个代理,根据请求的 URL 路径将流量转发到相关服务。这样做的好处是,我们只需要提供一个 URL,并在后端扩展我们的服务,而用户不需要分别记住不同服务的 URL。在正常情况下,服务会在不同的节点上运行,每个服务可以有多个实例在运行。假设在这个例子中,我们正在运行下面的服务,我们希望根据 URI 将流量分配给它们。

服务

URI

主机:端口

service-hi

/hi/*

"127.0.0.1:8080", "127.0.0.1:8082"

service-echo

/echo

"127.0.0.1:8081"

service-tell-ip

/ip/*

"127.0.0.1:8082"

Pipy 的脚本是用 JavaScript 编写的,你可以用任何文本编辑器来编辑它们。另外,如果你在本地安装了 Pipy,就可以使用 Pipy 提供的 Web 端管理 UI,它提供了语法高亮、自动完成、提示等特性,你甚至可以运行脚本,所有这些都在同一个控制台上。

 

好了,让我们启动一个 Pipy 实例,不需要任何参数,这样,Pipy 管理控制台将在 6060 端口启动。现在,打开你喜欢的 Web 浏览器,导航到 http://localhost:6060,就会看到 Pipy 内置的 Web 端管理 UI(如图 1)。

创建一个 Pipy 程序

将代码和配置分开是一种很好的设计实践。Pipy 通过插件(你可以把它想成是 JavaScript 模块)来支持这种模块化设计。也就是说,我们将把配置数据存储在 config 文件夹下,把编码逻辑存储在 plugins 文件夹下不同的文件中。主代理服务器脚本将存储在根目录下,主代理脚本(proxy.js)将包含并组合这些单独的模块所定义的功能。 一旦我们完成了下述步骤,最终的文件夹结构将是下面这个样子:

├── config│   ├── balancer.json│   ├── proxy.json│   └── router.json├── plugins│   ├── balancer.js│   ├── default.js│   └── router.js└── proxy.js
复制代码

让我们开始吧:

  1. 点击新建代码库,在对话框中输入/proxy(或任何你想使用的名称)作为代码库名称,然后点击创建。你将进入到新创建的代码库的代码编辑器。

  2. 点击上面的“+”按钮,添加一个新文件。输入/config/proxy.json(这是配置文件,我们将用来配置代理)作为文件名,然后点击创建

  3. 现在,你会看到,左侧窗格的config文件夹下多了一个proxy.json文件。点击该文件把它打开,并添加如下所示的配置信息,务必点击顶部面板上的磁盘图标来保存文件:

{  "listen": 8000,  "plugins": [    "plugins/router.js",    "plugins/balancer.js",    "plugins/default.js"  ]}
复制代码
  1. 重复步骤 2 和 3,创建另一个文件/config/router.json,它将存储路由信息,配置数据如下:

{  "routes": {    "/hi/*": "service-hi",    "/echo": "service-echo",    "/ip/*": "service-tell-ip"  }}
复制代码
  1. 重复步骤 2 和 3,创建另一个文件/config/balancer.json,它将存储服务到目标的映射信息,内容如下:

 {  "services": {    "service-hi"      : ["127.0.0.1:8080", "127.0.0.1:8082"],    "service-echo"    : ["127.0.0.1:8081"],    "service-tell-ip" : ["127.0.0.1:8082"]  }}
复制代码
  1. 现在,我们编写第一个 Pipy 脚本,当我们收到一个没有配置任何目标(端点/url)的请求时,它将被用作默认的后备选项。重复上述步骤,创建文件/plugins/default.js。使用 default 作为文件名只是一个习惯做法,并不是 Pipy 的要求,你可以选择任何你喜欢的名字。该脚本将包含如下代码,返回 HTTP 状态代码 404,信息为 No handler found:

pipy()

.pipeline('request') .replaceMessage( new Message({ status: 404 }, 'No handler found') )
复制代码
  1. 创建/plugins/router.js文件,存储路由逻辑:

(config =>

pipy({ _router: new algo.URLRouter(config.routes),})

.export('router', { __serviceID: '',})

.pipeline('request') .handleMessageStart( msg => ( __serviceID = _router.find( msg.head.headers.host, msg.head.path, ) ) )

)(JSON.decode(pipy.load('config/router.json')))
复制代码
  1. 创建/plugins/balancer.js文件,存储了我们的负载均衡逻辑。顺便说明一下,Pipy 提供了多种负载均衡算法,但简单起见,我们这里将使用 Round Robin 算法。

(config =>

pipy({ _services: ( Object.fromEntries( Object.entries(config.services).map( ([k, v]) => [ k, new algo.RoundRobinLoadBalancer(v) ] ) ) ),

_balancer: null, _balancerCache: null, _target: '',})

.import({ __turnDown: 'proxy', __serviceID: 'router',})

.pipeline('session') .handleStreamStart( () => ( _balancerCache = new algo.Cache( // k is a balancer, v is a target (k ) => k.select(), (k,v) => k.deselect(v), ) ) ) .handleStreamEnd( () => ( _balancerCache.clear() ) )

.pipeline('request') .handleMessageStart( () => ( _balancer = _services[__serviceID], _balancer && (_target = _balancerCache.get(_balancer)), _target && (__turnDown = true) ) ) .link( 'forward', () => Boolean(_target), '' )

.pipeline('forward') .muxHTTP( 'connection', () => _target )

.pipeline('connection') .connect( () => _target )

)(JSON.decode(pipy.load('config/balancer.json')))
复制代码
  1. 现在,我们来编写入口点或代理服务器脚本,它会使用上述插件。创建一个新的代码库(步骤 1),这个过程会创建一个默认的main.js文件作为入口点。我们可以用它作为我们的主入口点,或者如果你希望换个名字,可以随时删除main.js,然后用你选的名字新建一个文件。让我们删除它并新建一个名为/proxy.js的文件。务必点下顶部的旗标,将其设置为主入口点,这可以确保在你点击运行按钮(右侧的箭头图标)时开始执行脚本:

(config =>

pipy()

.export('proxy', { __turnDown: false,})

.listen(config.listen) .use(config.plugins, 'session') .demuxHTTP('request')

.pipeline('request') .use( config.plugins, 'request', 'response', () => __turnDown )

)(JSON.decode(pipy.load('config/proxy.json')))
复制代码

如果你已经按照上面的步骤进行了操作,就可以看到类似于以下截图的东西:


 

现在,我们点击播放图标按钮(右起第四个)来运行我们的脚本。如果脚本没有任何错误,我们将看到 Pipy 运行我们的代理脚本,输出类似下面这样:


 

这表明我们的代理服务器正在监听 8000 端口(这是在/config/proxy.json中配置的)。我们用 curl 来运行一个测试:

$ curl -i http://localhost:8000

HTTP/1.1 404 Not Foundcontent-length: 10connection: keep-alive

No handler found
复制代码

这没问题,因为我们没有为 root 配置任何目标。让我们试下配置过的路由,如/hi

$ curl -i http://localhost:8000/hi

HTTP/1.1 502 Connection Refusedcontent-length: 0connection: keep-alive
复制代码

我们看到了 502 Connection Refused 这个消息,因为我们没有在配置的目标端口上运行服务。

 

你可以更新/config/balancer.json,加入你已经运行的服务的主机、端口等细节,以匹配你的实际情况,或者我们在 Pipy 中编写一个脚本,监听我们配置的端口,并返回简单的消息。

 

将以下代码片段保存到你本地计算机上的一个文件中,命名为mock-proxy.js,并记住文件的存储位置。

pipy()

.listen(8080) .serveHTTP( new Message('Hi, there!\n') )

.listen(8081) .serveHTTP( msg => new Message(msg.body) )

.listen(8082) .serveHTTP( msg => new Message( `You are requesting ${msg.head.path} from ${__inbound.remoteAddress}\n` ) )
复制代码

打开一个新的终端窗口,通过 Pipy 运行这个脚本(其中/path/to是存储该脚本文件的位置):

$ pipy /path/to/mock-proxy.js

2022-01-11 18:56:31 [INF] [config]2022-01-11 18:56:31 [INF] [config] Module /mock-proxy.js2022-01-11 18:56:31 [INF] [config] ================2022-01-11 18:56:31 [INF] [config]2022-01-11 18:56:31 [INF] [config] [Listen on :::8080]2022-01-11 18:56:31 [INF] [config] ----->|2022-01-11 18:56:31 [INF] [config] |2022-01-11 18:56:31 [INF] [config] serveHTTP2022-01-11 18:56:31 [INF] [config] |2022-01-11 18:56:31 [INF] [config] <-----|2022-01-11 18:56:31 [INF] [config] 2022-01-11 18:56:31 [INF] [config] [Listen on :::8081]2022-01-11 18:56:31 [INF] [config] ----->|2022-01-11 18:56:31 [INF] [config] |2022-01-11 18:56:31 [INF] [config] serveHTTP2022-01-11 18:56:31 [INF] [config] |2022-01-11 18:56:31 [INF] [config] <-----|2022-01-11 18:56:31 [INF] [config] 2022-01-11 18:56:31 [INF] [config] [Listen on :::8082]2022-01-11 18:56:31 [INF] [config] ----->|2022-01-11 18:56:31 [INF] [config] |2022-01-11 18:56:31 [INF] [config] serveHTTP2022-01-11 18:56:31 [INF] [config] |2022-01-11 18:56:31 [INF] [config] <-----|2022-01-11 18:56:31 [INF] [config] 2022-01-11 18:56:31 [INF] [listener] Listening on port 8080 at ::2022-01-11 18:56:31 [INF] [listener] Listening on port 8081 at ::2022-01-11 18:56:31 [INF] [listener] Listening on port 8082 at ::
复制代码

现在,我们已经模拟了监听 8080、8081 和 8082 端口的服务。让我们在代理服务器上再做一次测试,你会看到,模拟服务返回了正确的响应。

小结

我们使用了 Pipy 的许多特性,包括变量声明、导入/导出变量、插件管道子管道过滤器链handleMessageStarthandleStreamStartlink等 Pipy 过滤器,以及JSONalgo.URLRouteralgo.RoundRobinLoadBalanceralgo.Cache等 Pipy 类。彻底解释所有这些概念超出了本文的范围,如果你希望了解更多信息,请阅读 Pipy 的文档。你可以通过 Pipy 的 Web 端管理 UI 查看这些文档,并按照入门教程一步步操作。

结语

来自Flomesh的 Pipy 是一个开源、高性能、轻量级的网络流量处理器,适用于多种场景,包括边缘路由器、负载平衡 &代理(正向/反向)、API 网关、静态 HTTP 服务器、服务网格挎斗等。Pipy 仍在积极开发之中,并由全职的提交者和贡献者维护,虽然仍是早期版本,但已有多个商业客户完成了测试并投入生产应用。它的创建者和维护者Flomesh.cn提供的商用解决方案就是以 Pipy 为核心。

 

这篇文章对 Pipy 做了一个非常简要的介绍和概述。GitHub 上提供了入门教程和文档,你也可以通过 Pipy 管理控制台的 Web UI 查看。社区非常欢迎大家为 Pipy 的发展做贡献,也欢迎大家在自己特定的场景下进行试用,或者提供反馈和意见。

 

作者简介:

Ali Naqvi 是一位拥有超过 20 年 IT 行业经验的专业人士。他非常热衷于开发以及为开源软件做贡献。他主要关注开发、软件架构、DevOps 等领域。他经常发表演讲,是当地社区/分会的活跃成员,致力于传播 OSS、DevOps 和 Agile 理念和知识。

 

原文链接:

How to Create a Network Proxy Using Stream Processor Pipy

 

2022-02-14 08:008965

评论

发布
暂无评论
发现更多内容

前端培训:Vue3计算属性比普通函数好的原因

@零度

Vue 前端开发

java培训:JVM垃圾回收

@零度

JVM JAVA开发

直播系统聊天技术(七):直播间海量聊天消息的架构设计难点实践

JackJiang

网络编程 即时通讯 IM 直播技术 音视频技术

VIPKID基于Karmada的容器PaaS平台落地实践

华为云原生团队

开源 Kubernetes k8s多集群管理 混合云 分布式云

OpenHarmony移植案例与原理:如何适配服务启动引导部件bootstrap_lite

华为云开发者联盟

OpenHarmony 移植 bootstrap_lite startup 系统服务

ko在数栈中的应用

袋鼠云数栈

Netty如何高效接收网络数据?一文聊透ByteBuffer动态自适应扩缩容机制

bin的技术小屋

网络编程 Netty nio 中间件 Java【

OBCE 认证第一人莅临直播间|助你快速拿下 OBCA & OBCP 证书

OceanBase 数据库

直播 OceanBase 社区版 OBCE

一文了解如何源码编译Rainbond基础组件

北京好雨科技有限公司

Kubernetes PaaS rainbond

C++异常处理机制

正向成长

c++ 异常处理

《数字经济全景白皮书》数字人民币篇 重磅发布

易观分析

数字经济 数字人民币

易观分析获评2021年度北京市专精特新“小巨人”企业

易观分析

易观新闻 “小巨人”企业

Nebula Graph 源码解读系列|客户端的通信秘密——fbthrift

NebulaGraph

数据库 图数据库

本着什么原则,才能写出优秀的代码?

AlwaysBeta

程序员 设计模式 代码规范

开源| 直播推拉流2.0升级了什么

anyRTC开发者

开源 音视频 屏幕共享 视频直播 美颜滤镜

上海市宝山区委书记陈杰一行参访旺链科技

旺链科技

区块链 产业区块链 Vone新闻

学生管理系统架构设计文档

阿卷

架构实战营

张海宁:首个 CNCF 中国开源项目 Harbor 的修炼之道

腾源会

开源 腾源会

Kotlin语法手册(四)

寻找生命中的美好

android kotlin 安卓

[Python]第一章(建议收藏)

謓泽

Python 2月月更

大数据培训:Flink CDC 高频面试题

@零度

大数据 flink

netty系列之:EventLoop,EventLoopGroup和netty的默认实现

程序那些事

Java Netty nio 程序那些事 2月月更

2022年2月国产数据库排行榜:冠军宝座面临挑战,OceanBase 重返 TOP3

墨天轮

数据库 tdengine TiDB 国产数据库

80%的软件环境管理问题,根因都在这里 | 研发效能提升36计

阿里云云效

阿里云 DevOps 云原生 持续交付 部署

阳振坤:从电动汽车看分布式数据库的发展和崛起

OceanBase 数据库

数据库 OceanBase 开源 OceanBase 社区版 HTAP

ModStart:拥抱新技术,率先支持 Laravel 9.0

ModStart开源

凡泰极客成为W3C成员并加入MiniApps工作组,将积极参与小程序快应用技术标准化进程

FinClip

小程序

腾讯云联合信通院发布《超低延时直播白皮书》,推动直播延时降低90%以上

科技热闻

福建省福州市网络安全等级测评机构名单目录看这里!

行云管家

等保 等级保护 等保测评

高可用之SkybilityHA简单介绍-行云管家

行云管家

高可用 ha

MatrixOne 0.2.0 发布!最快的SQL计算引擎来了!

MatrixOrigin

开源 MatrixOrigin MatrixOne 超融合异构云原生数据库 矩阵起源

如何使用流处理器Pipy来创建网络代理_语言 & 开发_InfoQ精选文章