免费下载案例集|20+数字化领先企业人才培养实践经验 了解详情
写点什么

Kube-Proxy IPVS 模式源码分析

  • 2019-10-14
  • 本文字数:6664 字

    阅读完需:约 22 分钟

Kube-Proxy IPVS模式源码分析

kube-proxy 整体逻辑结构


这张时序图描述了 kube-proxy 的整体逻辑结构,由于 kub-proxy 组件和其它的 kube-* 组件一样都是使用 pflag 和 cobra 库去构建命令行应用程序。所以先简单介绍下该包的基本使用方式:


func main() {  command := &cobra.Command{    Use:   "echo [string to echo]",    Short: "Echo anything to the screen",    Long: `echo is for echoing anything back.Echo works a lot like print, except it has a child command.`,    Args: cobra.MinimumNArgs(1),    Run: func(cmd *cobra.Command, args []string) {      fmt.Println("Print: " + strings.Join(args, " "))    },  }
command.Execute()}
复制代码


上面这段代码就是使用 cobra 包的一个最简单的例子,首先初始化 Command 结构,其中该结构中的 Run 就是最终要执行的真正逻辑。当初始化完成 Command 之后,通过 commnad.Execute 去启动应用程序。


现在看上面的图就能比较直观的理解程序的启动机制了,这张图的整体过程就是对 Commnad 结构中的 Run 进行核心逻辑实现。也就是说 kube-proxy 核心逻辑入口就是从这里开始(Command.Run)。


在 Command.Run 中主要做了如下几件事,看下面的代码:


// Run runs the specified ProxyServer.func (o *Options) Run() error {  defer close(o.errCh)    //....  proxyServer, err := NewProxyServer(o)  if err != nil {    return err  }
if o.CleanupAndExit { return proxyServer.CleanupAndExit() }
o.proxyServer = proxyServer return o.runLoop()}
复制代码


1.对 ProxyServer 实例进行初始化。


2.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 true,则会将 userspace, iptables, ipvs 三种模式之前设置的所有规则清除掉,然后直接退出。


3.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 flase,则会调用 runLoop 来启动 ProxyServer 服务。


首先先来看看 ProxyServer 的结构定义:


type ProxyServer struct {  Client                 clientset.Interface   EventClient            v1core.EventsGetter  IptInterface           utiliptables.Interface  IpvsInterface          utilipvs.Interface  IpsetInterface         utilipset.Interface  execer                 exec.Interface  Proxier                proxy.ProxyProvider  Broadcaster            record.EventBroadcaster  Recorder               record.EventRecorder  ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration  Conntracker            Conntracker // if nil, ignored  ProxyMode              string  NodeRef                *v1.ObjectReference  CleanupIPVS            bool  MetricsBindAddress     string  EnableProfiling        bool  OOMScoreAdj            *int32  ConfigSyncPeriod       time.Duration  HealthzServer          *healthcheck.HealthzServer}
复制代码


在 ProxyServer 结构中包含了与 kube-apiserver 通信的 Client、操作 Iptables 的 IptInterface、操作 IPVS 的 IpvsInterface、操作 IpSet 的 IpsetInterface,以及通过 ProxyMode 参数获取基于 userspace, iptables, ipvs 三种方式中的哪种使用的 Proxier。


接下来重点介绍基于 ipvs 模式实现的 Proxier, 在 ipvs 模式下 Proxier 结构的定义:


type Proxier struct {  endpointsChanges *proxy.EndpointChangeTracker  serviceChanges   *proxy.ServiceChangeTracker
//... serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable //... syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
//... iptables utiliptables.Interface ipvs utilipvs.Interface ipset utilipset.Interface exec utilexec.Interface //... ipvsScheduler string}
复制代码


在 Proxier 结构中,先介绍下 async.BoundedFrequencyRunner,其它的在介绍 ProxyServer.Run 的时候介绍。


BoundedFrequencyRunner 的定义结构如下:


type BoundedFrequencyRunner struct {  name        string        // the name of this instance  minInterval time.Duration // the min time between runs, modulo bursts  maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations fn func() // function to run lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs}
复制代码


BoundedFrequencyRunner 结构中的 run 会异步的去定期的执行任务 fn,比如定期的执行 proxier.syncProxyRules 去创建或者更新 VirtuaServer 和 RealServer 并将 VirtualServer 的 VIP 绑定到 dummy interface(kube-ipvs0)。


下面是在 NewProxier 方法中初始化 BoundedFrequencyRunner 对象的示例:


proxier.syncRunner = async.NewBoundedFrequencyRunner(    "sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
复制代码


其中:


minSyncPeriod: 规则最小的更新时间


syncPeriod: 规则最大更新时间


proxier.syncProxyRules: 同步规则的实现函数(也是 kube-proxy 基于 ipvs 同步规则的核心实现)

ProxyServer 启动流程

这部分介绍下 ProxyServer.Run 的逻辑实现,ProxyServer 启动流程如下图所示:



在启动过程中,主要做了下面这几件事情:


  1. 启动健康检查服务 HealthzServer.

  2. 启动暴露监控指标的 MetricsServer.

  3. 如果需要调整系统的 conntrack 相关参数,则对系统的 conntrack 进行参数调整.

  4. 创建一个 informerFactory 实例,后面去通过 informerFactory 获取 kubernetes 的各类资源数据.

  5. 创建一个 ServiceConfig 实例,这个实例主要作用是实时的 WATCH kubernetes Service 资源的变化,并加入队列中,用于后续对变化的 Service 进行规则同步。

  6. 注册 servier event hander 到 Proxier.

  7. 启动 serviceConfig.


接下来详细的介绍下[4-7]这几步的流程。


ServiceConfig 的结构定义如下:


type ServiceConfig struct {  listerSynced  cache.InformerSynced  eventHandlers []ServiceHandler}
复制代码


ServiceHandler 的结构定义如下:


type ServiceHandler interface {  // OnServiceAdd is called whenever creation of new service object  // is observed.  OnServiceAdd(service *v1.Service)  // OnServiceUpdate is called whenever modification of an existing  // service object is observed.  OnServiceUpdate(oldService, service *v1.Service)  // OnServiceDelete is called whenever deletion of an existing service  // object is observed.  OnServiceDelete(service *v1.Service)  // OnServiceSynced is called once all the initial even handlers were  // called and the state is fully propagated to local cache.  OnServiceSynced()}
复制代码


创建 ServiceConfig 实例对象的具体实现如下:


func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {  result := &ServiceConfig{    listerSynced: serviceInformer.Informer().HasSynced,  }
serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: result.handleAddService, UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService, }, resyncPeriod, )
return result}
复制代码


  • 首先通过执行 serviceInformer.Informer().HasSynced 来将 kubernetes 下的所有 Service 资源同步到缓存 listerSynced 中。

  • 其次为 AddEventHandlerWithResyncPeriod 添加针对 Service 对象,添加,更新,删除的事件触发函数。当 Service 有相应的触发动作,就会调用相应的函数:handleAddService、handleUpdateService 和 handleDeleteService。


我们看看 handleAddService 触发函数的实现逻辑,具体代码如下:


func (c *ServiceConfig) handleAddService(obj interface{}) {  service, ok := obj.(*v1.Service)  if !ok {    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))    return  }  for i := range c.eventHandlers {    klog.V(4).Info("Calling handler.OnServiceAdd")    c.eventHandlers[i].OnServiceAdd(service)  }}
复制代码


当 watch 到 kubernetes 集群中有新的 Service 被创建之后,会触发 handleAddService 函数,并在该函数中遍历 eventHandlers 分别去调用 OnServiceAdd 来对 proxier 结构中的 serviceChanages 进行更新并去同步相应的规则。


OnServiceAdd 的具体实现逻辑如下:


// OnServiceAdd is called whenever creation of new service object is observed.func (proxier *Proxier) OnServiceAdd(service *v1.Service) {  proxier.OnServiceUpdate(nil, service)}
// OnServiceUpdate is called whenever modification of an existing service object is observed.func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() }}
复制代码


ServiceChangeTracker 的结构定义如下:


// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of// Services, keyed by their namespace and name.type ServiceChangeTracker struct {  // lock protects items.  lock sync.Mutex  // items maps a service to its serviceChange.  items map[types.NamespacedName]*serviceChange  // makeServiceInfo allows proxier to inject customized information when processing service.  makeServiceInfo makeServicePortFunc  // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.  isIPv6Mode *bool  recorder   record.EventRecorder}
复制代码


serviceChanage 的结构定义如下:


// serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,// changes are accumulated, i.e. previous is state from before applying the changes,// current is state after applying all of the changes.type serviceChange struct {  previous ServiceMap  current  ServiceMap}
复制代码


到这里在回过头来看上面的基于 IPVS 实现的 Proxier 的整体流程就完全通了,ProxyServer.Run 函数在启动时,通过 kubernetes LIST/WATCH 机制去实时的感知 kubernetes 集群 Service 资源的变化,然后不断的在更新 Proxier 结构中的 ServiceChanges,然后将变化的 Service 保存在 ServiceChanges 结构中的 ServiceMap 中,给后续的 async.BoundedFrequencyRunner 去执行同步规则函数 syncProxyRules 来使用。


8. endpointConfig 的实现机制和 serviceConfig 的机制完全一样,这里就不在详细的介绍了。


9.上面做的所有预处理工作,会在 informerFactory.Start 这步生效。


10. birthCry 的作用就是通过 event 的方式通知 kubernetes, kube-proxy 这边的所有准备工作都处理好了,我要启动了。


  s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")}
复制代码


11. 最终通过 SyncLoop 启动 kube-proxy 服务,并立刻执行 syncProxyRules 先来一遍同步再说.之后便会通过异步的方式定期的去同步 IPVS, Iptables, Ipset 的规则。


而 syncProxyRules 函数是 kube-proxy 实现的核心。主体逻辑是遍历 ServiceMap 并遍历 ServiceMap 下的 endpointsMap 及创建的 Service 类型(如: CLusterIP, Loadbalancer, NodePort)去分别创建相应的 IPVS 规则。


syncProxyRules 的函数实现定义如下:


func (proxier *Proxier) syncProxyRules() {  //.....
// Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { //......
// Handle traffic that loops back to the originator with SNAT. for _, e := range proxier.endpointsMap[svcName] { //.... }
// Capture the clusterIP. // ipset call entry := &utilipset.Entry{ IP: svcInfo.ClusterIP().String(), Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name)) continue } proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) // ipvs call serv := &utilipvs.VirtualServer{ Address: svcInfo.ClusterIP(), Port: uint16(svcInfo.Port()), Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } // Set session affinity flag and timeout for IPVS service if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcNameString, serv, true); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. if err := proxier.syncEndpoint(svcName, false, serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { klog.Errorf("Failed to sync service: %v, err: %v", serv, err) }
// Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPStrings() { //.... }
// Capture load-balancer ingress. for _, ingress := range svcInfo.LoadBalancerIPStrings() { //..... }
if svcInfo.NodePort() != 0 { //.... } }
// sync ipset entries for _, set := range proxier.ipsetList { set.syncIPSetEntries() }
// Tail call iptables rules for ipset, make sure only call iptables once // in a single loop per ip set. proxier.writeIptablesRules()
// Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes())
}
复制代码

总结

kube-proxy 的代码逻辑还是比较简洁的,整体的思想就是 kube-proxy 服务去 watch kubernetes 集群的 Service 和 Endpoint 对象,当这两个资源对象有状态变化时,会把它们保存在 ServiceMap 和 EndPonintMap 中,然后会通过 async.BoundedFrequencyRunner 去异步的执行 syncProxyRules 去下发规则。


本文转载自公众号 360 云计算(ID:hulktalk)


原文链接


https://mp.weixin.qq.com/s?__biz=MzU4ODgyMDI0Mg==&mid=2247486894&idx=1&sn=c39bafbcc79e6ea0a25fcb077a0b1128&chksm=fdd7b7d3caa03ec520bb4ef2ec98c498a1646e38f66d684b1124fe1aa2eb841bf4f2f080dec9&scene=27#wechat_redirect


2019-10-14 08:002022

评论

发布
暂无评论
发现更多内容

HarmonyOS音频开发指导:使用AudioRenderer开发音频播放功能

HarmonyOS开发者

HarmonyOS

软件测试|华新学院在2022 年全国大学生“火焰杯”软件测试高校就业选拔赛取得佳绩

霍格沃兹测试开发学社

四川华新学院在“火焰杯”软件测试高校就业选拔赛取得佳绩

测试人

软件测试

10.26 来 CNCC 2023 T16 展位,TDengine 精美周边等你来领!

TDengine

时序数据库 ​TDengine

人工智能改变日常生活和工作的未来

测吧(北京)科技有限公司

测试

大模型训练,提升AI能力的关键

百度开发者中心

大模型训练 LLM

培养AI领域的未来人才

测吧(北京)科技有限公司

测试

人工智能的潜在益处与风险

测吧(北京)科技有限公司

测试

安卓设备连接Mac必备的传输工具 MacDroid

展初云

OpenHarmony Meetup成都站招募令

OpenHarmony开发者

OpenHarmony

应对全球性挑战的AI解决方案

测吧(北京)科技有限公司

测试

Parallels Desktop 19 for Mac虚拟机

展初云

虚拟机 pd虚拟机 Mac安装win

大模型训练,提升AI能力的关键

百度开发者中心

深度学习 大模型

访问控制中PIP的典型流程和关键点思考

权说安全

访问控制

软件测试|第二届、第三届<火焰杯>软件测试开发选拔赛河北赛区颁奖典礼落幕

霍格沃兹测试开发学社

AI在创新和竞争力中的关键

测吧(北京)科技有限公司

测试

天下苦定制久矣,平台化建设到底难在哪里?

权说安全

零信任 统一门户

AI监管与政策:塑造人工智能未来

测吧(北京)科技有限公司

测试

如何让大模型生成更准确、可靠的结果?

鼎道智联

GPT

AI与就业:面对未来的失业风险

测吧(北京)科技有限公司

测试

文韬武略,创新无界,华为云1024程序员节精彩抢先看

华为云开发者联盟

程序员 开发者 华为云 华为云开发者联盟 华为云1024程序员节

LLM盛行下,如何高效训练大模型

百度开发者中心

大模型训练 LLM LLMOps

一种基于闭包函数实现自动化框架断言组件的设计实践 | 京东物流技术团队

京东科技开发者

闭包函数 企业号10月PK榜 测试脚本 断言组件

一次单据图片处理的优化实践 | 京东物流技术团队

京东科技开发者

性能优化 图片处理 pdfbox 企业号10月PK榜

人工智能对教育和职业的双重冲击

测吧(北京)科技有限公司

测试

用HarmonyOS做一个可以手势控制的电子相册应用(ArkTS)

HarmonyOS开发者

HarmonyOS

FC红白机游戏600合集 for mac

展初云

Mac 游戏 FC 红白机游戏

加速深度学习创新的引擎

百度开发者中心

大模型 LLM LLMOps

透明格栅屏与传统LED显示屏有什么区别?透明格栅屏用在什么地方?

Dylan

产品 LED LED显示屏

Kube-Proxy IPVS模式源码分析_语言 & 开发_王希刚_InfoQ精选文章