1. 引言
偶然间发现 redis 的 BLPOP 命令的阻塞时间不是精确的,并且一般都是超出 100~500ms,多阻塞几次岂不是 1s 就没了?!这一行为成功引起了我的兴趣,于是乎就有了这篇对阻塞命令源码的探究。
本文以 BLPOP 命令为例探究一波内部实现(源码大军即将到来!!!)。
2. 设置阻塞 key
首先让我们跳过客户端连接和服务器初始化的步骤,直接看命令的执行流程。BLPOP 和 BRPOP 最终都会调用同一个通用函数 blockingPopGenericCommand。
在该函数中,如果 key 对应的 list 有值,阻塞命令的行为就同普通 pop 命令一致。反之,且 client 不在事务中时,就会调用 blockForKeys 函数进行设置阻塞操作。
在解析设置阻塞 key 的源码之前,需要先补充一点 redis 基础数据结构和单机实现的一些知识。
首先 redis 对外提供的 5 种数据结构实际上是由 redisObject 持有不同的底层数据结构实现的,本文会涉及到的有 dict、adlist 和 sds。其中 dict 是一个 hashTable 的封装,查找的时间复杂度为 O(1);adlist 是一个简单的双向链表,可以很方便的进行首尾遍历;sds 是 redis 自己实现的字符串,有动态扩容、兼容二进制数据等优势,基础数据结构的具体细节在这就不赘述。
除了这些基础的数据结构,redis 还定义了 redisServer、client 和 redisDb3 个数据结构用于保存 redis 服务、客户端和 db 的相关信息。一个 redis 服务只有一个 redisServer 实例,一个服务持有多个 redisDb 实例,而每个连接上的客户端都会在服务端初始化一个 client 实例,并且映射到一个 redisDb 实例上。
回到源码,redisServer、redisDb 和 client 的结构体为设置阻塞 key 专门准备了对应属性:
1struct redisServer { // 服务端
2 // ...
3 unsigned int bpop_blocked_clients; // 阻塞的client数量
4 // ...
5};
6
7typedef struct redisDb { // redisDb
8 // ...
9 dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
10 // ...
11} redisDb;
12
13typedef struct client { // 客户端
14 // ...
15 int btype; /* Type of blocking op if CLIENT_BLOCKED. */
16 blockingState bpop; /* blocking state */
17 // ...
18} client;
19
20typedef struct blockingState {
21 mstime_t timeout; // 阻塞超时时间
22 dict *keys; // 阻塞的key字典
23 robj *target; /* The key that should receive the element,
24 * for BRPOPLPUSH. */
25 /* BLOCKED_WAIT */
26 int numreplicas; /* Number of replicas we are waiting for ACK. */
27 long long reploffset; /* Replication offset to reach. */
28} blockingState;
复制代码
redisServer 中的 bpop_blocked_clients 只是简单的记录了阻塞的 client 数量,主要用于 info 的展示。
redisDb 中的 blocking_keys 是一个 dict,该 dict 的 key 为当前 db 阻塞的 key,每个 value 应着一个 client 的 adlist,如果多个 client 阻塞着同一个 key 会按照先后顺序添加到 adlist 尾部。
client 的 byte 标识了阻塞的类型,用于区分 BLPOP 和 BRPOPLPUSH 两种阻塞。bpop 记录了阻塞的各种属性,最主要的是 timeout 和 keys。
终于扯的差不多,可以把视线就转移到 blockForKeys 的源码了:
1void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { // 阻塞key 用于阻塞命令
2 dictEntry *de;
3 list *l;
4 int j;
5
6 c->bpop.timeout = timeout; // 设置阻塞时间
7 c->bpop.target = target;
8
9 if (target != NULL) incrRefCount(target);
10
11 for (j = 0; j < numkeys; j++) {
12 if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue; // 将阻塞的key添加到client的bpop.keys字典中
13 incrRefCount(keys[j]); // 引用计数加一
14 de = dictFind(c->db->blocking_keys,keys[j]); // 查找db的blocking_keys中是否存在阻塞的key
15 if (de == NULL) {
16 int retval;
17 l = listCreate(); // 创建一个adlist实例
18 retval = dictAdd(c->db->blocking_keys,keys[j],l); // 添加到blocking_keys的字典中
19 incrRefCount(keys[j]);
20 serverAssertWithInfo(c,keys[j],retval == DICT_OK);
21 } else {
22 l = dictGetVal(de);
23 }
24 listAddNodeTail(l,c); // 在blocking_keys的对应list中添加上client
25 }
26 blockClient(c,BLOCKED_LIST); // 设置客户端阻塞状态
27}
28
29void blockClient(client *c, int btype) {
30 c->flags |= CLIENT_BLOCKED;
31 c->btype = btype;
32 server.bpop_blocked_clients++;
33}
复制代码
c->bpop.keys 是一个 value 为 NULL 的 dict,主要是防止阻塞的 key 重复。在设置完阻塞 key 后,client 的状态会变更为 CLIENT_BLOCKED 状态,在这种状态下正常弹出、超时或客户端断开才会断开连接。
3. 阻塞超时
蛤!?设置流程完成了,却完全没看到超时的处理步骤,看来只能跟随着 bpop.timeout 的脚步进行下一步排查了。万幸在 clientsCronHandleTimeout 函数中发现了蛛丝马迹。那么 clientsCronHandleTimeout 又是搞啥子的呢?还是要先补充一些 redis 单机的运行知识。
首先如果不考虑 BGSAVE 和 BGREWRITEAOF 的话,redis 实际上是一个单进程的服务。通过 IO 多路复用和事件循环同时处理多 client 的请求。
事件循环中的事件分为文件事件和时间事件,文件事件主要是各种网络请求的处理,时间事件是 redis 服务定期执行的一些事件,包括随机删除过期 key、rdbsave、主从同步等,时间事件的执行频率是根据配置中的 hz 参数来设定。
而 clientsCronHandleTimeout 就是时间事件中专门检测 client 超时的函数,如果当前 client 的状态为 CLIENT_BLOCKED 会就根据 bpop.timeout 进行超时判断:
1int clientsCronHandleTimeout(client *c, mstime_t now_ms) { // 校验客户端是否超时
2 time_t now = now_ms/1000;
3
4 if (server.maxidletime &&
5 !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
6 !(c->flags & CLIENT_MASTER) && /* no timeout for masters */
7 !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
8 !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
9 (now - c->lastinteraction > server.maxidletime))
10 { // 正常设置的client超时
11 serverLog(LL_VERBOSE,"Closing idle client");
12 freeClient(c);
13 return 1;
14 } else if (c->flags & CLIENT_BLOCKED) { // 阻塞超时
15 if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
16 /* Handle blocking operation specific timeout. */
17 replyToBlockedClientTimedOut(c); // 设置超时返回
18 unblockClient(c); // 解除阻塞
19 } else if (server.cluster_enabled) { // 集群相关操作
20 if (clusterRedirectBlockedClientIfNeeded(c))
21 unblockClient(c);
22 }
23 }
24 return 0;
25}
复制代码
刨除一系列的校验,最核心的是 unblockClient 这个函数:
1void unblockClient(client *c) { // 解除阻塞
2 if (c->btype == BLOCKED_LIST) { // BLPOP和BRPOP的阻塞
3 unblockClientWaitingData(c);
4 } else if (c->btype == BLOCKED_WAIT) {
5 unblockClientWaitingReplicas(c);
6 } else {
7 serverPanic("Unknown btype in unblockClient().");
8 }
9
10 c->flags &= ~CLIENT_BLOCKED;
11 c->btype = BLOCKED_NONE;
12 server.bpop_blocked_clients--;
13 if (!(c->flags & CLIENT_UNBLOCKED)) { // 将client添加到server的非阻塞list中
14 c->flags |= CLIENT_UNBLOCKED;
15 listAddNodeTail(server.unblocked_clients,c);
16 }
17}
18
19void unblockClientWaitingData(client *c) { // 解除client的阻塞
20 dictEntry *de;
21 dictIterator *di;
22 list *l;
23
24 serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
25 di = dictGetIterator(c->bpop.keys); // 获取阻塞的keys
26 while((de = dictNext(di)) != NULL) { // 遍历client阻塞的所有key
27 robj *key = dictGetKey(de);
28
29 l = dictFetchValue(c->db->blocking_keys,key);
30 serverAssertWithInfo(c,key,l != NULL);
31 listDelNode(l,listSearchKey(l,c)); // 删除blocking_keys list中的client
32 if (listLength(l) == 0) // 删除空list
33 dictDelete(c->db->blocking_keys,key);
34 }
35 dictReleaseIterator(di);
36
37 dictEmpty(c->bpop.keys,NULL); // 释放client的bpop.keys占用内存
38 if (c->bpop.target) {
39 decrRefCount(c->bpop.target);
40 c->bpop.target = NULL;
41 }
42}
复制代码
redisServer 的 unblocked_clients 底层为一个 client 的 adlist。每个超时的 client 都会被记录在这个 adlist 中;超时的 client 中的 bpoop.keys 会被清空,同时 db 中的 blocking_keys 对应的 client 也会被删除。
好吧,流程执行到这又断了!还好有 server.unblocked_clients 作为线索,beforeSleep 作为其“包庇者”,会检测 server.unblocked_clients 是否为空,如果存在就给对应 client 返回超时并删除 list 中对应的 client:
1void beforeSleep(struct aeEventLoop *eventLoop) { // 每一次事件循环都会执行该函数
2 // ...
3 if (listLength(server.unblocked_clients))
4 processUnblockedClients();
5 // ...
6}
7
8void processUnblockedClients(void) { // 向非阻塞状态的client返回值
9 listNode *ln;
10 client *c;
11
12 while (listLength(server.unblocked_clients)) {
13 ln = listFirst(server.unblocked_clients);
14 serverAssert(ln != NULL);
15 c = ln->value;
16 listDelNode(server.unblocked_clients,ln); // 删除list节点
17 c->flags &= ~CLIENT_UNBLOCKED;
18
19 if (!(c->flags & CLIENT_BLOCKED)) {
20 if (c->querybuf && sdslen(c->querybuf) > 0) {
21 processInputBuffer(c); // 向client返回结果
22 }
23 }
24 }
25}
复制代码
到这整个超时流程已经走完了,那么消失的几百毫秒去哪了呢?
实际上 beforeSleep 函数是每次事件循环之前都会调用一个函数,因为设置阻塞、阻塞超时检测、阻塞超时返回分别分散在不同的事件中,所以阻塞的超时返回很可能需要跨越事件循环。而 redis 的默认 hz 配置为 10,每个周期间隔为 100ms,至此终于是水落石出。
4. 正常弹出
既然都追查到这了,那就继续看看正常弹出的策略吧。当有另一个 client 对阻塞的 key 进行 push 操作时,阻塞的 client 就会被正常弹出并获取返回值。该监测操作在添加数据的基础函数 dbAdd 中:
1void dbAdd(redisDb *db, robj *key, robj *val) { // db添加键值对
2 sds copy = sdsdup(key->ptr); // 复制key
3 int retval = dictAdd(db->dict, copy, val); // 往字典中添加键值对
4
5 serverAssertWithInfo(NULL,key,retval == DICT_OK);
6 if (val->type == OBJ_LIST) signalListAsReady(db, key); // 如果是list对象 判断是否有阻塞命令在监听
7 if (server.cluster_enabled) slotToKeyAdd(key); // 集群相关操作
8 }
如果添加的key是list对象,并且处于阻塞中,signalListAsReady就会标识该key已经就绪。
1typedef struct readyList { // server的待弹出list节点
2 redisDb *db;
3 robj *key;
4} readyList;
5
6void signalListAsReady(redisDb *db, robj *key) { // 标识某个阻塞key已经就绪
7 readyList *rl;
8
9 if (dictFind(db->blocking_keys,key) == NULL) return; // 不在阻塞的key中
10
11 if (dictFind(db->ready_keys,key) != NULL) return; // 防止重复弹出
12
13 rl = zmalloc(sizeof(*rl));
14 rl->key = key;
15 rl->db = db;
16 incrRefCount(key);
17 listAddNodeTail(server.ready_keys,rl); // 将ready_list添加到server.ready_keys的尾部
18
19 incrRefCount(key);
20 serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); // 将key添加到db的ready_keys字典中
21}
复制代码
redisServer 的 ready_keys 是一个 adlist,每个节点都是一个 readyList 对象,保存了 key 和对应的 db。
redisDb 的 ready_keys 是一个 dict 实例,同 client->bpop.keys 一样,value 都是 NULL,主要是利用 dictO(1)高效查找。
在最终的阻塞返回是在 handleClientsBlockedOnLists 函数中,该函数会在每次命令执行完成后被调用:
1int processCommand(client *c) { // 执行命令
2 // ...
3 /* Exec the command */
4 if (c->flags & CLIENT_MULTI &&
5 c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
6 c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
7 { // 事务
8 queueMultiCommand(c);
9 addReply(c,shared.queued);
10 } else { // 普通命令
11 call(c,CMD_CALL_FULL); // 调用命令对应函数
12 c->woff = server.master_repl_offset;
13 if (listLength(server.ready_keys))
14 handleClientsBlockedOnLists(); // 处理阻塞key
15 }
16 return C_OK;
17}
复制代码
通过在每次命令完成后对 ready_keys 进行检测,从而保证阻塞的 key 不会被其他的 pop 命令弹出。
1void handleClientsBlockedOnLists(void) { // 处理list阻塞弹出返回
2 while(listLength(server.ready_keys) != 0) {
3 list *l;
4
5 l = server.ready_keys;
6 server.ready_keys = listCreate(); // 将ready_keys置空
7
8 while(listLength(l) != 0) { // 遍历server.ready_keys
9 listNode *ln = listFirst(l);
10 readyList *rl = ln->value;
11
12 dictDelete(rl->db->ready_keys,rl->key); // 清除db->ready_keys
13
14 robj *o = lookupKeyWrite(rl->db,rl->key); // 查找对应的key
15 if (o != NULL && o->type == OBJ_LIST) {
16 dictEntry *de;
17
18 de = dictFind(rl->db->blocking_keys,rl->key); // 查找db的blocking_keys 获取阻塞client list
19 if (de) {
20 list *clients = dictGetVal(de);
21 int numclients = listLength(clients);
22
23 while(numclients--) { // 遍历clients 如果有足够多的值就都弹出返回
24 listNode *clientnode = listFirst(clients);
25 client *receiver = clientnode->value; // 获取client对象
26 robj *dstkey = receiver->bpop.target;
27 int where = (receiver->lastcmd &&
28 receiver->lastcmd->proc == blpopCommand) ?
29 LIST_HEAD : LIST_TAIL;
30 robj *value = listTypePop(o,where); // pop对应list
31
32 if (value) {
33 if (dstkey) incrRefCount(dstkey);
34 unblockClient(receiver); // 解除client的阻塞
35
36 if (serveClientBlockedOnList(receiver,
37 rl->key,dstkey,rl->db,value,
38 where) == C_ERR)
39 { // 如果通知客户端出错就进行回滚,将数据push回list
40 listTypePush(o,value,where);
41 }
42
43 if (dstkey) decrRefCount(dstkey);
44 decrRefCount(value);
45 } else {
46 break;
47 }
48 }
49 }
50 if (listTypeLength(o) == 0) {
51 dbDelete(rl->db,rl->key);
52 }
53 }
54 // 释放临时变量
55 decrRefCount(rl->key);
56 zfree(rl);
57 listDelNode(l,ln);
58 }
59 listRelease(l); /* We have the new list on place at this point. */
60 }
61}
复制代码
5. 小结
redis 是一个单进程服务,通过 IO 多路复用和事件循环提供高并发的服务,因此阻塞命令的实现需要分散到整个事件循环的各个环节中。
检测阻塞超时和返回超时信息都是异步进行的,很可能需要跨越事件循环,redis 默认每个事件循环的间隔为 100ms,这就是导致多出几百 ms 的“元凶”。
redis 每次进行添加操作的时候都会检测是否为阻塞的 list,并且在每次命令执行完成后都会对待弹出 key 进行弹出,保证阻塞命令拥有最高的弹出优先级。
redisServer 持有多个 redisDb,每个 redisDb 又可能对应多个 client,因此每个结构都需要持有不同的阻塞相关属性,用于各个环节的检测和返回。
最后附上一个阻塞命令涉及到的数据结构图:
作者介绍:
吴超艺,新房研发部,16 年 11 月加入贝壳找房,任职 PHP 研发工程师。
本文转载自公众号贝壳产品技术(ID:gh_9afeb423f390)。
原文链接:
https://mp.weixin.qq.com/s/ax1GP_fu8fHs9fNXMQKVyw
评论