kubernetes cloud controller manager

Table of Contents

本文所有代码基于 1.16.0-alpha.2 commit: bdde11a664
所以引用文档版本为 1.15.0

1. 什么是 cloud controller manager(ccm)

在说 ccm 之前要了解一个 ccm 的前身 – cloud provider

cloud provider 是为了 k8s 更加容易在公有云环境下而提出的一个方案

比如在 aws azure 等环境下可以让 k8s 的资源与云厂商的资源进行匹配

具体的演进路线等可以阅读 这篇文章

2. 能用 ccm 干什么

在思索 ccm 可以做什么时,要思考一个问题:kubernetes 的核心价值在于哪里?

云主机厂商的本质上是在售卖计算资源与服务,而 k8s 的价值是在于管理与调度容器

正如 k8s 描述的一样: Production-Grade Container Scheduling and Management

k8s 更加关心容器的调度与管理,其他的资源也都是为了容器而服务的

那么有什么资源对于 k8s 来说是可以被替代的?

负载均衡、路由、主机

k8s 不关心主机是实际在东京还是西雅图,也不关心负载均衡具体是如何实现的

它只需要主机上的 kubelet 在正常运行,可以通过负载均衡访问到暴露的服务

而这些恰恰是云厂商最为关心的事情,主机的配置、主机的位置、负载均衡的实现、路由如何到达

这时候再来看 ccm 的接口

LoadBalancer() (LoadBalancer, bool)
Instances() (Instances, bool)
Zones() (Zones, bool)
Clusters() (Clusters, bool)
Routes() (Routes, bool)
ProviderName() string
HasClusterID() bool

这样就不难猜测出云主机厂商可以用 ccm 在做些什么事情了

每当用户通过 k8s 创建一个资源的时候,可以同时对云主机运行的实例进行相应更改

2.1. 现有的 ccm

ccm 目前被分为两种类型: In Tree/Out of Tree

2.1.1. In Tree

In Tree 指的是在包含在 kubernetes 代码中的 cloud provider

In Tree 可以看作在 ccm 的概念提出来之前,k8s 为了兼容云厂商而做出的妥协

现在 k8s 流氓做的大,所以要让云厂商兼容他了,所以 k8s 正在着手移除 In Tree 的 ccm 相关代码

这里 可以看到 k8s 对于 In Tree ccm 的相关移除计划

这里 可以看到 k8s 中残余的 In Tree ccm

In Tree 的代表厂商

  1. AWS
  2. GCE
  3. OpenStack
  4. Azure
  5. vSphere

2.1.2. Out of Tree

Out of Tree 指的就是那些不在 kubernetes 代码当中的(没赶上车或社区内的方案)

然而不用很久,仅会有一种 ccm 就是 Out of Tree Cloud Controller Manager

Out of Tree 的代表

  1. DigitalOcean
  2. Oracle Cloud Infrastructure
  3. Rancher

3. 如何实现一个 ccm

实现一个简单的 ccm 十分的简单

只需要实现这个文件中的所有接口

4. ccm 背后的秘密

4.1. Out of Tree ccm 如何工作

1.在 kubelet 启动时会检查启动参数中是否有 "–cloud-provider=external"

//staging/src/k8s.io/cloud-provider/plugins.go
func IsExternal(name string) bool {
	return name == externalCloudProvider
}

2.当 kubelet 执行 initialNode 时判断是否 taint node

//pkg/kubelet/kubelet_nodes_status.go
func (kl *Kubelet) initialNode() (*v1.Node, error) {
	node := &v1.Node{
		ObjectMeta: metav1.ObjectMeta{
			Name: string(kl.nodeName),
			Labels: map[string]string{
				v1.LabelHostname:      kl.hostname,
				v1.LabelOSStable:      goruntime.GOOS,
				v1.LabelArchStable:    goruntime.GOARCH,
				kubeletapis.LabelOS:   goruntime.GOOS,
				kubeletapis.LabelArch: goruntime.GOARCH,
			},
		},
		Spec: v1.NodeSpec{
			Unschedulable: !kl.registerSchedulable,
		},
	}
	nodeTaints := make([]v1.Taint, 0)
	if len(kl.registerWithTaints) > 0 {
		taints := make([]v1.Taint, len(kl.registerWithTaints))
		for i := range kl.registerWithTaints {
			if err := k8s_api_v1.Convert_core_Taint_To_v1_Taint(&kl.registerWithTaints[i], &taints[i], nil); err != nil {
				return nil, err
			}
		}
		nodeTaints = append(nodeTaints, taints...)
	}

	unschedulableTaint := v1.Taint{
		Key:    schedulerapi.TaintNodeUnschedulable,
		Effect: v1.TaintEffectNoSchedule,
	}

	// If TaintNodesByCondition enabled, taint node with TaintNodeUnschedulable when initializing
	// node to avoid race condition; refer to #63897 for more detail.
	if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
		if node.Spec.Unschedulable &&
			!taintutil.TaintExists(nodeTaints, &unschedulableTaint) {
			nodeTaints = append(nodeTaints, unschedulableTaint)
		}
	}

	if kl.externalCloudProvider {
		taint := v1.Taint{
			Key:    schedulerapi.TaintExternalCloudProvider,
			Value:  "true",
			Effect: v1.TaintEffectNoSchedule,
		}

		nodeTaints = append(nodeTaints, taint)
	}
	if len(nodeTaints) > 0 {
		node.Spec.Taints = nodeTaints
	}
	// Initially, set NodeNetworkUnavailable to true.
	if kl.providerRequiresNetworkingConfiguration() {
		node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
			Type:               v1.NodeNetworkUnavailable,
			Status:             v1.ConditionTrue,
			Reason:             "NoRouteCreated",
			Message:            "Node created without a route",
			LastTransitionTime: metav1.NewTime(kl.clock.Now()),
		})
	}

	if kl.enableControllerAttachDetach {
		if node.Annotations == nil {
			node.Annotations = make(map[string]string)
		}

		klog.Infof("Setting node annotation to enable volume controller attach/detach")
		node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true"
	} else {
		klog.Infof("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes")
	}

	if kl.keepTerminatedPodVolumes {
		if node.Annotations == nil {
			node.Annotations = make(map[string]string)
		}
		klog.Infof("Setting node annotation to keep pod volumes of terminated pods attached to the node")
		node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true"
	}

	// @question: should this be place after the call to the cloud provider? which also applies labels
	for k, v := range kl.nodeLabels {
		if cv, found := node.ObjectMeta.Labels[k]; found {
			klog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv)
		}
		node.ObjectMeta.Labels[k] = v
	}

	if kl.providerID != "" {
		node.Spec.ProviderID = kl.providerID
	}

	if kl.cloud != nil {
		instances, ok := kl.cloud.Instances()
		if !ok {
			return nil, fmt.Errorf("failed to get instances from cloud provider")
		}

		// TODO: We can't assume that the node has credentials to talk to the
		// cloudprovider from arbitrary nodes. At most, we should talk to a
		// local metadata server here.
		var err error
		if node.Spec.ProviderID == "" {
			node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(context.TODO(), kl.cloud, kl.nodeName)
			if err != nil {
				return nil, err
			}
		}

		instanceType, err := instances.InstanceType(context.TODO(), kl.nodeName)
		if err != nil {
			return nil, err
		}
		if instanceType != "" {
			klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType)
			node.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType
		}
		// If the cloud has zone information, label the node with the zone information
		zones, ok := kl.cloud.Zones()
		if ok {
			zone, err := zones.GetZone(context.TODO())
			if err != nil {
				return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
			}
			if zone.FailureDomain != "" {
				klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain)
				node.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain
			}
			if zone.Region != "" {
				klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region)
				node.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region
			}
		}
	}

	kl.setNodeStatus(node)

	return node, nil
}

3.当 ccm 的 pod 启动时会执行主函数创建 cmd 并执行

//cmd/cloud-controller-manager/controller-manager.go
func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewCloudContrcollerManagerCommand()

	// TODO: once we switch everything over to Cobra commands, we can go back to calling
	// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
	// normalize func and add the go flag set by hand.
	// utilflag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

//cmd/cloud-controller-manager/app/controllermanager.go
func NewCloudControllerManagerCommand() *cobra.Command {
	s, err := options.NewCloudControllerManagerOptions()
	if err != nil {
		klog.Fatalf("unable to initialize command options: %v", err)
	}

	cmd := &cobra.Command{
		Use: "cloud-controller-manager",
		Long: `The Cloud controller manager is a daemon that embeds
the cloud specific control loops shipped with Kubernetes.`,
		Run: func(cmd *cobra.Command, args []string) {
			verflag.PrintAndExitIfRequested()
			utilflag.PrintFlags(cmd.Flags())

			c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
			if err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}

			if err := Run(c.Complete(), wait.NeverStop); err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}

		},
	}

	fs := cmd.Flags()
	namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
	verflag.AddFlags(namedFlagSets.FlagSet("global"))
	globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())

	if flag.CommandLine.Lookup("cloud-provider-gce-lb-src-cidrs") != nil {
		// hoist this flag from the global flagset to preserve the commandline until
		// the gce cloudprovider is removed.
		globalflag.Register(namedFlagSets.FlagSet("generic"), "cloud-provider-gce-lb-src-cidrs")
	}
	for _, f := range namedFlagSets.FlagSets {
		fs.AddFlagSet(f)
	}
	usageFmt := "Usage:\n  %s\n"
	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
	cmd.SetUsageFunc(func(cmd *cobra.Command) error {
		fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
		cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
		return nil
	})
	cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
		fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
		cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
	})

	return cmd
}

4.在 ccm 执行 Run 时首先进行 ccm 的初始化;主要用来确定是 In Tree 还是 Out of Tree,并创建一个 cloudprovider 实例

//staging/src/k8s.io/cloud-provider/plugins.go
func InitCloudProvider(name string, configFilePath string) (Interface, error) {
	var cloud Interface
	var err error

	if name == "" {
		klog.Info("No cloud provider specified.")
		return nil, nil
	}

	if IsExternal(name) {
		klog.Info("External cloud provider specified")
		return nil, nil
	}

	for _, provider := range deprecatedCloudProviders {
		if provider.name == name {
			detail := provider.detail
			if provider.external {
				detail = fmt.Sprintf("Please use 'external' cloud provider for %s: %s", name, provider.detail)
			}
			klog.Warningf("WARNING: %s built-in cloud provider is now deprecated. %s", name, detail)

			break
		}
	}

	if configFilePath != "" {
		var config *os.File
		config, err = os.Open(configFilePath)
		if err != nil {
			klog.Fatalf("Couldn't open cloud provider configuration %s: %#v",
				configFilePath, err)
		}

		defer config.Close()
		cloud, err = GetCloudProvider(name, config)
	} else {
		// Pass explicit nil so plugins can actually check for nil. See
		// "Why is my nil error value not equal to nil?" in golang.org/doc/faq.
		cloud, err = GetCloudProvider(name, nil)
	}

	if err != nil {
		return nil, fmt.Errorf("could not init cloud provider %q: %v", name, err)
	}
	if cloud == nil {
		return nil, fmt.Errorf("unknown cloud provider %q", name)
	}

	return cloud, nil
}

5.在初始化结束后 ccm 将会启动一个 HTTP Server 并监听 10258 端口 由于 k8s 固定了 ccm 监听的端口,导致无法同时运行两个 ccm 随后会进行多个 ccm 实例之间的选举;选举机制保证了一个资源不会被 ccm 处理多次 选举成功后会进入到 ccm 的控制循环当中

if c.SecureServing != nil {
	unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
	handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
	// TODO: handle stoppedCh returned by c.SecureServing.Serve
	if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
		return err
	}
}
if c.InsecureServing != nil {
	unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
	insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
	handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
	if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
		return err
	}
}

run := func(ctx context.Context) {
	if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil {
		klog.Fatalf("error running controllers: %v", err)
	}
}

if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
	run(context.TODO())
	panic("unreachable")
}

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
	Lock:          rl,
	LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
	RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
	RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
	Callbacks: leaderelection.LeaderCallbacks{
		OnStartedLeading: run,
		OnStoppedLeading: func() {
			klog.Fatalf("leaderelection lost")
		},
	},
	WatchDog: electionChecker,
	Name:     "cloud-controller-manager",
})

6.在 startControllers 中会设置一个 informer 给这个实例,用来监听 APIServer 的事件,并运行资源相应的控制器

func startControllers(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error {
	// Initialize the cloud provider with a reference to the clientBuilder
	cloud.Initialize(c.ClientBuilder, stopCh)
	// Set the informer on the user cloud object
	if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
		informerUserCloud.SetInformers(c.SharedInformers)
	}

	for controllerName, initFn := range controllers {
		if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) {
			klog.Warningf("%q is disabled", controllerName)
			continue
		}

		klog.V(1).Infof("Starting %q", controllerName)
		_, started, err := initFn(c, cloud, stopCh)
		if err != nil {
			klog.Errorf("Error starting %q", controllerName)
			return err
		}
		if !started {
			klog.Warningf("Skipping %q", controllerName)
			continue
		}
		klog.Infof("Started %q", controllerName)

		time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
	}

	// If apiserver is not running we should wait for some time and fail only then. This is particularly
	// important when we start apiserver and controller manager at the same time.
	if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil {
		klog.Fatalf("Failed to wait for apiserver being healthy: %v", err)
	}

	c.SharedInformers.Start(stopCh)

func newControllerInitializers() map[string]initFunc {
	controllers := map[string]initFunc{}
	controllers["cloud-node"] = startCloudNodeController
	controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
	controllers["service"] = startServiceController
	controllers["route"] = startRouteController
	return controllers
}

func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
	// Start the service controller
	serviceController, err := servicecontroller.New(
		cloud,
		ctx.ClientBuilder.ClientOrDie("service-controller"),
		ctx.SharedInformers.Core().V1().Services(),
		ctx.SharedInformers.Core().V1().Nodes(),
		ctx.ComponentConfig.KubeCloudShared.ClusterName,
	)
	if err != nil {
		// This error shouldn't fail. It lives like this as a legacy.
		klog.Errorf("Failed to start service controller: %v", err)
		return nil, false, nil
	}

	go serviceController.Run(stopCh, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs))

	return nil, true, nil
}

7.然后进入到 k8s service controller 的运行当中

func (s *ServiceController) init() error {
	if s.cloud == nil {
		return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail")
	}

	balancer, ok := s.cloud.LoadBalancer()
	if !ok {
		return fmt.Errorf("the cloud provider does not support external load balancers")
	}
	s.balancer = balancer

	return nil
}

5. 参考链接

Created: 2023-11-24 Fri 09:50

Validate