kubernetes cloud controller manager
Table of Contents
本文所有代码基于 1.16.0-alpha.2 commit: bdde11a664 所以引用文档版本为 1.15.0
1. 什么是 cloud controller manager(ccm)
在说 ccm 之前要了解一个 ccm 的前身 – cloud provider
cloud provider 是为了 k8s 更加容易在公有云环境下而提出的一个方案
比如在 aws azure 等环境下可以让 k8s 的资源与云厂商的资源进行匹配
具体的演进路线等可以阅读 这篇文章
2. 能用 ccm 干什么
在思索 ccm 可以做什么时,要思考一个问题:kubernetes 的核心价值在于哪里?
云主机厂商的本质上是在售卖计算资源与服务,而 k8s 的价值是在于管理与调度容器
正如 k8s 描述的一样: Production-Grade Container Scheduling and Management
k8s 更加关心容器的调度与管理,其他的资源也都是为了容器而服务的
那么有什么资源对于 k8s 来说是可以被替代的?
负载均衡、路由、主机
k8s 不关心主机是实际在东京还是西雅图,也不关心负载均衡具体是如何实现的
它只需要主机上的 kubelet 在正常运行,可以通过负载均衡访问到暴露的服务
而这些恰恰是云厂商最为关心的事情,主机的配置、主机的位置、负载均衡的实现、路由如何到达
这时候再来看 ccm 的接口
LoadBalancer() (LoadBalancer, bool) Instances() (Instances, bool) Zones() (Zones, bool) Clusters() (Clusters, bool) Routes() (Routes, bool) ProviderName() string HasClusterID() bool
这样就不难猜测出云主机厂商可以用 ccm 在做些什么事情了
每当用户通过 k8s 创建一个资源的时候,可以同时对云主机运行的实例进行相应更改
2.1. 现有的 ccm
ccm 目前被分为两种类型: In Tree/Out of Tree
2.1.1. In Tree
2.1.2. Out of Tree
Out of Tree 指的就是那些不在 kubernetes 代码当中的(没赶上车或社区内的方案)
然而不用很久,仅会有一种 ccm 就是 Out of Tree Cloud Controller Manager
Out of Tree 的代表
3. 如何实现一个 ccm
实现一个简单的 ccm 十分的简单
只需要实现这个文件中的所有接口
4. ccm 背后的秘密
4.1. Out of Tree ccm 如何工作
1.在 kubelet 启动时会检查启动参数中是否有 "–cloud-provider=external"
//staging/src/k8s.io/cloud-provider/plugins.go func IsExternal(name string) bool { return name == externalCloudProvider }
2.当 kubelet 执行 initialNode 时判断是否 taint node
//pkg/kubelet/kubelet_nodes_status.go func (kl *Kubelet) initialNode() (*v1.Node, error) { node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: string(kl.nodeName), Labels: map[string]string{ v1.LabelHostname: kl.hostname, v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH, kubeletapis.LabelOS: goruntime.GOOS, kubeletapis.LabelArch: goruntime.GOARCH, }, }, Spec: v1.NodeSpec{ Unschedulable: !kl.registerSchedulable, }, } nodeTaints := make([]v1.Taint, 0) if len(kl.registerWithTaints) > 0 { taints := make([]v1.Taint, len(kl.registerWithTaints)) for i := range kl.registerWithTaints { if err := k8s_api_v1.Convert_core_Taint_To_v1_Taint(&kl.registerWithTaints[i], &taints[i], nil); err != nil { return nil, err } } nodeTaints = append(nodeTaints, taints...) } unschedulableTaint := v1.Taint{ Key: schedulerapi.TaintNodeUnschedulable, Effect: v1.TaintEffectNoSchedule, } // If TaintNodesByCondition enabled, taint node with TaintNodeUnschedulable when initializing // node to avoid race condition; refer to #63897 for more detail. if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) { if node.Spec.Unschedulable && !taintutil.TaintExists(nodeTaints, &unschedulableTaint) { nodeTaints = append(nodeTaints, unschedulableTaint) } } if kl.externalCloudProvider { taint := v1.Taint{ Key: schedulerapi.TaintExternalCloudProvider, Value: "true", Effect: v1.TaintEffectNoSchedule, } nodeTaints = append(nodeTaints, taint) } if len(nodeTaints) > 0 { node.Spec.Taints = nodeTaints } // Initially, set NodeNetworkUnavailable to true. if kl.providerRequiresNetworkingConfiguration() { node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{ Type: v1.NodeNetworkUnavailable, Status: v1.ConditionTrue, Reason: "NoRouteCreated", Message: "Node created without a route", LastTransitionTime: metav1.NewTime(kl.clock.Now()), }) } if kl.enableControllerAttachDetach { if node.Annotations == nil { node.Annotations = make(map[string]string) } klog.Infof("Setting node annotation to enable volume controller attach/detach") node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true" } else { klog.Infof("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes") } if kl.keepTerminatedPodVolumes { if node.Annotations == nil { node.Annotations = make(map[string]string) } klog.Infof("Setting node annotation to keep pod volumes of terminated pods attached to the node") node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true" } // @question: should this be place after the call to the cloud provider? which also applies labels for k, v := range kl.nodeLabels { if cv, found := node.ObjectMeta.Labels[k]; found { klog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv) } node.ObjectMeta.Labels[k] = v } if kl.providerID != "" { node.Spec.ProviderID = kl.providerID } if kl.cloud != nil { instances, ok := kl.cloud.Instances() if !ok { return nil, fmt.Errorf("failed to get instances from cloud provider") } // TODO: We can't assume that the node has credentials to talk to the // cloudprovider from arbitrary nodes. At most, we should talk to a // local metadata server here. var err error if node.Spec.ProviderID == "" { node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(context.TODO(), kl.cloud, kl.nodeName) if err != nil { return nil, err } } instanceType, err := instances.InstanceType(context.TODO(), kl.nodeName) if err != nil { return nil, err } if instanceType != "" { klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType) node.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType } // If the cloud has zone information, label the node with the zone information zones, ok := kl.cloud.Zones() if ok { zone, err := zones.GetZone(context.TODO()) if err != nil { return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err) } if zone.FailureDomain != "" { klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain) node.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain } if zone.Region != "" { klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region) node.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region } } } kl.setNodeStatus(node) return node, nil }
3.当 ccm 的 pod 启动时会执行主函数创建 cmd 并执行
//cmd/cloud-controller-manager/controller-manager.go func main() { rand.Seed(time.Now().UnixNano()) command := app.NewCloudContrcollerManagerCommand() // TODO: once we switch everything over to Cobra commands, we can go back to calling // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the // normalize func and add the go flag set by hand. // utilflag.InitFlags() logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { os.Exit(1) } }
//cmd/cloud-controller-manager/app/controllermanager.go func NewCloudControllerManagerCommand() *cobra.Command { s, err := options.NewCloudControllerManagerOptions() if err != nil { klog.Fatalf("unable to initialize command options: %v", err) } cmd := &cobra.Command{ Use: "cloud-controller-manager", Long: `The Cloud controller manager is a daemon that embeds the cloud specific control loops shipped with Kubernetes.`, Run: func(cmd *cobra.Command, args []string) { verflag.PrintAndExitIfRequested() utilflag.PrintFlags(cmd.Flags()) c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List()) if err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } if err := Run(c.Complete(), wait.NeverStop); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }, } fs := cmd.Flags() namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List()) verflag.AddFlags(namedFlagSets.FlagSet("global")) globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name()) if flag.CommandLine.Lookup("cloud-provider-gce-lb-src-cidrs") != nil { // hoist this flag from the global flagset to preserve the commandline until // the gce cloudprovider is removed. globalflag.Register(namedFlagSets.FlagSet("generic"), "cloud-provider-gce-lb-src-cidrs") } for _, f := range namedFlagSets.FlagSets { fs.AddFlagSet(f) } usageFmt := "Usage:\n %s\n" cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) cmd.SetUsageFunc(func(cmd *cobra.Command) error { fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine()) cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols) return nil }) cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) { fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine()) cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols) }) return cmd }
4.在 ccm 执行 Run 时首先进行 ccm 的初始化;主要用来确定是 In Tree 还是 Out of Tree,并创建一个 cloudprovider 实例
//staging/src/k8s.io/cloud-provider/plugins.go func InitCloudProvider(name string, configFilePath string) (Interface, error) { var cloud Interface var err error if name == "" { klog.Info("No cloud provider specified.") return nil, nil } if IsExternal(name) { klog.Info("External cloud provider specified") return nil, nil } for _, provider := range deprecatedCloudProviders { if provider.name == name { detail := provider.detail if provider.external { detail = fmt.Sprintf("Please use 'external' cloud provider for %s: %s", name, provider.detail) } klog.Warningf("WARNING: %s built-in cloud provider is now deprecated. %s", name, detail) break } } if configFilePath != "" { var config *os.File config, err = os.Open(configFilePath) if err != nil { klog.Fatalf("Couldn't open cloud provider configuration %s: %#v", configFilePath, err) } defer config.Close() cloud, err = GetCloudProvider(name, config) } else { // Pass explicit nil so plugins can actually check for nil. See // "Why is my nil error value not equal to nil?" in golang.org/doc/faq. cloud, err = GetCloudProvider(name, nil) } if err != nil { return nil, fmt.Errorf("could not init cloud provider %q: %v", name, err) } if cloud == nil { return nil, fmt.Errorf("unknown cloud provider %q", name) } return cloud, nil }
5.在初始化结束后 ccm 将会启动一个 HTTP Server 并监听 10258 端口 由于 k8s 固定了 ccm 监听的端口,导致无法同时运行两个 ccm 随后会进行多个 ccm 实例之间的选举;选举机制保证了一个资源不会被 ccm 处理多次 选举成功后会进入到 ccm 的控制循环当中
if c.SecureServing != nil { unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) // TODO: handle stoppedCh returned by c.SecureServing.Serve if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { return err } } if c.InsecureServing != nil { unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}} handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn) if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil { return err } } run := func(ctx context.Context) { if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil { klog.Fatalf("error running controllers: %v", err) } } if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { run(context.TODO()) panic("unreachable") }
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, }, WatchDog: electionChecker, Name: "cloud-controller-manager", })
6.在 startControllers 中会设置一个 informer 给这个实例,用来监听 APIServer 的事件,并运行资源相应的控制器
func startControllers(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error { // Initialize the cloud provider with a reference to the clientBuilder cloud.Initialize(c.ClientBuilder, stopCh) // Set the informer on the user cloud object if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok { informerUserCloud.SetInformers(c.SharedInformers) } for controllerName, initFn := range controllers { if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) { klog.Warningf("%q is disabled", controllerName) continue } klog.V(1).Infof("Starting %q", controllerName) _, started, err := initFn(c, cloud, stopCh) if err != nil { klog.Errorf("Error starting %q", controllerName) return err } if !started { klog.Warningf("Skipping %q", controllerName) continue } klog.Infof("Started %q", controllerName) time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) } // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil { klog.Fatalf("Failed to wait for apiserver being healthy: %v", err) } c.SharedInformers.Start(stopCh)
func newControllerInitializers() map[string]initFunc { controllers := map[string]initFunc{} controllers["cloud-node"] = startCloudNodeController controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController controllers["service"] = startServiceController controllers["route"] = startRouteController return controllers }
func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { // Start the service controller serviceController, err := servicecontroller.New( cloud, ctx.ClientBuilder.ClientOrDie("service-controller"), ctx.SharedInformers.Core().V1().Services(), ctx.SharedInformers.Core().V1().Nodes(), ctx.ComponentConfig.KubeCloudShared.ClusterName, ) if err != nil { // This error shouldn't fail. It lives like this as a legacy. klog.Errorf("Failed to start service controller: %v", err) return nil, false, nil } go serviceController.Run(stopCh, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) return nil, true, nil }
7.然后进入到 k8s service controller 的运行当中
func (s *ServiceController) init() error { if s.cloud == nil { return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail") } balancer, ok := s.cloud.LoadBalancer() if !ok { return fmt.Errorf("the cloud provider does not support external load balancers") } s.balancer = balancer return nil }