
最近小编一直在做长连接相关的事情,最大的感触就是发版太痛苦,一个个踢掉连接然后发版,导致发版时长过长,操作繁琐。所以在想能不能实现优雅重启, 发版时客户端无感知。
难点
如何做到不中断接收连接
如何做到已有连接不中断
解决
如何做到不中断接受连接
以下是 linux 源码中 bind 的实现(linux-1.0)
// linux-1.0/net/socket.c 536static intsock_bind(int fd, struct sockaddr *umyaddr, int addrlen){ struct socket *sock; int i;
DPRINTF((net_debug, "NET: sock_bind: fd = %d\n", fd)); if (fd < 0 || fd >= NR_OPEN || current->filp[fd] == NULL) return(-EBADF); //获取fd对应的socket结构 if (!(sock = sockfd_lookup(fd, NULL))) return(-ENOTSOCK); // 转调用bind指向的函数,下层函数(inet_bind) if ((i = sock->ops->bind(sock, umyaddr, addrlen)) < 0) { DPRINTF((net_debug, "NET: sock_bind: bind failed\n")); return(i); } return(0);}
// linux-1.0/net/inet/sock.c 1012static intinet_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len){ ...outside_loop: for(sk2 = sk->prot->sock_array[snum & (SOCK_ARRAY_SIZE -1)]; sk2 != NULL; sk2 = sk2->next) {#if 1 /* should be below! */ if (sk2->num != snum) continue;/* if (sk2->saddr != sk->saddr) continue; */#endif if (sk2->dead) { destroy_sock(sk2); goto outside_loop; } if (!sk->reuse) { sti(); return(-EADDRINUSE); } if (sk2->num != snum) continue; /* more than one */ if (sk2->saddr != sk->saddr) continue; /* socket per slot ! -FB */ if (!sk2->reuse) { sti(); return(-EADDRINUSE); } } ... }
sock_array 是一个链式哈希表,保存着各端口号的 sock 结构
通过源码可以看到,bind 的时候会检测要绑定的地址和端口是否合法以及已被绑定, 如果发版时另一个进程和旧进程没有关系,则 bind 会返回错误 Address already in use
若旧进程 fork 出新进程,新进程和旧进程为父子关系,新进程继承旧进程的文件表,本身"本进程"就已经监听这个端口了,则不会出现上面的问题
如何做到已有连接不中断
新进程继承旧进程的用于连接的 fd,并且继续维持与客户端的心跳
linux 提供了 unix 域套接字可用于 socket 的传输, 新进程起来后通过 unix socket 通信继承旧进程所维护的连接
unix socket 用于*一台*主机的进程间通信,不需要基于网络协议,主要是基于文件系统的。
#include <sys/types.h>#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
发送端调用 sendmsg 发送文件描述符,接收端调用 revmsg 接收文件描述符。
两进程共享同一打开文件表,这与 fork 之后的父子进程共享打开文件表的情况完全相同。
由此解决了文章开头提出的两个问题
Demo 实现
进程每次启动时必须 check 有无继承 socket(尝试连接本地的 unix server,如果连接失败,说明是第一次启动,否则可能有继承的 socket),如果有,就将 socket 加入到自己的连接池中, 并初始化连接状态
旧进程监听 USR2 信号(通知进程需要重启,使用信号、http 接口等都可),监听后动作:
1.监听 Unix socket, 等待新进程初始化完成,发来开始继承连接的请求
2.使用旧进程启动的命令 fork 一个子进程(发布到线上的新二进制)。
3.accept 到新进程的请求,关闭旧进程 listener(保证旧进程不会再接收新请求,同时所有 connector 不在进行 I/O 操作。
4.旧进程将现有连接的 socket,以及连接状态(读写 buffer,connect session)通过 unix socket 发送到新进程。
5.最后旧进程给新进程发送发送完毕信号,随后退出
以下是简单实现的 demo, demo 中实现较为简单,只实现了文件描述符的传递,没有实现各连接状态的传递。
// server.go
package main
import ( "flag" "fmt" "golang.org/x/sys/unix" "log" "net" "os" "os/signal" "path/filepath" "sync" "syscall" "time")
var ( workSpace string
logger *log.Logger
writeTimeout = time.Second * 5 readTimeout = time.Second * 5
signalChan = make(chan os.Signal)
connFiles sync.Map
serverListener net.Listener
isUpdate = false)
func init() { flag.StringVar(&workSpace, "w", ".", "Usage:\n ./server -w=workspace") flag.Parse()
file, err := os.OpenFile(filepath.Join(workSpace, "server.log"), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0777) if err != nil { panic(err) } logger = log.New(file, "", 11) go beforeStart() go signalHandler()}
func main() { var err error serverListener, err = net.Listen("tcp", ":7000") if err != nil { panic(err) } for { if isUpdate == true { continue } conn, err := serverListener.Accept() if err != nil { logger.Println("conn error") continue } c := conn.(*net.TCPConn) go connectionHandler(c) }}
func connectionHandler(conn *net.TCPConn) { file, _ := conn.File() connFiles.Store(file, true) logger.Printf("conn fd %d\n", file.Fd()) defer func() { connFiles.Delete(file) _ = conn.Close() }() for { if isUpdate == true { continue } err := conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { logger.Println(err.Error()) return } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { logger.Println(err.Error()) return } if string(rBuf) != "ping" { logger.Println("failed to parse the message " + string(rBuf)) return } err = conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { logger.Println(err.Error()) return } _, err = conn.Write([]byte(`pong`)) if err != nil { logger.Println(err.Error()) return } }}
func beforeStart() { connInterface, err := net.Dial("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = connInterface.Close() }()
unixConn := connInterface.(*net.UnixConn)
b := make([]byte, 1) oob := make([]byte, 32) for { err = unixConn.SetWriteDeadline(time.Now().Add(time.Minute * 3)) if err != nil { fmt.Println(err.Error()) return } n, oobn, _, _, err := unixConn.ReadMsgUnix(b, oob) if err != nil { logger.Println(err.Error()) return } if n != 1 || b[0] != 0 { if n != 1 { logger.Printf("recv fd type error: %d\n", n) } else { logger.Println("init finish") } return } scms, err := unix.ParseSocketControlMessage(oob[0:oobn]) if err != nil { logger.Println(err.Error()) return } if len(scms) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(scms)) return } fds, err := unix.ParseUnixRights(&scms[0]) if err != nil { logger.Println(err.Error()) return } if len(fds) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(fds)) return } logger.Printf("recv fd %d\n", fds[0]) file := os.NewFile(uintptr(fds[0]), "fd-from-old") conn, err := net.FileConn(file) if err != nil { logger.Println(err.Error()) return } go connectionHandler(conn.(*net.TCPConn)) }}
func signalHandler() { signal.Notify( signalChan, syscall.SIGUSR2, ) for { sc := <-signalChan switch sc { case syscall.SIGUSR2: gracefulExit() default: continue } }}
func gracefulExit() { var connWait sync.WaitGroup _ = syscall.Unlink(filepath.Join(workSpace, "conn.sock")) listenerInterface, err := net.Listen("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = listenerInterface.Close() }() unixListener := listenerInterface.(*net.UnixListener) connWait.Add(1) go func() { defer connWait.Done() unixConn, err := unixListener.AcceptUnix() if err != nil { logger.Println(err.Error()) return } defer func() { _ = unixConn.Close() }() connFiles.Range(func(key, value interface{}) bool { if key == nil || value == nil { return false } file := key.(*os.File) defer func() { _ = file.Close() }() buf := make([]byte, 1) buf[0] = 0 rights := syscall.UnixRights(int(file.Fd())) _, _, err := unixConn.WriteMsgUnix(buf, rights, nil) if err != nil { logger.Println(err.Error()) } logger.Printf("send fd %d\n", file.Fd()) return true }) finish := make([]byte, 1) finish[0] = 1 _, _, err = unixConn.WriteMsgUnix(finish, nil, nil) if err != nil { logger.Println(err.Error()) } }()
isUpdate = true execSpec := &syscall.ProcAttr{ Env: os.Environ(), Files: append([]uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()}), }
pid, err := syscall.ForkExec(os.Args[0], os.Args, execSpec) if err != nil { logger.Println(err.Error()) return } logger.Printf("old process %d new process %d\n", os.Getpid(), pid) _ = serverListener.Close()
connWait.Wait() os.Exit(0)}// client.gopackage main
import ( "fmt" "net" "time")
var ( writeTimeout = time.Second * 5 readTimeout = time.Second * 5)
func main() { conn, err := net.Dial("tcp", "127.0.0.1:7000") if err != nil { panic(err) } defer func() { conn.Close() }() for { time.Sleep(time.Second) err := conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { fmt.Println(err.Error()) break } fmt.Println("send ping") _, err = conn.Write([]byte(`ping`)) if err != nil { fmt.Println(err.Error()) break } err = conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { fmt.Println(err.Error()) break } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { fmt.Println(err.Error()) } fmt.Println("recv " + string(rBuf)) }}
本文转载自 360 云计算公众号。
原文链接:https://mp.weixin.qq.com/s/be5NYjeqZ-lznXrEWD_ajA
更多内容推荐
30 |应用间通信(二):详解 Linux 进程 IPC
在大型商业系统中,通常需要消息队列和内存共享来使模块之间进行通信和协作,这节课我们就来学习这两种通信方式。
2022-10-10
Android-Framework 学习笔记(三)SystemServer 进程启动过程
{gCurRuntime->onZygoteInit();}
2021-11-05
升级企业数智化底座,助力企业实现数智连接
数字经济时代,数智化成为企业发展的根基和动力。
2023-06-30
(1-18/18) 推播式营销 vs. 集客式营销
天下再无推播式营销?或者说,推播式营销其实在互联网时代多多少少都会进化到集客式?
2022-01-22
【我和 openGauss 的故事】Navicat 连接 openGauss_5.0.0 企业版数据库
心有阳光 [openGauss](javascript:void(0);) 2023-08-03 16:49 发表于四川
2023-08-10
38|浏览器原理(二):浏览器进程通信与网络渲染详解
Chrome中有这么多进程,它们之间如何进行 IPC 通信呢?
2022-10-28
Django 笔记二十五之数据库函数之日期函数
本篇笔记主要介绍 Django 中使用连接的数据库的日期函数
2023-05-08
微服务连接:Subset 子集划分算法
微服务Subset子集划分算法
2022-11-19
Java | 继承
本文主要详细的介绍了Java中的继承,并且通过大量的举例,代码实战,带你深入浅出的理解继承。
2022-10-27
34|服务注册与监听:Worker 节点与 etcd 交互
这节课,让我们将Worker节点变为一个支持GRPC与HTTP协议访问的服务,让它最终可以被Master服务和外部服务直接访问。
2022-12-27
07|冰川之下:深入 Go 高并发网络模型
很多人认为,Go语言是开发网络服务的极佳选择。这节课,我们就深入看看这背后的道理是什么。
2022-10-25
C++ 中的 exec() 函数
exec()函数在C++中是一个进程控制函数,用于创建新进程执行其他程序或命令行指令。exec()函数可以替换当前进程的代码和数据,创建新的进程运行其他程序。exec()函数有多个版本,例如execl、execv、execle、execve等,根据不同的参数类型和个数来使用。
2023-06-30
软件工程高效学 | 软件工程基础
软件工程是一门指导进行计算机软件开发和维护的工程学科,涉及计算机科学、工程科学、管理科学等多学科,主要研究如何应用软件开发的科学理论和工程技术来指导大型软件系统的开发。
2023-03-28
4 种 LED 显示屏箱体连接方式
在安装的时候,LED显示屏箱体的连接显得异常重要,如果显示屏箱体连接不好,那么LED显示屏屏面的平整度、光滑度和清晰度都会大打折扣。5分钟带你了解户外LED显示屏。
2023-01-11
除了运行、休眠…进程居然还有僵尸、孤儿状态
本章我们将认识几种进程状态——运行状态、休眠状态、暂停状态、退出状态等。还要介绍两种具有惨烈身世的僵尸进程与孤儿进程~
2023-05-29
Android- 高级开发面试题以及答案整理,android 基础开发
onNewIntent()和onConfigurationChanged()onSaveInstanceState()和onRestoreInstanceState()Activity 到底是如何启动的
2021-11-02
C++ 中的继承和派生
C++ 中的继承是类与类之间的关系,是一个很简单很直观的概念,与现实世界中的继承类似,例如儿子继承父亲的财产。
2023-08-30
22|优雅地离场: Context 超时控制与原理
Context的作用是什么?应该如何去使用它?Context的最佳实践又是怎样的?
2022-11-29
17|巨人的肩膀:HTTP 协议与 Go 标准库原理
这节课,我们来看看当数据包到达对端服务器之后,操作系统和硬件会如何处理数据包。
2022-11-17
【机器学习】浅谈正规方程法 & 梯度下降
🤵♂️ 个人主页: @计算机魔术师👨💻 作者简介:CSDN内容合伙人,全栈领域优质创作者。
2022-08-28
推荐阅读
第一财经《大发求带回血的良心导师》MBA 智库百科
2023-09-02
1. 并发编程:channel 原理、底层实现与面试要点
2023-09-27
开源之夏 2023 | Databend 社区项目总结与分享
2023-11-22
从大模型到 MaaS 的新生态
2023-10-30
3、yml 配置文件加载流程源码解析
2023-09-28
21|平台移植:Windows 平台上的 eBPF 实现
2023-09-30
百家国企走进云投集团,探索世界 500 强数智化转型之路
2023-11-10
电子书

大厂实战PPT下载
换一换 
张敏 | 华为 AI 科学家
江岚 | 阿里云智能集团瓴羊 高级技术专家
吴彪 | 美团 数据科学与平台部/技术专家






评论