K8s源码阅读—Kubelet创建容器流程(一)

How Kubelet Creates Containers I

Posted by PYQ on April 8, 2024

当我们在k8s master节点发起创建pod/deployment时,需要经过很多步骤,这里只记录步骤6-9

  1. 提交请求:使用kubectl命令或Kubernetes API提交创建PodDeployment的请求时,该请求首先会发送到K8s-apiserver
  2. API服务器验证与授权k8s-apiserver会验证请求的合法性(例如,检查请求的格式是否正确)并进行授权检查,确保用户或服务有权执行该操作。
  3. 存储到etcd:一旦请求通过验证和授权,k8s-apiserver会将pod/deployment的定义存储到etcd中。
  4. 调度:创建pod/deployment的请求存储到etcd后,k8s-scheduler会被通知有新的pod/deployment需要调度。调度器会根据pod/deployment的需求(如资源需求、亲和性规则等)和集群的当前状态(如各节点的资源利用率)来选择一个最合适的节点来运行该pod/deployment
  5. 调度决策更新到etcd:一旦k8s-scheduler做出决策,它会将pod/deployment应该运行在哪个节点的信息更新到etcd中。
  6. Kubelet响应Kubelet会定期从k8s-apiserver查询它负责的pod/deployment信息。当Kubelet发现有新的pod/deployment需要在其节点上运行时,它会开始准备创建pod/deployment的容器。
  7. 容器运行时拉取镜像:如果pod/deployment的容器使用的镜像不在节点上,Kubelet会指示容器运行时(如Dockercontainerd等)拉取镜像。
  8. 创建容器:镜像拉取完成后,Kubelet会创建容器。这包括设置网络、挂载存储卷等必要的准备工作。
  9. 启动容器:容器创建完成后,Kubelet会启动容器。如果Pod中有多个容器,Kubelet会根据Pod的定义顺序依次启动它们。
  10. 健康检查:容器启动后,Kubelet会执行定义在Pod中的健康检查(如果有的话)。只有当所有的检查都通过时,Pod才被视为健康的。
  11. 注册服务和负载均衡:如果Pod是某个服务的一部分,K8s会更新内部的服务和负载均衡器,将新的Pod纳入到服务中。
  12. 监控和日志:一旦Pod运行起来,Kubelet会持续监控其状态,并将日志和指标上报给集群的监控系统。

阅读的源码为K8s-release-1.29分支Kubelet代码十分庞大并且精细,这里只对创建容器的相关流程进行简单记录(忽略了很多细节,只记录函数调用流程,具体细节需参考源码),太复杂了所以记录的比较乱。

参考的一些博客:

  1. 源码解析:K8s 创建 pod 时,背后发生了什么(版本比较老了)
  2. kubernetes源码-kubelet 原理和源码分析(版本相对较新)

关于kubelet

kubelet工具中的描述:

1
2
3
4
5
6
7
8
9
The kubelet is the primary "node agent" that runs on each node. It can register the node with the apiserver using one of: the hostname; a flag to override the hostname; or specific logic for a cloud provider.

The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object that describes a pod. The kubelet takes a set of PodSpecs that are provided through various mechanisms (primarily through the apiserver) and ensures that the containers described in those PodSpecs are running and healthy. The kubelet doesn't manage containers which were not created by Kubernetes.

Other than from an PodSpec from the apiserver, there are two ways that a container manifest can be provided to the Kubelet.

File: Path passed as a flag on the command line. Files under this path will be monitored periodically for updates. The monitoring period is 20s by default and is configurable via a flag.

HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint is checked every 20 seconds (also configurable with a flag).

kubelet是每个节点上的关键节点代理,负责注册节点并确保PodSpec中描述的容器运行且健康。它支持通过主机名、标志覆盖或云提供商逻辑进行节点注册。PodSpecsYAMLJSON)可以通过apiserver或两种其他方式提供:一种是监控特定路径下文件的变化,另一种是定期检查HTTP端点。默认的监控周期均为20秒,但可以通过命令行标志进行调整。

启动Kubelet

1
2
3
4
5
6
7
8
// cmd/kubelet/kubelet.go
func main() {
    // 构建一个cobra类型的kubelet命令
	command := app.NewKubeletCommand()
    // 启动kubelet
	code := cli.Run(command)
	os.Exit(code)
}

NewKubeletCommand函数比较长,主要涉及到kubelet命令行接口的创建和配置,通过cobra库来实现。它展示了如何定义命令、解析和验证命令行参数、加载和验证配置、以及最终执行命令的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// cmd/kubelet/app/server.go
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
// 省略
	cmd := &cobra.Command{
		Use: componentKubelet,
// 省略		
			// run the kubelet
			return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
		},
	}
// 省略
	return cmd
}

可以看到Run函数是Kubelet的入口函数,Run函数对run函数进行了一些封装,并记录一些信息以调试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// cmd/kubelet/app/server.go
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
	// To help debugging, immediately log version
	klog.InfoS("Kubelet version", "kubeletVersion", version.Get())

	klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

	if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
		return fmt.Errorf("failed OS init: %w", err)
	}
	if err := run(ctx, s, kubeDeps, featureGate); err != nil {
		return fmt.Errorf("failed to run Kubelet: %w", err)
	}
	return nil
}

run函数超级长(下面省略了细节),通过一系列的初始化步骤和配置来启动kubelet,包括特性门控(Feature Gates)的设置、配置验证、依赖项的准备、客户端的创建、认证配置、资源管理、事件记录器的设置等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// cmd/kubelet/app/server.go
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
	// Set global feature gates based on the value on the initial KubeletServer(特性门控,根据配置启用或禁用某些kubelet特性)
	// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates,验证配置)
	// Warn if MemoryQoS enabled with cgroups v1
	// Obtain Kubelet Lock File(锁文件处理)
    // Register current configuration with /configz endpoint
	// About to get clients and such, detect standaloneMode
	// if in standalone mode, indicate as much by setting all clients to nil
    // make a separate client for events
	// make a separate client for heartbeat with throttling disabled and a timeout attached
	// The timeout is the minimum of the lease duration and status update frequency
	// Get cgroup driver setting from CRI
	// Setup event recorder if required.
	// 初始化容器管理器
		kubeDeps.ContainerManager, err = cm.NewContainerManager(
			kubeDeps.Mounter,
			kubeDeps.CAdvisorInterface,
			cm.NodeConfig{
				RuntimeCgroupsName:    s.RuntimeCgroups,
				// 省略
			},
			s.FailSwapOn,
			kubeDeps.Recorder,
			kubeDeps.KubeClient,
		)
	// 启动kubelet	
        if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
			return err
		}
		if err != nil {
			return err
		}
	}
	return nil
}

RunKubelet函数是kubelet启动和运行的核心,它负责初始化kubelet的配置、依赖项和运行环境,并调用startedKubelet启动Kubelet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// cmd/kubelet/app/server.go
// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
//
//	1 Integration tests
//	2 Kubelet binary
//	3 Standalone 'kubernetes' binary
//
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
	// Setup event recorder if required.
		k, err := createAndInitKubelet(kubeServer,
		kubeDeps,
		hostname,
		hostnameOverridden,
		nodeName,
		nodeIPs)
	if err != nil {
		return fmt.Errorf("failed to create kubelet: %w", err)
	}
    // NewMainKubelet should have set up a pod source config if one didn't exist
	// when the builder was run. This is just a precaution.
	if kubeDeps.PodConfig == nil {
		return fmt.Errorf("failed to create kubelet, pod source config was nil")
	}
	podCfg := kubeDeps.PodConfig
	// process pods and exit.
	if runOnce {
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
			return fmt.Errorf("runonce failed: %w", err)
		}
		klog.InfoS("Started kubelet as runonce")
	} else {
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
		klog.InfoS("Started kubelet")
	}
	return nil
}

startKubelet用于启动kubeletkubelet apiserverpod资源管理器,都是通过goroutine异步执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// cmd/kubelet/app/server.go
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
	go k.Run(podCfg.Updates())

	// start the kubelet server
	if enableServer {
		go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
	}
	if kubeCfg.ReadOnlyPort > 0 {
		go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
	}
	go k.ListenAndServePodResources()
}

K8s中,管理Pod配置的核心组件是PodConfig结构体。这个结构体的主要职责是整合来自不同来源的Pod配置信息,形成一个统一的配置结构。这些配置信息来源可能包括API请求、配置文件、环境变量等。一旦有新的配置信息或现有配置信息发生变化,PodConfig就会负责将这些变化以增量的方式通知给所有注册的监听器,确保系统的每个部分都能及时准确地响应这些变化。

PodConfig的设计采用了观察者模式,其中PodConfig充当被观察者,而各个监听器则是观察者。当PodConfig中的数据发生变化时,它会通过updates通道向所有监听器发送更新通知。这种设计模式使得系统能够灵活地响应配置变化,同时保持了各组件之间的解耦,提高了系统的可维护性和扩展性。

PodConfig内部使用podStorage结构体来管理当前的Pod状态,并确保按顺序向updates通道传递更新通知。podStorage通过一系列的锁(如podLockupdateLock)来处理并发操作,确保Pod状态的一致性和更新通知的有序性。此外,podStorage还维护了一个sourcesSeen集合,用于记录已经发送过至少一次SET操作的配置源,这有助于PodConfig识别和管理来自不同源的配置信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// pkg/kubelet/config/config.go
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
	pods *podStorage
	mux  *config.Mux

	// the channel of denormalized changes passed to listeners
	updates chan kubetypes.PodUpdate

	// contains the list of all configured sources
	sourcesLock sync.Mutex
	sources     sets.String
}

// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order.  Note that this object is an in-memory source of
// "truth" and on creation contains zero entries.  Once all previously read sources are
// available, then this object should be considered authoritative.
// 通过一些锁来处理对pod的并发操作
type podStorage struct {
	podLock sync.RWMutex
	// map of source name to pod uid to pod reference
	pods map[string]map[types.UID]*v1.Pod
	mode PodConfigNotificationMode

	// ensures that updates are delivered in strict order
	// on the updates channel
	updateLock sync.Mutex
	updates    chan<- kubetypes.PodUpdate

	// contains the set of all sources that have sent at least one SET
	sourcesSeenLock sync.RWMutex
	sourcesSeen     sets.String

	// the EventRecorder to use
	recorder record.EventRecorder

	startupSLIObserver podStartupSLIObserver
}

// pkg/kubelet/types/pod_update.go
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
//
// Additionally, Pods should never be nil - it should always point to an empty slice. While
// functionally similar, this helps our unit tests properly check that the correct PodUpdates
// are generated.
type PodUpdate struct {
	Pods   []*v1.Pod
	Op     PodOperation
	Source string
}

// PodOperation defines what changes will be made on a pod configuration.
type PodOperation int

// These constants identify the PodOperations that can be made on a pod configuration.
const (
	// SET is the current pod configuration.
	SET PodOperation = iota
	// ADD signifies pods that are new to this source.
	ADD
	// DELETE signifies pods that are gracefully deleted from this source.
	DELETE
	// REMOVE signifies pods that have been removed from this source.
	REMOVE
	// UPDATE signifies pods have been updated in this source.
	UPDATE
	// RECONCILE signifies pods that have unexpected status in this source,
	// kubelet should reconcile status with this source.
	RECONCILE
)

// k8s.io/api/core/v1/types.go
// Pod is a collection of containers that can run on a host. This resource is created
// by clients and scheduled onto hosts.
type Pod struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
    // Spec和Status包含了Pod的元数据
	// Specification of the desired behavior of the pod.
	Spec PodSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
	// Most recently observed status of the pod.
	// This data may not be up to date.
	// Populated by the system.
	// Read-only.
	Status PodStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

回到kubelet的启动过程中,查看kubelet最终的启动接口k.Run(podCfg.Updates()),这个函数签名可以看出通过启动Kubelet实例,让它开始监听来自updates通道的信息,使得Kubelet能够异步接收来自其他组件(如k8s-apiserver)的Pod更新通知。并根据这些信息管理节点上的Pod生命周期(如启动、停止、更新容器等)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	ctx := context.Background()
	if kl.logServer == nil {
		file := http.FileServer(http.Dir(nodeLogDir))
		if utilfeature.DefaultFeatureGate.Enabled(features.NodeLogQuery) && kl.kubeletConfiguration.EnableSystemLogQuery {
			kl.logServer = http.StripPrefix("/logs/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
				if nlq, errs := newNodeLogQuery(req.URL.Query()); len(errs) > 0 {
					http.Error(w, errs.ToAggregate().Error(), http.StatusBadRequest)
					return
				} else if nlq != nil {
					if req.URL.Path != "/" && req.URL.Path != "" {
						http.Error(w, "path not allowed in query mode", http.StatusNotAcceptable)
						return
					}
					if errs := nlq.validate(); len(errs) > 0 {
						http.Error(w, errs.ToAggregate().Error(), http.StatusNotAcceptable)
						return
					}
					// Validation ensures that the request does not query services and files at the same time
					if len(nlq.Services) > 0 {
						journal.ServeHTTP(w, req)
						return
					}
					// Validation ensures that the request does not explicitly query multiple files at the same time
					if len(nlq.Files) == 1 {
						// Account for the \ being used on Windows clients
						req.URL.Path = filepath.ToSlash(nlq.Files[0])
					}
				}
				// Fall back in case the caller is directly trying to query a file
				// Example: kubectl get --raw /api/v1/nodes/$name/proxy/logs/foo.log
				file.ServeHTTP(w, req)
			}))
		} else {
			kl.logServer = http.StripPrefix("/logs/", file)
		}
	}
	if kl.kubeClient == nil {
		klog.InfoS("No API server defined - no node status update will be sent")
	}

	// Start the cloud provider sync manager
	if kl.cloudResourceSyncManager != nil {
		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
	}

	if err := kl.initializeModules(); err != nil {
		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
		klog.ErrorS(err, "Failed to initialize internal modules")
		os.Exit(1)
	}

	// Start volume manager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

	if kl.kubeClient != nil {
		// Start two go-routines to update the status.
		//
		// The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
		// while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
		// once the node becomes ready, then exits afterwards.
		//
		// Introduce some small jittering to ensure that over time the requests won't start
		// accumulating at approximately the same time from the set of nodes due to priority and
		// fairness effect.
		go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
		go kl.fastStatusUpdateOnce()

		// start syncing lease
		go kl.nodeLeaseController.Run(context.Background())
	}
	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

	// Set up iptables util rules
	if kl.makeIPTablesUtilChains {
		kl.initNetworkUtil()
	}

	// Start component sync loops.同步pod状态,更新pod状态到缓存
	kl.statusManager.Start()

	// Start syncing RuntimeClasses if enabled.为pod选择合适容器运行时
	if kl.runtimeClassManager != nil {
		kl.runtimeClassManager.Start(wait.NeverStop)
	}

	// Start the pod lifecycle event generator.管理和上报pod生命周期
	kl.pleg.Start()

	// Start eventedPLEG only if EventedPLEG feature gate is enabled.
	if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
		kl.eventedPleg.Start()
	}
	// 听来自file/apiserver/http 事件源发送过来的事件(PodUpdate),并对事件做出对应的同步处理。
	kl.syncLoop(ctx, updates, kl)
}

上述代码中的 kl.initializeModules()初始化了一些模块:

  • metrics.Register ,注册Prometheus监控指标。

  • setupDataDirs ,设置文件存储目录(/var/lib/kubelet的相关目录)。

    • the root directory
    • the pods directory
    • the plugins directory
    • the pod-resources directory
  • ContainerLogsDir ,创建容器日志目录(/var/log/containers)。
  • imageManager.Start() ,启动镜像管理器。
  • serverCertificateManager.Start() ,启动证书管理器。
  • oomWatcher.Start() ,启动oom监听器。
  • resourceAnalyzer.Start() ,启动资源分析管理器。