写点什么

Kube-Proxy IPVS 模式源码分析

  • 2019-10-14
  • 本文字数:6664 字

    阅读完需:约 22 分钟

Kube-Proxy IPVS模式源码分析

kube-proxy 整体逻辑结构


这张时序图描述了 kube-proxy 的整体逻辑结构,由于 kub-proxy 组件和其它的 kube-* 组件一样都是使用 pflag 和 cobra 库去构建命令行应用程序。所以先简单介绍下该包的基本使用方式:


func main() {  command := &cobra.Command{    Use:   "echo [string to echo]",    Short: "Echo anything to the screen",    Long: `echo is for echoing anything back.Echo works a lot like print, except it has a child command.`,    Args: cobra.MinimumNArgs(1),    Run: func(cmd *cobra.Command, args []string) {      fmt.Println("Print: " + strings.Join(args, " "))    },  }
command.Execute()}
复制代码


上面这段代码就是使用 cobra 包的一个最简单的例子,首先初始化 Command 结构,其中该结构中的 Run 就是最终要执行的真正逻辑。当初始化完成 Command 之后,通过 commnad.Execute 去启动应用程序。


现在看上面的图就能比较直观的理解程序的启动机制了,这张图的整体过程就是对 Commnad 结构中的 Run 进行核心逻辑实现。也就是说 kube-proxy 核心逻辑入口就是从这里开始(Command.Run)。


在 Command.Run 中主要做了如下几件事,看下面的代码:


// Run runs the specified ProxyServer.func (o *Options) Run() error {  defer close(o.errCh)    //....  proxyServer, err := NewProxyServer(o)  if err != nil {    return err  }
if o.CleanupAndExit { return proxyServer.CleanupAndExit() }
o.proxyServer = proxyServer return o.runLoop()}
复制代码


1.对 ProxyServer 实例进行初始化。


2.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 true,则会将 userspace, iptables, ipvs 三种模式之前设置的所有规则清除掉,然后直接退出。


3.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 flase,则会调用 runLoop 来启动 ProxyServer 服务。


首先先来看看 ProxyServer 的结构定义:


type ProxyServer struct {  Client                 clientset.Interface   EventClient            v1core.EventsGetter  IptInterface           utiliptables.Interface  IpvsInterface          utilipvs.Interface  IpsetInterface         utilipset.Interface  execer                 exec.Interface  Proxier                proxy.ProxyProvider  Broadcaster            record.EventBroadcaster  Recorder               record.EventRecorder  ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration  Conntracker            Conntracker // if nil, ignored  ProxyMode              string  NodeRef                *v1.ObjectReference  CleanupIPVS            bool  MetricsBindAddress     string  EnableProfiling        bool  OOMScoreAdj            *int32  ConfigSyncPeriod       time.Duration  HealthzServer          *healthcheck.HealthzServer}
复制代码


在 ProxyServer 结构中包含了与 kube-apiserver 通信的 Client、操作 Iptables 的 IptInterface、操作 IPVS 的 IpvsInterface、操作 IpSet 的 IpsetInterface,以及通过 ProxyMode 参数获取基于 userspace, iptables, ipvs 三种方式中的哪种使用的 Proxier。


接下来重点介绍基于 ipvs 模式实现的 Proxier, 在 ipvs 模式下 Proxier 结构的定义:


type Proxier struct {  endpointsChanges *proxy.EndpointChangeTracker  serviceChanges   *proxy.ServiceChangeTracker
//... serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable //... syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
//... iptables utiliptables.Interface ipvs utilipvs.Interface ipset utilipset.Interface exec utilexec.Interface //... ipvsScheduler string}
复制代码


在 Proxier 结构中,先介绍下 async.BoundedFrequencyRunner,其它的在介绍 ProxyServer.Run 的时候介绍。


BoundedFrequencyRunner 的定义结构如下:


type BoundedFrequencyRunner struct {  name        string        // the name of this instance  minInterval time.Duration // the min time between runs, modulo bursts  maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations fn func() // function to run lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs}
复制代码


BoundedFrequencyRunner 结构中的 run 会异步的去定期的执行任务 fn,比如定期的执行 proxier.syncProxyRules 去创建或者更新 VirtuaServer 和 RealServer 并将 VirtualServer 的 VIP 绑定到 dummy interface(kube-ipvs0)。


下面是在 NewProxier 方法中初始化 BoundedFrequencyRunner 对象的示例:


proxier.syncRunner = async.NewBoundedFrequencyRunner(    "sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
复制代码


其中:


minSyncPeriod: 规则最小的更新时间


syncPeriod: 规则最大更新时间


proxier.syncProxyRules: 同步规则的实现函数(也是 kube-proxy 基于 ipvs 同步规则的核心实现)

ProxyServer 启动流程

这部分介绍下 ProxyServer.Run 的逻辑实现,ProxyServer 启动流程如下图所示:



在启动过程中,主要做了下面这几件事情:


  1. 启动健康检查服务 HealthzServer.

  2. 启动暴露监控指标的 MetricsServer.

  3. 如果需要调整系统的 conntrack 相关参数,则对系统的 conntrack 进行参数调整.

  4. 创建一个 informerFactory 实例,后面去通过 informerFactory 获取 kubernetes 的各类资源数据.

  5. 创建一个 ServiceConfig 实例,这个实例主要作用是实时的 WATCH kubernetes Service 资源的变化,并加入队列中,用于后续对变化的 Service 进行规则同步。

  6. 注册 servier event hander 到 Proxier.

  7. 启动 serviceConfig.


接下来详细的介绍下[4-7]这几步的流程。


ServiceConfig 的结构定义如下:


type ServiceConfig struct {  listerSynced  cache.InformerSynced  eventHandlers []ServiceHandler}
复制代码


ServiceHandler 的结构定义如下:


type ServiceHandler interface {  // OnServiceAdd is called whenever creation of new service object  // is observed.  OnServiceAdd(service *v1.Service)  // OnServiceUpdate is called whenever modification of an existing  // service object is observed.  OnServiceUpdate(oldService, service *v1.Service)  // OnServiceDelete is called whenever deletion of an existing service  // object is observed.  OnServiceDelete(service *v1.Service)  // OnServiceSynced is called once all the initial even handlers were  // called and the state is fully propagated to local cache.  OnServiceSynced()}
复制代码


创建 ServiceConfig 实例对象的具体实现如下:


func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {  result := &ServiceConfig{    listerSynced: serviceInformer.Informer().HasSynced,  }
serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: result.handleAddService, UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService, }, resyncPeriod, )
return result}
复制代码


  • 首先通过执行 serviceInformer.Informer().HasSynced 来将 kubernetes 下的所有 Service 资源同步到缓存 listerSynced 中。

  • 其次为 AddEventHandlerWithResyncPeriod 添加针对 Service 对象,添加,更新,删除的事件触发函数。当 Service 有相应的触发动作,就会调用相应的函数:handleAddService、handleUpdateService 和 handleDeleteService。


我们看看 handleAddService 触发函数的实现逻辑,具体代码如下:


func (c *ServiceConfig) handleAddService(obj interface{}) {  service, ok := obj.(*v1.Service)  if !ok {    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))    return  }  for i := range c.eventHandlers {    klog.V(4).Info("Calling handler.OnServiceAdd")    c.eventHandlers[i].OnServiceAdd(service)  }}
复制代码


当 watch 到 kubernetes 集群中有新的 Service 被创建之后,会触发 handleAddService 函数,并在该函数中遍历 eventHandlers 分别去调用 OnServiceAdd 来对 proxier 结构中的 serviceChanages 进行更新并去同步相应的规则。


OnServiceAdd 的具体实现逻辑如下:


// OnServiceAdd is called whenever creation of new service object is observed.func (proxier *Proxier) OnServiceAdd(service *v1.Service) {  proxier.OnServiceUpdate(nil, service)}
// OnServiceUpdate is called whenever modification of an existing service object is observed.func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() }}
复制代码


ServiceChangeTracker 的结构定义如下:


// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of// Services, keyed by their namespace and name.type ServiceChangeTracker struct {  // lock protects items.  lock sync.Mutex  // items maps a service to its serviceChange.  items map[types.NamespacedName]*serviceChange  // makeServiceInfo allows proxier to inject customized information when processing service.  makeServiceInfo makeServicePortFunc  // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.  isIPv6Mode *bool  recorder   record.EventRecorder}
复制代码


serviceChanage 的结构定义如下:


// serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,// changes are accumulated, i.e. previous is state from before applying the changes,// current is state after applying all of the changes.type serviceChange struct {  previous ServiceMap  current  ServiceMap}
复制代码


到这里在回过头来看上面的基于 IPVS 实现的 Proxier 的整体流程就完全通了,ProxyServer.Run 函数在启动时,通过 kubernetes LIST/WATCH 机制去实时的感知 kubernetes 集群 Service 资源的变化,然后不断的在更新 Proxier 结构中的 ServiceChanges,然后将变化的 Service 保存在 ServiceChanges 结构中的 ServiceMap 中,给后续的 async.BoundedFrequencyRunner 去执行同步规则函数 syncProxyRules 来使用。


8. endpointConfig 的实现机制和 serviceConfig 的机制完全一样,这里就不在详细的介绍了。


9.上面做的所有预处理工作,会在 informerFactory.Start 这步生效。


10. birthCry 的作用就是通过 event 的方式通知 kubernetes, kube-proxy 这边的所有准备工作都处理好了,我要启动了。


  s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")}
复制代码


11. 最终通过 SyncLoop 启动 kube-proxy 服务,并立刻执行 syncProxyRules 先来一遍同步再说.之后便会通过异步的方式定期的去同步 IPVS, Iptables, Ipset 的规则。


而 syncProxyRules 函数是 kube-proxy 实现的核心。主体逻辑是遍历 ServiceMap 并遍历 ServiceMap 下的 endpointsMap 及创建的 Service 类型(如: CLusterIP, Loadbalancer, NodePort)去分别创建相应的 IPVS 规则。


syncProxyRules 的函数实现定义如下:


func (proxier *Proxier) syncProxyRules() {  //.....
// Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { //......
// Handle traffic that loops back to the originator with SNAT. for _, e := range proxier.endpointsMap[svcName] { //.... }
// Capture the clusterIP. // ipset call entry := &utilipset.Entry{ IP: svcInfo.ClusterIP().String(), Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name)) continue } proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) // ipvs call serv := &utilipvs.VirtualServer{ Address: svcInfo.ClusterIP(), Port: uint16(svcInfo.Port()), Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } // Set session affinity flag and timeout for IPVS service if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcNameString, serv, true); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. if err := proxier.syncEndpoint(svcName, false, serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { klog.Errorf("Failed to sync service: %v, err: %v", serv, err) }
// Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPStrings() { //.... }
// Capture load-balancer ingress. for _, ingress := range svcInfo.LoadBalancerIPStrings() { //..... }
if svcInfo.NodePort() != 0 { //.... } }
// sync ipset entries for _, set := range proxier.ipsetList { set.syncIPSetEntries() }
// Tail call iptables rules for ipset, make sure only call iptables once // in a single loop per ip set. proxier.writeIptablesRules()
// Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes())
}
复制代码

总结

kube-proxy 的代码逻辑还是比较简洁的,整体的思想就是 kube-proxy 服务去 watch kubernetes 集群的 Service 和 Endpoint 对象,当这两个资源对象有状态变化时,会把它们保存在 ServiceMap 和 EndPonintMap 中,然后会通过 async.BoundedFrequencyRunner 去异步的执行 syncProxyRules 去下发规则。


本文转载自公众号 360 云计算(ID:hulktalk)


原文链接


https://mp.weixin.qq.com/s?__biz=MzU4ODgyMDI0Mg==&mid=2247486894&idx=1&sn=c39bafbcc79e6ea0a25fcb077a0b1128&chksm=fdd7b7d3caa03ec520bb4ef2ec98c498a1646e38f66d684b1124fe1aa2eb841bf4f2f080dec9&scene=27#wechat_redirect


2019-10-14 08:002106

评论

发布
暂无评论
发现更多内容

智能导向的用户钱包画像分析:揭秘 NFT 市场的秘密

Footprint Analytics

NFT 加密钱包

文心一言 VS 讯飞星火 VS chatgpt (176)-- 算法导论13.3 5题

福大大架构师每日一题

福大大架构师每日一题

软件测试/测试开发/全日制/测试管理丨Vue 访问 api 组件-axios

测试人

软件测试

软件测试/测试开发/全日制 |实践容器化部署与微服务治理

测吧(北京)科技有限公司

测试

tb商品详情数据抓取

tbapi

淘宝商品详情数据接口 淘宝API接口 天猫商品详情数据接口 天猫API接口 tb商品详情数据接口

软件测试/测试开发/全日制 | 前端工程化与持续集成的完美结合

测吧(北京)科技有限公司

测试

软件测试/测试开发/全日制|基于Vue.js构建现代化前端应用

测吧(北京)科技有限公司

测试

数据库与低代码:加速开发,提升效率的完美结合

不在线第一只蜗牛

MySQL 数据库 sql 低代码

软件测试/测试开发/全日制/测试管理丨Vue 页面布局组件-Vuetify

测试人

软件测试

体育直播系统源码提供哪些安全购买保障策略

软件开发-梦幻运营部

软件测试/测试开发/全日制 | 解密WebSocket在游戏开发中的应用

测吧(北京)科技有限公司

测试

DAPP盲盒游戏代币质押模式系统开发丨案例

l8l259l3365

软件测试/测试开发/全日制 |使用Django Channels构建实时应用

测吧(北京)科技有限公司

测试

实时云渲染赋能2023湾区文采会元宇宙虚拟展厅

3DCAT实时渲染

云VR 元宇宙虚拟展厅 线上展厅

API设计:从基础到优秀实践

高端章鱼哥

API

2023 IoTDB Summit:天谋科技产品负责人赵馨逸《利其器:如何用 IoTDB 可视化控制台实现高效管理与运维》

Apache IoTDB

探索AI技术的奥秘:揭秘人工智能的核心原理

快乐非自愿限量之名

人工智能 机器学习 AI技术

程序员一定要知道的!屎山代码风格指南(避免被优化&&避免被接盘)

快乐非自愿限量之名

程序员 代码审查 代码

Dubbo 3.3.0-beta 版本正式发布

阿里巴巴云原生

阿里云 云原生 dubbo

市场动态:在比特币 ETF 投机和摄氏度网络向挖矿转型的背景下,2024 年将保持稳定开局

区块链软件开发推广运营

dapp开发 区块链开发 链游开发 NFT开发 公链开发

数据库内核那些事|一文Get PolarDB IMCI如何对半结构化数据进行高效分析

阿里云瑶池数据库

数据库 阿里云 数据结构 云原生

探索web技术与低代码开发的融合应用

EquatorCoco

云计算 软件开发 低代码 web3

Kube-Proxy IPVS模式源码分析_语言 & 开发_王希刚_InfoQ精选文章