50万奖金+官方证书,深圳国际金融科技大赛正式启动,点击报名 了解详情
写点什么

浅谈长连接的平滑重启

  • 2020-02-26
  • 本文字数:4626 字

    阅读完需:约 15 分钟

浅谈长连接的平滑重启

最近小编一直在做长连接相关的事情,最大的感触就是发版太痛苦,一个个踢掉连接然后发版,导致发版时长过长,操作繁琐。所以在想能不能实现优雅重启, 发版时客户端无感知。

难点

  • 如何做到不中断接收连接

  • 如何做到已有连接不中断

解决

如何做到不中断接受连接

以下是 linux 源码中 bind 的实现(linux-1.0)


// linux-1.0/net/socket.c 536static intsock_bind(int fd, struct sockaddr *umyaddr, int addrlen){  struct socket *sock;  int i;
DPRINTF((net_debug, "NET: sock_bind: fd = %d\n", fd)); if (fd < 0 || fd >= NR_OPEN || current->filp[fd] == NULL) return(-EBADF); //获取fd对应的socket结构 if (!(sock = sockfd_lookup(fd, NULL))) return(-ENOTSOCK); // 转调用bind指向的函数,下层函数(inet_bind) if ((i = sock->ops->bind(sock, umyaddr, addrlen)) < 0) { DPRINTF((net_debug, "NET: sock_bind: bind failed\n")); return(i); } return(0);}
// linux-1.0/net/inet/sock.c 1012static intinet_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len){ ...outside_loop: for(sk2 = sk->prot->sock_array[snum & (SOCK_ARRAY_SIZE -1)]; sk2 != NULL; sk2 = sk2->next) {#if 1 /* should be below! */ if (sk2->num != snum) continue;/* if (sk2->saddr != sk->saddr) continue; */#endif if (sk2->dead) { destroy_sock(sk2); goto outside_loop; } if (!sk->reuse) { sti(); return(-EADDRINUSE); } if (sk2->num != snum) continue; /* more than one */ if (sk2->saddr != sk->saddr) continue; /* socket per slot ! -FB */ if (!sk2->reuse) { sti(); return(-EADDRINUSE); } } ... }
复制代码


  • sock_array 是一个链式哈希表,保存着各端口号的 sock 结构

  • 通过源码可以看到,bind 的时候会检测要绑定的地址和端口是否合法以及已被绑定, 如果发版时另一个进程和旧进程没有关系,则 bind 会返回错误 Address already in use

  • 若旧进程 fork 出新进程,新进程和旧进程为父子关系,新进程继承旧进程的文件表,本身"本进程"就已经监听这个端口了,则不会出现上面的问题


如何做到已有连接不中断


  • 新进程继承旧进程的用于连接的 fd,并且继续维持与客户端的心跳

  • linux 提供了 unix 域套接字可用于 socket 的传输, 新进程起来后通过 unix socket 通信继承旧进程所维护的连接

  • unix socket 用于*一台*主机的进程间通信,不需要基于网络协议,主要是基于文件系统的。


#include <sys/types.h>#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
复制代码


发送端调用 sendmsg 发送文件描述符,接收端调用 revmsg 接收文件描述符。


两进程共享同一打开文件表,这与 fork 之后的父子进程共享打开文件表的情况完全相同。


由此解决了文章开头提出的两个问题


Demo 实现


  • 进程每次启动时必须 check 有无继承 socket(尝试连接本地的 unix server,如果连接失败,说明是第一次启动,否则可能有继承的 socket),如果有,就将 socket 加入到自己的连接池中, 并初始化连接状态

  • 旧进程监听 USR2 信号(通知进程需要重启,使用信号、http 接口等都可),监听后动作:

  • 1.监听 Unix socket, 等待新进程初始化完成,发来开始继承连接的请求

  • 2.使用旧进程启动的命令 fork 一个子进程(发布到线上的新二进制)。

  • 3.accept 到新进程的请求,关闭旧进程 listener(保证旧进程不会再接收新请求,同时所有 connector 不在进行 I/O 操作。

  • 4.旧进程将现有连接的 socket,以及连接状态(读写 buffer,connect session)通过 unix socket 发送到新进程。

  • 5.最后旧进程给新进程发送发送完毕信号,随后退出

  • 以下是简单实现的 demo, demo 中实现较为简单,只实现了文件描述符的传递,没有实现各连接状态的传递。


// server.go
package main
import ( "flag" "fmt" "golang.org/x/sys/unix" "log" "net" "os" "os/signal" "path/filepath" "sync" "syscall" "time")
var ( workSpace string
logger *log.Logger
writeTimeout = time.Second * 5 readTimeout = time.Second * 5
signalChan = make(chan os.Signal)
connFiles sync.Map
serverListener net.Listener
isUpdate = false)
func init() { flag.StringVar(&workSpace, "w", ".", "Usage:\n ./server -w=workspace") flag.Parse()
file, err := os.OpenFile(filepath.Join(workSpace, "server.log"), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0777) if err != nil { panic(err) } logger = log.New(file, "", 11) go beforeStart() go signalHandler()}
func main() { var err error serverListener, err = net.Listen("tcp", ":7000") if err != nil { panic(err) } for { if isUpdate == true { continue } conn, err := serverListener.Accept() if err != nil { logger.Println("conn error") continue } c := conn.(*net.TCPConn) go connectionHandler(c) }}
func connectionHandler(conn *net.TCPConn) { file, _ := conn.File() connFiles.Store(file, true) logger.Printf("conn fd %d\n", file.Fd()) defer func() { connFiles.Delete(file) _ = conn.Close() }() for { if isUpdate == true { continue } err := conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { logger.Println(err.Error()) return } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { logger.Println(err.Error()) return } if string(rBuf) != "ping" { logger.Println("failed to parse the message " + string(rBuf)) return } err = conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { logger.Println(err.Error()) return } _, err = conn.Write([]byte(`pong`)) if err != nil { logger.Println(err.Error()) return } }}
func beforeStart() { connInterface, err := net.Dial("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = connInterface.Close() }()
unixConn := connInterface.(*net.UnixConn)
b := make([]byte, 1) oob := make([]byte, 32) for { err = unixConn.SetWriteDeadline(time.Now().Add(time.Minute * 3)) if err != nil { fmt.Println(err.Error()) return } n, oobn, _, _, err := unixConn.ReadMsgUnix(b, oob) if err != nil { logger.Println(err.Error()) return } if n != 1 || b[0] != 0 { if n != 1 { logger.Printf("recv fd type error: %d\n", n) } else { logger.Println("init finish") } return } scms, err := unix.ParseSocketControlMessage(oob[0:oobn]) if err != nil { logger.Println(err.Error()) return } if len(scms) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(scms)) return } fds, err := unix.ParseUnixRights(&scms[0]) if err != nil { logger.Println(err.Error()) return } if len(fds) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(fds)) return } logger.Printf("recv fd %d\n", fds[0]) file := os.NewFile(uintptr(fds[0]), "fd-from-old") conn, err := net.FileConn(file) if err != nil { logger.Println(err.Error()) return } go connectionHandler(conn.(*net.TCPConn)) }}
func signalHandler() { signal.Notify( signalChan, syscall.SIGUSR2, ) for { sc := <-signalChan switch sc { case syscall.SIGUSR2: gracefulExit() default: continue } }}
func gracefulExit() { var connWait sync.WaitGroup _ = syscall.Unlink(filepath.Join(workSpace, "conn.sock")) listenerInterface, err := net.Listen("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = listenerInterface.Close() }() unixListener := listenerInterface.(*net.UnixListener) connWait.Add(1) go func() { defer connWait.Done() unixConn, err := unixListener.AcceptUnix() if err != nil { logger.Println(err.Error()) return } defer func() { _ = unixConn.Close() }() connFiles.Range(func(key, value interface{}) bool { if key == nil || value == nil { return false } file := key.(*os.File) defer func() { _ = file.Close() }() buf := make([]byte, 1) buf[0] = 0 rights := syscall.UnixRights(int(file.Fd())) _, _, err := unixConn.WriteMsgUnix(buf, rights, nil) if err != nil { logger.Println(err.Error()) } logger.Printf("send fd %d\n", file.Fd()) return true }) finish := make([]byte, 1) finish[0] = 1 _, _, err = unixConn.WriteMsgUnix(finish, nil, nil) if err != nil { logger.Println(err.Error()) } }()
isUpdate = true execSpec := &syscall.ProcAttr{ Env: os.Environ(), Files: append([]uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()}), }
pid, err := syscall.ForkExec(os.Args[0], os.Args, execSpec) if err != nil { logger.Println(err.Error()) return } logger.Printf("old process %d new process %d\n", os.Getpid(), pid) _ = serverListener.Close()
connWait.Wait() os.Exit(0)}// client.gopackage main
import ( "fmt" "net" "time")
var ( writeTimeout = time.Second * 5 readTimeout = time.Second * 5)
func main() { conn, err := net.Dial("tcp", "127.0.0.1:7000") if err != nil { panic(err) } defer func() { conn.Close() }() for { time.Sleep(time.Second) err := conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { fmt.Println(err.Error()) break } fmt.Println("send ping") _, err = conn.Write([]byte(`ping`)) if err != nil { fmt.Println(err.Error()) break } err = conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { fmt.Println(err.Error()) break } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { fmt.Println(err.Error()) } fmt.Println("recv " + string(rBuf)) }}
复制代码


本文转载自 360 云计算公众号。


原文链接:https://mp.weixin.qq.com/s/be5NYjeqZ-lznXrEWD_ajA


2020-02-26 22:002042

评论

发布
暂无评论
发现更多内容

淘宝商品评论API调用教程:轻松获取用户评价数据(含测试Key)

代码忍者

淘宝API接口

“企业级敏捷教练课程” 4月19-20日 · CSP-SM认证周末班【报名即赠敏捷漂流记】

ShineScrum

Scrum 敏捷 Scrum Master CSM认证

如何利用MES系统进行产能分析呢?

万界星空科技

制造业 mes 万界星空科技mes 制造业转型 产能分析

挑战数据传输路由规划,与DeepSeek共探大模型算法优化

华为云开发者联盟

人工智能 动态内存 大模型 LLM 路由规划

京东工业平台商品列表API接口(京东工业API系列)

tbapi

京东API接口 京东工业平台接口 京东工业数据采集

iOS APP上线流程

北京木奇移动技术有限公司

软件外包公司 APP外包 iOS APP

化妆品行业需要使用堡垒机情形分析以及品牌推荐

行云管家

网络安全 信息安全 IT运维

Deepseek官网太卡,教你白嫖阿里云的Deepseek-R1满血版

卷福同学

阿里云 DeepSeek DeepSeek-R1、

Svelte 最新中文文档教程(15)—— Stores

冴羽

vue.js 前端 React Svelte SvelteKit

时序数据库 TDengine 化工新签约:存储降本一半,查询提速十倍

TDengine

数据库 大数据 时序数据库

智能制造与精益生产的深度融合

积木链小链

数字化转型 智能制造 精益生产

人工智能丨告别无效提问!零基础用户必备的DeepSeek高效沟通秘籍

测试人

远控软件怎么选?几款主流远控盘点对比

编程猫

2600 万表流计算分析如何做到? 时序数据库 TDengine 助力数百家超市智能化转型

TDengine

数据库 时序数据库

CRM管理系统(源码+文档+演示+讲解)

深圳亥时科技

快来报名 | KWDB 演示环境限时免费开放

KaiwuDB

数据库

CRM系统(源码+文档+讲解+演示)

深圳亥时科技

行云堡垒-助力深圳企业快速过等保

行云管家

等保 深圳 等保测评 过等保

【GreatSQL优化器-15】index merge

GreatSQL

国内十大专业SEO公司评测

科技汇

告别无效提问!零基础用户必备的DeepSeek高效沟通秘籍

测吧(北京)科技有限公司

测试

iOS APP性能优化

北京木奇移动技术有限公司

APP开发 软件外包公司 APP外包公司

浅谈长连接的平滑重启_行业深度_360云计算_InfoQ精选文章