这不是我第一次使用大型数据集。我为最大的英国公共 Wi-Fi 供应商设计的认证和产品管理数据库也有巨大的容量。我们每天跟踪数百万设备的身份认证。然而,该项目有资金,允许我们选择任何硬件、任何支持服务以及聘请任何数据库管理员来协助复制/数据仓库/故障排除。此外,所有分析查询/报告都是在逻辑副本之外完成的,还有多个系统管理员来负责支持基础设施。但是,这次是我自己的投资,资金有限而容量却有 20 倍之多。
别人的错误
这不是说,如果我们有大量资金,那么我们可以用于购买最先进的硬件、最炫的监控系统或数据库管理员(好吧,也许有个专门的数据库管理员就好了)。经过多年的咨询工作,我形成了一种观点,所有不好的根源都在于不必要的复杂数据处理管道。不需要 ETL 消息队列并且也不需要数据库查询的应用程序层缓存。通常,这些都是底层数据库问题(如:延迟、糟糕的索引策略)的解决方案,这些问题导致了更多问题。在理想场景中,我们希望在单个数据库中包含所有的数据,并且所有数据下载操作抽象为原子事务。我的目标是不再犯这些错误。
我们的目标
正如你已经猜到的,我们的 PostgreSQL 数据库成为了业务的核心部分(也称为“母亲”,尽管我的联合创始人坚持,我称不同的基础设施组件为“母亲”、“母舰”、“祖国”令人担忧)。我们没有独立的消息队列服务,缓存服务或用于数据仓库的副本。我没有维护支持的基础设施,而是致力于通过最大限度地减少延迟、提供最合适的硬件以及谨慎地规划数据库模式来消除瓶颈。我们所拥有的是易于扩展的基础设施,具有单个数据库和很多数据处理代理。我喜欢它的简单,如果什么东西坏了,我们可以马上知道,并在几分钟内修复问题。然而,在这个过程中,我们犯了很多错,本文将对其中一些问题做个总结。
数据集
在深入研究前,让我们快速总结一下数据集。
我是 Applaudience 公司的联合创始人。我们汇总电影院数据。我们主要的数据集包括电影放映时间、票价和入场情况。我们把这些数据和各种类型的支持数据(包括我们收集自 YouTube、推特和天气预报的数据)组合起来。最终结果是一个全面的时间序列数据集,描述了整个影院电影发行窗口。目标是预测电影未来的表现。
目前,我们跟踪遍及欧洲和美国的 22 个国家及地区的 3200 多个电影院的数据。每天大约有 47000 场电影。每当有人从这些电影院预定或购买电影票,我们都会捕捉一个快照以描述电影放映厅中各个座位的属性。
我们如何监控数据聚合和检测异常是另一个话题。无论如何,把 PostgreSQL 作为所有正在聚合数据以及聚合数据的所有进程的单一真实来源使之变得更容易。
每月累计达 12 亿条记录,而且这还仅仅是入场数据。
选择在何处托管数据库
我们尝试了几个供应商:
1.谷歌
2.亚马逊
3.Aiven.io
4.自托管
支持 PostgreSQL 的谷歌云 SQL
我们从谷歌获得了 10 万美元的启动资金。这是选择他们服务的主要决定因素。我们使用支持PostgreSQL的谷歌云大约有 6 个月。我们把 PostgreSQL 从谷歌云 SQL 迁走的主要原因是我们发现了一个导致数据损坏的错误。这是一个已知错误,在较新的 PostgreSQL 版本中已被修复。但是,支持 PostgreSQL 的谷歌云 SQL 的版本落后了好几个版本。承认知道该问题的部门缺乏响应提醒了我们需要向前走了。我很高兴我们向前走了,因为我们提出这个问题已经 8 个月了,但 PostgreSQL 的版本还是没有更新:
postgres=> SELECT version();
PostgreSQL 9.6.6 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 4.8.4-2ubuntu1~14.04.3) 4.8.4, 64-bit(1 row)
支持 PostgreSQL 的亚马逊 RDS
然后,我们从亚马逊获得了资金并迁移到支持PostgreSQL的亚马逊RDS,他们的 PostgreSQL 的版本是最新的,并且我对 RDS 社区的研究表明没有问题。但是,亚马逊 RDS 支持 PostgreSQL,但不支持TimescaleDB的扩展,而我们计划用它来对我们的数据库进行分区。随着亚马逊发布 Timestream(他们自己的时间序列数据库),因此,很显然,这个要求在可以预见的未来不会得到解决(该问题已经提出2年了)。
接着,我们迁移到Aiven.io。Avien.io 在我们选择的云服务供应商上为我们管理 PostgreSQL 数据库。其拥有所有我需要的扩展(包括 TimescaleDB),没有把我们锁定在特定的服务供应商上(这意味着,我们可以把我们的 Kubernetes 集群托管在 Aiven.io 支持的任何供应商那里),他们的支持从第一次互动开始就很有帮助,并且我的尽职调查得到的都是赞扬。但是,我忽略的是,我们无法得到超级用户访问权限。这导致了很多问题(如:我们一直在用的各种维护程序停止工作,而我们因为权限的问题无法使用我们的监控软件;无法使用auto_explain;逻辑复制需要自定义扩展),和本可以避免的长时间停机。
2019 年 2 月 5 日更新:
Aiven.io 已经发布了 auto_explain 支持,和作为可用的维护更新的已修复的 Timescale 1.2。
总的来说,我不明白 Aiven.io 提供了什么附加价值,因为我们甚至在数据库容量不足时也没收到警告。
由于一个无人值守的复制槽使 WAL 不断增加而耗尽磁盘空间
当这种情况发生时,支持系统将实例升级为有更大容量的实例。尽管这是一个好的解决方案,但是,它导致了过长的停机时间。具有 SSH 访问权的人应该诊断出该问题,并在几分钟内修复。
由于 Avien.io 所用的(后来证明是这样)TimescaleDB扩展中的错误,我们开始经历不断的停机,支持部门没有为该问题提供任何解决方法。
我们正在研究该问题,与 timescale 团队合作,但是,对大多数问题的响应不是即时的。我们的帮助文档(https://help.aiven.io/support/aiven-support-details)描述了我们提供的响应时间。
当客户服务器处于崩溃循环时,这是一个非常被动的响应(两天之后,Aiven.io 还没有跟进)。
尽管在一些问题上,我对 Aiven.io 颇有微词,但是,总体上,他们的支持是相当不错的。他们容忍我在文档中已经涵盖的问题,并且帮助解决问题。我们离开的主要原因是缺乏 SSH/超级用户权限。
自托管
我一直在试图避免不可避免的事,也即我们自己管理数据库。现在,我们租用我们自己的硬件并维护数据库。我们有很多好的硬件,比任何云服务供应商能提供的更好,能够及时恢复(多亏了 Barman),也没有供应商锁定,并且(理论上)比用谷歌云或 AWS 托管要便宜 30%。这省下来的钱我们可以用来请一位自由数据库管理员每天检查一次服务器。
要点
这里的要点是,谷歌和亚马逊优先考虑他们的专有解决方案(谷歌的 BigQuery 和亚马逊的 Redshift)。因此,我们必须计划未来我们需要的功能。对于一个简单的数据库,它既不会变成一个有数十亿条记录的数据库,也不需要自定义扩展,那么我会不假思索地选其中任何一个(即时扩展实例,把服务器迁移到不同的地区,时间点恢复,内置的监控工具和托管复制可以省下很多时间)。
如果业务全跟数据有关,并且知道需要自定义硬件配置等等,那么最好的选择是自托管并管理数据库。也就是说,逻辑迁移非常简单,如果可以从任何一个托管供应商开始,并利用他们的启动资金,那么启动一个项目是很好的方法,并且,稍后如果有必要,可以进行迁移。
如果我重新开始,那么我会把时间花在估计我们会成长得多快和多大,我会用裸机设置并从第一天就开始请一位自由数据库管理员。
福利:性能
我选择托管服务的主要标准是减少管理开销。我认为成本和硬件大致相同。Aiven.io 已经写了一篇文章,在文章中比较了PostgreSQL在AWS、GCP、Azure、DO和UpCloud的性能(GCP 的性能在所有测试中都比 AWS 高出 2 倍)。
物化数据
我有没有提到这是我第一次使用 PostgreSQL?
在此之前,我主要使用 MySQL。我决定把 PostgreSQL 用于此次创业的原因是 PostgreSQL 支持物化视图和编程语言。我认为物化视图本身就是个很好的功能,可以用来学习 PostgreSQL。相反,我以为永远不会在数据库里运行脚本(MySQL 给我的教训是,数据库只能用于存储数据,所有逻辑必须在应用程序代码中实现)。
两年后,我们摆脱了大多数物化视图,现在我们使用数百个定制程序。但是,在这之前,在使用物化视图时,有多次失败的尝试。
首次尝试使用 PostgreSQL 物化视图
我第一次使用物化视图的用例可以概括为“有一个富含元数据的基表”,例如:
CREATE MATERIALIZED VIEW venue_view AS
WITH
auditorium_with_future_events AS (
SELECT
e1.venue_id,
e1.auditorium_id
FROM event e1
WHERE
-- The 30 days interval ensures that we do not remove auditoriums
-- that are temporarily unavailable.
e1.start_time > now() - INTERVAL '30 day' AND
e1.auditorium_id IS NOT NULL
GROUP BY
e1.venue_id,
e1.auditorium_id
),
auditorium_with_future_events_count AS (
SELECT
awfe1.venue_id,
count(*) auditorium_count
FROM auditorium_with_future_events awfe1
GROUP BY
awfe1.venue_id
),
venue_auditorium_seat_count AS (
SELECT DISTINCT ON (e1.venue_id, e1.auditorium_id)
e1.venue_id,
e1.auditorium_id,
e1.seat_count
FROM auditorium_with_future_events awfe1
INNER JOIN event e1 ON e1.venue_id = awfe1.venue_id AND e1.auditorium_id = awfe1.auditorium_id
WHERE
e1.start_time > now() - INTERVAL '30 day' AND
e1.auditorium_id IS NOT NULL AND
e1.seat_count IS NOT NULL
ORDER BY
e1.venue_id,
e1.auditorium_id
),
venue_seat_count AS (
SELECT
vasc1.venue_id,
sum(vasc1.seat_count) seat_count
FROM venue_auditorium_seat_count vasc1
GROUP BY vasc1.venue_id
)
SELECT DISTINCT ON (v1.id)
v1.id,
v1.google_place_id,
v1.fuid,
v1.cinema_id,
v1.street_1,
v1.street_2,
v1.postcode,
v1.coordinates,
gp1.country_id,
gp1.timezone_id,
COALESCE(v1.phone_number, c1.phone_number) AS phone_number,
v1.display_name AS name,
COALESCE(v1.alternative_url, v1.url) AS url,
v1.permanently_closed_at,
awfec1.auditorium_count,
nearest_venue.id nearest_venue_id,
CASE
WHEN nearest_venue.id IS NULL
THEN NULL
ELSE round(ST_DistanceSphere(gp1.location, nearest_venue.location))
END nearest_venue_distance,
vsc1.seat_count seat_count
FROM venue v1
LEFT JOIN venue_seat_count vsc1 ON vsc1.venue_id = v1.id
LEFT JOIN google_place gp1 ON gp1.id = v1.google_place_id
LEFT JOIN LATERAL (
SELECT
v2.id,
gp2.location
FROM venue v2
INNER JOIN google_place gp2 ON gp2.id = v2.google_place_id
WHERE v2.id != v1.id
ORDER BY gp1.location <-> gp2.location
LIMIT 1
) nearest_venue ON TRUE
LEFT JOIN auditorium_with_future_events_count awfec1 ON awfec1.venue_id = v1.id
INNER JOIN cinema c1 ON c1.id = v1.cinema_id
WITH NO DATA;
CREATE UNIQUE INDEX ON venue_view (id);
CREATE INDEX ON venue_view (google_place_id);
CREATE INDEX ON venue_view (cinema_id);
CREATE INDEX ON venue_view (country_id);
CREATE INDEX ON venue_view (nearest_venue_id);
复制代码
在这里,venue 是基表,我们用附加数据对其进行扩展,并称之为 venue_view。
只需遵守两个规则:
_view 必须包含基表的所有列。
_view 必须包含基表的所有行。
以上查询没有任何问题。这个方法在很长时间里都有效。但是,随着记录的数量增长到数百万和数十亿的时候,刷新物化视图的时间从几秒钟增加到几小时。(如果你对物化视图不熟悉,那么值得注意的是,你只能刷新整个物化视图,没有办法根据条件刷新视图的子集。)
第二次尝试:分而治之
我尝试通过把 MV 分解为多个更小的 MV 来解决该问题,如:
(请注意,我们已经把查询从 CTE 迁移到专用 MV。)
CREATE MATERIALIZED VIEW auditorium_with_future_events_view
SELECT
e1.venue_id,
e1.auditorium_id
FROM event e1
WHERE
-- The 30 days interval ensures that we do not remove auditoriums
-- that are temporarily unavailable.
e1.start_time > now() - INTERVAL '30 day' AND
e1.auditorium_id IS NOT NULL
GROUP BY
e1.venue_id,
e1.auditorium_id
WITH NO DATA;
CREATE UNIQUE INDEX ON auditorium_with_future_events_view (venue_id, auditorium_id);
CREATE MATERIALIZED VIEW venue_auditorium_seat_count_view
SELECT DISTINCT ON (e1.venue_id, e1.auditorium_id)
e1.venue_id,
e1.auditorium_id,
e1.seat_count
FROM auditorium_with_future_events_view awfe1
INNER JOIN event e1 ON e1.venue_id = awfe1.venue_id AND e1.auditorium_id = awfe1.auditorium_id
WHERE
e1.start_time > now() - INTERVAL '30 day' AND
e1.auditorium_id IS NOT NULL AND
e1.seat_count IS NOT NULL
ORDER BY
e1.venue_id,
e1.auditorium_id
WITH NO DATA;
CREATE UNIQUE INDEX ON venue_auditorium_seat_count_view (venue_id, auditorium_id);
CREATE MATERIALIZED VIEW venue_view AS
WITH
auditorium_with_future_events_count AS (
SELECT
awfe1.venue_id,
count(*) auditorium_count
FROM auditorium_with_future_events_view awfe1
GROUP BY
awfe1.venue_id
),
venue_seat_count AS (
SELECT
vasc1.venue_id,
sum(vasc1.seat_count) seat_count
FROM venue_auditorium_seat_count_view vasc1
GROUP BY vasc1.venue_id
)
SELECT DISTINCT ON (v1.id)
v1.id,
v1.google_place_id,
v1.fuid,
v1.cinema_id,
v1.street_1,
v1.street_2,
v1.postcode,
v1.coordinates,
gp1.country_id,
gp1.timezone_id,
COALESCE(v1.phone_number, c1.phone_number) AS phone_number,
v1.display_name AS name,
COALESCE(v1.alternative_url, v1.url) AS url,
v1.permanently_closed_at,
awfec1.auditorium_count,
nearest_venue.id nearest_venue_id,
CASE
WHEN nearest_venue.id IS NULL
THEN NULL
ELSE round(ST_DistanceSphere(gp1.location, nearest_venue.location))
END nearest_venue_distance,
vsc1.seat_count seat_count
FROM venue v1
LEFT JOIN venue_seat_count vsc1 ON vsc1.venue_id = v1.id
LEFT JOIN google_place gp1 ON gp1.id = v1.google_place_id
LEFT JOIN LATERAL (
SELECT
v2.id,
gp2.location
FROM venue v2
INNER JOIN google_place gp2 ON gp2.id = v2.google_place_id
WHERE v2.id != v1.id
ORDER BY gp1.location <-> gp2.location
LIMIT 1
) nearest_venue ON TRUE
LEFT JOIN auditorium_with_future_events_count awfec1 ON awfec1.venue_id = v1.id
INNER JOIN cinema c1 ON c1.id = v1.cinema_id
WITH NO DATA;
CREATE UNIQUE INDEX ON venue_view (id);
CREATE INDEX ON venue_view (google_place_id);
CREATE INDEX ON venue_view (cinema_id);
CREATE INDEX ON venue_view (country_id);
CREATE INDEX ON venue_view (nearest_venue_id);
复制代码
这种方法的好处是:
1.我们把一个长事务分解成很多短事务。
2.我们能够用索引来加速 JOIN。
3.我们能够刷新单个物化视图(某些数据比其他数据改变得更频繁)。
这种方法的缺点是它增加了我们使用的物化视图的数量,并需要开发一个自定义解决方案来协调物化视图的刷新。在当时,这似乎是合理的,我就接受了。于是产生了 materialized_view_refresh_schedule 表以及我们的第一个数据库内队列:
CREATE TABLE materialized_view_refresh_schedule (
id SERIAL PRIMARY KEY,
materialized_view_name citext NOT NULL,
refresh_interval interval NOT NULL,
last_attempted_at timestamp with time zone,
maximum_execution_duration interval NOT NULL DEFAULT '00:30:00'::interval
);
CREATE UNIQUE INDEX materialized_view_refresh_schedule_materialized_view_name_idx ON materialized_view_refresh_schedule(materialized_view_name citext_ops);
CREATE TABLE materialized_view_refresh_schedule_execution (
id integer DEFAULT nextval('materialized_view_refresh_id_seq'::regclass) PRIMARY KEY,
materialized_view_refresh_schedule_id integer NOT NULL REFERENCES materialized_view_refresh_schedule(id) ON DELETE CASCADE,
started_at timestamp with time zone NOT NULL,
ended_at timestamp with time zone,
execution_is_successful boolean,
error_name text,
error_message text,
terminated_at timestamp with time zone,
CONSTRAINT materialized_view_refresh_schedule_execution_check CHECK (terminated_at IS NULL OR ended_at IS NOT NULL)
);
CREATE INDEX materialized_view_refresh_schedule_execution_materialized_view_ ON materialized_view_refresh_schedule_execution(materialized_view_refresh_schedule_id int4_ops);
复制代码
物化视图的名称存于 materialized_view_refresh_schedule 表中,并包含需要刷新的频率的说明。编写了另一个程序以使用这些指令来执行物化。
CREATE OR REPLACE FUNCTION schedule_new_materialized_view_refresh_schedule_execution()
RETURNS table(materialized_view_refresh_schedule_id int)
AS $$
BEGIN
RETURN QUERY
UPDATE materialized_view_refresh_schedule
SET last_attempted_at = now()
WHERE id IN (
SELECT mvrs1.id
FROM materialized_view_refresh_schedule mvrs1
LEFT JOIN LATERAL (
SELECT 1
FROM materialized_view_refresh_schedule_execution mvrse1
WHERE
mvrse1.ended_at IS NULL AND
mvrse1.materialized_view_refresh_schedule_id = mvrs1.id
) AS unendeded_materialized_view_refresh_schedule_execution ON TRUE
WHERE
unendeded_materialized_view_refresh_schedule_execution IS NULL AND
(
mvrs1.last_attempted_at IS NULL OR
mvrs1.last_attempted_at + mvrs1.refresh_interval < now()
)
ORDER BY mvrs1.last_attempted_at ASC NULLS FIRST
LIMIT 1
FOR UPDATE OF mvrs1 SKIP LOCKED
)
RETURNING id;
END
$$
LANGUAGE plpgsql;
复制代码
该程序将调用 schedule_new_materialized_view_refresh_schedule_execution 来安排物化视图的刷新,同时评估 REFRESH MATERIALIZED VIEW…并记录结果。总的来说,该方法效果很好。但是,我们很快就不能使用该方法了。原因是需要扫描整个表的视图对拥有数十亿条记录的大型表来说不可行。
第三次尝试:使用 MV 来抽象数据的子集
我已经描述了我们如何使用 MV 来有效地扩展表。这种方法不适用于扩展大型表。于是,产生了第三次迭代:我们没有使用物化视图来扩展基本表,而是创建抽象数据领域的物化视图。由于它的大小,venue_view 能够保持原样,但是,像有数十亿条记录的 event_view 的假设视图将变为 last_week_event、future_event 等。这种方法有效果,我们继续使用几个这样的物化视图。
第四次尝试:物化表的列
尽管后一种方式涵盖了所有我们的日常操作,但是,我们还是需要在历史数据上进行查询。在没有物化视图的情况下来运行这些查询将对单个查询进行很多索引规划。此外,针对主实例运行长事务将防止自动真空并导致表膨胀。我本可以创建一个逻辑复制并允许分析师在那个实例上运行任何查询而不妨碍自动真空。然而,更大的问题是,作为一个初创公司,我们无法承担需要几个小时或几天来运行的查询。我们需要比其他人更快。因此诞生了目前的解决方案:物化表的列。
原则很简单:
描述我们希望用附加信息充实的实体的表,被改为包含一个 materialized_at 时间戳列,和我们想要物化的每个数据点的列。在 venue_view 的示例中,我们将摆脱整个物化视图,并将原来的 venue_view 物化视图中的 materialized_at、country_id、timezone_id、phone_number 和其他列都添加到场地表。
然后,有一个脚本,它将观察所有拥有 materialized_at 列的表,并且每当其检测到某一行的 materialized_at 是空时,它会为物化的列计算新值并更新该行,如:
CREATE OR REPLACE FUNCTION materialize_event_seat_state_change()
RETURNS void
AS $$
BEGIN
WITH
event_seat_state_count AS (
SELECT
essc1.id,
count(*)::smallint seat_count,
count(*) FILTER (WHERE ss1.nid = 'BLOCKED')::smallint seat_blocked_count,
count(*) FILTER (WHERE ss1.nid = 'BROKEN')::smallint seat_broken_count,
count(*) FILTER (WHERE ss1.nid = 'EMPTY')::smallint seat_empty_count,
count(*) FILTER (WHERE ss1.nid = 'HOUSE')::smallint seat_house_count,
count(*) FILTER (WHERE ss1.nid = 'SOLD')::smallint seat_sold_count,
count(*) FILTER (WHERE ss1.nid = 'UNKNOWN')::smallint seat_unknown_count,
count(*) FILTER (WHERE ss1.id IS NULL)::smallint seat_unmapped_count,
count(*) FILTER (WHERE ss1.nid IN ('BLOCKED', 'BROKEN', 'HOUSE', 'SOLD', 'UNKNOWN')) seat_unavailable_count
FROM event e1
LEFT JOIN event_seat_state_change essc1 ON essc1.event_id = e1.id
LEFT JOIN event_seat_state_change_seat_state esscss1 ON esscss1.event_seat_state_change_id = essc1.id
LEFT JOIN cinema_foreign_seat_state fcss1 ON fcss1.id = cinema_foreign_seat_state_id
LEFT JOIN seat_state ss1 ON ss1.id = fcss1.seat_state_id
WHERE
essc1.id IN (
SELECT id
FROM event_seat_state_change
WHERE
materialized_at IS NULL
ORDER BY materialized_at DESC
LIMIT 100
)
GROUP BY essc1.id
)
UPDATE event_seat_state_change essc1
SET
materialized_at = now(),
seat_count = essc2.seat_count,
seat_blocked_count = essc2.seat_blocked_count,
seat_broken_count = essc2.seat_broken_count,
seat_empty_count = essc2.seat_empty_count,
seat_house_count = essc2.seat_house_count,
seat_sold_count = essc2.seat_sold_count,
seat_unknown_count = essc2.seat_unknown_count,
seat_unmapped_count = essc2.seat_unmapped_count
FROM event_seat_state_count essc2
WHERE
essc1.id = essc2.id;
END
$$
LANGUAGE plpgsql
SET work_mem='1GB'
SET max_parallel_workers_per_gather=4;
复制代码
同样,这需要编写一个定制解决方案,以观察表并管理其物化、行和列的到期逻辑等等。目前,我在开发一个开源版本,我计划在不久的将来进行发布。
这种方法的最大好处是可以按所希望的粒度来更新物化表的列:可以更新单个行,可以更新单个列(如,当添加了新的物化列,需要填充新的列值时,可以只需要生成该列的值;不需要运行完整的物化查询)。此外,因为更新是粒度的,所以它们都能够近乎实时地得到应用。
要点
这里的要点是,PostgreSQL 物化视图对小型数据集是个极好的功能。但是,随着数据集的增长,需要谨慎地计划如何访问数据以及支持这种需求的物化策略。利用粒度物化视图和物化表的列的组合,我们可以实时地丰富数据库,并用于我们所有的分析查询,而不必增加数据仓储逻辑复制的复杂性。
使用数据库作为作业队列
这和我们处理的数据量关系不大,而与我们使用数据库的方式有关。如前所述,我的目标是减少数据处理管道中参与的服务数量。在数据库中包含作业队列的另一个好处是,可以保存和查询所有与数据库中每个数据点关联的作业(以及其属性)的记录。能够查询与每个数据点关联的作业和日志,把它和父作业及子作业连接起来等等,事实证明,这对于标记失败作业和指出问题的根源是很有价值的。
用 PostgreSQL 构建一个简单、可靠和高效的并发工作队列。
值得注意的是,通常对于并发作业队列,RDBM 是个糟糕的选择(原因请参考What is SKIP LOCKED for in PostgreSQL 9.5?)。但是,在 PostgreSQL 的情况下,我们可以用于 UPDATE…SKIP LOCKED 来构建简单、可靠和高效的并发工作队列。其缺点在性能方面:
每个事务都会扫描表并跳过锁定的行,因此,对于大量活跃的工作节点来说,其可能需要做一些工作来获得新项目。这不只是把项目弹出栈。查询可能必须使用索引扫描遍历索引,从堆中获取每个候选项目,并检查锁定状态。对于任何合理的队列,这将都存在于内存中,但其仍然是相当大的变动。
– https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
我对该警告没有引起足够的重视,结果给自己惹上了相当大的麻烦。简而言之,用来调度作用的查询的第一个版本执行时间很长,这意味着工作节点基本上处于闲置状态,我们在浪费宝贵的资源,而重要的任务没有及时完成。
解决方案相当简单:一个专用表,用未完成的任务列表来填充。从该表中选取一个作业很简单:
CREATE OR REPLACE FUNCTION schedule_cinema_data_task()
RETURNS table(cinema_data_task_id int)
AS $$
DECLARE
scheduled_cinema_data_task_id int;
BEGIN
UPDATE
cinema_data_task_queue
SET
attempted_at = now()
WHERE
id = (
SELECT cdtq1.id
FROM cinema_data_task_queue cdtq1
WHERE cdtq1.attempted_at IS NULL
ORDER BY cdtq1.id ASC
LIMIT 1
FOR UPDATE OF cdtq1 SKIP LOCKED
)
RETURNING cinema_data_task_queue.cinema_data_task_id
INTO scheduled_cinema_data_task_id;
UPDATE cinema_data_task
SET last_attempted_at = now()
WHERE id = scheduled_cinema_data_task_id;
RETURN QUERY SELECT scheduled_cinema_data_task_id;
END
$$
LANGUAGE plpgsql
SET work_mem='100MB';
主任务定义存于cinema_data_task。cinema_data_task_queue只用于对准备执行的任务进行排队。
最大的问题是,每次执行新任务时,哪些任务可以运行的优先级和限制都会发生变化。因此,我们不是调度大量的作业,而是运行一个流程,每秒都去检查一下队列是否快空了,并用新任务填充它等等。
CREATE OR REPLACE FUNCTION update_cinema_data_task_queue()
RETURNS void
AS $$
DECLARE
outstanding_task_count int;
BEGIN
SELECT count(*)
FROM cinema_data_task_queue
WHERE attempted_at IS NULL
INTO outstanding_task_count;
IF outstanding_task_count < 100 THEN
INSERT INTO cinema_data_task_queue (cinema_data_task_id)
SELECT
cdtq1.cinema_data_task_id
FROM cinema_data_task_queue(100, 50, 100, false) cdtq1
WHERE
NOT EXISTS (
SELECT 1
FROM cinema_data_task_queue
WHERE
cinema_data_task_id = cdtq1.cinema_data_task_id AND
attempted_at IS NULL
)
ON CONFLICT (cinema_data_task_id) WHERE attempted_at IS NULL
DO NOTHING;
END IF;
END
$$
LANGUAGE plpgsql
SET work_mem='50MB';
复制代码
在任务结束后,任务的引用将从 cinema_data_task_queue 中删除。这确保表的扫描很快,不让 CPU 一直繁忙。
该方法允许我们扩展到 2000 多个并发数据聚合代理。
注意:100 个未完成任务的限制有点武断。我已经试验了大到 10000 的值,而没有任何可测量的性能损失。但是,只要我们保持队列里一直有数据,那么调度越精细,我们就能越好地平衡不同来源之间的数据聚合的负载平衡,我们也可以越快地停止从故障数据源中提取数据等等。
要点
如果准备把数据库用作作业队列,那么包含作业的表必须有合理的大小,并且用于调度下一个作业执行的查询时间不得超过几个毫秒。
其他
在扩展数据库时,这 3 件事是最大的挑战。其他一些问题包括:
识别不同云供应商之间的延迟。
列的顺序很重要。我们有一些具有 60 多个列的表。对列进行排序以避免填充操作,可以节约 20%以上的存储空间(请参看 https://blog.2ndquadrant.com/on-rocks-and-sand/)。
如果准备在主节点上运行长查询,那么评估 vacuum_freeze_table_age 以避免表膨胀。
这两个配置没有得到充分的讨论:from_collapse_limit、join_collapse_limit。这两个配置默认为 8。如果不知道这些配置,可能会导致很多让人头痛的调试,搞乱执行计划。我们把 from_collapse_limit 提高到 20,把 join_collapse_limit 提高到 50。我还不清楚为什么默认值那么低。把它们提到那么高也没有什么惩罚。
为表的膨胀做计划,并知道如何修复。随着数据库变得越来越大,vacuum full 变得不可行。探索一下 pg_repack 和 pg_squeeze。
持续监控 pg_stat_statements。用 total_time 排序。顶层查询最先得到服务。
持续监控 pg_stat_user_tables。识别未充分利用的索引并监控死元组的积累。
持续监控 pg_stat_activity。识别由于锁定造成的瓶颈并重构有问题的事务。
福利:Slonik PostgreSQL 客户端
我们经常使用 PostgreSQL。我们从使用 node-postgres 开始。node-postgres 提供了很好的协议抽象。但是,感觉代码冗长,我们不断添加新的帮助程序来抽象重复的模式并支持调试体验。我们在很多不同的程序中都需要这些帮助程序。因此,我最终开发了 Slonik,这是一个有严格类型、详细的日志记录和断言的 PostgreSQL 客户端。
Slonik 有助于我们保持代码简洁,防止 SQL 注入,支持详细登录和与 auto_explain 相关的应用程序日志。
阅读英文原文:Lessons learned scaling PostgreSQL database to 1.2bn records/month
评论