速来报名!AICon北京站鸿蒙专场~ 了解详情
写点什么

浅谈长连接的平滑重启

  • 2020-02-26
  • 本文字数:4626 字

    阅读完需:约 15 分钟

浅谈长连接的平滑重启

最近小编一直在做长连接相关的事情,最大的感触就是发版太痛苦,一个个踢掉连接然后发版,导致发版时长过长,操作繁琐。所以在想能不能实现优雅重启, 发版时客户端无感知。

难点

  • 如何做到不中断接收连接

  • 如何做到已有连接不中断

解决

如何做到不中断接受连接

以下是 linux 源码中 bind 的实现(linux-1.0)


// linux-1.0/net/socket.c 536static intsock_bind(int fd, struct sockaddr *umyaddr, int addrlen){  struct socket *sock;  int i;
DPRINTF((net_debug, "NET: sock_bind: fd = %d\n", fd)); if (fd < 0 || fd >= NR_OPEN || current->filp[fd] == NULL) return(-EBADF); //获取fd对应的socket结构 if (!(sock = sockfd_lookup(fd, NULL))) return(-ENOTSOCK); // 转调用bind指向的函数,下层函数(inet_bind) if ((i = sock->ops->bind(sock, umyaddr, addrlen)) < 0) { DPRINTF((net_debug, "NET: sock_bind: bind failed\n")); return(i); } return(0);}
// linux-1.0/net/inet/sock.c 1012static intinet_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len){ ...outside_loop: for(sk2 = sk->prot->sock_array[snum & (SOCK_ARRAY_SIZE -1)]; sk2 != NULL; sk2 = sk2->next) {#if 1 /* should be below! */ if (sk2->num != snum) continue;/* if (sk2->saddr != sk->saddr) continue; */#endif if (sk2->dead) { destroy_sock(sk2); goto outside_loop; } if (!sk->reuse) { sti(); return(-EADDRINUSE); } if (sk2->num != snum) continue; /* more than one */ if (sk2->saddr != sk->saddr) continue; /* socket per slot ! -FB */ if (!sk2->reuse) { sti(); return(-EADDRINUSE); } } ... }
复制代码


  • sock_array 是一个链式哈希表,保存着各端口号的 sock 结构

  • 通过源码可以看到,bind 的时候会检测要绑定的地址和端口是否合法以及已被绑定, 如果发版时另一个进程和旧进程没有关系,则 bind 会返回错误 Address already in use

  • 若旧进程 fork 出新进程,新进程和旧进程为父子关系,新进程继承旧进程的文件表,本身"本进程"就已经监听这个端口了,则不会出现上面的问题


如何做到已有连接不中断


  • 新进程继承旧进程的用于连接的 fd,并且继续维持与客户端的心跳

  • linux 提供了 unix 域套接字可用于 socket 的传输, 新进程起来后通过 unix socket 通信继承旧进程所维护的连接

  • unix socket 用于*一台*主机的进程间通信,不需要基于网络协议,主要是基于文件系统的。


#include <sys/types.h>#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
复制代码


发送端调用 sendmsg 发送文件描述符,接收端调用 revmsg 接收文件描述符。


两进程共享同一打开文件表,这与 fork 之后的父子进程共享打开文件表的情况完全相同。


由此解决了文章开头提出的两个问题


Demo 实现


  • 进程每次启动时必须 check 有无继承 socket(尝试连接本地的 unix server,如果连接失败,说明是第一次启动,否则可能有继承的 socket),如果有,就将 socket 加入到自己的连接池中, 并初始化连接状态

  • 旧进程监听 USR2 信号(通知进程需要重启,使用信号、http 接口等都可),监听后动作:

  • 1.监听 Unix socket, 等待新进程初始化完成,发来开始继承连接的请求

  • 2.使用旧进程启动的命令 fork 一个子进程(发布到线上的新二进制)。

  • 3.accept 到新进程的请求,关闭旧进程 listener(保证旧进程不会再接收新请求,同时所有 connector 不在进行 I/O 操作。

  • 4.旧进程将现有连接的 socket,以及连接状态(读写 buffer,connect session)通过 unix socket 发送到新进程。

  • 5.最后旧进程给新进程发送发送完毕信号,随后退出

  • 以下是简单实现的 demo, demo 中实现较为简单,只实现了文件描述符的传递,没有实现各连接状态的传递。


// server.go
package main
import ( "flag" "fmt" "golang.org/x/sys/unix" "log" "net" "os" "os/signal" "path/filepath" "sync" "syscall" "time")
var ( workSpace string
logger *log.Logger
writeTimeout = time.Second * 5 readTimeout = time.Second * 5
signalChan = make(chan os.Signal)
connFiles sync.Map
serverListener net.Listener
isUpdate = false)
func init() { flag.StringVar(&workSpace, "w", ".", "Usage:\n ./server -w=workspace") flag.Parse()
file, err := os.OpenFile(filepath.Join(workSpace, "server.log"), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0777) if err != nil { panic(err) } logger = log.New(file, "", 11) go beforeStart() go signalHandler()}
func main() { var err error serverListener, err = net.Listen("tcp", ":7000") if err != nil { panic(err) } for { if isUpdate == true { continue } conn, err := serverListener.Accept() if err != nil { logger.Println("conn error") continue } c := conn.(*net.TCPConn) go connectionHandler(c) }}
func connectionHandler(conn *net.TCPConn) { file, _ := conn.File() connFiles.Store(file, true) logger.Printf("conn fd %d\n", file.Fd()) defer func() { connFiles.Delete(file) _ = conn.Close() }() for { if isUpdate == true { continue } err := conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { logger.Println(err.Error()) return } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { logger.Println(err.Error()) return } if string(rBuf) != "ping" { logger.Println("failed to parse the message " + string(rBuf)) return } err = conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { logger.Println(err.Error()) return } _, err = conn.Write([]byte(`pong`)) if err != nil { logger.Println(err.Error()) return } }}
func beforeStart() { connInterface, err := net.Dial("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = connInterface.Close() }()
unixConn := connInterface.(*net.UnixConn)
b := make([]byte, 1) oob := make([]byte, 32) for { err = unixConn.SetWriteDeadline(time.Now().Add(time.Minute * 3)) if err != nil { fmt.Println(err.Error()) return } n, oobn, _, _, err := unixConn.ReadMsgUnix(b, oob) if err != nil { logger.Println(err.Error()) return } if n != 1 || b[0] != 0 { if n != 1 { logger.Printf("recv fd type error: %d\n", n) } else { logger.Println("init finish") } return } scms, err := unix.ParseSocketControlMessage(oob[0:oobn]) if err != nil { logger.Println(err.Error()) return } if len(scms) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(scms)) return } fds, err := unix.ParseUnixRights(&scms[0]) if err != nil { logger.Println(err.Error()) return } if len(fds) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(fds)) return } logger.Printf("recv fd %d\n", fds[0]) file := os.NewFile(uintptr(fds[0]), "fd-from-old") conn, err := net.FileConn(file) if err != nil { logger.Println(err.Error()) return } go connectionHandler(conn.(*net.TCPConn)) }}
func signalHandler() { signal.Notify( signalChan, syscall.SIGUSR2, ) for { sc := <-signalChan switch sc { case syscall.SIGUSR2: gracefulExit() default: continue } }}
func gracefulExit() { var connWait sync.WaitGroup _ = syscall.Unlink(filepath.Join(workSpace, "conn.sock")) listenerInterface, err := net.Listen("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = listenerInterface.Close() }() unixListener := listenerInterface.(*net.UnixListener) connWait.Add(1) go func() { defer connWait.Done() unixConn, err := unixListener.AcceptUnix() if err != nil { logger.Println(err.Error()) return } defer func() { _ = unixConn.Close() }() connFiles.Range(func(key, value interface{}) bool { if key == nil || value == nil { return false } file := key.(*os.File) defer func() { _ = file.Close() }() buf := make([]byte, 1) buf[0] = 0 rights := syscall.UnixRights(int(file.Fd())) _, _, err := unixConn.WriteMsgUnix(buf, rights, nil) if err != nil { logger.Println(err.Error()) } logger.Printf("send fd %d\n", file.Fd()) return true }) finish := make([]byte, 1) finish[0] = 1 _, _, err = unixConn.WriteMsgUnix(finish, nil, nil) if err != nil { logger.Println(err.Error()) } }()
isUpdate = true execSpec := &syscall.ProcAttr{ Env: os.Environ(), Files: append([]uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()}), }
pid, err := syscall.ForkExec(os.Args[0], os.Args, execSpec) if err != nil { logger.Println(err.Error()) return } logger.Printf("old process %d new process %d\n", os.Getpid(), pid) _ = serverListener.Close()
connWait.Wait() os.Exit(0)}// client.gopackage main
import ( "fmt" "net" "time")
var ( writeTimeout = time.Second * 5 readTimeout = time.Second * 5)
func main() { conn, err := net.Dial("tcp", "127.0.0.1:7000") if err != nil { panic(err) } defer func() { conn.Close() }() for { time.Sleep(time.Second) err := conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { fmt.Println(err.Error()) break } fmt.Println("send ping") _, err = conn.Write([]byte(`ping`)) if err != nil { fmt.Println(err.Error()) break } err = conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { fmt.Println(err.Error()) break } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { fmt.Println(err.Error()) } fmt.Println("recv " + string(rBuf)) }}
复制代码


本文转载自 360 云计算公众号。


原文链接:https://mp.weixin.qq.com/s/be5NYjeqZ-lznXrEWD_ajA


2020-02-26 22:001763

评论

发布
暂无评论
发现更多内容

女朋友要我讲解@Controller注解的原理,真是难为我了

Java你猿哥

Java spring Spring 配置解析

BT!GitHub开源阿里Java性能调优百宝书仅3小时,标星竟超过30k

Java你猿哥

Java JVM 性能调优 SSM框架 Java工程师

FastAPI入门

Liam

程序员 开发工具 API FastApi API 开发

用友iuap平台一站式服务,助力央国企推进数智化转型

用友BIP

苹果手机里面的udid怎么查出来

雪奈椰子

如何通过Java应用程序在PPT中创建SmartArt图形

在下毛毛雨

Java PowerPoint 添加艺术图形

行云管家堡垒机有免费的吗?谁能告诉一下!

行云管家

高新企业 堡垒机 行云管家

前端沙箱利用这些特性实现代码的隔离与限制

没有用户名丶

知你懂你,聪明得简直不像一台车,问界M5智驾版重磅升级鸿蒙3

Geek_2d6073

Zabbix5.0配置企业微信告警

A-刘晨阳

Linux zabbix 三周年连更

Python本地SQL文件对比工具

YUKI0506

Python 文件对比 对比工具 difflib

DLRover:云上自动扩缩容 DeepRec 分布式训练作业案例分享

AI Infra

程序员 AI 互联网 DLRover

这是你的云-云起实验室

六月的雨在InfoQ

开发者 实验室 三周年连更 云起实验室

国内Google翻译失效的解决方法(MAC/WIN)

互联网搬砖工作者

浪潮海岳低代码平台inBuilder开源社区版正式发布

科技热闻

站群SEO是什么意思?站群SEO怎么做效果才好?

海拥(haiyong.site)

三周年连更

硬核!万字神文精解高并发高可用系统实战,分布式系统一致性文档

做梦都在改BUG

Java 高可用 高并发 分布式一致性

最佳云转码,腾讯云MPS夺得MSU编码器大赛21项第一

科技热闻

【堡垒机小知识】堡垒机能记录操作时间、操作数据等等吗?

行云管家

网络安全 堡垒机

阿里大神整理的Java核心知识点和面试官常问到的知识点,压压惊

会踢球的程序源

Java 面试 求职 java面试 Java构架

什么是企业数智化的创新加速器?

用友BIP

技术大会 用友iuap 用友BIP 用友技术大会

Solr和Elasticsearch,搜索框架怎么选?

会踢球的程序源

Java

听说谛听闹退休?感知网络接班啦!

脑极体

感知网络

AutoCAD2024最新版介绍及autocad 2024系统要求

互联网搬砖工作者

集简云开放平台是什么?

集简云开放平台

简单的视频格式转换器:MacX Video Converter Pro中文版

真大的脸盆

Mac Mac 软件 视频格式转换 格式转换器

chrome调试技巧(一)

知心宝贝

前端 后端 调试 三周年连更

「云原生」Elasticsearch + Kibana on k8s 讲解与实战操作

会踢球的程序源

Java elasticsearch Kibana

GitHub和 Gitee联合编写最新版20w字Java全栈面试手册,简直无敌!

Java你猿哥

Java java面试 SSM框架 Java面经

ChatGPT无需API开发连接第三方系统,让舆情自动监控

集简云开放平台

数据集成 数据集成平台 Chat

企业全面数智化转型,国产替代成为安全保障

用友BIP

技术大会 用友iuap 数智化转型 用友BIP 用友技术大会

浅谈长连接的平滑重启_行业深度_360云计算_InfoQ精选文章