【技术】从POD创建看Kubernetes源码实现 (五)- kubelet

✍️作者:茶水间Tech

🏷️标签:#云计算#云原生#kubernetes#容器

📖 前言

​ kubernetes的模块比较多,架构复杂,代码量更是庞大,看代码比较麻烦,我们从现实场景出发,从创建POD分析在Kubernetes内部的代码流程,本系列文章从POD创建,整体梳理Kubernetes源码实现,其中本节主要分析kubelet侧的流程实现。

​ 本文基于Client Version: v1.34.3 , Server Version: v1.34.2

​ 📌POD创建的整体架构图

💻 正文

📑 一、关于kubelet

​ 在kubernetes集群中,每个Node节点都会启动kubelet进程,用来处理Master节点下发到本节点的任务,管理Pod和其中的容器。

​ kubelet 是基于 PodSpec 来工作的。每个 PodSpec 是一个描述 Pod 的 YAML 或 JSON 对象。 kubelet 接受通过各种机制(主要是通过 apiserver)提供的一组 PodSpec,并确保这些 PodSpec 中描述的容器处于运行状态且运行状况良好。

在每个 Node 上运行的 Kubelet,都会维持一个与 API Server 的长连接。

  1. 带过滤条件的 Watch:
    Kubelet 只关心属于它自己的 Pod。它会向 API Server 发起一个类似这样的请求:
    GET /v1/pods?watch=true&fieldSelector=spec.nodeName={My_Node_Name}
    • 这里的fieldSelector极其重要,它确保了 Node-A 不会收到发给 Node-B 的 Pod 信息。
  2. 事件触达:
    当 Scheduler 完成 Bind 动作,API Server 中的 Pod 对象更新了nodeName字段。API Server 会立刻通过这个长连接,向该节点所在的 Kubelet 推送一个“MODIFIED”事件。
📑 二、代码分析
程序入口:Run (scheduler.go)

Run中启动了syncLoop循环同步

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { //...(略) kl.syncLoop(ctx, updates, kl) }
详细流程如下:

kubelet

2.1 主循环:syncLoop(kubelet.go)

这个 syncLoop 是 Kubelet 的心脏,确保节点上的 Pod 状态与期望状态保持一致。

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) { klog.InfoS("Starting kubelet main sync loop") // The syncTicker wakes up kubelet to checks if there are any pod workers // that need to be sync'd. A one-second period is sufficient because the // sync interval is defaulted to 10s. // 同步定时器:每秒触发一次,检查需要同步的 Pod syncTicker := time.NewTicker(time.Second) defer syncTicker.Stop() // 清理定时器:执行周期性清理任务(默认2分钟) housekeepingTicker := time.NewTicker(housekeepingPeriod) defer housekeepingTicker.Stop() plegCh := kl.pleg.Watch() const ( base = 100 * time.Millisecond // 初始延迟:100ms max = 5 * time.Second // 最大延迟:5秒 factor = 2 // 指数因子:2倍增长 ) duration := base // Responsible for checking limits in resolv.conf // The limits do not have anything to do with individual pods // Since this is called in syncLoop, we don't need to call it anywhere else if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" { kl.dnsConfigurer.CheckLimitsForResolvConf(klog.FromContext(ctx)) } for { if err := kl.runtimeState.runtimeErrors(); err != nil { klog.ErrorS(err, "Skipping pod synchronization") // exponential backoff time.Sleep(duration) // 计算下次退避时间(最大5秒) duration = time.Duration(math.Min(float64(max), factor*float64(duration))) continue } // reset backoff if we have a success duration = base kl.syncLoopMonitor.Store(kl.clock.Now()) // 执行一次同步迭代 if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } kl.syncLoopMonitor.Store(kl.clock.Now()) } }
2.2 同步迭代:syncLoopIteration(kubelet.go)

syncLoopIteration 对POD的不同操作做对应的处理,这里发现是创建POD,则会调用HandlePodAdditions 进行创建POD

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { logger := klog.FromContext(ctx) select { case u, open := <-configCh: // Update from a config source; dispatch it to the right handler // callback. if !open { klog.ErrorS(nil, "Update channel is closed, exiting the sync loop") return false } switch u.Op { case kubetypes.ADD: //接收到POD创建事件 klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods)) // After restarting, kubelet will get all existing pods through // ADD as if they are new pods. These pods will then go through the // admission process and *may* be rejected. This can be resolved // once we have checkpointing. //主POD处理函数 handler.HandlePodAdditions(u.Pods) case kubetypes.UPDATE: klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods)) handler.HandlePodUpdates(u.Pods) case kubetypes.REMOVE: klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods)) handler.HandlePodRemoves(u.Pods) // ...(略) } return true }
2.3 主处理函数:HandlePodAdditions(kubelet.go)

HandlePodAdditions是 kubelet 处理新 Pod 添加的核心函数。当 kubelet 从 API server 接收到新 Pod 时,此函数负责

  • Pod 注册:将新 Pod 添加到 pod manager 中作为期望状态的单一事实来源
  • 准入控制:检查节点资源是否足够接纳新 Pod
  • 证书管理:跟踪 Pod 的证书信息
  • Mirror Pod 处理:处理静态 Pod 的 mirror pod
  • 垂直扩缩容:支持 Pod 的原地垂直扩缩容(InPlacePodVerticalScaling)
  • 工作调度:将 Pod 传递给 pod workers 进行实际创建和同步

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { start := kl.clock.Now() // 对 Pod 进行排序(静态 Pod 优先,然后按创建时间) sort.Sort(sliceutils.PodsByCreationTime(pods)) var pendingResizes []types.UID for _, pod := range pods { // Always add the pod to the pod manager. Kubelet relies on the pod // manager as the source of truth for the desired state. If a pod does // not exist in the pod manager, it means that it has been deleted in // the apiserver and no action (other than cleanup) is required. // 将 Pod 添加到 Pod Manager // Pod Manager 是 Kubelet 的真实状态源 kl.podManager.AddPod(pod) kl.podCertificateManager.TrackPod(context.TODO(), pod) // 获取 Pod 和它的 Mirror Pod pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { if pod == nil { klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) continue } kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodUpdate, StartTime: start, }) continue } // Only go through the admission process if the pod is not requested // for termination by another part of the kubelet. If the pod is already // using resources (previously admitted), the pod worker is going to be // shutting it down. If the pod hasn't started yet, we know that when // the pod worker is invoked it will also avoid setting up the pod, so // we simply avoid doing any work. // We also do not try to admit the pod that is already in terminated state. // 检查 Pod 是否正在终止或已终止 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) && !podutil.IsPodPhaseTerminal(pod.Status.Phase) { // Check if we can admit the pod; if not, reject it. // We failed pods that we rejected, so activePods include all admitted // pods that are alive. // 检查资源是否足够 // 返回:是否接受、拒绝原因、详细消息 if ok, reason, message := kl.allocationManager.AddPod(kl.GetActivePods(), pod); !ok { kl.rejectPod(pod, reason, message) // We avoid recording the metric in canAdmitPod because it's called // repeatedly during a resize, which would inflate the metric. // Instead, we record the metric here in HandlePodAdditions for new pods // and capture resize events separately. recordAdmissionRejection(reason) continue } if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { // Backfill the queue of pending resizes, but only after all the pods have // been added. This ensures that no resizes get resolved until all the // existing pods are added. _, updatedFromAllocation := kl.allocationManager.UpdatePodFromAllocation(pod) if updatedFromAllocation { pendingResizes = append(pendingResizes, pod.UID) } } } // 通过 podWorkers.UpdatePod 异步创建 Pod kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodCreate, // 创建类型 StartTime: start, }) } if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { // 回填 Pod 调整大小的条件 kl.statusManager.BackfillPodResizeConditions(pods) // 推送待处理的调整大小请求 for _, uid := range pendingResizes { kl.allocationManager.PushPendingResize(uid) } // 重试待处理的调整 if len(pendingResizes) > 0 { kl.allocationManager.RetryPendingResizes(allocation.TriggerReasonPodsAdded) } } }
a. Pod Manager
type podManager struct { // 存储所有 Pod 的期望状态 pods map[types.UID]*v1.Pod // Mirror Pod 映射 mirrorPods map[types.UID]*v1.Pod // 静态 Pod 映射 staticPods map[types.UID]*v1.Pod }
b. Allocation Manager
type allocationManager struct { // 管理节点资源分配 // 包括 CPU、内存、设备等 } func (m *allocationManager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, string) { // 检查资源是否足够 // 返回:是否接受、拒绝原因、详细消息 }
c. InPlace Pod 垂直扩缩容

这是一个新特性,允许在不重启 Pod 的情况下调整资源:

if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { kl.statusManager.BackfillPodResizeConditions(pods) for _, uid := range pendingResizes { kl.allocationManager.PushPendingResize(uid) } if len(pendingResizes) > 0 { kl.allocationManager.RetryPendingResizes(allocation.TriggerReasonPodsAdded) } }
2.4 更新创建POD:UpdatePod(pod_workers.go)

UpdatePod是 kubelet 中处理 Pod 更新的核心入口函数,负责:

  • Pod 状态管理:维护 Pod 的同步状态,跟踪 Pod 的生命周期(首次同步、终止中、已终止)
  • 更新调度:接收 Pod 更新请求并调度到相应的 worker goroutine
  • 生命周期转换:处理 Pod 从创建到终止的状态转换
  • 资源管理:集成资源分配管理器,支持原地垂直扩缩容
  • 并发控制:协调多个 Pod 更新请求,确保状态一致性

代码路径:kubernetes/pkg/kubelet/pod_workers.go

func (p *podWorkers) UpdatePod(options UpdatePodOptions) { // ...(略) // start the pod worker goroutine if it doesn't exist podUpdates, exists := p.podUpdates[uid] if !exists { // buffer the channel to avoid blocking this method podUpdates = make(chan struct{}, 1) p.podUpdates[uid] = podUpdates // ensure that static pods start in the order they are received by UpdatePod if kubetypes.IsStaticPod(pod) { p.waitingToStartStaticPodsByFullname[status.fullname] = append(p.waitingToStartStaticPodsByFullname[status.fullname], uid) } // allow testing of delays in the pod update channel var outCh <-chan struct{} if p.workerChannelFn != nil { outCh = p.workerChannelFn(uid, podUpdates) } else { outCh = podUpdates } // spawn a pod worker // 启动 worker goroutine go func() { // TODO: this should be a wait.Until with backoff to handle panics, and // accept a context for shutdown defer runtime.HandleCrash() defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid) p.podWorkerLoop(uid, outCh) }() } // ...(略) }
2.5 真实创建:podWorkerLoop(pod_workers.go)

podWorkerLoop是 kubelet 中每个 Pod 的独立工作循环,负责处理单个 Pod 的所有同步操作。每个 Pod 都有自己专属的 goroutine 运行此函数,主要功能包括:

  • 事件驱动处理:通过podUpdateschannel 接收并处理 Pod 的更新事件
  • 状态同步:根据 Pod 的当前工作类型执行相应的同步操作
  • 生命周期管理:处理 Pod 从创建、运行到终止的完整生命周期
  • 错误恢复:在操作失败时进行重试,确保 Pod 状态最终一致
  • 资源清理:在 Pod 终止时清理相关资源并关闭 worker

最终调用调用SyncPod执行同步。

代码路径:kubernetes/pkg/kubelet/pod_workerss.go

func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) { var lastSyncTime time.Time for range podUpdates { //启动同步操作 ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID) // If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate. //...(略) // Take the appropriate action (illegal phases are prevented by UpdatePod) switch { case update.WorkType == TerminatedPod: err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status) case update.WorkType == TerminatingPod: var gracePeriod *int64 if opt := update.Options.KillPodOptions; opt != nil { gracePeriod = opt.PodTerminationGracePeriodSecondsOverride } podStatusFn := p.acknowledgeTerminating(podUID) // if we only have a running pod, terminate it directly if update.Options.RunningPod != nil { err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod) } else { err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn) } default: //正常同步 Pod 状态 isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status) } lastSyncTime = p.clock.Now() return err }() //...(略) } }
2.6 kubelet创建:SyncPod(kubelet.go)

[SyncPod] 是 kubelet 中同步 Pod 状态的核心函数,负责将 Pod 的期望状态与实际状态对齐。主要功能包括:

  • Pod 状态同步:根据 Pod 的期望状态和当前运行时状态,执行必要的操作使两者一致
  • 生命周期管理:处理 Pod 从创建、运行到终止的完整生命周期
  • 资源管理:创建和管理 Pod 的 Cgroups,确保资源隔离和 QoS
  • 卷管理:等待并挂载 Pod 所需的存储卷
  • 网络配置:检查网络插件状态,处理主机网络 Pod
  • 可观测性:记录指标、日志和追踪信息,支持性能监控和问题诊断

最后调用kl.containerRuntime.SyncPod调用容器运行时同步回调

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { // ...(略) // Make data directories for the pod // 创建 Pod 目录 if err := kl.makePodDataDirs(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err) klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod)) return false, err } // Wait for volumes to attach/mount // 等待挂载卷 if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil { var volumeAttachLimitErr *volumemanager.VolumeAttachLimitExceededError if errors.As(err, &volumeAttachLimitErr) { kl.rejectPod(pod, volumemanager.VolumeAttachmentLimitExceededReason, volumeAttachLimitErr.Error()) recordAdmissionRejection(volumemanager.VolumeAttachmentLimitExceededReason) return true, nil } if !wait.Interrupted(err) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) } return false, err } // Fetch the pull secrets for the pod //为POD下载secret pullSecrets := kl.getPullSecretsForPod(pod) //...(略) //调用容器运行时创建和启动容器 result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.crashLoopBackOff) kl.reasonCache.Update(pod.UID, result) if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { for _, r := range result.SyncResults { if r.Action == kubecontainer.ResizePodInPlace && r.Error != nil { // If the condition already exists, the observedGeneration does not get updated. kl.statusManager.SetPodResizeInProgressCondition(pod.UID, v1.PodReasonError, r.Message, pod.Generation) } } } return false, result.Error() }

运行时

2.7 运行时:SyncPod(kuberuntime_manager.go)

SyncPod是 kubelet 容器运行时管理器的核心同步函数,负责将 Pod 的期望状态与容器运行时的实际状态对齐。主要功能包括:

  • Pod 同步:根据 Pod 规范和当前运行时状态,执行必要的操作使两者一致
  • Sandbox 管理:创建、更新或删除 Pod sandbox(容器运行时隔离环境)
  • 容器生命周期管理:启动、停止、重启容器(包括 init 容器、临时容器和主容器)
  • 资源管理:处理容器资源分配和原地垂直扩缩容
  • 网络配置:管理 Pod IP 地址和网络配置
  • 镜像管理:拉取容器镜像和镜像卷
  • 错误处理:记录同步结果和错误,支持重试机制

代码路径:kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go

// SyncPod syncs the running pod into the desired pod by executing following steps: // // 1. Compute sandbox and container changes. // 2. Kill pod sandbox if necessary. // 3. Kill any containers that should not be running. // 4. Create sandbox if necessary. // 5. Create ephemeral containers. // 6. Create init containers. // 7. Resize running containers (if InPlacePodVerticalScaling==true) // 8. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { logger := klog.FromContext(ctx) //...(略) // Step 4: Create a sandbox for the pod if necessary. podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { var msg string var err error logger.V(4).Info("Creating PodSandbox for pod", "pod", klog.KObj(pod)) //...(略) //开始创建沙箱 podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt) //...(略) logger.V(4).Info("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod)) //获取沙箱状态 resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false) //...(略) // If we ever allow updating a pod from non-host-network to // host-network, we may use a stale IP. //给POD分配IP if !kubecontainer.IsHostNetworkPod(pod) { // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox. podIPs = m.determinePodSandboxIPs(ctx, pod.Namespace, pod.Name, resp.GetStatus()) logger.V(4).Info("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod)) } } // the start containers routines depend on pod ip(as in primary pod ip) // instead of trying to figure out if we have 0 < len(podIPs) // everytime, we short circuit it here podIP := "" if len(podIPs) != 0 { podIP = podIPs[0] } // Get podSandboxConfig for containers to start. configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID) result.AddSyncResult(configPodSandboxResult) podSandboxConfig, err := m.generatePodSandboxConfig(ctx, pod, podContainerChanges.Attempt) if err != nil { message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) logger.Error(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod)) configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message) return } imageVolumePullResults, err := m.getImageVolumes(ctx, pod, podSandboxConfig, pullSecrets) if err != nil { logger.Error(err, "Get image volumes for pod failed", "pod", klog.KObj(pod)) configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, err.Error()) return } // Helper containing boilerplate common to starting all types of containers. // typeName is a description used to describe this type of container in log messages, // currently: "container", "init container" or "ephemeral container" // metricLabel is the label used to describe this type of container in monitoring metrics. // currently: "container", "init_container" or "ephemeral_container" //沙箱建好了,接下来在POD内创建容器 start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error { startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(ctx, pod, spec.container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) logger.V(4).Info("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container.Name, "pod", klog.KObj(pod)) return err } metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc() if sc.HasWindowsHostProcessRequest(pod, spec.container) { metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc() } logger.V(4).Info("Creating container in pod", "containerType", typeName, "container", spec.container.Name, "pod", klog.KObj(pod)) // We fail late here to populate the "ErrImagePull" and "ImagePullBackOff" correctly to the end user. imageVolumes, err := m.toKubeContainerImageVolumes(ctx, imageVolumePullResults, spec.container, pod, startContainerResult) if err != nil { return err } // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs. msg, err = m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs, imageVolumes) incrementImageVolumeMetrics(err, msg, spec.container, imageVolumes) if err != nil { // startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are // useful to cluster administrators to distinguish "server errors" from "user errors". metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc() if sc.HasWindowsHostProcessRequest(pod, spec.container) { metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc() } startContainerResult.Fail(err, msg) // known errors that are logged in other places are logged at higher levels here to avoid // repetitive log spam switch { case err == images.ErrImagePullBackOff: logger.V(3).Info("Container start failed in pod", "containerType", typeName, "container", spec.container.Name, "pod", klog.KObj(pod), "containerMessage", msg, "err", err) default: utilruntime.HandleError(fmt.Errorf("%v %v start failed in pod %v: %w: %s", typeName, spec.container.Name, format.Pod(pod), err, msg)) } return err } return nil } // Step 5: start ephemeral containers // These are started "prior" to init containers to allow running ephemeral containers even when there // are errors starting an init container. In practice init containers will start first since ephemeral // containers cannot be specified on pod creation. for _, idx := range podContainerChanges.EphemeralContainersToStart { start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx])) } // Step 6: start init containers. for _, idx := range podContainerChanges.InitContainersToStart { container := &pod.Spec.InitContainers[idx] // Start the next init container. if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil { if podutil.IsRestartableInitContainer(container) { logger.V(4).Info("Failed to start the restartable init container for the pod, skipping", "initContainerName", container.Name, "pod", klog.KObj(pod)) continue } logger.V(4).Info("Failed to initialize the pod, as the init container failed to start, aborting", "initContainerName", container.Name, "pod", klog.KObj(pod)) return } // Successfully started the container; clear the entry in the failure logger.V(4).Info("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod)) } // Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources if resizable, _, _ := allocation.IsInPlacePodVerticalScalingAllowed(pod); resizable { if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources { result.SyncResults = append(result.SyncResults, m.doPodResizeAction(ctx, pod, podStatus, podContainerChanges)) } } // Step 8: start containers in podContainerChanges.ContainersToStart. for _, idx := range podContainerChanges.ContainersToStart { start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx])) } return }
2.8 创建沙箱:createPodSandbox(kuberuntime_sandbox.go)

createPodSandbox是 kubelet 容器运行时管理器中创建 Pod sandbox 的核心函数。Sandbox 是容器运行时的隔离环境,为 Pod 提供共享的网络、PID 等命名空间。主要功能包括:

  • 生成 Sandbox 配置:根据 Pod 规范生成 sandbox 配置
  • 创建日志目录:为 Pod 创建日志存储目录
  • 查找运行时处理器:根据 RuntimeClass 查找对应的容器运行时
  • 创建 Sandbox:调用容器运行时接口创建实际的 sandbox
  • 错误处理:在每个步骤进行错误检查和日志记录

代码路径:kubernetes/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go

func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) { // ...(略) podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler) if err != nil { message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err) logger.Error(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod)) return "", message, err } return podSandBoxID, "", nil }
2.9 调用CRI:RunPodSandbox(remote_runtime.go)

[RunPodSandbox]是 CRI(容器运行时接口)客户端的核心方法,负责通过 gRPC 调用远程容器运行时创建 Pod sandbox。主要功能包括:

  • 超时管理:为 sandbox 创建操作设置超时时间(默认 4 分钟)
  • 远程调用:通过 gRPC 客户端调用容器运行时的RunPodSandbox接口
  • 响应验证:验证返回的 sandbox ID 是否有效
  • 错误处理:记录错误日志并返回适当的错误信息
  • 日志记录:记录请求和响应的详细信息,便于调试

代码路径:kubernetes/staging/src/k8s.io/cri-client/pkg/remote_runtime.go

func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) { // ...(略) resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{ Config: config, RuntimeHandler: runtimeHandler, }) // ...(略) return podSandboxID, nil }

Containerd

进入containerd 的内容

📝 总结与展望

Kubelet 的syncLoop定期(默认 10 秒)执行一次全量同步,。

  • 如果容器崩了:在下一次同步时,Kubelet 发现“API Server 说应该有这个 Pod,但 containerd 说没这个容器”,Kubelet 会立即重新调用 CRI 创建它。
  • 如果网络断了:等网络恢复,Kubelet 会通过 Watch 补偿机制补齐断网期间错过的所有 Pod 更新。

创建POD过程中,会通过CRI调用containerd 先创建pause沙箱,再在沙箱内创建业务容器,实现整个POD的生命周期管理。

这就是为什么 Kubernetes 是“状态驱动”而不是“指令驱动”:Kubelet 永远在对比“API 记录的准则”和“我这台机器上的实际容器”,一旦不对劲就立刻修正。

📚 参考资料

https://kubernetes.io/zh-cn/docs/reference/command-line-tools-reference/kubelet/

https://blog.huweihuang.com/kubernetes-notes/principle/component/kubernetes-core-principle-kubelet/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1200395.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sci文献检索入口指南:快速掌握SCI文献检索入口及使用方法

做科研的第一道坎&#xff0c;往往不是做实验&#xff0c;也不是写论文&#xff0c;而是——找文献。 很多新手科研小白会陷入一个怪圈&#xff1a;在知网、Google Scholar 上不断换关键词&#xff0c;结果要么信息过载&#xff0c;要么完全抓不到重点。今天分享几个长期使用的…

【开题答辩全过程】以 基于SpringBoot的律师事务所管理系统的设计与实现为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

【技术】从POD创建看Kubernetes源码实现 (六)- containerd

✍️ 作者&#xff1a;茶水间Tech &#x1f3f7;️ 标签&#xff1a;#云计算#云原生#kubernetes#容器&#x1f4d6; 前言 ​ kubernetes的模块比较多&#xff0c;架构复杂&#xff0c;代码量更是庞大&#xff0c;看代码比较麻烦&#xff0c;我们从现实场景出发&#xff0c;从…

供应链预测科学:机器学习与优化技术

Ping Xu 在亚马逊的供应链优化技术&#xff08;SCOT&#xff09;组织中担任预测科学总监&#xff0c;她是今年消费者科学峰会的组织者之一。 她已在亚马逊担任了近 15 年的各种优化和需求预测职务。她于 2005 年在麻省理工学院获得运筹学博士学位后不久&#xff0c;作为全职员工…

2026年DevOps平台全景观察:本土化与云原生双轨并行下的企业选择

2026年DevOps平台全景观察&#xff1a;本土化与云原生双轨并行下的企业选择 随着数字化转型进入深水区&#xff0c;DevOps平台正从单纯的技术工具演变为企业研发效能的战略基础设施。2026年的技术版图上&#xff0c;DevOps领域呈现出明显的本土化与全球化双轨并行态势&#xff…

一文带你上手 Skills:构建可复用的 AI 能力体系

标准化、可复用、渐进式——让 AI 高效完成重复性任务一、 为什么需要 Skills在传统 LLM 使用场景中&#xff0c;我们通常依赖 Prompt 来让模型完成任务&#xff0c;例如&#xff1a;"你是一个项目经理&#xff0c;请根据输入内容生成符合公司规范的周报……"这种方式…

制造业海外社媒代运营服务商:外贸 B2B 营销 + 海外整合营销 + 海外展会推广平台全链路服务

在全球贸易格局深度调整的背景下,中国外贸正稳步复苏并呈现结构性转型态势。海关总署数据显示,2025年前十个月,我国货物贸易出口达22.12万亿元,同比增长6.2%,外贸“逐季回暖”趋势明显。与此同时,共建“一带一路…

高效<|关键词|>指南:提升学术资源检索效率与科研文献获取能力的实用方法

做科研的第一道坎&#xff0c;往往不是做实验&#xff0c;也不是写论文&#xff0c;而是——找文献。 很多新手科研小白会陷入一个怪圈&#xff1a;在知网、Google Scholar 上不断换关键词&#xff0c;结果要么信息过载&#xff0c;要么完全抓不到重点。今天分享几个长期使用的…

搞定100+表迁移 Navicat实战复盘

需求清单&#xff1a; 100张数据表要迁移&#xff08;还要支持后续动态新增&#xff09;双链路同步&#xff1a;MySQL到MySQL、MongoDB到PostgreSQL不能写死配置&#xff0c;要能灵活扩展 技术约束&#xff1a; 源环境&#xff08;塔外&#xff09;和目标环境&#xff08;塔…

寻找可靠碳酸镁货源?这些厂家口碑获认可,国外碳酸镁厂家选哪家优质品牌榜单更新

近年来,随着菱镁矿资源精深加工技术的突破,碳酸镁作为功能性无机材料在有色金属冶炼、医药食品、运动防护等领域的应用需求持续增长。然而,市场上游厂家技术水平参差不齐,部分企业存在原料供应不稳定、产品杂质超标…

AI训练存储系统对象存储为后端的文件系统概论

存储系统按照抽象级别分类&#xff0c;分为三种&#xff1a;文件存储、对象存储、块存储。此处我们不讨论块存储&#xff0c;只讨论文件存储与对象存储。文件存储是我们在日常生活中最熟悉的存储方式。它将数据组织成树状结构&#xff08;目录/文件夹&#xff09;。每个文件都位…

现阶段最经典的天猫购物券回收省心平台

如今网购已经深度融入日常,不管是节日福利、活动赠送还是自行领取,很多人手中都会积攒不少天猫购物券。但实际生活里,计划往往难以跟上变化,不少天猫购物券最终没能派上用场,只能静静躺在账户里闲置过期 面对闲置…

Python+tkinter程序中ttk.Progressbar进度条组件用法演示

董付国老师Python系列教材&#xff08;累计印刷超过240次&#xff09;推荐与选用参考 中国大学MOOC董付国老师“Python程序设计基础”可以发证书啦 开学第一课&#xff1a;一定不要这样问老师Python问题 Python小屋7500道习题免费在线练习 “Python小屋”1400篇历史文章分类速查…

微算法科技(NASDAQ :MLGO)量子安全区块链:PQ-DPoL与Falcon签名的双重防御体系

量子计算技术正以前所未有的速度发展,Shor算法展现出能在多项式时间内破解传统公钥密码术如RSA和ECC的强大潜力,这使得依赖传统加密手段的区块链预加密安全性面临严峻危机。微算法科技(NASDAQ :MLGO)提出的量子安…

2026本地汽车托运物流怎么选?性价比优选,国内正规的汽车托运物流平台赋能企业生产效率提升与成本优化

近年来,汽车托运物流行业迎来高速发展期,全国运输需求年均增长率超15%,市场扩容的同时,价格不透明、服务标准缺失、安全保障薄弱等问题逐渐凸显。对于个人车主、二手车商及主机厂等采购方而言,如何在众多服务商中…

救命神器9个AI论文软件,助你轻松搞定本科生毕业论文!

救命神器9个AI论文软件&#xff0c;助你轻松搞定本科生毕业论文&#xff01; 论文写作的救星&#xff1a;AI工具如何改变你的学术之路 在当今这个信息爆炸的时代&#xff0c;本科生的毕业论文写作已经不再是单纯的脑力劳动&#xff0c;而是需要借助高效工具来提升效率和质量。…

ctfshow web入门

web1 打开得到:方法1: f12可在元素里面看到注释从而得到flag:方法2: 在网站的url前加上view-source:即可得到当前url页码的源码从而看到flag:方法3: ctrl+u查看前端源码 总结: 注释,view-source看到前端代码,…

2025.12.18 NAT地址转换、PAT - 实践

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2026最新专注力培训机构top5评测!服务深度覆盖锦江区、青羊区、双流区等地,辐射成都本地,优质学校权威榜单发布,科学体系铸就儿童成长优势.

在儿童成长关键期,专注力培养已成为幼小衔接阶段的核心课题。随着《教育部关于大力推进幼儿园与小学科学衔接的指导意见》深入落实,成都地区专注于专注力培养的幼小衔接机构迎来发展新机遇。本榜单基于课程体系专业性…

学长亲荐2026专科生必用AI论文软件TOP10:开题报告文献综述全测评

学长亲荐2026专科生必用AI论文软件TOP10&#xff1a;开题报告文献综述全测评 2026年专科生论文写作工具测评&#xff1a;为何需要一份精准指南 随着人工智能技术的不断进步&#xff0c;AI论文写作工具逐渐成为高校学生&#xff0c;尤其是专科生群体的重要辅助工具。然而&#x…