写点什么

浅谈长连接的平滑重启

  • 2020-02-26
  • 本文字数:4626 字

    阅读完需:约 15 分钟

浅谈长连接的平滑重启

最近小编一直在做长连接相关的事情,最大的感触就是发版太痛苦,一个个踢掉连接然后发版,导致发版时长过长,操作繁琐。所以在想能不能实现优雅重启, 发版时客户端无感知。

难点

  • 如何做到不中断接收连接

  • 如何做到已有连接不中断

解决

如何做到不中断接受连接

以下是 linux 源码中 bind 的实现(linux-1.0)


// linux-1.0/net/socket.c 536static intsock_bind(int fd, struct sockaddr *umyaddr, int addrlen){  struct socket *sock;  int i;
DPRINTF((net_debug, "NET: sock_bind: fd = %d\n", fd)); if (fd < 0 || fd >= NR_OPEN || current->filp[fd] == NULL) return(-EBADF); //获取fd对应的socket结构 if (!(sock = sockfd_lookup(fd, NULL))) return(-ENOTSOCK); // 转调用bind指向的函数,下层函数(inet_bind) if ((i = sock->ops->bind(sock, umyaddr, addrlen)) < 0) { DPRINTF((net_debug, "NET: sock_bind: bind failed\n")); return(i); } return(0);}
// linux-1.0/net/inet/sock.c 1012static intinet_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len){ ...outside_loop: for(sk2 = sk->prot->sock_array[snum & (SOCK_ARRAY_SIZE -1)]; sk2 != NULL; sk2 = sk2->next) {#if 1 /* should be below! */ if (sk2->num != snum) continue;/* if (sk2->saddr != sk->saddr) continue; */#endif if (sk2->dead) { destroy_sock(sk2); goto outside_loop; } if (!sk->reuse) { sti(); return(-EADDRINUSE); } if (sk2->num != snum) continue; /* more than one */ if (sk2->saddr != sk->saddr) continue; /* socket per slot ! -FB */ if (!sk2->reuse) { sti(); return(-EADDRINUSE); } } ... }
复制代码


  • sock_array 是一个链式哈希表,保存着各端口号的 sock 结构

  • 通过源码可以看到,bind 的时候会检测要绑定的地址和端口是否合法以及已被绑定, 如果发版时另一个进程和旧进程没有关系,则 bind 会返回错误 Address already in use

  • 若旧进程 fork 出新进程,新进程和旧进程为父子关系,新进程继承旧进程的文件表,本身"本进程"就已经监听这个端口了,则不会出现上面的问题


如何做到已有连接不中断


  • 新进程继承旧进程的用于连接的 fd,并且继续维持与客户端的心跳

  • linux 提供了 unix 域套接字可用于 socket 的传输, 新进程起来后通过 unix socket 通信继承旧进程所维护的连接

  • unix socket 用于*一台*主机的进程间通信,不需要基于网络协议,主要是基于文件系统的。


#include <sys/types.h>#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
复制代码


发送端调用 sendmsg 发送文件描述符,接收端调用 revmsg 接收文件描述符。


两进程共享同一打开文件表,这与 fork 之后的父子进程共享打开文件表的情况完全相同。


由此解决了文章开头提出的两个问题


Demo 实现


  • 进程每次启动时必须 check 有无继承 socket(尝试连接本地的 unix server,如果连接失败,说明是第一次启动,否则可能有继承的 socket),如果有,就将 socket 加入到自己的连接池中, 并初始化连接状态

  • 旧进程监听 USR2 信号(通知进程需要重启,使用信号、http 接口等都可),监听后动作:

  • 1.监听 Unix socket, 等待新进程初始化完成,发来开始继承连接的请求

  • 2.使用旧进程启动的命令 fork 一个子进程(发布到线上的新二进制)。

  • 3.accept 到新进程的请求,关闭旧进程 listener(保证旧进程不会再接收新请求,同时所有 connector 不在进行 I/O 操作。

  • 4.旧进程将现有连接的 socket,以及连接状态(读写 buffer,connect session)通过 unix socket 发送到新进程。

  • 5.最后旧进程给新进程发送发送完毕信号,随后退出

  • 以下是简单实现的 demo, demo 中实现较为简单,只实现了文件描述符的传递,没有实现各连接状态的传递。


// server.go
package main
import ( "flag" "fmt" "golang.org/x/sys/unix" "log" "net" "os" "os/signal" "path/filepath" "sync" "syscall" "time")
var ( workSpace string
logger *log.Logger
writeTimeout = time.Second * 5 readTimeout = time.Second * 5
signalChan = make(chan os.Signal)
connFiles sync.Map
serverListener net.Listener
isUpdate = false)
func init() { flag.StringVar(&workSpace, "w", ".", "Usage:\n ./server -w=workspace") flag.Parse()
file, err := os.OpenFile(filepath.Join(workSpace, "server.log"), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0777) if err != nil { panic(err) } logger = log.New(file, "", 11) go beforeStart() go signalHandler()}
func main() { var err error serverListener, err = net.Listen("tcp", ":7000") if err != nil { panic(err) } for { if isUpdate == true { continue } conn, err := serverListener.Accept() if err != nil { logger.Println("conn error") continue } c := conn.(*net.TCPConn) go connectionHandler(c) }}
func connectionHandler(conn *net.TCPConn) { file, _ := conn.File() connFiles.Store(file, true) logger.Printf("conn fd %d\n", file.Fd()) defer func() { connFiles.Delete(file) _ = conn.Close() }() for { if isUpdate == true { continue } err := conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { logger.Println(err.Error()) return } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { logger.Println(err.Error()) return } if string(rBuf) != "ping" { logger.Println("failed to parse the message " + string(rBuf)) return } err = conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { logger.Println(err.Error()) return } _, err = conn.Write([]byte(`pong`)) if err != nil { logger.Println(err.Error()) return } }}
func beforeStart() { connInterface, err := net.Dial("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = connInterface.Close() }()
unixConn := connInterface.(*net.UnixConn)
b := make([]byte, 1) oob := make([]byte, 32) for { err = unixConn.SetWriteDeadline(time.Now().Add(time.Minute * 3)) if err != nil { fmt.Println(err.Error()) return } n, oobn, _, _, err := unixConn.ReadMsgUnix(b, oob) if err != nil { logger.Println(err.Error()) return } if n != 1 || b[0] != 0 { if n != 1 { logger.Printf("recv fd type error: %d\n", n) } else { logger.Println("init finish") } return } scms, err := unix.ParseSocketControlMessage(oob[0:oobn]) if err != nil { logger.Println(err.Error()) return } if len(scms) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(scms)) return } fds, err := unix.ParseUnixRights(&scms[0]) if err != nil { logger.Println(err.Error()) return } if len(fds) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(fds)) return } logger.Printf("recv fd %d\n", fds[0]) file := os.NewFile(uintptr(fds[0]), "fd-from-old") conn, err := net.FileConn(file) if err != nil { logger.Println(err.Error()) return } go connectionHandler(conn.(*net.TCPConn)) }}
func signalHandler() { signal.Notify( signalChan, syscall.SIGUSR2, ) for { sc := <-signalChan switch sc { case syscall.SIGUSR2: gracefulExit() default: continue } }}
func gracefulExit() { var connWait sync.WaitGroup _ = syscall.Unlink(filepath.Join(workSpace, "conn.sock")) listenerInterface, err := net.Listen("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = listenerInterface.Close() }() unixListener := listenerInterface.(*net.UnixListener) connWait.Add(1) go func() { defer connWait.Done() unixConn, err := unixListener.AcceptUnix() if err != nil { logger.Println(err.Error()) return } defer func() { _ = unixConn.Close() }() connFiles.Range(func(key, value interface{}) bool { if key == nil || value == nil { return false } file := key.(*os.File) defer func() { _ = file.Close() }() buf := make([]byte, 1) buf[0] = 0 rights := syscall.UnixRights(int(file.Fd())) _, _, err := unixConn.WriteMsgUnix(buf, rights, nil) if err != nil { logger.Println(err.Error()) } logger.Printf("send fd %d\n", file.Fd()) return true }) finish := make([]byte, 1) finish[0] = 1 _, _, err = unixConn.WriteMsgUnix(finish, nil, nil) if err != nil { logger.Println(err.Error()) } }()
isUpdate = true execSpec := &syscall.ProcAttr{ Env: os.Environ(), Files: append([]uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()}), }
pid, err := syscall.ForkExec(os.Args[0], os.Args, execSpec) if err != nil { logger.Println(err.Error()) return } logger.Printf("old process %d new process %d\n", os.Getpid(), pid) _ = serverListener.Close()
connWait.Wait() os.Exit(0)}// client.gopackage main
import ( "fmt" "net" "time")
var ( writeTimeout = time.Second * 5 readTimeout = time.Second * 5)
func main() { conn, err := net.Dial("tcp", "127.0.0.1:7000") if err != nil { panic(err) } defer func() { conn.Close() }() for { time.Sleep(time.Second) err := conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { fmt.Println(err.Error()) break } fmt.Println("send ping") _, err = conn.Write([]byte(`ping`)) if err != nil { fmt.Println(err.Error()) break } err = conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { fmt.Println(err.Error()) break } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { fmt.Println(err.Error()) } fmt.Println("recv " + string(rBuf)) }}
复制代码


本文转载自 360 云计算公众号。


原文链接:https://mp.weixin.qq.com/s/be5NYjeqZ-lznXrEWD_ajA


2020-02-26 22:002081

评论

发布
暂无评论
发现更多内容

当中国诗词大会邂逅了Flutter,从此我的眼里只有你!,附架构师必备技术详解

android 程序员 移动开发

惨遭社会毒打,公司倒闭突然失业,程序员该如何在下次危机对准时狠狠还击

android 程序员 移动开发

微信小程序之商品属性分类 —— 微信小程序实战商城系列

android 程序员 移动开发

想掌握Android面试官必问的 Binder 机制?那别想绕开 Binder 驱动源码分析!

android 程序员 移动开发

我又开发了一个非常好用的开源库,调试Android数据库有救了 (1)

android 程序员 移动开发

我又开发了一个非常好用的开源库,调试Android数据库有救了

android 程序员 移动开发

当中国诗词大会邂逅了Flutter,从此我的眼里只有你!(1)

android 程序员 移动开发

微信小程序之加载更多(分页加载)实例 ,flutter瀑布流列表

android 程序员 移动开发

微信逆向之朋友圈,2021最新Android大厂面试真题大全

android 程序员 移动开发

我学习Android的一些套路,这份333页关于性能优化知识点的PDF你不能不看

android 程序员 移动开发

我才知道原来Flutter内置了10多种Button控件,音视频开发工程师抖音

android 程序员 移动开发

当 Android 的 Compat 库不能拯救你的时候,覆盖所有面试知识点

android 程序员 移动开发

怎样让你更快的完成工作去“摸鱼”,我的Android美团求职之路

android 程序员 移动开发

想搞懂Jetpack架构可以不搞懂生命周期知识吗?,阿里Android面试必问

android 程序员 移动开发

我想谈谈关于-Android-面试那些事,写给有开发经验的你们

android 程序员 移动开发

想掌握Android面试官必问的-Binder-机制?那别想绕开-Binder-驱动源码分析!

android 程序员 移动开发

当你面试的时候,被问到关于Fragment的种种,5年经验Android程序员面试27天

android 程序员 移动开发

当面试官要你说一下Activity的启动模式时,怎么回答最合适?标准答案在这里

android 程序员 移动开发

彻底理解Android架构,移动应用开发就业工资

android 程序员 移动开发

性能优化,还得看AspectJ,android高级开发实战

android 程序员 移动开发

我们来剖析一下这个Android猴子的面试过程,以及被问到的问题

android 程序员 移动开发

怎样才是刷面试题的正确姿势?Android400道面试题+通关知识宝典助你进大厂

android 程序员 移动开发

想进阶高级架构师,你需要养成这10个习惯!,掌握这套精编Android高级面试题解析

android 程序员 移动开发

成功逆袭:越来越胖怎么能忍,我的APK瘦身之路,完整版开放免费下载

android 程序员 移动开发

我们来剖析一下这个Android猴子的面试过程,以及被问到的问题(1)

android 程序员 移动开发

往事只能回味!春招 Android 开发岗:我居然三天就拿到了offer

android 程序员 移动开发

微博热门清华学霸的计划表刷屏,程序员该如何制定你的学习计划?

android 程序员 移动开发

我敢打赌!你从未见过如此简单的Dagger-导航---基于-Android-Studio-4-1

android 程序员 移动开发

当事人:现在就是非常后悔,开工那天没去上班,Flutter中网络图片加载和缓存源码分析

android 程序员 移动开发

总结了30个例子之后,我悟到了Flutter的布局原理,android移动开发基础答案

android 程序员 移动开发

成为一个优秀的Android开发者,需要必备哪些技术&工作技能?

android 程序员 移动开发

浅谈长连接的平滑重启_行业深度_360云计算_InfoQ精选文章