HarmonyOS开发者限时福利来啦!最高10w+现金激励等你拿~ 了解详情
写点什么

Kube-Proxy IPVS 模式源码分析

  • 2019-10-14
  • 本文字数:6664 字

    阅读完需:约 22 分钟

Kube-Proxy IPVS模式源码分析

kube-proxy 整体逻辑结构


这张时序图描述了 kube-proxy 的整体逻辑结构,由于 kub-proxy 组件和其它的 kube-* 组件一样都是使用 pflag 和 cobra 库去构建命令行应用程序。所以先简单介绍下该包的基本使用方式:


func main() {  command := &cobra.Command{    Use:   "echo [string to echo]",    Short: "Echo anything to the screen",    Long: `echo is for echoing anything back.Echo works a lot like print, except it has a child command.`,    Args: cobra.MinimumNArgs(1),    Run: func(cmd *cobra.Command, args []string) {      fmt.Println("Print: " + strings.Join(args, " "))    },  }
command.Execute()}
复制代码


上面这段代码就是使用 cobra 包的一个最简单的例子,首先初始化 Command 结构,其中该结构中的 Run 就是最终要执行的真正逻辑。当初始化完成 Command 之后,通过 commnad.Execute 去启动应用程序。


现在看上面的图就能比较直观的理解程序的启动机制了,这张图的整体过程就是对 Commnad 结构中的 Run 进行核心逻辑实现。也就是说 kube-proxy 核心逻辑入口就是从这里开始(Command.Run)。


在 Command.Run 中主要做了如下几件事,看下面的代码:


// Run runs the specified ProxyServer.func (o *Options) Run() error {  defer close(o.errCh)    //....  proxyServer, err := NewProxyServer(o)  if err != nil {    return err  }
if o.CleanupAndExit { return proxyServer.CleanupAndExit() }
o.proxyServer = proxyServer return o.runLoop()}
复制代码


1.对 ProxyServer 实例进行初始化。


2.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 true,则会将 userspace, iptables, ipvs 三种模式之前设置的所有规则清除掉,然后直接退出。


3.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 flase,则会调用 runLoop 来启动 ProxyServer 服务。


首先先来看看 ProxyServer 的结构定义:


type ProxyServer struct {  Client                 clientset.Interface   EventClient            v1core.EventsGetter  IptInterface           utiliptables.Interface  IpvsInterface          utilipvs.Interface  IpsetInterface         utilipset.Interface  execer                 exec.Interface  Proxier                proxy.ProxyProvider  Broadcaster            record.EventBroadcaster  Recorder               record.EventRecorder  ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration  Conntracker            Conntracker // if nil, ignored  ProxyMode              string  NodeRef                *v1.ObjectReference  CleanupIPVS            bool  MetricsBindAddress     string  EnableProfiling        bool  OOMScoreAdj            *int32  ConfigSyncPeriod       time.Duration  HealthzServer          *healthcheck.HealthzServer}
复制代码


在 ProxyServer 结构中包含了与 kube-apiserver 通信的 Client、操作 Iptables 的 IptInterface、操作 IPVS 的 IpvsInterface、操作 IpSet 的 IpsetInterface,以及通过 ProxyMode 参数获取基于 userspace, iptables, ipvs 三种方式中的哪种使用的 Proxier。


接下来重点介绍基于 ipvs 模式实现的 Proxier, 在 ipvs 模式下 Proxier 结构的定义:


type Proxier struct {  endpointsChanges *proxy.EndpointChangeTracker  serviceChanges   *proxy.ServiceChangeTracker
//... serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable //... syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
//... iptables utiliptables.Interface ipvs utilipvs.Interface ipset utilipset.Interface exec utilexec.Interface //... ipvsScheduler string}
复制代码


在 Proxier 结构中,先介绍下 async.BoundedFrequencyRunner,其它的在介绍 ProxyServer.Run 的时候介绍。


BoundedFrequencyRunner 的定义结构如下:


type BoundedFrequencyRunner struct {  name        string        // the name of this instance  minInterval time.Duration // the min time between runs, modulo bursts  maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations fn func() // function to run lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs}
复制代码


BoundedFrequencyRunner 结构中的 run 会异步的去定期的执行任务 fn,比如定期的执行 proxier.syncProxyRules 去创建或者更新 VirtuaServer 和 RealServer 并将 VirtualServer 的 VIP 绑定到 dummy interface(kube-ipvs0)。


下面是在 NewProxier 方法中初始化 BoundedFrequencyRunner 对象的示例:


proxier.syncRunner = async.NewBoundedFrequencyRunner(    "sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
复制代码


其中:


minSyncPeriod: 规则最小的更新时间


syncPeriod: 规则最大更新时间


proxier.syncProxyRules: 同步规则的实现函数(也是 kube-proxy 基于 ipvs 同步规则的核心实现)

ProxyServer 启动流程

这部分介绍下 ProxyServer.Run 的逻辑实现,ProxyServer 启动流程如下图所示:



在启动过程中,主要做了下面这几件事情:


  1. 启动健康检查服务 HealthzServer.

  2. 启动暴露监控指标的 MetricsServer.

  3. 如果需要调整系统的 conntrack 相关参数,则对系统的 conntrack 进行参数调整.

  4. 创建一个 informerFactory 实例,后面去通过 informerFactory 获取 kubernetes 的各类资源数据.

  5. 创建一个 ServiceConfig 实例,这个实例主要作用是实时的 WATCH kubernetes Service 资源的变化,并加入队列中,用于后续对变化的 Service 进行规则同步。

  6. 注册 servier event hander 到 Proxier.

  7. 启动 serviceConfig.


接下来详细的介绍下[4-7]这几步的流程。


ServiceConfig 的结构定义如下:


type ServiceConfig struct {  listerSynced  cache.InformerSynced  eventHandlers []ServiceHandler}
复制代码


ServiceHandler 的结构定义如下:


type ServiceHandler interface {  // OnServiceAdd is called whenever creation of new service object  // is observed.  OnServiceAdd(service *v1.Service)  // OnServiceUpdate is called whenever modification of an existing  // service object is observed.  OnServiceUpdate(oldService, service *v1.Service)  // OnServiceDelete is called whenever deletion of an existing service  // object is observed.  OnServiceDelete(service *v1.Service)  // OnServiceSynced is called once all the initial even handlers were  // called and the state is fully propagated to local cache.  OnServiceSynced()}
复制代码


创建 ServiceConfig 实例对象的具体实现如下:


func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {  result := &ServiceConfig{    listerSynced: serviceInformer.Informer().HasSynced,  }
serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: result.handleAddService, UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService, }, resyncPeriod, )
return result}
复制代码


  • 首先通过执行 serviceInformer.Informer().HasSynced 来将 kubernetes 下的所有 Service 资源同步到缓存 listerSynced 中。

  • 其次为 AddEventHandlerWithResyncPeriod 添加针对 Service 对象,添加,更新,删除的事件触发函数。当 Service 有相应的触发动作,就会调用相应的函数:handleAddService、handleUpdateService 和 handleDeleteService。


我们看看 handleAddService 触发函数的实现逻辑,具体代码如下:


func (c *ServiceConfig) handleAddService(obj interface{}) {  service, ok := obj.(*v1.Service)  if !ok {    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))    return  }  for i := range c.eventHandlers {    klog.V(4).Info("Calling handler.OnServiceAdd")    c.eventHandlers[i].OnServiceAdd(service)  }}
复制代码


当 watch 到 kubernetes 集群中有新的 Service 被创建之后,会触发 handleAddService 函数,并在该函数中遍历 eventHandlers 分别去调用 OnServiceAdd 来对 proxier 结构中的 serviceChanages 进行更新并去同步相应的规则。


OnServiceAdd 的具体实现逻辑如下:


// OnServiceAdd is called whenever creation of new service object is observed.func (proxier *Proxier) OnServiceAdd(service *v1.Service) {  proxier.OnServiceUpdate(nil, service)}
// OnServiceUpdate is called whenever modification of an existing service object is observed.func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() }}
复制代码


ServiceChangeTracker 的结构定义如下:


// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of// Services, keyed by their namespace and name.type ServiceChangeTracker struct {  // lock protects items.  lock sync.Mutex  // items maps a service to its serviceChange.  items map[types.NamespacedName]*serviceChange  // makeServiceInfo allows proxier to inject customized information when processing service.  makeServiceInfo makeServicePortFunc  // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.  isIPv6Mode *bool  recorder   record.EventRecorder}
复制代码


serviceChanage 的结构定义如下:


// serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,// changes are accumulated, i.e. previous is state from before applying the changes,// current is state after applying all of the changes.type serviceChange struct {  previous ServiceMap  current  ServiceMap}
复制代码


到这里在回过头来看上面的基于 IPVS 实现的 Proxier 的整体流程就完全通了,ProxyServer.Run 函数在启动时,通过 kubernetes LIST/WATCH 机制去实时的感知 kubernetes 集群 Service 资源的变化,然后不断的在更新 Proxier 结构中的 ServiceChanges,然后将变化的 Service 保存在 ServiceChanges 结构中的 ServiceMap 中,给后续的 async.BoundedFrequencyRunner 去执行同步规则函数 syncProxyRules 来使用。


8. endpointConfig 的实现机制和 serviceConfig 的机制完全一样,这里就不在详细的介绍了。


9.上面做的所有预处理工作,会在 informerFactory.Start 这步生效。


10. birthCry 的作用就是通过 event 的方式通知 kubernetes, kube-proxy 这边的所有准备工作都处理好了,我要启动了。


  s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")}
复制代码


11. 最终通过 SyncLoop 启动 kube-proxy 服务,并立刻执行 syncProxyRules 先来一遍同步再说.之后便会通过异步的方式定期的去同步 IPVS, Iptables, Ipset 的规则。


而 syncProxyRules 函数是 kube-proxy 实现的核心。主体逻辑是遍历 ServiceMap 并遍历 ServiceMap 下的 endpointsMap 及创建的 Service 类型(如: CLusterIP, Loadbalancer, NodePort)去分别创建相应的 IPVS 规则。


syncProxyRules 的函数实现定义如下:


func (proxier *Proxier) syncProxyRules() {  //.....
// Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { //......
// Handle traffic that loops back to the originator with SNAT. for _, e := range proxier.endpointsMap[svcName] { //.... }
// Capture the clusterIP. // ipset call entry := &utilipset.Entry{ IP: svcInfo.ClusterIP().String(), Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name)) continue } proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) // ipvs call serv := &utilipvs.VirtualServer{ Address: svcInfo.ClusterIP(), Port: uint16(svcInfo.Port()), Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } // Set session affinity flag and timeout for IPVS service if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcNameString, serv, true); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. if err := proxier.syncEndpoint(svcName, false, serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { klog.Errorf("Failed to sync service: %v, err: %v", serv, err) }
// Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPStrings() { //.... }
// Capture load-balancer ingress. for _, ingress := range svcInfo.LoadBalancerIPStrings() { //..... }
if svcInfo.NodePort() != 0 { //.... } }
// sync ipset entries for _, set := range proxier.ipsetList { set.syncIPSetEntries() }
// Tail call iptables rules for ipset, make sure only call iptables once // in a single loop per ip set. proxier.writeIptablesRules()
// Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes())
}
复制代码

总结

kube-proxy 的代码逻辑还是比较简洁的,整体的思想就是 kube-proxy 服务去 watch kubernetes 集群的 Service 和 Endpoint 对象,当这两个资源对象有状态变化时,会把它们保存在 ServiceMap 和 EndPonintMap 中,然后会通过 async.BoundedFrequencyRunner 去异步的执行 syncProxyRules 去下发规则。


本文转载自公众号 360 云计算(ID:hulktalk)


原文链接


https://mp.weixin.qq.com/s?__biz=MzU4ODgyMDI0Mg==&mid=2247486894&idx=1&sn=c39bafbcc79e6ea0a25fcb077a0b1128&chksm=fdd7b7d3caa03ec520bb4ef2ec98c498a1646e38f66d684b1124fe1aa2eb841bf4f2f080dec9&scene=27#wechat_redirect


2019-10-14 08:002039

评论

发布
暂无评论
发现更多内容

Linux C/C++ 学习路线总结!助我拿下腾讯offer

赖猫

后台开发 C/C++ Linux服务器开发

anyRTC 六周年 打造全网最低音视频价格

anyRTC开发者

音视频 WebRTC RTC sdk

Amazon Glue 版本 2.0 将作业启动时间缩短了 10 倍,现已全面开放!

亚马逊云科技 (Amazon Web Services)

限流与Guava RateLimiter原理解析

千珏

Java 微服务 限流算法 Guava 令牌桶

嵌入式程序调用函数的内部过程和机制

不脱发的程序猿

单片机 嵌入式程序 嵌入式设计

智慧党建三维云展厅可视化

一只数据鲸鱼

数据可视化 智慧党建 三维可视化

为啥你写的代码总是这么复杂?

华为云开发者联盟

软件 代码 代码注释 bug 复杂度

我崩溃了!BTAJ面试有关散列(哈希)表的面试题详解,电子版已问世

欢喜学安卓

android 程序员 面试 移动开发

Nginx负载均衡配置误区

运维研习社

nginx 负载均衡 5月日更

Amazon Route 53 Resolver 落地中国区,轻松玩转私有域名互访不是梦!| 新服务上线

亚马逊云科技 (Amazon Web Services)

堪称完美!淘宝内部百亿级Java高并发系统架构设计PDF手册分享

Java架构追梦

Java 架构 高并发 淘宝网 亿级架构设计

Spring Cloud Bus 消息总线介绍

阿里巴巴云原生

Java 微服务 云原生 中间件 数据格式

华为云PB级数据库GaussDB(for Redis)揭秘第十期:GaussDB(for Redis)迁移系列(上)

华为云开发者联盟

数据仓库 华为云 数据迁移 GaussDB(for Redis) PB级数据库

云图说|不要小看不起眼的日志,“小日志,大作用”

华为云开发者联盟

运维 日志 云日志服务 安全监控审计

官宣:恭喜 ChaosBlade 项目进入 CNCF Sandbox

阿里巴巴云原生

容器 云原生 k8s 监控 Go 语言

STM32电源框图解析(VDD、VSS、VDDA、VSSA、VREF+、VREF-、VBAT等的区别)

不脱发的程序猿

嵌入式 stm32 单片机 电源框图解析

数据采集之js自定义采集

大数据技术指南

大数据

再次荣获最受观众喜爱奖

Serverless Devs

阿里云 云原生 cncf #Serverless

CampusBulider(模模搭)学习笔记5:创建自定义建筑

ThingJS数字孪生引擎

大前端 可视化 3D 3D可视化 数字孪生

HuskyLens人工智能摄像头

不脱发的程序猿

人工智能 智能硬件 AIOT HuskyLens 人工智能摄像头

更灵活的边缘云原生运维:OpenYurt 单元化部署新增 Patch 特性

阿里巴巴云原生

容器 运维 云原生 中间件 边缘计算

如何做一场高质量的分享

阿里巴巴云原生

深度学习 开发者 云原生 分享

揭秘 Amazon Go 无人商店是如何炼成的!

亚马逊云科技 (Amazon Web Services)

“云演唱会”也有仪式感!能检票、可转赠,爱奇艺“云票”如何重构线上购票逻辑

爱奇艺技术产品团队

iMazing比iTunes好用在哪些地方

懒得勤快

2021年5月国产数据库排行榜:“华为高斯模式”取得成功,阿里OPA持续攀升

墨天轮

数据库 dba tdsql TiDB Gauss DB

论好文章和烂文章

阿里巴巴云原生

程序员 开发者 云原生 写作技巧 成长与思考

源码解析之Seata项目中的分布式ID生成算法

Coder的技术之路

分布式 分布式ID

怎么进大厂?166位Java工程师的大厂面试经验分享

北游学Java

Java 面试 大厂

MapReduce排序以及序列化

五分钟学大数据

大数据 hadoop mapreduce

如何高效地存储与检索大规模的图谱数据?

华为云开发者联盟

存储 知识图谱 检索 图结构 表结构

Kube-Proxy IPVS模式源码分析_语言 & 开发_王希刚_InfoQ精选文章