欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

kube-scheduler 磁盘调度源码分析

发布时间:2024/8/23 编程问答 67 豆豆
生活随笔 收集整理的这篇文章主要介绍了 kube-scheduler 磁盘调度源码分析 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

作者 | leadersnowy

来源 | CSDN博客

kube-scheduler介绍

首先我们知道,kube-scheduler的根本工作任务是根据各种调度算法将Pod调度到最合适的工作节点上

一、整个调度流程分为两个阶段:

1、预选(Predicates):

输入是所有节点,输出是满足预选条件的节点。kube-scheduler根据预选策略过滤掉不满足策略的Nodes。例如,如果某节点的资源不足或者不满足预选策略的条件如“Node的label必须与Pod的Selector一致”时则无法通过预选。

2、优选(Priorities):

输入是预选阶段筛选出的节点,优选会根据优先策略为通过预选的Nodes进行打分排名,选择得分最高的Node。例如,资源越富裕、负载越小的Node可能具有越高的排名。

而整个调度过程是有很多因素影响的,包括节点状态(cpu,内存,磁盘),节点与pod的亲和性,节点标签等,而我们本次只做与磁盘相关的源码分析。

源码分析

2.1、磁盘预选(Predicates)

这一部分核心代码在pkg/scheduler/framework/plugins/nodevolumelimits/这个目录下,分两个文件

csi.go,non_csi.go 两个文件的核心思想基本一致,都是通过Filter来限制一个节点上的最大盘的数量,这个最大数量是默认配置在k8s源码中的。csi的就是限制一个节点上最多能挂几块csi的volume,而non_csi的相对比较复杂,因为牵扯到in-tree插件跟flexvolume插件,而且对每一种in-tree插件都进行了分别的可挂载盘的数量的配置,下面我们对于non-csi的进行具体的源码分析。

话不多说,直接上代码:

2.1.1 nonCSILimits

type nonCSILimits struct {name stringfilter VolumeFiltervolumeLimitKey v1.ResourceNamemaxVolumeFunc func(node *v1.Node) intcsiNodeLister storagelisters.CSINodeListerpvLister corelisters.PersistentVolumeListerpvcLister corelisters.PersistentVolumeClaimListerscLister storagelisters.StorageClassLister// The string below is generated randomly during the struct's initialization.// It is used to prefix volumeID generated inside the predicate() method to// avoid conflicts with any real volume.randomVolumeIDPrefix string }

最核心的结构体nonCSILimits ,提供了几个成员

  • name 顾名思义,每一个nonCSILimits结构体变量的名称

  • filter VolumeFilter类型的变量,具体包括几个方法,FilterVolume(),FilterPersistentVolume(),MatchProvisioner()以及IsMigrated()具体调用我们后面再讲

  • volumeLimitKey 其实就是一个string类型,指定了几种类型的key

  • maxVolumeFunc() 一个获取该Limits的最大数量的方法

  • csiNodeLister csinode的监听对象

  • pvLister pv的监听对象

  • pvcLister pvc的监听对象

  • scLister sc的监听对象

  • randomVolumeIDPrefix 一个string类型,用于生成唯一的pvID

  • 2.1.2 nonCSILimits初始化

    func newNonCSILimits(filterName string,csiNodeLister storagelisters.CSINodeLister,scLister storagelisters.StorageClassLister,pvLister corelisters.PersistentVolumeLister,pvcLister corelisters.PersistentVolumeClaimLister, ) framework.Plugin {var filter VolumeFiltervar volumeLimitKey v1.ResourceNamevar name stringswitch filterName {case ebsVolumeFilterType:name = EBSNamefilter = ebsVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)case gcePDVolumeFilterType:name = GCEPDNamefilter = gcePDVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)case azureDiskVolumeFilterType:name = AzureDiskNamefilter = azureDiskVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)case indiskVolumeFilterType:name = IndiskNamefilter = indiskVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.IndiskVolumeLimitKey)default:klog.Fatalf("Wrong filterName, Only Support %v %v %v %v %v", ebsVolumeFilterType,gcePDVolumeFilterType, azureDiskVolumeFilterType, cinderVolumeFilterType, indiskVolumeFilterType)return nil}pl := &nonCSILimits{name: name,filter: filter,volumeLimitKey: volumeLimitKey,maxVolumeFunc: getMaxVolumeFunc(filterName),csiNodeLister: csiNodeLister,pvLister: pvLister,pvcLister: pvcLister,scLister: scLister,randomVolumeIDPrefix: rand.String(32),}return pl }

    初始化时引用了大量的常量

    const (// defaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE.// GCE instances can have up to 16 PD volumes attached.defaultMaxGCEPDVolumes = 16// defaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure.// Larger Azure VMs can actually have much more disks attached.// TODO We should determine the max based on VM sizedefaultMaxAzureDiskVolumes = 16// ebsVolumeFilterType defines the filter name for ebsVolumeFilter.ebsVolumeFilterType = "EBS"// gcePDVolumeFilterType defines the filter name for gcePDVolumeFilter.gcePDVolumeFilterType = "GCE"// azureDiskVolumeFilterType defines the filter name for azureDiskVolumeFilter.azureDiskVolumeFilterType = "AzureDisk"// cinderVolumeFilterType defines the filter name for cinderVolumeFilter.indiskVolumeFilterType = "Indisk"// ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"// KubeMaxPDVols defines the maximum number of PD Volumes per kubelet.KubeMaxPDVols = "KUBE_MAX_PD_VOLS"// MaxIndiskVolumes defines the maximum number of Indisk Volumes per node.MaxIndiskVolumes = "MAX_INDISK_VOLUMES"// DefaultIndiskVolumes defines the default number of Indisk Volumes per node.DefaultIndiskVolumes = "DEFAULT_INDISK_VOLUMES" )

    这些常量包括各intree存储类型的名字,以及默认的最大volume数量等信息,在nonCSILimits初始化时进行赋值。

    除了这些常量之外,最关键的就是filter方法的初始化,看一下cinder的例子,是怎么区分这个volume是cinder的volume的。

    var cinderVolumeFilter = VolumeFilter{FilterVolume: func(vol *v1.Volume) (string, bool) {if vol.Cinder != nil {return vol.Cinder.VolumeID, true}return "", false},FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {if pv.Spec.Cinder != nil {return pv.Spec.Cinder.VolumeID, true}return "", false},MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {if sc.Provisioner == csilibplugins.CinderInTreePluginName {return true}return false},IsMigrated: func(csiNode *storage.CSINode) bool {return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)}, }

    可以看到,filer其实是通过对元数据的字段(vol.Cinder,pv.Spec.Cinder,sc.Provisioner)等进行判断来分辨是不是本类型的volume的。

    如果不是intree的,可以通过其它的字段来进行过滤

    Flexvolume的我们可以用这些字段:

    vol.FlexVolume.Driver,pv.Spec.FlexVolume.Driver

    CSI的我们可以用这些字段:

    vol.CSI.Driver,pv.Spec.CSI.Driver

    2.1.3 核心方法Filter

    // Filter invoked at the filter extension point. func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {// If a pod doesn't have any volume attached to it, the predicate will always be true.// Thus we make a fast path for it, to avoid unnecessary computations in this case.if len(pod.Spec.Volumes) == 0 {return nil}newVolumes := make(map[string]bool)if err := pl.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {return framework.NewStatus(framework.Error, err.Error())}// quick returnif len(newVolumes) == 0 {return nil}node := nodeInfo.Node()if node == nil {return framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))}var csiNode *storage.CSINodevar err errorif pl.csiNodeLister != nil {csiNode, err = pl.csiNodeLister.Get(node.Name)if err != nil {// we don't fail here because the CSINode object is only necessary// for determining whether the migration is enabled or notklog.V(5).Infof("Could not get a CSINode object for the node: %v", err)}}// if a plugin has been migrated to a CSI driver, defer to the CSI predicateif pl.filter.IsMigrated(csiNode) {return nil}// count unique volumesexistingVolumes := make(map[string]bool)for _, existingPod := range nodeInfo.Pods {if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil {return framework.NewStatus(framework.Error, err.Error())}}numExistingVolumes := len(existingVolumes)// filter out already-mounted volumesfor k := range existingVolumes {delete(newVolumes, k)}numNewVolumes := len(newVolumes)maxAttachLimit := pl.maxVolumeFunc(node)volumeLimits := volumeLimits(nodeInfo)if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {maxAttachLimit = int(maxAttachLimitFromAllocatable)}if numExistingVolumes+numNewVolumes > maxAttachLimit {return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)}if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {nodeInfo.TransientInfo.TransientLock.Lock()defer nodeInfo.TransientInfo.TransientLock.Unlock()nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumesnodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes}return nil }

    接下来看一下核心代码的逻辑

    最开始是一些校验,首先校验这个pod有没有使用volume,再校验这个pod使用的volume是什么类型的volume,然后由对应类型的volume的filter进行处理。

    看一下最核心的处理流程

    numExistingVolumes := len(existingVolumes)// filter out already-mounted volumesfor k := range existingVolumes {delete(newVolumes, k)}numNewVolumes := len(newVolumes)maxAttachLimit := pl.maxVolumeFunc(node)volumeLimits := volumeLimits(nodeInfo)if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {maxAttachLimit = int(maxAttachLimitFromAllocatable)}if numExistingVolumes+numNewVolumes > maxAttachLimit {return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)}if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {nodeInfo.TransientInfo.TransientLock.Lock()defer nodeInfo.TransientInfo.TransientLock.Unlock()nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumesnodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes}return nil

    逻辑非常清楚

  • numExistingVolumes 表示该节点上已经有的volume数

  • numNewVolumes是该pod需要创建的volume数

  • maxAttachLimit 是该类型的volume在节点上所能创建的最大volume数

  • 如果 numExistingVolumes+numNewVolumes > maxAttachLimit 则该节点不可调度

  • 如果可以调度,则return nil,然后释放资源

  • 到此,预选阶段的工作做完。

    这里还需要注意一个地方

    nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumesnodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes

    AllocatableVolumesCount,RequestedVolumes这两个变量的赋值是给后面进行优选打分的时候用的

    2.2 磁盘优选(Priorities)

    优选打分的条件也很多,这里我们只处理跟存储相关的,具体源码在

    pkg\scheduler\framework\plugins\noderesources\resource_allocation.go

    pkg\scheduler\framework\plugins\noderesources\least_allocated.go

    2.2.1 resource_allocation

    resource_allocation 字体意思就可以理解,就是资源分配,而资源分配有多种方,比如least_allocated,most_allocated,requested_to_capacity_ratio都是资源分配的方法,而resource_allocation只是提供一个score方法,代码如下:

    func (r *resourceAllocationScorer) score(pod *v1.Pod,nodeInfo *framework.NodeInfo) (int64, *framework.Status) {node := nodeInfo.Node()if node == nil {return 0, framework.NewStatus(framework.Error, "node not found")}if r.resourceToWeightMap == nil {return 0, framework.NewStatus(framework.Error, "resources not found")}requested := make(resourceToValueMap, len(r.resourceToWeightMap))allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))for resource := range r.resourceToWeightMap {allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)}var score int64// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)} else {score = r.scorer(requested, allocatable, false, 0, 0)}if klog.V(10).Enabled() {if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {klog.Infof("%v -> %v: %v, map of allocatable resources %v, map of requested resources %v , allocatable volumes %d, requested volumes %d, score %d",pod.Name, node.Name, r.Name,allocatable, requested, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount,nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes,score,)} else {klog.Infof("%v -> %v: %v, map of allocatable resources %v, map of requested resources %v ,score %d,",pod.Name, node.Name, r.Name,allocatable, requested, score,)}}return score, nil }

    核心代码就一句

    if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)} else {score = r.scorer(requested, allocatable, false, 0, 0)}

    如果pod有volume并且BalanceAttachedNodeVolumes这个feature打开了,并且节点有TransientInfo

    那么就走存储相关的打分,否则就不走。

    我们看一下r.scorer的参数的后两个,就是我们在预选阶段最后赋值的两个参数

    AllocatableVolumesCount 表示还可以创建的volume数量

    RequestedVolumes 表示该pod需要的volume数量

    2.2.2 least_allocated

    least_allocated代码最少资源优先,也就是节点上资源越少,分越高

    直接看源码

    type resourceAllocationScorer struct {Name stringscorer func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64resourceToWeightMap resourceToWeightMap }func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 {return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {var nodeScore, weightSum int64for resource, weight := range resToWeightMap {resourceScore := leastRequestedScore(requested[resource], allocable[resource])nodeScore += resourceScore * weightweightSum += weight}return nodeScore / weightSum} }// The unused capacity is calculated on a scale of 0-MaxNodeScore // 0 being the lowest priority and `MaxNodeScore` being the highest. // The more unused resources the higher the score is. func leastRequestedScore(requested, capacity int64) int64 {if capacity == 0 {return 0}if requested > capacity {return 0}return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity }

    我们来看核心算法

    capacity表示还可以剩余资源数量

    requested表示该pod需求资源数量

    比如:capacity=10,requested=3,framework.MaxNodeScorel默认是100

    那么得分就是 (10-3)*100/10=70

    但是我们看到 leastResourceScorer中并没有引用存储的部分,所以我们可以手动添加上

    if includeVolumes && allocatableVolumes - requestedVolumes > 0 && allocatableVolumes > 0 {nodeScore += int64(((allocatableVolumes - requestedVolumes) * int(framework.MaxNodeScore)) / allocatableVolumes)weightSum += 1}

    至此,kube-scheduler存储部分代码解读。

    往期推荐

    从 40% 跌至 4%,“糊”了的 Firefox 还能重回巅峰吗?

    Gartner 发布 2022 年汽车行业五大技术趋势

    使用这个库,让你的服务操作 Redis 速度飞起

    漫画:什么是“低代码”开发平台?

    点分享

    点收藏

    点点赞

    点在看

    总结

    以上是生活随笔为你收集整理的kube-scheduler 磁盘调度源码分析的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。