AICon 上海站|90%日程已就绪,解锁Al未来! 了解详情
写点什么

浅谈 Puma 的并发模型与实现

  • 2019-12-05
  • 本文字数:7784 字

    阅读完需:约 26 分钟

浅谈 Puma 的并发模型与实现


这篇文章已经是整个 Rack 系列文章的第五篇了,在前面的文章中我们见到了多线程模型、多进程模型以及事件驱动的 I/O 模型,对于几种常见的 webserver 已经很了解了,其实无论是 Ruby 还是其他社区对于 webserver 的实现也就是这么几种方式:多线程、多线程和 Reactor。



在这篇文章中要介绍的 Puma 只是混合了两种 I/O 模型,同时使用多进程和多线程来提高应用的并行能力。


文中使用的 Puma 版本是 v3.10.0,如果你使用了不同版本的 Puma,原理上的区别不会太大,只是在一些方法的实现上会有一些细微的不同。

Rack 默认处理器

Puma 是目前 Rack 中优先级最高的默认 webserver,如果直接使用 rackup 命令并且当前机器上安装了 puma,那么 Rack 会自动选择 Puma 作为当前处理 HTTP 请求的服务器:


Ruby


def self.default  pick ['puma', 'thin', 'webrick']end
$ rackupPuma starting in single mode...* Version 3.10.0 (ruby 2.3.3-p222), codename: Russell's Teapot* Min threads: 0, max threads: 16* Environment: development* Listening on tcp://localhost:9292Use Ctrl-C to stop
复制代码


通过在 Rack::Handler 下创建一个新的 module Puma 再实现类方法 .run,我们就可以直接将启动的过程转交给 Puma::Launcher 处理:


Ruby


module Rack  module Handler    module Puma      def self.run(app, options = {})        conf   = self.config(app, options)        events = options.delete(:Silent) ? ::Puma::Events.strings : ::Puma::Events.stdio        launcher = ::Puma::Launcher.new(conf, :events => events)
yield launcher if block_given? begin launcher.run rescue Interrupt puts "* Gracefully stopping, waiting for requests to finish" launcher.stop puts "* Goodbye!" end end end endend
复制代码

启动器 Launcher

Puma 中的启动器确实没有做太多的工作,大部分的代码其实都是在做配置,从 ENV 和上下文的环境中读取参数,而整个初始化方法中需要注意的地方也只有不同 @runner 的初始化了:


Ruby


From: lib/puma/launcher.rb @ line 44:Owner: Puma::Launcher
def initialize(conf, launcher_args={}) @runner = nil @events = launcher_args[:events] || Events::DEFAULT @argv = launcher_args[:argv] || [] @config = conf @config.load
Dir.chdir(@restart_dir)
if clustered? @events.formatter = Events::PidFormatter.new @options[:logger] = @events
@runner = Cluster.new(self, @events) else @runner = Single.new(self, @events) end
@status = :runend
复制代码


#initialize 方法中,@runner 的初始化是根据当前配置中的 worker 数决定的,如果当前的 worker > 0,那么就会选择 Cluster 作为 @runner,否则就会选择 Single,在初始化结束之后会执行 Launcher#run 方法启动当前的 Puma 进程:


Ruby


From: lib/puma/launcher.rb @ line 165:Owner: Puma::Launcher
def run previous_env = ENV.to_h
setup_signals @runner.run
case @status when :halt log "* Stopping immediately!" when :run, :stop graceful_stop when :restart log "* Restarting..." ENV.replace(previous_env) @runner.before_restart restart! when :exit # nothing endend
复制代码


在这个简单的 #run 方法中,Puma 通过 #setup_singals 设置了一些信号的响应过程,在这之后执行 Runner#run 启动 Puma 的服务。

启动服务

根据配置文件中不同的配置项,Puma 在启动时有两种不同的选择,一种是当前的 worker 数为 0,这时会通过 Single 启动单机模式的 Puma 进程,另一种情况是 worker 数大于 0,它使用 Cluster 的 runner 启动一组 Puma 进程。



在这一节中文章将会简单介绍不同的 runner 是如何启动 Puma 进程的。

单机模式

Puma 单机模式的启动通过 Single 类来处理,而定义这个类的文件 single.rb 中其实并没有多少代码,我们从中就可以看到单机模式下 Puma 的启动其实并不复杂:


Ruby


From: lib/puma/single.rb @ line 40:Owner: Puma::Single
def run output_header "single"
if daemon? log "* Daemonizing..." Process.daemon(true) redirect_io end
load_and_bind @launcher.write_state @server = server = start_server
begin server.run.join rescue Interrupt # Swallow it endend
复制代码


如果我们启动了后台模式,就会通过 Puma 为 Process 模块扩展的方法 .daemon 在后台启动新的 Puma 进程,启动的过程其实和 Unicorn 中的差不多:


Ruby


From: lib/puma/daemon_ext.rb @ line 12:Owner: #<Class:Process>
def self.daemon(nochdir=false, noclose=false) exit if fork Process.setsid exit if fork
Dir.chdir "/" unless nochdir
if !noclose STDIN.reopen File.open("/dev/null", "r") null_out = File.open "/dev/null", "w" STDOUT.reopen null_out STDERR.reopen null_out end
0end
复制代码


在 Puma 中通过两次 fork 同时将当前进程从终端中分离出来,最终就可以得到一个独立的 Puma 进程,你可以通过下面的图片简单理解这个过程:



当我们在后台启动了一个 Puma 的 master 进程之后就可以开始启动 Puma 的服务器了,也就是 Puma::Server 的实例:


Ruby


From: lib/puma/runner.rb @ line 151:Owner: Puma::Runner
def start_server min_t = @options[:min_threads] max_t = @options[:max_threads]
server = Puma::Server.new app, @launcher.events, @options server.min_threads = min_t server.max_threads = max_t server.inherit_binder @launcher.binder
if @options[:mode] == :tcp server.tcp_mode! end
unless development? server.leak_stack_on_error = false end
serverend
复制代码


这里有很多不是特别重要的代码,需要注意的是 Server 初始化的过程以及最大、最小线程数的设置,这些信息都是通过命令行或者配置文件传入的,例如 puma -t 8:32 表示当前的最小线程数为 8、最大线程数为 32 个,Puma 会根据当前的流量自动调节同一个进程中的线程个数。


服务在启动时会创建一个线程池 ThreadPool 并传入一个用于处理请求的 block,这个方法的实现其实非常长,这里省略了很多代码;


Ruby


From: lib/puma/server.rb @ line 255:Owner: Puma::Server
def run(background=true) queue_requests = @queue_requests
@thread_pool = ThreadPool.new(@min_threads, @max_threads, IOBuffer) do |client, buffer| process_now = false
begin if queue_requests process_now = client.eagerly_finish else client.finish process_now = true end rescue MiniSSL::SSLError, HttpParserError => e # ... rescue ConnectionError client.close else if process_now process_client client, buffer else client.set_timeout @first_data_timeout @reactor.add client end end end
if queue_requests @reactor = Reactor.new self, @thread_pool @reactor.run_in_thread end
@thread = Thread.new { handle_servers } @threadend
复制代码


上述代码创建了一个新的 Reactor 对象并在一个新的线程中执行 #handle_servers 接受客户端的请求,文章会在后面介绍请求的处理。

集群模式

如果在启动 puma 进程时使用 -w 参数,例如下面的命令:


$ puma -w 3[20904] Puma starting in cluster mode...[20904] * Version 3.10.0 (ruby 2.3.3-p222), codename: Russell's Teapot[20904] * Min threads: 0, max threads: 16[20904] * Environment: development[20904] * Process workers: 3[20904] * Phased restart available[20904] * Listening on tcp://0.0.0.0:9292[20904] Use Ctrl-C to stop[20904] - Worker 2 (pid: 20907) booted, phase: 0[20904] - Worker 1 (pid: 20906) booted, phase: 0[20904] - Worker 0 (pid: 20905) booted, phase: 0
$ ps aux | grep pumadraveness 20909 0.0 0.0 4296440 952 s001 S+ 10:23AM 0:00.01 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn pumadraveness 20907 0.0 0.1 4358888 12128 s003 S+ 10:23AM 0:00.07 puma: cluster worker 2: 20904 [Desktop]draveness 20906 0.0 0.1 4358888 12148 s003 S+ 10:23AM 0:00.07 puma: cluster worker 1: 20904 [Desktop]draveness 20905 0.0 0.1 4358888 12196 s003 S+ 10:23AM 0:00.07 puma: cluster worker 0: 20904 [Desktop]draveness 20904 0.0 0.2 4346784 25632 s003 S+ 10:23AM 0:00.67 puma 3.10.0 (tcp://0.0.0.0:9292) [Desktop]
复制代码


上述命令就会启动一个 Puma 的 master 进程和三个 worker 进程,Puma 集群模式就是通过 Puma::Cluster 类来启动的,而启动集群的方法 #run 仍然是一个非常长的方法,在这里仍然省去了很多的代码:


Ruby


From: lib/puma/cluster.rb @ line 386:Owner: Puma::Cluster
def run @status = :run
output_header "cluster" log "* Process workers: #{@options[:workers]}"
read, @wakeup = Puma::Util.pipe
Process.daemon(true) spawn_workers
begin while @status == :run begin res = IO.select([read], nil, nil, WORKER_CHECK_INTERVAL)
if res req = read.read_nonblock(1) result = read.gets pid = result.to_i
if w = @workers.find { |x| x.pid == pid } case req when "b" w.boot! when "t" w.dead! when "p" w.ping!(result.sub(/^\d+/,'').chomp) end else log "! Out-of-sync worker list, no #{pid} worker" end end
rescue Interrupt @status = :stop end end
stop_workers unless @status == :halt ensure read.close @wakeup.close endend
复制代码


在使用 #spawn_workers 之后,当前 master 进程就开始通过 Socket 监听所有来自 worker 的消息,例如当前的状态以及心跳检查等等。


#spawn_workers 方法会通过 fork 创建当前集群中缺少的 worker 数,在新的进程中执行 #worker 方法并将 worker 保存在 master 的 @workers 数组中:


Ruby


From: lib/puma/cluster.rb @ line 116:Owner: Puma::Cluster
def spawn_workers diff = @options[:workers] - @workers.size return if diff < 1
master = Process.pid
diff.times do idx = next_worker_index pid = fork { worker(idx, master) }
debug "Spawned worker: #{pid}" @workers << Worker.new(idx, pid, @phase, @options) endend
复制代码


在 fork 出的新进程中,#worker 方法与单机模式中一样都创建了新的 Server 实例,调用 #run#join 方法启动服务:


Ruby


From: lib/puma/cluster.rb @ line 231:Owner: Puma::Cluster
def worker(index, master) title = "puma: cluster worker #{index}: #{master}" $0 = title
server = start_server server.run.joinend
复制代码


与 Unicorn 完全相同,Puma 使用了一个 master 进程来管理所有的 worker 进程:



虽然 Puma 集群中的所有节点也都是由 master 管理的,但是所有的事件和信号会由各个接受信号的进程处理的,只有在特定事件发生时会通知主进程。

处理请求

在 Puma 中所有的请求都是通过 ServerThreadPool 协作来响应的,我们在 #handler_servers 方法中通过 IO.select 监听一组套接字上的读写事件:


Ruby


From: lib/puma/server.rb @ line 334:Owner: Puma::Server
def handle_servers begin sockets = @binder.ios pool = @thread_pool
while @status == :run begin ios = IO.select sockets ios.first.each do |sock| begin if io = sock.accept_nonblock client = Client.new io, @binder.env(sock) pool << client pool.wait_until_not_full end rescue Errno::ECONNABORTED io.close rescue nil end rescue Object => e @events.unknown_error self, e, "Listen loop" end end rescue Exception => e # ... endend
复制代码


当有读写事件发生时会非阻塞的接受 Socket,创建新的 Client 对象最后加入到线程池中交给线程池来处理接下来的请求。


Ruby


From: lib/puma/thread_pool.rb @ line 140:Owner: Puma::ThreadPool
def <<(work) @mutex.synchronize do if @shutdown raise "Unable to add work while shutting down" end
@todo << work
if @waiting < @todo.size and @spawned < @max spawn_thread end
@not_empty.signal endend
复制代码


ThreadPool 覆写了 #<< 方法,在这个方法中它将 Client 对象加入到 @todo 数组中,通过对比几个参数选择是否创建一个新的线程来处理当前队列中的任务。


重新回到 ThreadPool 的初始化方法 #initialize 中,线程池在初始化时就会创建最低数量的线程保证当前的 worker 进程中有足够的工作线程能够处理客户端的请求:


Ruby


From: lib/puma/thread_pool.rb @ line 21:Owner: Puma::ThreadPool
def initialize(min, max, *extra, &block) @mutex = Mutex.new
@todo = []
@spawned = 0 @waiting = 0
@min = Integer(min) @max = Integer(max) @block = block @extra = extra
@workers = []
@mutex.synchronize do @min.times { spawn_thread } endend
复制代码


每一个线程都是通过 Thread.new 创建的,我们会在这个线程执行的过程中执行传入的 block:


Ruby


From: lib/puma/thread_pool.rb @ line 21:Owner: Puma::ThreadPool
def spawn_thread @spawned += 1
th = Thread.new(@spawned) do |spawned| todo = @todo block = @block mutex = @mutex
extra = @extra.map { |i| i.new }
while true work = nil
continue = true
mutex.synchronize do work = todo.shift end
begin block.call(work, *extra) rescue Exception => e STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})" end end
mutex.synchronize do @spawned -= 1 @workers.delete th end end
@workers << th thend
复制代码


在每一个工作完成之后,也会在一个互斥锁内部使用 #delete 方法将当前线程从数组中删除,在这里执行的 block 中将客户端对象 Client 加入了 Reactor 中等待之后的处理。


Ruby


@thread_pool = ThreadPool.new(@min_threads,                              @max_threads,                              IOBuffer) do |client, buffer|  begin    client.finish  rescue MiniSSL::SSLError => e    # ...  else    process_client client, buffer  endend
复制代码


如过当前任务不需要立即处理,就会向 Reactor 加入任务等待一段时间,否则就会立即由 #process_client 方法进行处理,其中调用了 #handle_request 方法尝试处理当前的网络请求:


Ruby


From: lib/puma/server.rb @ line 439:Owner: Puma::Server
def process_client(client, buffer) begin while true case handle_request(client, buffer) when false return when true return unless @queue_requests buffer.reset unless client.reset(@status == :run) client.set_timeout @persistent_timeout @reactor.add client return end end end rescue StandardError => e # ... ensure # ... endend
复制代码


用于处理网络请求的方法 #handle_request 足足有 200 多行,代码中处理非常多的实现细节,在这里实在是不想一行一行代码看过去,也就简单梳理一下这段代码的脉络了:


Ruby


From: lib/puma/server.rb @ line 574:Owner: Puma::Server
def handle_request(req, lines) env = req.env client = req.io
# ...
begin status, headers, res_body = @app.call(env)
headers.each do |k, vs| # ... end
fast_write client, lines.to_s res_body.each do |part| fast_write client, part client.flush end ensure body.close endend
复制代码


我们在这里直接将这段代码压缩至 20 行左右,你可以看到与其他的 webserver 完全相同,这里也调用了 Rack 应用的 #call 方法获得了一个三元组,然后通过 #fast_write 将请求写回客户端的 Socket 结束这个 HTTP 请求。

并发模型

到目前为止,我们已经对 Puma 是如何处理 HTTP 请求的有一个比较清晰的认识了,对于每一个 HTTP 请求都会由操作系统选择不同的进程来处理,这部分的负载均衡完全是由 OS 层来做的,当请求被分配给某一个进程时,当前进程会根据持有的线程数选择是否对请求进行处理,在这时可能会创建新的 Thread 对象来处理这个请求,也可能会把当前请求暂时扔到 Reactor 中进行等待。



Reactor 主要是为了提高 Puma 服务的性能存在的产物,它能够让当前的 worker 接受所有请求并将它们以队列的形式传入处理器中;如果当前的系统中存在慢客户端,那么也会占用处理请求的资源,不过由于 Puma 是多进程多线程模型的,所以影响没有那么严重,但是我们也经常会通过反向代理来解决慢客户端的问题。

总结

相比于多进程单线程的 Unicorn,Puma 提供了更灵活的配置功能,每一个进程的线程数都能在一定范围内进行收缩,目前也是绝大多数的 Ruby 项目使用的 webserver,从不同 webserver 的发展我们其实可以看出混合方式的并发模型虽然实现更加复杂,但是确实能够提供更高的性能和容错。


Puma 项目使用了 Rubocop 来规范项目中的代码风格,相比其他的 webserver 来说确实有更好的阅读体验,只是偶尔出现的长方法会让代码在理解时出现一些问题。

相关文章

关于图片和转载

本作品采用知识共享署名 4.0 国际许可协议进行许可。


  转载时请注明原文链接,图片在使用时请保留图片中的全部内容,可适当缩放并在引用处附上图片所在的文章链接,图片使用 Sketch 进行绘制,你可以在 [](https://draveness.me/draveness.me/sketch-sketch) 一文中找到画图的方法和素材。
复制代码


本文转载自 Draveness 技术博客。


原文链接:https://draveness.me/rack-puma


2019-12-05 18:151201

评论

发布
暂无评论
发现更多内容

千万级数据深分页查询SQL性能优化实践-京东零售技术团队

京东零售技术

Java MySQL 后端

Wireshark中的http协议包分析

小齐写代码

通义灵码——灵动指间,快码加编,你的智能编码助手

阿里巴巴云原生

阿里云 云原生

安卓动态链接库文件体积优化探索实践

京东科技开发者

高效率软件开发工具,提速开发,真的很赞!

互联网工科生

软件开发 低代码 JNPF

wrk压测

WiFi 7/QCN9274: Connecting the super network of the future

wallysSK

Seal 新春大挑战等你来参与!

SEAL安全

AI DevOps Walrus

迎龙年接新春,来华为手机里寻找祥龙

最新动态

有了ERP和MES,还需要质量管理QMS系统吗?

万界星空科技

数字化 生产管理系统 mes 万界星空科技 QMS

中文版3d lut creator pro调色软件下载 兼容M1

Rose

Mac软件 3d lut creator pro 调色

4份报告简读Java生态

4ye

JVM, Java’

探索大模型训练与多模态数据处理

百度开发者中心

人工智能 图像 大模型训练

这篇深入浅出贴 助你早日实现Stable diffusion自由

京东科技开发者

自动化测试,有最佳实践吗?

老张

软件测试 自动化测试

部署Palworld幻兽帕鲁服务器最佳实践(Ubuntu)

天翼云开发者社区

云计算 最佳实践 服务器 云服务器

KubeEdge v1.16.0 版本发布!10项新增特性

华为云开发者联盟

k8s 开发 华为云 kubeedge 华为云开发者联盟

OpenSPG新版发布:大模型知识抽取与快速知识图谱构建

百度开发者中心

人工智能 知识图谱 智能客服 大模型

Microsoft Outlook将邮件、日历和联系人汇集一处,让你轻松管理一切

Rose

Office 邮件客户端 Microsoft Outlook

如何使用低代码+定制,打造一个个性化的社交媒体平台?

天津汇柏科技有限公司

低代码 定制软件开发 软件开发定制

100%中奖、会员回馈礼…星河会员新春福利到!

飞桨PaddlePaddle

百度 飞桨 飞桨AI 飞桨星河社区

全新 Amazon S3 Express One Zone 高性能存储类服务,震撼发布!

亚马逊云科技 (Amazon Web Services)

大文件上传原理及实现方案 | 京东物流技术团队

京东科技开发者

SublimeText中文破解版 简单易用的代码编辑器

Rose

代码编辑器 SublimeText

macs fan control pro破解版序列号 mac电脑风扇控制 v1.5.17中文版

Rose

苹果电脑 风扇转速控制 Macs Fan Control Pro

2023 年已知被利用最多的十大CWE漏洞排名

华为云PaaS服务小智

软件开发 华为云

假期想学习,送你测试开发+人工智能大礼包

霍格沃兹测试开发学社

Bookends for Mac(文献书籍管理工具)v14.2.9注册激活版

Rose

IT工单治理野史:由每周最高150+治理到20+ | 京东物流技术团队

京东科技开发者

测试开发+人工智能大礼包,让你在假期实现弯道超车

测试人

软件测试

浅谈 Puma 的并发模型与实现_文化 & 方法_Draveness_InfoQ精选文章