写点什么

Kube-Proxy IPVS 模式源码分析

  • 2019-10-14
  • 本文字数:6664 字

    阅读完需:约 22 分钟

Kube-Proxy IPVS模式源码分析

kube-proxy 整体逻辑结构


这张时序图描述了 kube-proxy 的整体逻辑结构,由于 kub-proxy 组件和其它的 kube-* 组件一样都是使用 pflag 和 cobra 库去构建命令行应用程序。所以先简单介绍下该包的基本使用方式:


func main() {  command := &cobra.Command{    Use:   "echo [string to echo]",    Short: "Echo anything to the screen",    Long: `echo is for echoing anything back.Echo works a lot like print, except it has a child command.`,    Args: cobra.MinimumNArgs(1),    Run: func(cmd *cobra.Command, args []string) {      fmt.Println("Print: " + strings.Join(args, " "))    },  }
command.Execute()}
复制代码


上面这段代码就是使用 cobra 包的一个最简单的例子,首先初始化 Command 结构,其中该结构中的 Run 就是最终要执行的真正逻辑。当初始化完成 Command 之后,通过 commnad.Execute 去启动应用程序。


现在看上面的图就能比较直观的理解程序的启动机制了,这张图的整体过程就是对 Commnad 结构中的 Run 进行核心逻辑实现。也就是说 kube-proxy 核心逻辑入口就是从这里开始(Command.Run)。


在 Command.Run 中主要做了如下几件事,看下面的代码:


// Run runs the specified ProxyServer.func (o *Options) Run() error {  defer close(o.errCh)    //....  proxyServer, err := NewProxyServer(o)  if err != nil {    return err  }
if o.CleanupAndExit { return proxyServer.CleanupAndExit() }
o.proxyServer = proxyServer return o.runLoop()}
复制代码


1.对 ProxyServer 实例进行初始化。


2.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 true,则会将 userspace, iptables, ipvs 三种模式之前设置的所有规则清除掉,然后直接退出。


3.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 flase,则会调用 runLoop 来启动 ProxyServer 服务。


首先先来看看 ProxyServer 的结构定义:


type ProxyServer struct {  Client                 clientset.Interface   EventClient            v1core.EventsGetter  IptInterface           utiliptables.Interface  IpvsInterface          utilipvs.Interface  IpsetInterface         utilipset.Interface  execer                 exec.Interface  Proxier                proxy.ProxyProvider  Broadcaster            record.EventBroadcaster  Recorder               record.EventRecorder  ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration  Conntracker            Conntracker // if nil, ignored  ProxyMode              string  NodeRef                *v1.ObjectReference  CleanupIPVS            bool  MetricsBindAddress     string  EnableProfiling        bool  OOMScoreAdj            *int32  ConfigSyncPeriod       time.Duration  HealthzServer          *healthcheck.HealthzServer}
复制代码


在 ProxyServer 结构中包含了与 kube-apiserver 通信的 Client、操作 Iptables 的 IptInterface、操作 IPVS 的 IpvsInterface、操作 IpSet 的 IpsetInterface,以及通过 ProxyMode 参数获取基于 userspace, iptables, ipvs 三种方式中的哪种使用的 Proxier。


接下来重点介绍基于 ipvs 模式实现的 Proxier, 在 ipvs 模式下 Proxier 结构的定义:


type Proxier struct {  endpointsChanges *proxy.EndpointChangeTracker  serviceChanges   *proxy.ServiceChangeTracker
//... serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable //... syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
//... iptables utiliptables.Interface ipvs utilipvs.Interface ipset utilipset.Interface exec utilexec.Interface //... ipvsScheduler string}
复制代码


在 Proxier 结构中,先介绍下 async.BoundedFrequencyRunner,其它的在介绍 ProxyServer.Run 的时候介绍。


BoundedFrequencyRunner 的定义结构如下:


type BoundedFrequencyRunner struct {  name        string        // the name of this instance  minInterval time.Duration // the min time between runs, modulo bursts  maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations fn func() // function to run lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs}
复制代码


BoundedFrequencyRunner 结构中的 run 会异步的去定期的执行任务 fn,比如定期的执行 proxier.syncProxyRules 去创建或者更新 VirtuaServer 和 RealServer 并将 VirtualServer 的 VIP 绑定到 dummy interface(kube-ipvs0)。


下面是在 NewProxier 方法中初始化 BoundedFrequencyRunner 对象的示例:


proxier.syncRunner = async.NewBoundedFrequencyRunner(    "sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
复制代码


其中:


minSyncPeriod: 规则最小的更新时间


syncPeriod: 规则最大更新时间


proxier.syncProxyRules: 同步规则的实现函数(也是 kube-proxy 基于 ipvs 同步规则的核心实现)

ProxyServer 启动流程

这部分介绍下 ProxyServer.Run 的逻辑实现,ProxyServer 启动流程如下图所示:



在启动过程中,主要做了下面这几件事情:


  1. 启动健康检查服务 HealthzServer.

  2. 启动暴露监控指标的 MetricsServer.

  3. 如果需要调整系统的 conntrack 相关参数,则对系统的 conntrack 进行参数调整.

  4. 创建一个 informerFactory 实例,后面去通过 informerFactory 获取 kubernetes 的各类资源数据.

  5. 创建一个 ServiceConfig 实例,这个实例主要作用是实时的 WATCH kubernetes Service 资源的变化,并加入队列中,用于后续对变化的 Service 进行规则同步。

  6. 注册 servier event hander 到 Proxier.

  7. 启动 serviceConfig.


接下来详细的介绍下[4-7]这几步的流程。


ServiceConfig 的结构定义如下:


type ServiceConfig struct {  listerSynced  cache.InformerSynced  eventHandlers []ServiceHandler}
复制代码


ServiceHandler 的结构定义如下:


type ServiceHandler interface {  // OnServiceAdd is called whenever creation of new service object  // is observed.  OnServiceAdd(service *v1.Service)  // OnServiceUpdate is called whenever modification of an existing  // service object is observed.  OnServiceUpdate(oldService, service *v1.Service)  // OnServiceDelete is called whenever deletion of an existing service  // object is observed.  OnServiceDelete(service *v1.Service)  // OnServiceSynced is called once all the initial even handlers were  // called and the state is fully propagated to local cache.  OnServiceSynced()}
复制代码


创建 ServiceConfig 实例对象的具体实现如下:


func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {  result := &ServiceConfig{    listerSynced: serviceInformer.Informer().HasSynced,  }
serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: result.handleAddService, UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService, }, resyncPeriod, )
return result}
复制代码


  • 首先通过执行 serviceInformer.Informer().HasSynced 来将 kubernetes 下的所有 Service 资源同步到缓存 listerSynced 中。

  • 其次为 AddEventHandlerWithResyncPeriod 添加针对 Service 对象,添加,更新,删除的事件触发函数。当 Service 有相应的触发动作,就会调用相应的函数:handleAddService、handleUpdateService 和 handleDeleteService。


我们看看 handleAddService 触发函数的实现逻辑,具体代码如下:


func (c *ServiceConfig) handleAddService(obj interface{}) {  service, ok := obj.(*v1.Service)  if !ok {    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))    return  }  for i := range c.eventHandlers {    klog.V(4).Info("Calling handler.OnServiceAdd")    c.eventHandlers[i].OnServiceAdd(service)  }}
复制代码


当 watch 到 kubernetes 集群中有新的 Service 被创建之后,会触发 handleAddService 函数,并在该函数中遍历 eventHandlers 分别去调用 OnServiceAdd 来对 proxier 结构中的 serviceChanages 进行更新并去同步相应的规则。


OnServiceAdd 的具体实现逻辑如下:


// OnServiceAdd is called whenever creation of new service object is observed.func (proxier *Proxier) OnServiceAdd(service *v1.Service) {  proxier.OnServiceUpdate(nil, service)}
// OnServiceUpdate is called whenever modification of an existing service object is observed.func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() }}
复制代码


ServiceChangeTracker 的结构定义如下:


// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of// Services, keyed by their namespace and name.type ServiceChangeTracker struct {  // lock protects items.  lock sync.Mutex  // items maps a service to its serviceChange.  items map[types.NamespacedName]*serviceChange  // makeServiceInfo allows proxier to inject customized information when processing service.  makeServiceInfo makeServicePortFunc  // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.  isIPv6Mode *bool  recorder   record.EventRecorder}
复制代码


serviceChanage 的结构定义如下:


// serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,// changes are accumulated, i.e. previous is state from before applying the changes,// current is state after applying all of the changes.type serviceChange struct {  previous ServiceMap  current  ServiceMap}
复制代码


到这里在回过头来看上面的基于 IPVS 实现的 Proxier 的整体流程就完全通了,ProxyServer.Run 函数在启动时,通过 kubernetes LIST/WATCH 机制去实时的感知 kubernetes 集群 Service 资源的变化,然后不断的在更新 Proxier 结构中的 ServiceChanges,然后将变化的 Service 保存在 ServiceChanges 结构中的 ServiceMap 中,给后续的 async.BoundedFrequencyRunner 去执行同步规则函数 syncProxyRules 来使用。


8. endpointConfig 的实现机制和 serviceConfig 的机制完全一样,这里就不在详细的介绍了。


9.上面做的所有预处理工作,会在 informerFactory.Start 这步生效。


10. birthCry 的作用就是通过 event 的方式通知 kubernetes, kube-proxy 这边的所有准备工作都处理好了,我要启动了。


  s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")}
复制代码


11. 最终通过 SyncLoop 启动 kube-proxy 服务,并立刻执行 syncProxyRules 先来一遍同步再说.之后便会通过异步的方式定期的去同步 IPVS, Iptables, Ipset 的规则。


而 syncProxyRules 函数是 kube-proxy 实现的核心。主体逻辑是遍历 ServiceMap 并遍历 ServiceMap 下的 endpointsMap 及创建的 Service 类型(如: CLusterIP, Loadbalancer, NodePort)去分别创建相应的 IPVS 规则。


syncProxyRules 的函数实现定义如下:


func (proxier *Proxier) syncProxyRules() {  //.....
// Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { //......
// Handle traffic that loops back to the originator with SNAT. for _, e := range proxier.endpointsMap[svcName] { //.... }
// Capture the clusterIP. // ipset call entry := &utilipset.Entry{ IP: svcInfo.ClusterIP().String(), Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name)) continue } proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) // ipvs call serv := &utilipvs.VirtualServer{ Address: svcInfo.ClusterIP(), Port: uint16(svcInfo.Port()), Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } // Set session affinity flag and timeout for IPVS service if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcNameString, serv, true); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. if err := proxier.syncEndpoint(svcName, false, serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { klog.Errorf("Failed to sync service: %v, err: %v", serv, err) }
// Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPStrings() { //.... }
// Capture load-balancer ingress. for _, ingress := range svcInfo.LoadBalancerIPStrings() { //..... }
if svcInfo.NodePort() != 0 { //.... } }
// sync ipset entries for _, set := range proxier.ipsetList { set.syncIPSetEntries() }
// Tail call iptables rules for ipset, make sure only call iptables once // in a single loop per ip set. proxier.writeIptablesRules()
// Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes())
}
复制代码

总结

kube-proxy 的代码逻辑还是比较简洁的,整体的思想就是 kube-proxy 服务去 watch kubernetes 集群的 Service 和 Endpoint 对象,当这两个资源对象有状态变化时,会把它们保存在 ServiceMap 和 EndPonintMap 中,然后会通过 async.BoundedFrequencyRunner 去异步的执行 syncProxyRules 去下发规则。


本文转载自公众号 360 云计算(ID:hulktalk)


原文链接


https://mp.weixin.qq.com/s?__biz=MzU4ODgyMDI0Mg==&mid=2247486894&idx=1&sn=c39bafbcc79e6ea0a25fcb077a0b1128&chksm=fdd7b7d3caa03ec520bb4ef2ec98c498a1646e38f66d684b1124fe1aa2eb841bf4f2f080dec9&scene=27#wechat_redirect


2019-10-14 08:002073

评论

发布
暂无评论
发现更多内容

云原生2.0时代下,DevOps实践如何才能更加高效敏捷?

华为云开发者联盟

云计算 数字化 华为云

Nginx-技术专题-技术介绍

洛神灬殇

【活动回顾】WebRTC服务端工程实践和优化探索

ZEGO即构

WebRTC 服务端工程

一瞬间让我秒变“快男”!腾讯内部强推Java性能优化手册,快了不止一点点。

Java架构追梦

Java 架构 jdk 面试 性能优化

什么是云服务?

anyRTC开发者

音视频 WebRTC 云服务 RTC

#不吐不快# 三观很正的Boss,你遇到过么?

架构精进之路

职场成长 奇葩的经历 不吐不快

DàYé的CTO姗姗学步路

曲水流觞TechRill

管理 CTO

年轻人不讲武德不仅白piao接口测试知识还白piao接口测试工具会员

测试人生路

接口测试

【涂鸦物联网足迹】涂鸦云平台消息服务—顺带Pulsar简单介绍

IoT云工坊

人工智能 物联网 云服务 Apache Pulsar 云平台

分布式事务太繁琐?官方推荐Atomikos,5分钟帮你搞定

互联网应用架构

分布式事务 springboot

Dubbo 接口,导出 Markdown ,这些功能 DocView 现在都有了!

程序员小航

markdown idea插件 IntelliJ IDEA 文档生成 Doc View

SQL数据库:窗口函数

正向成长

窗口函数

Glide.with(view)挂在了谁的生命周期上

mengxn

生命周期 Glide Activity Fragment

#不吐不快# CV千千条,修改最重要。代码不规范,伙伴两行泪!

程序员小航

奇葩的经历 不吐不快

高性能利器!华为云MRS ClickHouse重磅推出!

华为云开发者联盟

数据库 Clickhouse MRS

SpringBoot:整合Swagger3.0与RESTful接口整合返回值(2020最新最易懂)

比伯

Java 编程 架构 面试 计算机

区块链在债券市场如何应用

CECBC

区块链 债券

IoT企业物联网平台,从设备端到云端业务系统全链路开发实战

不吃米饭

阿里云 最佳实践 物联网 IoT

synchronized 到底该不该用

古时的风筝

Java synchronized

科普干货|漫谈鸿蒙LiteOS-M与HUAWEI LiteOS内核的几大不同

华为云开发者联盟

华为 鸿蒙 IoT

太赞了!腾讯T3-3架构师整理了5000页的Java学习手册免费开放下载

Java架构之路

Java 程序员 架构 面试 编程语言

MySQL从库维护经验分享

Simon

MySQL 主从复制

CSS 排版与正常流 —— 重学CSS

三钻

CSS 排版

Jira停售Server版政策客观解读——如何最小化风险?

爱吃小舅的鱼

项目管理 研发管理 Jira Atlassian

什么是低代码(Low-Code)?

移动研发平台EMAS

工具 研发效能 低代码 开发 代码

小学妹问我:如何利用可视化工具排查问题?

田维常

可视化

一次 Java 进程 OOM 的排查分析(glibc 篇)

996小迁

Java 编程 架构 面试 计算机

区块链,音乐,流媒体和版税

CECBC

区块链 艺术

前嗅教你大数据——什么是代理IP?

前嗅大数据

爬虫 数据采集 静态IP 代理IP 动态IP

圆通快递回应内鬼泄露用户信息:严打数据倒卖灰色产业

石头IT视角

《垃圾回收的算法与实现》.pdf

田维常

垃圾回收

Kube-Proxy IPVS模式源码分析_语言 & 开发_王希刚_InfoQ精选文章