最近小编一直在做长连接相关的事情,最大的感触就是发版太痛苦,一个个踢掉连接然后发版,导致发版时长过长,操作繁琐。所以在想能不能实现优雅重启, 发版时客户端无感知。
难点
如何做到不中断接收连接
如何做到已有连接不中断
解决
如何做到不中断接受连接
以下是 linux 源码中 bind 的实现(linux-1.0)
// linux-1.0/net/socket.c 536
static int
sock_bind(int fd, struct sockaddr *umyaddr, int addrlen)
{
struct socket *sock;
int i;
DPRINTF((net_debug, "NET: sock_bind: fd = %d\n", fd));
if (fd < 0 || fd >= NR_OPEN || current->filp[fd] == NULL)
return(-EBADF);
//获取fd对应的socket结构
if (!(sock = sockfd_lookup(fd, NULL))) return(-ENOTSOCK);
// 转调用bind指向的函数,下层函数(inet_bind)
if ((i = sock->ops->bind(sock, umyaddr, addrlen)) < 0) {
DPRINTF((net_debug, "NET: sock_bind: bind failed\n"));
return(i);
}
return(0);
}
// linux-1.0/net/inet/sock.c 1012
static int
inet_bind(struct socket *sock, struct sockaddr *uaddr,
int addr_len)
{
...
outside_loop:
for(sk2 = sk->prot->sock_array[snum & (SOCK_ARRAY_SIZE -1)];
sk2 != NULL; sk2 = sk2->next) {
#if 1 /* should be below! */
if (sk2->num != snum) continue;
/* if (sk2->saddr != sk->saddr) continue; */
#endif
if (sk2->dead) {
destroy_sock(sk2);
goto outside_loop;
}
if (!sk->reuse) {
sti();
return(-EADDRINUSE);
}
if (sk2->num != snum) continue; /* more than one */
if (sk2->saddr != sk->saddr) continue; /* socket per slot ! -FB */
if (!sk2->reuse) {
sti();
return(-EADDRINUSE);
}
}
...
}
sock_array 是一个链式哈希表,保存着各端口号的 sock 结构
通过源码可以看到,bind 的时候会检测要绑定的地址和端口是否合法以及已被绑定, 如果发版时另一个进程和旧进程没有关系,则 bind 会返回错误 Address already in use
若旧进程 fork 出新进程,新进程和旧进程为父子关系,新进程继承旧进程的文件表,本身"本进程"就已经监听这个端口了,则不会出现上面的问题
如何做到已有连接不中断
新进程继承旧进程的用于连接的 fd,并且继续维持与客户端的心跳
linux 提供了 unix 域套接字可用于 socket 的传输, 新进程起来后通过 unix socket 通信继承旧进程所维护的连接
unix socket 用于*一台*主机的进程间通信,不需要基于网络协议,主要是基于文件系统的。
#include <sys/types.h>
#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
发送端调用 sendmsg 发送文件描述符,接收端调用 revmsg 接收文件描述符。
两进程共享同一打开文件表,这与 fork 之后的父子进程共享打开文件表的情况完全相同。
由此解决了文章开头提出的两个问题
Demo 实现
进程每次启动时必须 check 有无继承 socket(尝试连接本地的 unix server,如果连接失败,说明是第一次启动,否则可能有继承的 socket),如果有,就将 socket 加入到自己的连接池中, 并初始化连接状态
旧进程监听 USR2 信号(通知进程需要重启,使用信号、http 接口等都可),监听后动作:
1.监听 Unix socket, 等待新进程初始化完成,发来开始继承连接的请求
2.使用旧进程启动的命令 fork 一个子进程(发布到线上的新二进制)。
3.accept 到新进程的请求,关闭旧进程 listener(保证旧进程不会再接收新请求,同时所有 connector 不在进行 I/O 操作。
4.旧进程将现有连接的 socket,以及连接状态(读写 buffer,connect session)通过 unix socket 发送到新进程。
5.最后旧进程给新进程发送发送完毕信号,随后退出
以下是简单实现的 demo, demo 中实现较为简单,只实现了文件描述符的传递,没有实现各连接状态的传递。
// server.go
package main
import (
"flag"
"fmt"
"golang.org/x/sys/unix"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
)
var (
workSpace string
logger *log.Logger
writeTimeout = time.Second * 5
readTimeout = time.Second * 5
signalChan = make(chan os.Signal)
connFiles sync.Map
serverListener net.Listener
isUpdate = false
)
func init() {
flag.StringVar(&workSpace, "w", ".", "Usage:\n ./server -w=workspace")
flag.Parse()
file, err := os.OpenFile(filepath.Join(workSpace, "server.log"), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0777)
if err != nil {
panic(err)
}
logger = log.New(file, "", 11)
go beforeStart()
go signalHandler()
}
func main() {
var err error
serverListener, err = net.Listen("tcp", ":7000")
if err != nil {
panic(err)
}
for {
if isUpdate == true {
continue
}
conn, err := serverListener.Accept()
if err != nil {
logger.Println("conn error")
continue
}
c := conn.(*net.TCPConn)
go connectionHandler(c)
}
}
func connectionHandler(conn *net.TCPConn) {
file, _ := conn.File()
connFiles.Store(file, true)
logger.Printf("conn fd %d\n", file.Fd())
defer func() {
connFiles.Delete(file)
_ = conn.Close()
}()
for {
if isUpdate == true {
continue
}
err := conn.SetReadDeadline(time.Now().Add(readTimeout))
if err != nil {
logger.Println(err.Error())
return
}
rBuf := make([]byte, 4)
_, err = conn.Read(rBuf)
if err != nil {
logger.Println(err.Error())
return
}
if string(rBuf) != "ping" {
logger.Println("failed to parse the message " + string(rBuf))
return
}
err = conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err != nil {
logger.Println(err.Error())
return
}
_, err = conn.Write([]byte(`pong`))
if err != nil {
logger.Println(err.Error())
return
}
}
}
func beforeStart() {
connInterface, err := net.Dial("unix", filepath.Join(workSpace, "conn.sock"))
if err != nil {
logger.Println(err.Error())
return
}
defer func() {
_ = connInterface.Close()
}()
unixConn := connInterface.(*net.UnixConn)
b := make([]byte, 1)
oob := make([]byte, 32)
for {
err = unixConn.SetWriteDeadline(time.Now().Add(time.Minute * 3))
if err != nil {
fmt.Println(err.Error())
return
}
n, oobn, _, _, err := unixConn.ReadMsgUnix(b, oob)
if err != nil {
logger.Println(err.Error())
return
}
if n != 1 || b[0] != 0 {
if n != 1 {
logger.Printf("recv fd type error: %d\n", n)
} else {
logger.Println("init finish")
}
return
}
scms, err := unix.ParseSocketControlMessage(oob[0:oobn])
if err != nil {
logger.Println(err.Error())
return
}
if len(scms) != 1 {
logger.Printf("recv fd num != 1 : %d\n", len(scms))
return
}
fds, err := unix.ParseUnixRights(&scms[0])
if err != nil {
logger.Println(err.Error())
return
}
if len(fds) != 1 {
logger.Printf("recv fd num != 1 : %d\n", len(fds))
return
}
logger.Printf("recv fd %d\n", fds[0])
file := os.NewFile(uintptr(fds[0]), "fd-from-old")
conn, err := net.FileConn(file)
if err != nil {
logger.Println(err.Error())
return
}
go connectionHandler(conn.(*net.TCPConn))
}
}
func signalHandler() {
signal.Notify(
signalChan,
syscall.SIGUSR2,
)
for {
sc := <-signalChan
switch sc {
case syscall.SIGUSR2:
gracefulExit()
default:
continue
}
}
}
func gracefulExit() {
var connWait sync.WaitGroup
_ = syscall.Unlink(filepath.Join(workSpace, "conn.sock"))
listenerInterface, err := net.Listen("unix", filepath.Join(workSpace, "conn.sock"))
if err != nil {
logger.Println(err.Error())
return
}
defer func() {
_ = listenerInterface.Close()
}()
unixListener := listenerInterface.(*net.UnixListener)
connWait.Add(1)
go func() {
defer connWait.Done()
unixConn, err := unixListener.AcceptUnix()
if err != nil {
logger.Println(err.Error())
return
}
defer func() {
_ = unixConn.Close()
}()
connFiles.Range(func(key, value interface{}) bool {
if key == nil || value == nil {
return false
}
file := key.(*os.File)
defer func() {
_ = file.Close()
}()
buf := make([]byte, 1)
buf[0] = 0
rights := syscall.UnixRights(int(file.Fd()))
_, _, err := unixConn.WriteMsgUnix(buf, rights, nil)
if err != nil {
logger.Println(err.Error())
}
logger.Printf("send fd %d\n", file.Fd())
return true
})
finish := make([]byte, 1)
finish[0] = 1
_, _, err = unixConn.WriteMsgUnix(finish, nil, nil)
if err != nil {
logger.Println(err.Error())
}
}()
isUpdate = true
execSpec := &syscall.ProcAttr{
Env: os.Environ(),
Files: append([]uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()}),
}
pid, err := syscall.ForkExec(os.Args[0], os.Args, execSpec)
if err != nil {
logger.Println(err.Error())
return
}
logger.Printf("old process %d new process %d\n", os.Getpid(), pid)
_ = serverListener.Close()
connWait.Wait()
os.Exit(0)
}
// client.go
package main
import (
"fmt"
"net"
"time"
)
var (
writeTimeout = time.Second * 5
readTimeout = time.Second * 5
)
func main() {
conn, err := net.Dial("tcp", "127.0.0.1:7000")
if err != nil {
panic(err)
}
defer func() {
conn.Close()
}()
for {
time.Sleep(time.Second)
err := conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err != nil {
fmt.Println(err.Error())
break
}
fmt.Println("send ping")
_, err = conn.Write([]byte(`ping`))
if err != nil {
fmt.Println(err.Error())
break
}
err = conn.SetReadDeadline(time.Now().Add(readTimeout))
if err != nil {
fmt.Println(err.Error())
break
}
rBuf := make([]byte, 4)
_, err = conn.Read(rBuf)
if err != nil {
fmt.Println(err.Error())
}
fmt.Println("recv " + string(rBuf))
}
}
本文转载自 360 云计算公众号。
原文链接:https://mp.weixin.qq.com/s/be5NYjeqZ-lznXrEWD_ajA
更多内容推荐
状态机的概念与设计
一般情况下,状态触发器的数量是有限的,其状态数也是有限的,故称为有限状态机(Finite State Machine,简称为FSM)。状态机中所有触发器的时钟输入端被连接到一个公共时钟脉冲源上,其状态的转换是在同一时钟源的同一脉冲边沿同步进行的,所以它也被称作时
2023-02-09
44|一个程序多种功能:构建子命令与 flags
这节课,让我们打开分布式开发的大门,一起看看如何开发Master服务,实现任务的调度与故障容错。
2023-01-19
【Jvm】Jvm 类加载机制
1类加载时机 虚拟机把描述类的数据从 Class 文件加载到内存,并对数据进行校验、转换解析和初始化,最终形成可以被虚拟机直接使用的 Java 类型,这就是虚拟机的类加载机制。 在Java语言里面,类型的加载、连接和初始化过程都是在程序运行期间完成的
2022-09-23
38|浏览器原理(二):浏览器进程通信与网络渲染详解
Chrome中有这么多进程,它们之间如何进行 IPC 通信呢?
2022-10-28
大数据培训:Flink 调度器性能的提高
对于以 all-to-all 分布模式连接的两个 JobVertices ,上游ExecutionVertices产生的所有IntermediateResultPartitions都是同构的,大数据培训这意味着它们所连接的下游 ExecutionVertices 完全相同。
2022-02-14
【技术干货】代码示例:使用 Apache Flink 连接 TDengine
想用 Flink 对接 TDengine?保姆级教程来了。
2022-05-27
消失的死锁:从 JSF 线程池满到 JVM 初始化原理剖析 | 京东云技术团队
在一次上线时,按照正常流程上线后,观察了线上报文、接口可用率十分钟以上,未出现异常情况,结果在上线一小时后突然收到jsf线程池耗尽的报警,并且该应用一共有30台机器,只有一台机器出现该问题,迅速下线该机器的jsf接口,恢复线上。然后开始排查问题。
2023-06-14
MQTT over QUIC:下一代物联网标准协议为消息传输场景注入新动力
解决车联网、移动终端等弱网场景下消息延迟与高连接开销问题。
2022-08-11
详解 HTTP Keep-Alive 选项说明及注意事项
keep-Alive首部只是请求将连接保持在活跃状态。发出keep-alive请求之后,客户端和服务器并不一定会同意进行keep-alive会话。它们可以在任意时刻关闭空闲的keep-alive连接,并可随意限制keep-alive连接所处理事务的数量。
2023-04-20
好用的远程控制桌面连接工具有哪些?
好用的远程控制桌面连接工具有哪些?
2023-03-23
避免惊群以及负载均衡的原理与具体实现
主进程(master进程)fork出⼀批⼦进程(worker进程),⼦进程继承了⽗进程的监听端⼝(sockfd),就会出现accept惊群效应。子进程的fd属于同一个文件,若两个⼦进程同时调⽤accept进⾏阻塞监听,两个进程都会被挂起来,内核会在这个socket的等待队列wait qu
2022-05-24
35|秒级开发体验,如何实现容器热加载和一键调试?
这节课,我们来学习如何借助 Nocalhost 实现 Kubernetes 应用秒级的开发体验,提升开发循环反馈效率。
2023-02-27
启动报名 2022 南京智博会 第十四届南京国际智慧城市、物联网、大数据博览会
2022南京智博会
2022-04-02
30 |应用间通信(二):详解 Linux 进程 IPC
在大型商业系统中,通常需要消息队列和内存共享来使模块之间进行通信和协作,这节课我们就来学习这两种通信方式。
2022-10-10
34|服务注册与监听:Worker 节点与 etcd 交互
这节课,让我们将Worker节点变为一个支持GRPC与HTTP协议访问的服务,让它最终可以被Master服务和外部服务直接访问。
2022-12-27
1. 并发编程:channel 原理、底层实现与面试要点
2023-09-27
Linux 之 cat 命令
cat命令的用途是连接文件或者标准输入并打印。这个命令常用来显示文件内容,或者将几个文件拼接起来显示,或者从标准输入读取内容并显示,它常与重定向符号配合使用。
2021-12-19
UART
通用异步收发传输器(Universal Asynchronous Receiver/Transmitter,通常称为UART)是一种异步收发传输器,是电脑硬件的一部分,将数据透过串列通信进行传输。UART通常用在与其他通信接口(如EIA RS-232)的连接上。
2022-07-24
政企数智办公巡展回顾 | 通信赋能传统行业数智化转型的应用实践
近期,“连接无界 · 智赋未来” 融云 2023 政企数智办公巡展在北京、杭州相继举办,汇聚宏信动力创始人兼 CEO 刘己伟、茂屹科技总经理胡剑、网宿科技行业技术专家王天栋
2023-04-21
druid 源码阅读(四)返回一个连接
在初始化连接池后就要开始返回真正的数据库连接了。
2022-05-13
推荐阅读
Python 连接 es 笔记三之 es 更新操作
2023-11-21
21|Web 开发(上):如何使用 Axum 框架进行 Web 后端开发?
2023-12-11
21|平台移植:Windows 平台上的 eBPF 实现
2023-09-30
用户故事|驰往:借镜观形,学以致用
2023-11-13
Royal TSX for Mac(远程管理软件) v6.0.1 完整激活版
2023-11-20
透过一台电视,看到万家星闪
2023-11-22
升级企业数智化底座,助力企业实现数智连接
2023-06-30
电子书
大厂实战PPT下载
换一换 魏永明 | 北京飞漫软件技术有限公司 创始人兼 CEO
胡光朴 | 腾讯云音视频 高级音视频产品架构师
李友科 | 京东 搜索与推荐平台质量保障部总监
评论