KubeController Garbagecollector
本文从源码的角度分析KubeController Garbagecollector相关功能的实现。
本篇kubernetes版本为v1.27.3。
kubernetes项目地址: https://github.com/kubernetes/kubernetes
controller命令main入口: cmd/kube-controller-manager/controller-manager.go
controller相关代码目录: pkg/controller
更多文章访问 https://www.cyisme.top
在k8s中, 像deployment、statefulset等这些资源都是属于higher-level的资源; 比如deployment会控制replicaset, replicaset会控制pod, 它们的创建、更新、删除等操作都是通过controller来完成的。
这些对象的关联关系, 会通过ownerReference来标识, 例如一个处于被控状态的replicaset的ownerReference会指向它的deployment,一个处于被控状态的pod的ownerReference会指向它的replicaset(在deployment的资源场景下)。
GC Controller的作用就是在资源被删除时, 通过ownerReference来生成一个“关联图”, 并利用这个“图”,来删除(或其他动作)它所控制的资源。
GC Controller启动时, 会调用RUN和Sync方法
func startGarbageCollectorController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {// 省略部分代码// 用于创建metadata informer// finalizer的变动普通的informer是无法感知的, 所以需要使用metadata informer监听metadataClient, err := metadata.NewForConfig(config)if err != nil {return nil, true, err}// 创建GC ControllergarbageCollector, err := garbagecollector.NewGarbageCollector(gcClientset,metadataClient,controllerContext.RESTMapper,ignoredResources,controllerContext.ObjectOrMetadataInformerFactory,controllerContext.InformersStarted,)if err != nil {return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err)}// 启动gc进程go garbageCollector.Run(ctx, workers)// 定期刷新k8s中的资源列表, 并更新到informer监听中// 这里的作用是为了保证informer中的资源列表是最新的(全部的)go garbageCollector.Sync(ctx, discoveryClient, 30*time.Second)return garbageCollector, true, nil
}
组件信息
GC Controller从功能职责上,大致上可以分为两部分:
- GarbageCollector: 负责执行清理动作
- GraphBuilder: 负责list/watch资源, 更新并维护“关联图”
 - GraphBuilder的作用是提供数据,- GarbageCollector的作用是消费数据。从- GarbageCollector定义可以看出:
type GarbageCollector struct {// 用于获取gvkrestMapper     meta.ResettableRESTMapper// 用于获取资源metadataClient metadata.Interface// 待删除的队列attemptToDelete workqueue.RateLimitingInterface// 孤儿队列attemptToOrphan        workqueue.RateLimitingInterface// 生成“关联图”dependencyGraphBuilder *GraphBuilder// 缓存对象absentOwnerCache *ReferenceCachekubeClient       clientset.InterfaceeventBroadcaster record.EventBroadcasterworkerLock sync.RWMutex
}
GraphBuilder
GraphBuilder代码文件: pkg/controller/garbagecollector/graph_builder.go
type GraphBuilder struct {restMapper meta.RESTMapper// list/watch资源monitors    monitorsmonitorLock sync.RWMutexinformersStarted <-chan struct{}stopCh <-chan struct{}running bool// 事件记录eventRecorder record.EventRecordermetadataClient metadata.Interface// 存放发生变动的资源graphChanges workqueue.RateLimitingInterface// node是“图”中的节点, 存放关联信息// 这是一个map, key是资源的uid, value是一个nodeuidToNode *concurrentUIDToNode// 待删除资源attemptToDelete workqueue.RateLimitingInterface// 孤儿资源attemptToOrphan workqueue.RateLimitingInterfaceabsentOwnerCache *ReferenceCachesharedInformers  informerfactory.InformerFactoryignoredResources map[schema.GroupResource]struct{}
}
monitor用于list/watch资源meta信息, 使用eventHandler机制, 将数据添加到graphChanges队列中。
GraphBuilder的Run方法会启动monitors和graphChanges的处理。
func (gb *GraphBuilder) Run(ctx context.Context) {// 省略部分代码// 启动monitor, 数据list/watchgb.startMonitors(logger)// 启动数据处理wait.Until(func() { gb.runProcessGraphChanges(logger) }, 1*time.Second, ctx.Done())// 省略部分代码
}
最终资源间的关联关系会形成一个树状的关系图。
Monitors
monitors是一个map类型,他以GVK为键,对应的informer为值.
type monitors map[schema.GroupVersionResource]*monitor
type monitor struct {// infomer的控制器和缓存controller cache.Controllerstore      cache.StorestopCh chan struct{}
}
gb.startMonitors其实就是对monitors中的所有infomer执行Run操作, 也就是启动list/watch,这里不展开说明。
gb.startMonitors除了在启动时调用, 还有一个resyncMonitors方法也调用了他。这个方法其实就是“整理”了一下monitors。 这个方法会被GarbageCollector.Sync调用。
func (gc *GarbageCollector) resyncMonitors(logger klog.Logger, deletableResources map[schema.GroupVersionResource]struct{}) error {if err := gc.dependencyGraphBuilder.syncMonitors(logger, deletableResources); err != nil {return err}gc.dependencyGraphBuilder.startMonitors(logger)return nil
}
func (gb *GraphBuilder) syncMonitors(logger klog.Logger, resources map[schema.GroupVersionResource]struct{}) error {// 省略部分代码toRemove := gb.monitorsfor resource := range resources {if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {continue}// 如果已经存在, 则不需要再创建if m, ok := toRemove[resource]; ok {current[resource] = mdelete(toRemove, resource)kept++continue}// 获取资源的GVKkind, err := gb.restMapper.KindFor(resource)// 创建informerc, s, err := gb.controllerFor(logger, resource, kind)current[resource] = &monitor{store: s, controller: c}added++}gb.monitors = current// 停止不需要的informerfor _, monitor := range toRemove {if monitor.stopCh != nil {close(monitor.stopCh)}}// 省略部分代码
}
controllerFor中创建了infomer, 并添加handler,也就是将数据添加到graphChanges队列。
func (gb *GraphBuilder) controllerFor(logger klog.Logger, resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {handlers := cache.ResourceEventHandlerFuncs{// add the event to the dependencyGraphBuilder's graphChanges.AddFunc: func(obj interface{}) {event := &event{eventType: addEvent,obj:       obj,gvk:       kind,}gb.graphChanges.Add(event)},// 省略update和delete}// 省略创建informer
}
启动时调用的gb.startMonitors(logger)实际就是对所有monitor执行Run操作。
func (gb *GraphBuilder) startMonitors(logger klog.Logger) {// 省略部分代码for _, monitor := range monitors {if monitor.stopCh == nil {monitor.stopCh = make(chan struct{})gb.sharedInformers.Start(gb.stopCh)go monitor.Run()started++}}
}
runProcessGraphChanges
在执行资源的删除操作时, 会根据不同的删除策略更新finalizers, 从而触发update事件。
# 前台删除,finalizers中增加foregroundDeletion
kubectl delete deployment nginx-deployment --cascade=foreground
# 孤儿删除,finalizers中增加orphan
kubectl delete deployment nginx-deployment --cascade=orphan
# 后台删除,不更新finalizers
kubectl delete deployment nginx-deployment
后台/前台删除在感官上区别不大(在正常情况下,最终都会被删除)
- 前台: 先删除从属对象,再删除主对象 (速度会慢一些, 从属对象的删除结果会影响主对象)
- 后台: 先删除主对象,后台清理从属对象 (速度快一些, 从属对象的删除结果不会影响主对象)
- 孤儿: 删除主对象,不清理从属对象
对象变动被监听到后, 会由processGraphChanges函数处理, 这个函数会从graphChanges队列中获取数据, 并更新uidToNode, 然后分发到attemptToDelete和attemptToOrphan。
一个资源有可能有多个owner, 所以需对每个owner的身份信息进行处理。
apiVersion: v1
kind: Pod
metadata:creationTimestamp: "2023-12-27T10:11:00Z"generateName: test-5f6778868d-labels:app: testpod-template-hash: 5f6778868dname: test-5f6778868d-6zt4hnamespace: defaultownerReferences:// ownerReferences是一个数组, 代表有多个owner- apiVersion: apps/v1blockOwnerDeletion: truecontroller: truekind: ReplicaSetname: test-5f6778868duid: a5a436cb-3212-4fbe-85d3-ec80ab9b4208resourceVersion: "50228752"uid: 0bce606e-1f1c-4f6f-bba0-229a55cb656b
先来看一个频繁出现的处理函数partitionDependents。这个函数用于判断给定的依赖列表,与给定的owner身份信息是否匹配, 最终返回匹配/不匹配两个列表。
- 检测当前依赖的ns与给定owner的ns是否一致
- 检测当前依赖的owners中,是否有与给定owner uid一致的
- 如果当前依赖中的某个owner与给定owner uid一致, 则检测ownerReference中的其他字段是否一致
func partitionDependents(dependents []*node, matchOwnerIdentity objectReference) (matching, nonmatching []*node) {// 判断是否有ns绑定ownerIsNamespaced := len(matchOwnerIdentity.Namespace) > 0for i := range dependents {dep := dependents[i]foundMatch := falsefoundMismatch := false// 如果命名空间不匹配, 则直接认为不匹配if ownerIsNamespaced && matchOwnerIdentity.Namespace != dep.identity.Namespace {// all references to the parent do not match, since the dependent namespace does not match the ownerfoundMismatch = true} else {for _, ownerRef := range dep.owners {// 如果uid匹配, 则进行更细致的判断if ownerRef.UID == matchOwnerIdentity.UID {// 对剩余的其他字段进行匹配检测, uid、name、apiVersion、kindif ownerReferenceMatchesCoordinates(ownerRef, matchOwnerIdentity.OwnerReference) {foundMatch = true} else {foundMismatch = true}}}}if foundMatch {matching = append(matching, dep)}if foundMismatch {nonmatching = append(nonmatching, dep)}}return matching, nonmatching
}
processGraphChanges中就用到了上述的partitionDependents来处理依赖关系。
func (gb *GraphBuilder) processGraphChanges(logger klog.Logger) bool {// 获取数据item, quit := gb.graphChanges.Get()if quit {return false}// 省略部分代码// item最终会被转换成accessoraccessor, err := meta.Accessor(obj)existingNode, found := gb.uidToNode.Read(accessor.GetUID())// 不会以虚拟事件更新真实节点// 如果节点已经存在, 且已经存在的节点为虚拟节点、当前的事件非虚拟事件if found && !event.virtual && !existingNode.isObserved() {// 获取观察到的身份信息observedIdentity := identityFromEvent(event, accessor)// 如果观察到的身份信息和已经存在的节点身份信息不一致if observedIdentity != existingNode.identity {// 找到与当前观察到的身份信息不匹配的依赖_, potentiallyInvalidDependents := partitionDependents(existingNode.getDependents(), observedIdentity)// 不匹配的依赖会放到待删除的队列中for _, dep := range potentiallyInvalidDependents {if len(observedIdentity.Namespace) > 0 && dep.identity.Namespace != observedIdentity.Namespace {// 会记录事件gb.reportInvalidNamespaceOwnerRef(dep, observedIdentity.UID)}// 放到待清理队列中gb.attemptToDelete.Add(dep)}// 更新节点的身份信息existingNode = existingNode.clone()existingNode.identity = observedIdentitygb.uidToNode.Write(existingNode)}// 将virtual标记为false,代表是观察到的真实节点existingNode.markObserved()}switch {case (event.eventType == addEvent || event.eventType == updateEvent) && !found:newNode := &node{identity:           identityFromEvent(event, accessor),dependents:         make(map[*node]struct{}),owners:             accessor.GetOwnerReferences(),deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),beingDeleted:       beingDeleted(accessor),}// 将节点添加到uidToNode中,并解析其owners写入关联关系// 如果owner在uidToNode不存在, 则会创建一个虚拟节点// 虚拟节点将会被添加到待删除队列, 再后续的处理中将会对这个虚拟节点进行真实性核对gb.insertNode(logger, newNode)// 如果是更新事件, 需要处理ownerReference的变化// 判断对象中的fiinalizers的变化, 从而放到不同的队列中// 如果finalizers中有foregroundDeletion, 则放到attemptToDelete队列中. 并标记为isDeletingDependents// 如果finalizers中有forphan, 则放到attemptToOrphan队列中gb.processTransitions(logger, event.oldObj, accessor, newNode)case (event.eventType == addEvent || event.eventType == updateEvent) && found:// 对比差异, 并差异更新added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {// 分别判断removed,和change的BlockOwnerDeletion的状态,决定是否放到attemptToDelete队列中gb.addUnblockedOwnersToDeleteQueue(logger, removed, changed)// 更新节点数据existingNode.owners = accessor.GetOwnerReferences()// 添加到从属列表gb.addDependentToOwners(logger, existingNode, added)// 删除关联关系gb.removeDependentFromOwners(existingNode, removed)}// 如果当前节点有DeletionTimestamp, 代表正在删除中, 更新其标记if beingDeleted(accessor) {existingNode.markBeingDeleted()}gb.processTransitions(logger, event.oldObj, accessor, existingNode)case event.eventType == deleteEvent:// 省略不存在则跳过的代码removeExistingNode := true// 虚拟事件代表不是从infomer监听到的// 如果查找的节点没有找到, 会生成一个虚拟节点, 并创建虚拟事件if event.virtual {// this is a virtual delete event, not one observed from an informerdeletedIdentity := identityFromEvent(event, accessor)if existingNode.virtual {// 获取节点关联信息if matchingDependents, nonmatchingDependents := partitionDependents(existingNode.getDependents(), deletedIdentity); len(nonmatchingDependents) > 0 {removeExistingNode = false// 匹配到的依赖会放到attemptToDelete队列中// 将当前节点的身份信息放到absentOwnerCache// absentOwnerCache用来存放未找到的节点if len(matchingDependents) > 0 {gb.absentOwnerCache.Add(deletedIdentity)for _, dep := range matchingDependents {gb.attemptToDelete.Add(dep)}}// 更新虚拟节点信息if existingNode.identity == deletedIdentity {replacementIdentity := getAlternateOwnerIdentity(nonmatchingDependents, deletedIdentity)if replacementIdentity != nil {replacementNode := existingNode.clone()replacementNode.identity = *replacementIdentitygb.uidToNode.Write(replacementNode)// 从新入队gb.attemptToDelete.AddRateLimited(replacementNode)}}}} else if existingNode.identity != deletedIdentity {// 不会根据因为虚拟事件删除真实节点removeExistingNode = falsematchingDependents, _ := partitionDependents(existingNode.getDependents(), deletedIdentity)// 更新缺失缓存// 关联删除if len(matchingDependents) > 0 {// mark the observed deleted identity as absentgb.absentOwnerCache.Add(deletedIdentity)// attempt to delete dependents that do match the verified deleted identityfor _, dep := range matchingDependents {gb.attemptToDelete.Add(dep)}}}}if removeExistingNode {// 删除节点gb.removeNode(existingNode)existingNode.dependentsLock.RLock()defer existingNode.dependentsLock.RUnlock()if len(existingNode.dependents) > 0 {gb.absentOwnerCache.Add(identityFromEvent(event, accessor))}for dep := range existingNode.dependents {gb.attemptToDelete.Add(dep)}for _, owner := range existingNode.owners {// 如果owner是正在删除的状态, 即等待从属节点删除结束, 则放到attemptToDelete队列中// isDeletingDependents状态是由processTransitions方法标记的ownerNode, found := gb.uidToNode.Read(owner.UID)if !found || !ownerNode.isDeletingDependents() {continue}gb.attemptToDelete.Add(ownerNode)}}}return true
}
GarbageCollector

GarbageCollector的Run方法会启动GraphBuilder和runAttemptToDeleteWorker、runAttemptToOrphanWorker。
- attemptToDelete 处理待删除的资源
- attemptToOrphan 处理孤儿资源
func (gc *GarbageCollector) Run(ctx context.Context, workers int) {// 省略部分代码// 启动GraphBuildergo gc.dependencyGraphBuilder.Run(ctx)// 省略部分代码// 启动工作进程for i := 0; i < workers; i++ {go wait.UntilWithContext(ctx, gc.runAttemptToDeleteWorker, 1*time.Second)go wait.Until(func() { gc.runAttemptToOrphanWorker(logger) }, 1*time.Second, ctx.Done())}
}
因为主要的数据处理逻辑都由GraphBuilder完成, 所以GarbageCollector的处理逻辑就简单一些
runAttemptToDeleteWorker
runAttemptToDeleteWorker处理待删除的资源
func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool {item, quit := gc.attemptToDelete.Get()gc.workerLock.RLock()defer gc.workerLock.RUnlock()if quit {return false}defer gc.attemptToDelete.Done(item)action := gc.attemptToDeleteWorker(ctx, item)switch action {case forgetItem:// 代表处理完成,从队列中删除gc.attemptToDelete.Forget(item)case requeueItem:// 代表处理失败,重新入队gc.attemptToDelete.AddRateLimited(item)}return true
}
这里跳过对gc.attemptToDeleteWorker的代码展开。
gc.attemptToDeleteWorker最终调用的是attemptToDeleteItem。
gc.attemptToDeleteWorker主要是做了一些状态判断和错误判断,用于返回action。
最终的删除动作是由gc.deleteObject执行的, 它会调用api删除资源, 以此实现递归删除。
attemptToDeleteItem中出现的gc.classifyReferences方法,是用于检查owner的状态, 并返回三个列表:
- solid 代表存在的owner资源列表,这个owner不能处于"删除中"的状态
- dangling 代表不存在的owner资源列表,这个owner已经不存在与集群中了
- waitingForDependentsDeletion 代表存在的owner资源列表,这个owner处于"删除中"的状态
 当执行删除时, 会根据这三个列表的情况, 来决定删除策略。
func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node) error {// 省略部分代码// 从api中获取对象latest, err := gc.getObject(item.identity)switch {// 未找到时代表item是一个虚拟节点case errors.IsNotFound(err):// 会生成一个虚拟删除事件gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)return enqueuedVirtualDeleteEventErrcase err != nil:return err}// 如果uid不匹配,会生成一个虚拟删除事件if latest.GetUID() != item.identity.UID {gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)return enqueuedVirtualDeleteEventErr}// 移除finalizersif item.isDeletingDependents() {return gc.processDeletingDependentsItem(logger, item)}ownerReferences := latest.GetOwnerReferences()// solid 代表有owner的资源// dangling 代表没有owner的资源// waitingForDependentsDeletion 代表有owner, 但是owner正在删除中,并且有finalizerssolid, dangling, waitingForDependentsDeletion, err := gc.classifyReferences(ctx, item, ownerReferences)switch {case len(solid) != 0:// 如果有solid, 代表有owner, 还不能删除这个资源// 省略部分代码// // 但是可以调用api更新资源ref信息,从ref中移除已经不存在的owner和正在删除中的ownerownerUIDs := append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...)p, err := c.GenerateDeleteOwnerRefStrategicMergeBytes(item.identity.UID, ownerUIDs)_, err = gc.patch(item, p, func(n *node) ([]byte, error) {return gc.deleteOwnerRefJSONMergePatch(n, ownerUIDs...)})return errcase len(waitingForDependentsDeletion) != 0 && item.dependentsLength() != 0:deps := item.getDependents()for _, dep := range deps {if dep.isDeletingDependents() {// 先调用api将其依赖的blockOwnerDeletion设置为falsepatch, err := item.unblockOwnerReferencesStrategicMergePatch()if err != nil {return err}if _, err := gc.patch(item, patch, gc.unblockOwnerReferencesJSONMergePatch); err != nil {return err}break}}// 所有owner都处于可删除的状态, 那么可以删除这个资源(进入递归删除)// 当这个资源再次被监听到时, 因为关联已经被清空(假如已清空), 所以会直接删除policy := metav1.DeletePropagationForegroundreturn gc.deleteObject(item.identity, &policy)default:// 如果没有owner则需要进行垃圾回收// 根据其finalizers中的信息, 选择不同的删除策略var policy metav1.DeletionPropagationswitch {case hasOrphanFinalizer(latest):policy = metav1.DeletePropagationOrphancase hasDeleteDependentsFinalizer(latest):policy = metav1.DeletePropagationForegrounddefault:policy = metav1.DeletePropagationBackground}return gc.deleteObject(item.identity, &policy)}
}
runAttemptToOrphanWorker
孤儿资源的处理就相对比较简:
- 对所有关联资源的ownerReference进行更新, 解除关联关系。 如删除deployments时指定孤儿删除, 那么关联的rs的ownerReference中会移除这个deployment。
- 删除finalizers使其可以被删除
- 等待再次被监听到时执行删除操作
func (gc *GarbageCollector) attemptToOrphanWorker(logger klog.Logger, item interface{}) workQueueItemAction {// 省略部分代码// // 更新关联资源的ownerReferenceerr := gc.orphanDependents(logger, owner.identity, dependents)if err != nil {utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))return requeueItem}// 删除finalizerserr = gc.removeFinalizer(logger, owner, metav1.FinalizerOrphanDependents)if err != nil {utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))return requeueItem}return forgetItem
}
func (gc *GarbageCollector) orphanDependents(logger klog.Logger, owner objectReference, dependents []*node) error {// 多线程处理wg := sync.WaitGroup{}wg.Add(len(dependents))for i := range dependents {go func(dependent *node) {defer wg.Done()// 生成patch的请求内容p, err := c.GenerateDeleteOwnerRefStrategicMergeBytes(dependent.identity.UID, []types.UID{owner.UID})// 执行patch, 删除对应的owner信息_, err = gc.patch(dependent, p, func(n *node) ([]byte, error) {// 这里是兼容处理, 实际干的是同一件事return gc.deleteOwnerRefJSONMergePatch(n, owner.UID)})}(dependents[i])}wg.Wait()close(errCh)// 省略部分代码return nil
}
总结
- 在执行删除资源时, 不同的删除策略会使资源在finalizers中增加不同的字段, 从而触发不同的处理逻辑。
- 想要删除一个资源需要满足几个条件: - 清空关联关系
- 无owner
- finalizers为空
 
- 不同的删除策略决定了上述几个条件具体执行的方式, 但最终需要满足的条件是一致。
- 删除是按固定的逻辑执行的,整个流程可以理解为递归的处理方式。