1. 环境说明
Kubernetes
源码版本:remotes/origin/release-1.25
Kubernetes
编译出来的Kubelet
版本:Kubernetes v1.24.0-beta.0.2463+ee7799bab469d7
Kubernetes
集群实验环境:使用Kubernetes v1.25.4
二进制的方式搭建了一个单节点集群
Golang
版本:go1.19.3 linux/amd64
IDEA
版本:2022.2.3
Delve
版本:1.9.1
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# dlv version
Delve Debugger
Version: 1.9.1
Build: $Id: d81b9fd12bfa603f3cf7a4bc842398bd61c42940 $
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# go version
go version go1.19.3 linux/amd64
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl version
WARNING: This version information is deprecated and will be replaced with the output from kubectl version --short. Use --output=yaml|json to get the full version.
Client Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:36:36Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
Kustomize Version: v4.5.7
Server Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:29:58Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get nodes -owide
NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
k8s-master1 Ready <none> 31h v1.25.4 192.168.11.71 <none> CentOS Linux 7 (Core) 3.10.0-1160.80.1.el7.x86_64 containerd://1.6.10
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get componentstatus
Warning: v1 ComponentStatus is deprecated in v1.19+
NAME STATUS MESSAGE ERROR
etcd-0 Healthy {"health":"true","reason":""}
controller-manager Healthy ok
scheduler Healthy ok
[root@k8s-master1 kubernetes]#
Kubelet
启动参数配置如下:
[root@k8s-master1 kubernetes]# ps -ef|grep "/usr/local/bin/kubelet"
root 7972 1 6 07:06 ? 00:00:06 /usr/local/bin/kubelet --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.kubeconfig --kubeconfig=/etc/kubernetes/kubelet.kubeconfig --config=/etc/kubernetes/kubelet-conf.yml --container-runtime-endpoint=unix:///run/containerd/containerd.sock --node-labels=node.kubernetes.io/node= --v=8
root 9549 6424 0 07:07 pts/0 00:00:00 grep --color=auto /usr/local/bin/kubelet
[root@k8s-master1 kubernetes]#
Kubelet
参数配置如下:
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
address: 0.0.0.0
port: 10250
readOnlyPort: 10255
authentication:
anonymous:
enabled: false
webhook:
cacheTTL: 2m0s
enabled: true
x509:
clientCAFile: /etc/kubernetes/pki/ca.pem
authorization:
mode: Webhook
webhook:
cacheAuthorizedTTL: 5m0s
cacheUnauthorizedTTL: 30s
cgroupDriver: systemd
cgroupsPerQOS: true
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf
cpuCFSQuota: true
cpuManagerPolicy: none
cpuManagerReconcilePeriod: 10s
enableControllerAttachDetach: true
enableDebuggingHandlers: true
enforceNodeAllocatable:
- pods
eventBurst: 10
eventRecordQPS: 5
evictionHard:
imagefs.available: 15%
memory.available: 100Mi
nodefs.available: 10%
nodefs.inodesFree: 5%
evictionPressureTransitionPeriod: 5m0s
failSwapOn: true
fileCheckFrequency: 20s
hairpinMode: promiscuous-bridge
healthzBindAddress: 127.0.0.1
healthzPort: 10248
httpCheckFrequency: 20s
imageGCHighThresholdPercent: 85
imageGCLowThresholdPercent: 80
imageMinimumGCAge: 2m0s
iptablesDropBit: 15
iptablesMasqueradeBit: 14
kubeAPIBurst: 10
kubeAPIQPS: 5
makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1
registryBurst: 10
registryPullQPS: 5
resolvConf: /etc/resolv.conf
rotateCertificates: true
runtimeRequestTimeout: 2m0s
serializeImagePulls: true
staticPodPath: /etc/kubernetes/manifests
streamingConnectionIdleTimeout: 4h0m0s
syncFrequency: 1m0s
volumeStatsAggPeriod: 1m0s
2. 源码分析
syncLoop
函数并不长,实际上也没有干啥事,重点逻辑都放在了syncLoopIteration
。
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
// 这里的housekeeping英文翻译过来是家务管理的意思
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
// 看到这里是不是顿然开悟,PLEG生产的PodLifecycleEvent最终落到了syncLoop这里
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
// 下面就是一个死循环了,对得起syncLoop中的Loop
for {
// 如果运行时报错,就跳出Pod同步
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
2.1. syncLoopIteration
syncLoopIteration
函数具体逻辑如下,乍一眼看过去,差点被唬住了,这里面居然一次性监听了7
个Channel
,我们来一个一个的说。
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if e.Type == pleg.ContainerStarted {
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
}
klog.V(4).InfoS("SyncLoop (housekeeping) end")
}
}
return true
}
2.1.1. configCh
首先是configCh
,实际上这玩意就是PodConfig.updates
,最终会被PodStorage.Merge()
以及PodStorage.Sync()
这两兄弟改写。
PodStorage
中存储的Pod
有三种来源,分别是Http, File, Api
,顾名思义,Http
肯定来资源网络,在kubelet
的配置文件中,可以通过StaticPodURLHeader
来配置。我们并没有配置这一项。File
则来文件,也就是StaticPod
的来源,在kubelet
的配置文件中,通过StaticPodPath
来配置,我们配置为/etc/kubernetes/manifests
。虽然我们配置了这个目录,但是这个目录是空的。而Api
则来自kube-apiserver
,我们通过kubectl
或者通过client-go
对于所有Pod
的更改都会在这里体现。
因此,一旦我们使用Kubectl
创建、修改或者删除Pod
,configCh
都会收到。
根据PodUpdate
的操作类型不同,通过SyncHandler
来处理,通过分析SyncHnadler
的源码,实际上SyncHandler
是通过PodWorker
来处理的
// pkg/kubelet/kubelet.go
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
2.1.2. plegCh
通过Kubelet 1.25.x源码——PLEG这篇博客,我们知道,PLEG
实际上就是不停的对比前后两个事件点的Pod
得出的状态,只要发现前后两个时间点,同一个Pod
的两个状态不同,就会向eventChannel
发送PodLifecycleEvent
。后这里的plegCh
就是eventChannel
,实际上就是新瓶装旧酒。
// pkg/kubelet/kubelet.go
if e.Type == pleg.ContainerStarted {
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
2.1.3. syncCh
2.1.4. livenessManager.Updates()
2.1.5. readinessManager.Updates()
2.1.6. startupManager.Updates()
2.1.7. housekeepingCh
2.2. PodUpdate
这里我们来看看PodUpate
,不知道大家有没有注意到,进入到syncLoop
函数的时候,首先引入眼帘的就是它的参数,直觉告诉我,这两个参数还是比较重要的。
向上追溯可以发现,PodUpate
是通过PodCfg.Updates()
传进来的
// cmd/kubelet/app/server.go
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())
// 此处省略其它逻辑
}
点进去,我们发现PodCfg.Updates()
方法就是把自己内部的updates
属性传了出来
// pkg/kubelet/config/config.go
func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
return c.updates
}
那么,这个属性是在什么地方、什么时候、被谁写入的呢?,我们带着这个问题继续往下追,按住IDEA
的ctrl + G
,找到所有写入updates
这个属性的地方。
搜索结果让人很开心,就一个地方写入,也就是创建PodConfig
的时候写入的,但是创建PodConfig
也是通过updates := make(chan kubetypes.PodUpdate, 50)
创建了一个空的channel
啊。
用脚趾头想,这个channel
一定在某个地方被使用,我们仔细看一下,storage := newPodStorage(updates, mode, recorder)
,其实在创建PodStorage
的时候也传入了updates
,也就是说PodConfig.updates
属性一定是通过PodStrorage
来改变的,我们继续通过IDEA
搜索来看看PodStorage
是如何使用这个PodConfig.updates
的。
居然有如此之多的地方在修改PodConfig.updates
属性,PodStorage
主要有两块在修改PodConfig.updates
,一块是在Merge
,一块是在Sync
的时候。看来分析PodStorage
是势在必行了,我们一起来看看PodStorage
干了啥
2.2.1. PodStorage
顾名思义,PodStorage
势必是Pod
的缓存,在没有看到接口定义之前,我猜测内部应该是使用了一个Map
,其中key
为Pod ID
,Value
为Pod
。然而当我们真正看到PodStorage
的时候,还是傻了眼,因为我猜测的这个Map
居然只是一个Value
,真正的Key
看注释是Source
。看到这里,似乎可以猜测,因该是做了Pod
的分类,每个Pod
可能的来源不同。
那么,究竟有哪些Source
呢?这些Source
下的Pod
又有何不同?,我们继续借助IDEA
向上搜索。(突然不由得感慨一下,选择一个趁手的工具真的很重要,IDEA
阅读代码真的很顺畅,VsCode
还是差了些,尤其是向上搜索的能力不够,当然,也可能是我用的不熟练)
// pkg/kubelet/config/config.go
type podStorage struct {
podLock sync.RWMutex
// 果然没有让我们失望,对得起Storage, 这里用了一个Map存放Pod, 按照我的猜测本来应该就是一个简单的Map,key为PodId,值为Pod,但是这里多嵌套了一层
pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode
updateLock sync.Mutex
// 这个udpates属性,实际上就是PodConfig.updates,他们两个持有相同的引用
updates chan<- kubetypes.PodUpdate
sourcesSeenLock sync.RWMutex
sourcesSeen sets.String
recorder record.EventRecorder
}
终于,经过几次的搜索,还是让我找到了不同Source
的定义,如下所示。Pod
的来源一共有三种,分别是File, Http, Api
// pkg/kubelet/types/pod_update.go
const (
// Filesource idenitified updates from a file.
FileSource = "file"
// HTTPSource identifies updates from querying a web page.
HTTPSource = "http"
// ApiserverSource identifies updates from Kubernetes API Server.
ApiserverSource = "api"
// AllSource identifies updates from all sources.
AllSource = "*"
)
在Kubelet v1.25.x源码——总体概览这篇博客当中,我们曾经看到过创建PodSourceConfig
的影子,只不过当时并不知道这是干嘛的,也没有过多的注意。老铁们如果忘记了,可以再过一遍Kubelet
整个启动流程。我们继续看看PodSourceConfig
的具体逻辑
这里的逻辑并不复杂,就和PodSourceConfig
命名一样,仅仅是配置Pod
不同的来源,我们可以配置的来源有两个选项,分别是StaticPodURLHeader, StaticPodPath
,不知道老铁们有没有看我我交代的环境说明那一节,里面贴出了Kubelet
的配置,其中就是有StaticPodPath
,并且我们配置的值为/etc/kubernetes/manifests
。熟悉Kubernetes
的小伙伴肯定很清楚,这其实就是所谓的StaticPod
,kubelet
启动完成之后,会把StaticPodPath
所指定路径下的所有资源清单全部Apply
一遍。而对于StaticPodURLHeader
,这个属于HTTP SOURCE
,暂时还不知道这个东西的使用场景。
至于Api Source
,看了代码显然不难理解,其实就是来自于kube-apiserver
的Pod,这种来源的Pod
才是我们重点关注的对象。
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// TODO: it needs to be replaced by a proper context in the future
ctx := context.TODO()
// define file config source
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
}
// define url config source
if kubeCfg.StaticPodURL != "" {
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
}
return cfg, nil
}
2.2.1.1. Merge
// pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
case PodConfigNotificationSnapshotAndUpdates:
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
case PodConfigNotificationUnknown:
fallthrough
default:
panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
}
return nil
}
// pkg/kubelet/config/config.go
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
addPods := []*v1.Pod{}
updatePods := []*v1.Pod{}
deletePods := []*v1.Pod{}
removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{}
pods := s.pods[source]
if pods == nil {
pods = make(map[types.UID]*v1.Pod)
}
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}
update := change.(kubetypes.PodUpdate)
switch update.Op {
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
if update.Op == kubetypes.ADD {
klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjs(update.Pods))
} else if update.Op == kubetypes.DELETE {
klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjs(update.Pods))
} else {
klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjs(update.Pods))
}
updatePodsFunc(update.Pods, pods, pods)
case kubetypes.REMOVE:
klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjs(update.Pods))
for _, value := range update.Pods {
if existing, found := pods[value.UID]; found {
// this is a delete
delete(pods, value.UID)
removePods = append(removePods, existing)
continue
}
// this is a no-op
}
case kubetypes.SET:
klog.V(4).InfoS("Setting pods for source", "source", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
}
default:
klog.InfoS("Received invalid update type", "type", update)
}
s.pods[source] = pods
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
return adds, updates, deletes, removes, reconciles
}
2.2.1.2. Sync
// pkg/kubelet/config/config.go
// Sync sends a copy of the current state through the update channel.
func (s *podStorage) Sync() {
s.updateLock.Lock()
defer s.updateLock.Unlock()
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
}
2.3. SyncHandler
这里,我们一起看一下SyncHandler
,这个接口也是即为重要,syncLoop
中监听的7
个channel
对于Pod
的修改都是通过SyncHandler
来实现的。
SyncHandler
一共定义了六个接口,前三个接口不难理解,分别是对于Pod
的增加、修改、删除,后面三个看名字暂时还不清楚干了啥。
// pkg/kubelet/kubelet.go
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups() error
}
2.3.1. HandlePodAdditions
具体逻辑如下:
- 1、对于新添加的
Pod
按照创建事件先后排序,显然,先创建的先被处理,很合情合理 - 2、通过
PodManager
获取所有的Pod
- 3、把当前需要创建的
Pod
添加到PodManager
中 - 4、判断当前需要创建的
Pod
是否是MirrorPod
,如果是MirrorPod
,按照MirrorPod
流程处理- 那么,什么是
MirrorPod
, 它有什么用,和普通的Pod
有啥不同
- 那么,什么是
- 5、如果不是
MirrorPod
,通过PodWorker
判断当前需要创建的Pod
是否正在请求结束 - 6、把当前要创建的
Pod
交给PodWorker
来处理,至于后续PodWorker
是怎么处理这个Pod
的,我后面再写文章来介绍
// pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
kl.podManager.AddPod(pod)~~
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// we simply avoid doing any work.
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
activePods := kl.filterOutInactivePods(existingPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
}
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
2.3.2. HandlePodUpdates
和HandlerPodAdditions
类似,首先更新了PodManager
,然后判断当前Pod
是否是MirrorPod
,如果是MirrorPod
,那么就按照MirrorPod
的处理流程处理。如果不是同样交给PodWorker
来处理
// pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
}
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
2.3.3. HandlePodRemoves
还是一样的配方,一样的味道。先在PodManager
中删除此Pod
,然后判断是否是MirrorPod
,最后交给PodWorker
来处理
// pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.DeletePod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
}
}
}
func (kl *Kubelet) deletePod(pod *v1.Pod) error {
if pod == nil {
return fmt.Errorf("deletePod does not allow nil pod")
}
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
return fmt.Errorf("skipping delete because sources aren't ready yet")
}
klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
})
// We leave the volume/directory cleanup to the periodic cleanup routine.
return nil
}
2.3.4. HandlePodReconcile
先更新PodManager
,然后判断当前Pod
是否需要调谐,交给PodWorker
来处理
// pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
// Update the pod in pod manager, status manager will do periodically reconcile according
// to the pod manager.
kl.podManager.UpdatePod(pod)
// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
if status.NeedToReconcilePodReadiness(pod) {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}
// After an evicted pod is synced, all dead containers in the pod can be removed.
if eviction.PodIsEvicted(pod.Status) {
if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
kl.containerDeletor.deleteContainersInPod("", podStatus, true)
}
}
}
}
func NeedToReconcilePodReadiness(pod *v1.Pod) bool {
if len(pod.Spec.ReadinessGates) == 0 {
return false
}
podReadyCondition := GeneratePodReadyCondition(&pod.Spec, pod.Status.Conditions, pod.Status.ContainerStatuses, pod.Status.Phase)
i, curCondition := podutil.GetPodConditionFromList(pod.Status.Conditions, v1.PodReady)
// Only reconcile if "Ready" condition is present and Status or Message is not expected
if i >= 0 && (curCondition.Status != podReadyCondition.Status || curCondition.Message != podReadyCondition.Message) {
return true
}
return false
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
2.3.5. HandlePodSyncs
// pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
2.3.6. HandlePodCleanups
逻辑还是不少,看累了,后面再分析吧
// pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodCleanups() error {
var (
cgroupPods map[types.UID]cm.CgroupName
err error
)
if kl.cgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
cgroupPods, err = pcm.GetAllPodsFromCgroups()
if err != nil {
return fmt.Errorf("failed to get list of pods that still exist on cgroup mounts: %v", err)
}
}
allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
klog.V(3).InfoS("Clean up pod workers for terminated pods")
workingPods := kl.podWorkers.SyncKnownPods(allPods)
allPodsByUID := make(map[types.UID]*v1.Pod)
for _, pod := range allPods {
allPodsByUID[pod.UID] = pod
}
runningPods := make(map[types.UID]sets.Empty)
possiblyRunningPods := make(map[types.UID]sets.Empty)
restartablePods := make(map[types.UID]sets.Empty)
for uid, sync := range workingPods {
switch sync {
case SyncPod:
runningPods[uid] = struct{}{}
possiblyRunningPods[uid] = struct{}{}
case TerminatingPod:
possiblyRunningPods[uid] = struct{}{}
case TerminatedAndRecreatedPod:
restartablePods[uid] = struct{}{}
}
}
// Stop probing pods that are not running
klog.V(3).InfoS("Clean up probes for terminated pods")
kl.probeManager.CleanupPods(possiblyRunningPods)
// Terminate any pods that are observed in the runtime but not
// present in the list of known running pods from config.
runningRuntimePods, err := kl.runtimeCache.GetPods()
if err != nil {
klog.ErrorS(err, "Error listing containers")
return err
}
for _, runningPod := range runningRuntimePods {
switch workerState, ok := workingPods[runningPod.ID]; {
case ok && workerState == SyncPod, ok && workerState == TerminatingPod:
// if the pod worker is already in charge of this pod, we don't need to do anything
continue
default:
if _, ok := allPodsByUID[runningPod.ID]; !ok {
klog.V(3).InfoS("Clean up orphaned pod containers", "podUID", runningPod.ID)
one := int64(1)
kl.podWorkers.UpdatePod(UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
RunningPod: runningPod,
KillPodOptions: &KillPodOptions{
PodTerminationGracePeriodSecondsOverride: &one,
},
})
}
}
}
// Remove orphaned pod statuses not in the total list of known config pods
klog.V(3).InfoS("Clean up orphaned pod statuses")
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
runningRuntimePods, err = kl.containerRuntime.GetPods(false)
if err != nil {
klog.ErrorS(err, "Error listing containers")
return err
}
// Remove orphaned pod user namespace allocations (if any).
klog.V(3).InfoS("Clean up orphaned pod user namespace allocations")
if err = kl.usernsManager.CleanupOrphanedPodUsernsAllocations(allPods, runningRuntimePods); err != nil {
klog.ErrorS(err, "Failed cleaning up orphaned pod user namespaces allocations")
}
klog.V(3).InfoS("Clean up orphaned pod directories")
err = kl.cleanupOrphanedPodDirs(allPods, runningRuntimePods)
if err != nil {
klog.ErrorS(err, "Failed cleaning up orphaned pod directories")
}
// Remove any orphaned mirror pods (mirror pods are tracked by name via the
// pod worker)
klog.V(3).InfoS("Clean up orphaned mirror pods")
kl.deleteOrphanedMirrorPods()
if kl.cgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
klog.V(3).InfoS("Clean up orphaned pod cgroups")
kl.cleanupOrphanedPodCgroups(pcm, cgroupPods, possiblyRunningPods)
}
kl.backOff.GC()
for uid := range restartablePods {
pod, ok := allPodsByUID[uid]
if !ok {
continue
}
if kl.isAdmittedPodTerminal(pod) {
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse, but pod phase is terminal", "pod", klog.KObj(pod), "podUID", pod.UID)
continue
}
start := kl.clock.Now()
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
return nil
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}