源码分析系列文章已经开源到github,地址如下:

  • github:https://github.com/farmer-hutao/k8s-source-code-analysis
  • gitbook:https://farmer-hutao.github.io/k8s-source-code-analysis

本文大纲

  • 走近priority过程
  • PrioritizeNodes整体流程
    • Results
    • Old Priority Function
    • Map-Reduce
    • Combine Scores
  • Fun和Map-Reduce实例分析
    • InterPodAffinityPriority(Function)
    • CalculateNodeAffinityPriorityMap(Map)
    • CalculateNodeAffinityPriorityReduce(Reduce)
    • 小结

1. 走近priority过程

pkg/scheduler/core/generic_scheduler.go:186

priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)

今天的分析从这行代码开始。

PrioritizeNodes要做的事情是给已经通过predicate的nodes赋上一个分值,从而抉出一个最优node用于运行当前pod. 第一次看priority可能会一脸蒙,和predicate中的逻辑不太一样;大伙得耐下性子多思考,实在有障碍也可以先不求甚解,整体过完后再二刷代码,再不行三刷,总会大彻大悟的!

从注释中可以找到关于PrioritizeNodes的原理(pkg/scheduler/core/generic_scheduler.go:624):

然后我们先函数签名入手:

pkg/scheduler/core/generic_scheduler.go:624

func PrioritizeNodes(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulercache.NodeInfo,
    meta interface{},
    priorityConfigs []algorithm.PriorityConfig,
    nodes []*v1.Node,
    extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error)

形参定义和返回值:

  • pod *v1.Pod* // pod就不用说了;
  • *nodeNameToInfo map[string]*schedulercache.NodeInfo // 这个也不需要讲,字面意思代表一切;
  • meta interface{} // 和predicate里的meta不太一样,下面会贴个debug的图先,具体后面再看;
  • priorityConfigs []algorithm.PriorityConfig // 包含优选算法各种信息,比较重要;
  • nodes []*v1.Node // node集合,不需要解释了;
  • extenders []algorithm.SchedulerExtender // extender逻辑放到后面单独讲。

meta实参长这个样子:

《k8s-1.13版本源码分析》-调度优选-LMLPHP

返回值只需要看一下schedulerapi.HostPriorityList类型的含义了,这个类型之前也提过,后面频繁涉及到操作这个结构,所以这里再贴一次,大伙得烂熟于心才行!

pkg/scheduler/api/types.go:305

type HostPriority struct {
    Host string
    Score int
}
type HostPriorityList []HostPriority

着重分析一下这2个type,虽然很简单,还是有必要啰嗦一下,必须记在心里。HostPriority这个struct的属性是HostScore,一个是string一个是int,所以很明显HostPriority所能够保存的信息是一个节点的名字和分值,再仔细一点说就是这个结构保存的是一个node在一个priority算法计算后所得到的结果;然后看HostPriorityList类型,这个类型是上一个类型的“集合”,集合表达的是一个node多个算法还是多个node一个算法呢?稍微思考一下可以知道HostPriorityList中存的是多个Host和Score的组合,所以HostPriorityList这个结构是要保存一个算法作用于所有node之后,得到的所有node的Score信息的。(这里我们先理解成一个算法的结果,作为函数返回值这里肯定是要保留所有算法作用后的最终node的Score,所以函数后半部分肯定有combine分值的步骤。)

2. PrioritizeNodes整体流程

前面说到PrioritizeNodes()函数也就是node优选的具体逻辑,这个函数略长,我们分段讲解。

2.1. Results

PrioritizeNodes()函数开头的逻辑很简单,我们先从第一行看到results定义的这一行。

pkg/scheduler/core/generic_scheduler.go:634

if len(priorityConfigs) == 0 && len(extenders) == 0 {
    // 这个if很明显是处理特殊场景的,就是优选算法一个都没有配置(extenders同样没有配置)的时候怎么做;
    // 这个result是要当作返回值的,HostPriorityList类型前面唠叨了很多了,大家得心里有数;
   result := make(schedulerapi.HostPriorityList, 0, len(nodes))
   for i := range nodes {
       // 这一行代码是唯一的“逻辑了”,下面直到for结束都是简单代码;所以我们看一下EqualPriorityMap
       // 函数的作用就行了。这里我不贴代码,这个函数很短,作用就是设置每个node的Score相同(都为1)
       // hostPriority的类型也就是schedulerapi.HostPriority类型,再次强调这个类型是要烂熟于心的;
      hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
      if err != nil {
         return nil, err
      }
       // 最终的result也就是设置了每个node的Score为1的schedulerapi.HostPriorityList类型数据;
      result = append(result, hostPriority)
   }
   return result, nil
}
// 这里只是简单定义3个变量,一把锁,一个并发等待相关的wg,一个错误集合errs;
var (
   mu   = sync.Mutex{}
   wg   = sync.WaitGroup{}
   errs []error
)
// 这里定义了一个appendError小函数,逻辑很简单,并发场景下将错误信息收集到errs中;
appendError := func(err error) {
   mu.Lock()
   defer mu.Unlock()
   errs = append(errs, err)
}
// 最后一个变量results也不难理解,类型是[]schedulerapi.HostPriorityList,这里需要注意这个类型
// 的作用,它保存的是所有算法作用所有node之后得到的结果集,相当于一个二维数组,每个格子是1个算法
// 作用于1个节点的结果,一行也就是1个算法作用于所有节点的结果;一行展成一个二维就是所有算法作用于所有节点;
results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

到这里要求大家心中能够想象上面提到的results是什么样的,可以借助纸笔画一画。下面的代码会往这个二维结构里面存储数据。

2.2. Old Priority Function

我们既然讲到“老式”,后面肯定有对应的“新式”。虽然这种函数已经DEPRECATED了,不过对于我们学习掌握优选流程还是很有帮助的。默认的优选算法里其实也只有1个是这在old形式的了:

《k8s-1.13版本源码分析》-调度优选-LMLPHP

贴这块代码之前我们先关注一下多次出现的priorityConfigs这个变量的类型:

函数形参中有写到:priorityConfigs []algorithm.PriorityConfig,所以我们直接看PriorityConfig是什么类型:

pkg/scheduler/algorithm/types.go:62

// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
   Name   string
   Map    PriorityMapFunction
   Reduce PriorityReduceFunction
   // TODO: Remove it after migrating all functions to
   // Map-Reduce pattern.
   Function PriorityFunction
   Weight   int
}

PriorityConfig中有一个Name,一个Weight,很好猜到意思,名字和权重嘛。剩下的Map、Reduce和Function目测代表的就是优选函数的新旧两种表达方式了。我们先看旧的Function属性的类型PriorityFunction是什么:

pkg/scheduler/algorithm/types.go:59

type PriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error)

很明显这个类型代表了一个priority函数,入参是pod、nodeNameToInfo和nodes,返回值是HostPriorityList,也就是我们前面提到的1个priority函数作用于每个node后得到了Score信息,存结果的结构就是这个HostPriorityList;看起来很和谐~

然后讲回PrioritizeNodes过程:

pkg/scheduler/core/generic_scheduler.go:661

for i := range priorityConfigs {
    // 如果第i个优选配置(priorityConfig)定义了老函数,则调用之;
    if priorityConfigs[i].Function != nil {
        wg.Add(1)
        // 注意这里的参数index,这里传入的实参是上面的i;
        go func(index int) {
            defer wg.Done()
            var err error
            // 所以这里的results[index]就好理解了;后面priorityConfigs[index]的索引也是index,
            // 这里表达的是第N个优选配置里有Function,那么这个Function的计算结果保存在
            // results的第N个格子里;
            results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
            if err != nil {
                appendError(err)
            }
        }(i)
    } else {
        // 如果没有定义Function,其实也就是使用了Map-Reduce方式的,这里先存个空的结构占位;
        results[i] = make(schedulerapi.HostPriorityList, len(nodes))
    }
}

上面这段代码逻辑还算好理解,唯一有点小绕的还是前面强调的HostPriorityList相关类型的操作上。

2.3. Map-Reduce

关于map-reduce思想我就不在这里赘述了,大数据行业很流行的一个词汇,百度一下(如果你能够google,自然更好咯)可以找到一大堆介绍的文章。

简单说map-reduce就是:Map是映射,Reduce是归约;map是统计一本书中的一页出现了多少次k8s这个词,reduce是将这些map结果汇总在一起得到最终结果。(map一般都是将一个算法作用于一堆数据集的每一个元素,得到一个结果集,reduce有各种形式,可以是累加这些结果,或者是对这个结果集做其他复杂的f(x)操作。

看看在Scheduler里面是怎么用Map-Reduce的吧:

// 这个并发逻辑之前介绍过了,我们直接看ParallelizeUntil的最后一个参数就行,这里直接写了一个匿名函数;
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
    // 这里的index是[0,len(nodes)-1],相当于遍历所有的nodes;
   nodeInfo := nodeNameToInfo[nodes[index].Name]
    // 这个for循环遍历的是所有的优选配置,如果有老Fun就跳过,新逻辑就继续;
   for i := range priorityConfigs {
      if priorityConfigs[i].Function != nil {
          // 因为前面old已经运行过了
         continue
      }

      var err error
       // 这里的i和前面老Fun的互补,老Fun中没有赋值的results中的元素就在这里赋值了;
       // 注意到这里调用了一个Map函数就直接赋值给了results[i][index],这里的index是第一行这个
       // 匿名函数的形参,通过ParallelizeUntil这个并发实现所有node对应一个优选算法的分值计算;
      results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
      if err != nil {
         appendError(err)
         results[i][index].Host = nodes[index].Name
      }
   }
})

for i := range priorityConfigs {
    // 没有定义Reduce函数就不处理;
   if priorityConfigs[i].Reduce == nil {
      continue
   }
   wg.Add(1)
   go func(index int) {
      defer wg.Done()
       // 调用Reduce函数
      if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
         appendError(err)
      }
      if klog.V(10) {
         for _, hostPriority := range results[index] {
            klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
         }
      }
   }(i)
}
// Wait for all computations to be finished.
wg.Wait()
if len(errs) != 0 {
   return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
}

看到这里我们可以发现老Fun和Map的区别不大,都是优选函数的执行过程。那为什么会存在两种形式呢?我们看完PrioritizeNodes整体流程后通过具体的Fun和Map-Reduce实现来看二者的区别。

2.4. Combine Scores

这块的代码很简单,我们先抛开extender的逻辑,剩下的代码如下:

// Summarize all scores.
// 这个result和前面的results类似,result用于存储每个node的Score,到这里已经没有必要区分算法了;
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
// 循环执行len(nodes)次
for i := range nodes {
    // 先在result中塞满所有node的Name,Score初始化为0;
   result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
    // 执行了多少个priorityConfig就有多少个Score,所以这里遍历len(priorityConfigs)次;
   for j := range priorityConfigs {
       // 每个算法对应第i个node的结果分值加权后累加;
      result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
   }
}

return result, nil

这块逻辑很清晰,要将前面得到的二维结果results压缩成一维的加权分值集合result,最终返回这个result.

从这里我们还可以得到一个结论,不管是Fun还是Map-Reduce,处理的结果都是填充results这个二维结构,所以Map-Reduce也没有什么神秘的,下面通过具体的算法来看二者有何异同。

3. Fun和Map-Reduce实例分析

3.1. InterPodAffinityPriority(Function)

这个算法做的是Pod间亲和性优选,也就是亲和pod越多的节点分值越高,反亲和pod越多分值越低。

我们撇开具体的亲和性计算规则,从优选函数的形式上看一下这段代码的逻辑:

pkg/scheduler/algorithm/priorities/interpod_affinity.go:119

func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
    affinity := pod.Spec.Affinity
    // 是否有亲和性约束;
    hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
    // 是否有反亲和性约束;
    hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil

    // 这里有一段根据亲和性和反亲和性来计算一个node上匹配的pod数量的逻辑,我们先跳过这些逻辑,从优选算法实现的角度看这个算法的架子;

    // 当遍历完所有的node之后,可以得到1个最高分和1个最低分,分别记为maxCount和minCount;
    for _, node := range nodes {
        if pm.counts[node.Name] > maxCount {
            maxCount = pm.counts[node.Name]
        }
        if pm.counts[node.Name] < minCount {
            minCount = pm.counts[node.Name]
        }
    }
    // 这个result类型和前面看到的一样,都是存储单个算法的计算结果的;
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    for _, node := range nodes {
        fScore := float64(0)
        // 如果分差大于0,也就是说不是所有的node都一样的情况,需要对分值做一个处理;
        if (maxCount - minCount) > 0 {
            // MaxPriority定义的是优选最高分10,第二个因数是当前node的count-最小count,
            // 然后除以(maxCount - minCount);举个例子,当前node的计算结果是5,最大count是20,最小
            // count是-3,那么这里就是10*[5-(-3)/20-(-3)]
            // 这个计算的结果显然会在[0-10]之间;
            fScore = float64(schedulerapi.MaxPriority) * ((pm.counts[node.Name] - minCount) / (maxCount - minCount))
        }
        // 如果分差不大于0,这时候int(fScore)也就是0,对于各个node的结果都是0;
        result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
    }
    return result, nil
}

如上,我们可以发现最终这个函数计算出了每个node的分值,这个分值在[0-10]之间。所以说到底Function做的事情就是根据一定的规则给每个node赋一个分值,这个分值要求在[0-10]之间,然后把这个HostPriorityList返回就行。

3.2. CalculateNodeAffinityPriorityMap(Map)

这个算法和上一个类似,上一个是Pod的Affinity,这个是Node的Affinity;我们来看代码:

pkg/scheduler/algorithm/priorities/node_affinity.go:34

func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
    node := nodeInfo.Node()
    if node == nil {
        return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
    }

    // default is the podspec.
    affinity := pod.Spec.Affinity
    if priorityMeta, ok := meta.(*priorityMetadata); ok {
        // We were able to parse metadata, use affinity from there.
        affinity = priorityMeta.affinity
    }

    var count int32
    if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
        // Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
        for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
            preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
            if preferredSchedulingTerm.Weight == 0 {
                continue
            }

            nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
            if err != nil {
                return schedulerapi.HostPriority{}, err
            }
            if nodeSelector.Matches(labels.Set(node.Labels)) {
                count += preferredSchedulingTerm.Weight
            }
        }
    }

    return schedulerapi.HostPriority{
        Host:  node.Name,
        Score: int(count),
    }, nil
}

撇开具体的亲和性计算细节,我们可以发现这个的count没有特定的规则,可能会加到10以上;另外这里的返回值是HostPriority类型,前面的Function返回了HostPriorityList类型。

map函数

pkg/scheduler/algorithm/priorities/selector_spreading.go:221

func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
    var firstServiceSelector labels.Selector

    node := nodeInfo.Node()
    if node == nil {
        return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
    }
    priorityMeta, ok := meta.(*priorityMetadata)
    if ok {
        firstServiceSelector = priorityMeta.podFirstServiceSelector
    } else {
        firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
    }
    // 查找给定node在给定namespace下符合selector的pod,返回值是[]*v1.Pod
    matchedPodsOfNode := filteredPod(pod.Namespace, firstServiceSelector, nodeInfo)

    return schedulerapi.HostPriority{
        Host:  node.Name,
        // 返回值中Score设置成上面找到的pod的数量
        Score: int(len(matchedPodsOfNode)),
    }, nil
}

这个函数比较短,可以看到在指定node上查询到匹配selector的pod越多,分值就越高。假设找到了20个,那么这里的分值就是20;假设找到的是2,那这里的分值就是2.

3.3. CalculateNodeAffinityPriorityReduce(Reduce)

和上面这个Map对应的Reduce函数其实没有单独实现,通过NormalizeReduce函数做了一个通用的Reduce处理:

pkg/scheduler/algorithm/priorities/node_affinity.go:77

var CalculateNodeAffinityPriorityReduce = NormalizeReduce(schedulerapi.MaxPriority, false)

pkg/scheduler/algorithm/priorities/reduce.go:29

func NormalizeReduce(maxPriority int, reverse bool) algorithm.PriorityReduceFunction {
    return func(
        _ *v1.Pod,
        _ interface{},
        _ map[string]*schedulercache.NodeInfo,
        // 注意到这个result是HostPriorityList,对应1个算法N个node的结果集
        result schedulerapi.HostPriorityList) error {

        var maxCount int
        // 遍历result将最高的Score赋值给maxCount;
        for i := range result {
            if result[i].Score > maxCount {
                maxCount = result[i].Score
            }
        }

        if maxCount == 0 {
            if reverse {
                for i := range result {
                    result[i].Score = maxPriority
                }
            }
            return nil
        }

        for i := range result {
            score := result[i].Score
            // 举个例子:10*(5/20)
            score = maxPriority * score / maxCount
            if reverse {
                // 如果score是3,得到7;如果score是4,得到6,结果反转;
                score = maxPriority - score
            }

            result[i].Score = score
        }
        return nil
    }
}

3.4. 小结

  • Function:一个算法一次性计算出所有node的Score,这个Score的范围是规定的[0-10];
  • Map-Reduce:一个Map算法计算1个node的Score,这个Score可以灵活处理,可能是20,可能是-3;Map过程并发进行;最终得到的结果result通过Reduce归约,将这个算法对应的所有node的分值归约为[0-10];

本节有几张图是goland debug的截图,我们目前还没有提到如何debug;不过本节内容的阅读基本是不影响的。下一节源码分析内容发出来前我会在“环境准备”这一章中增加如何开始debug的内容,大家可以选择开始debug的时机。

《k8s-1.13版本源码分析》-调度优选-LMLPHP
03-20 05:22