当前位置:   article > 正文

【kubernetes/k8s源码分析】k8s extender scheduler 分析_k8s scheduler extender

k8s scheduler extender

1. Scheduler extender

    有三种方式为 kubernetes 添加新的调度规则,包括 predicates 和 priority 功能,本文讲解第三种方式

  • 第一种,直接在 kubernetes 添加调度规则,重新编译
  • 第二种,实现自己的调度,替换 k8s 的 scheduler
  • 第三种,实现 scheduler extender,提供扩展 k8s 调度的一个能力

   1.1 http body 结构 ExtenderArgs 

     设置的 scheduler 会向 extender 服务发起 http 请求,其中将 ExtenderArgs 序列化作为 body,主要是需要调度的 pod 的信息,以及调度的节点列表。

  1. // ExtenderArgs represents the arguments needed by the extender to filter/prioritize
  2. // nodes for a pod.
  3. type ExtenderArgs struct {
  4. // Pod being scheduled
  5. Pod *v1.Pod
  6. // List of candidate nodes where the pod can be scheduled; to be populated
  7. // only if ExtenderConfig.NodeCacheCapable == false
  8. Nodes *v1.NodeList
  9. // List of candidate node names where the pod can be scheduled; to be
  10. // populated only if ExtenderConfig.NodeCacheCapable == true
  11. NodeNames *[]string
  12. }

     1.1.1 读取 http body

      如果使用 github.com/emicklei/go-restful 包比较简单,直接使用 ReadEntity 读取到 ExtenderArgs 变量即可

  1. func predicates(r *restful.Request, w *restful.Response) {
  2. var extenderArgs schedulerapi.ExtenderArgs
  3. if err := r.ReadEntity(&extenderArgs); err != nil {
  4. logrus.Errorf("predicate read entity error: %v", err)
  5. w.WriteErrorString(http.StatusInternalServerError, err.Error())
  6. return
  7. }

    1.1.2 定义自己的 filter 规则

     轮询所有 node 节点,使用自定义的过略规则,比如 CPU 内存 存储等指标,通过的加入到 canSchedule,未通的加入到 canNotSchedule,返回结果在 ExtenderFilterResult。

     predicateHandler 可以根据情况二定义,比如 CPU,内存 存储等等指标

  1. func handleFilter(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
  2. pod := args.Pod
  3. canSchedule := make([]v1.Node, 0, len(args.Nodes.Items))
  4. canNotSchedule := make(map[string]string)
  5. for _, node := range args.Nodes.Items {
  6. result, err := predicateHandler(*pod, node)
  7. if err != nil {
  8. canNotSchedule[node.Name] = err.Error()
  9. } else if result {
  10. canSchedule = append(canSchedule, node)
  11. }
  12. }
  13. return &schedulerapi.ExtenderFilterResult{
  14. Nodes: &v1.NodeList{
  15. Items: canSchedule,
  16. },
  17. FailedNodes: canNotSchedule,
  18. Error: "",
  19. }
  20. }

   1.2 http response body 结构 ExtenderFilterResult

     使用 ExtenderFilterResult 作为结构告知 scheduler 哪些 node 可以调度,哪些是不可以调度节点

  1. // ExtenderFilterResult represents the results of a filter call to an extender
  2. type ExtenderFilterResult struct {
  3. // Filtered set of nodes where the pod can be scheduled; to be populated
  4. // only if ExtenderConfig.NodeCacheCapable == false
  5. Nodes *v1.NodeList
  6. // Filtered set of nodes where the pod can be scheduled; to be populated
  7. // only if ExtenderConfig.NodeCacheCapable == true
  8. NodeNames *[]string
  9. // Filtered out nodes where the pod can't be scheduled and the failure messages
  10. FailedNodes FailedNodesMap
  11. // Error message indicating failure
  12. Error string
  13. }

 

2. How the scheduler extender works

    2.1 extender scheduler policy 配置文件样例 policy.yaml

  •      urlPrefix 是向 scheduler 注册需要回调服务地址前缀
  •      enableHttps 是否 https 服务
  •      nodeCacheCapable :如果设置 NodeCache,那调度器只会传给 nodenames 列表。如果没有开启调度器会把所有 nodeinfo 完整结构都传递过来。
  1. {
  2. "kind" : "Policy",
  3. "apiVersion" : "v1",
  4. "predicates" : [
  5. {"name" : "PodFitsHostPorts"},
  6. {"name" : "PodFitsResources"},
  7. {"name" : "NoDiskConflict"},
  8. {"name" : "MatchNodeSelector"},
  9. {"name" : "HostName"}
  10. ],
  11. "priorities" : [
  12. {"name" : "LeastRequestedPriority", "weight" : 1},
  13. {"name" : "BalancedResourceAllocation", "weight" : 1},
  14. {"name" : "ServiceSpreadingPriority", "weight" : 1},
  15. {"name" : "EqualPriority", "weight" : 1}
  16. ],
  17. "extenders" : [{
  18. "urlPrefix": "http://localhost:8880",
  19. "filterVerb": "predicates",
  20. "prioritizeVerb": "priorities",
  21. "preemptVerb": "preemption",
  22. "bindVerb": "",
  23. "weight": 1,
  24. "enableHttps": false,
  25. "nodeCacheCapable": false
  26. }],
  27. "hardPodAffinitySymmetricWeight" : 10
  28. }

  2.2 定义的 KubeSchedulerConfiguration

    设置调度器,以及调度算法的配置 policy 文件,使用上文的 2.1 policy.yaml 文件,下文 schedulerConfig.yaml

    kube-scheduler 启动时可以通过 --config=schedulerConfig.yaml 参数可以指定调度策略文件,用户可以根据需要组装Predicates 和 Priority函数。选择不同的过滤函数和优先级函数、控制优先级函数的权重、调整过滤函数的顺序都会影响调度过程。

  1. apiVersion: kubescheduler.config.k8s.io/v1alpha1
  2. kind: KubeSchedulerConfiguration
  3. schedulerName: my-scheduler
  4. algorithmSource:
  5. policy:
  6. file:
  7. path: policy.yaml
  8. leaderElection:
  9. leaderElect: true
  10. lockObjectName: my-scheduler
  11. lockObjectNamespace: kube-system

 

3. k8s scheduler extender 源码实现分析

    3.1 读取配置文件如果设置 file 或者 configMap

apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
  policy:
    file:
      path: extender-policy.yaml

  1. source := schedulerAlgorithmSource
  2. switch {
  3. case source.Provider != nil:
  4. // Create the config from a named algorithm provider.
  5. sc, err := configurator.CreateFromProvider(*source.Provider)
  6. if err != nil {
  7. return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
  8. }
  9. config = sc
  10. case source.Policy != nil:
  11. // Create the config from a user specified policy source.
  12. policy := &schedulerapi.Policy{}
  13. switch {
  14. case source.Policy.File != nil:
  15. if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
  16. return nil, err
  17. }
  18. case source.Policy.ConfigMap != nil:
  19. if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
  20. return nil, err
  21. }
  22. }
  23. sc, err := configurator.CreateFromConfig(*policy)
  24. if err != nil {
  25. return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
  26. }
  27. config = sc

    3.1.1 CreateFromConfig 读取设置的 policy.yaml 文件或者 configMap 配置

  1. {
  2. "kind" : "Policy",
  3. "apiVersion" : "v1",
  4. "predicates" : [
  5. {"name" : "PodFitsHostPorts"},
  6. {"name" : "PodFitsResources"},
  7. {"name" : "NoDiskConflict"},
  8. {"name" : "MatchNodeSelector"},
  9. {"name" : "HostName"}
  10. ],
  11. "priorities" : [
  12. {"name" : "LeastRequestedPriority", "weight" : 1},
  13. {"name" : "BalancedResourceAllocation", "weight" : 1},
  14. {"name" : "ServiceSpreadingPriority", "weight" : 1},
  15. {"name" : "EqualPriority", "weight" : 1}
  16. ],
  17. "extenders" : [{
  18. "urlPrefix": "http://localhost:8880",
  19. "filterVerb": "predicates",
  20. "prioritizeVerb": "priorities",
  21. "preemptVerb": "preemption",
  22. "bindVerb": "",
  23. "weight": 1,
  24. "enableHttps": false,
  25. "nodeCacheCapable": false
  26. }],
  27. "hardPodAffinitySymmetricWeight" : 10
  28. }

    3.1.2 SchedulerExtender 接口

    方法比较简单明了,路径 pkg/scheduler/algorithm/scheduler_interface.go

  1. type SchedulerExtender interface {
  2. // Filter based on extender-implemented predicate functions. The filtered list is
  3. // expected to be a subset of the supplied list. failedNodesMap optionally contains
  4. // the list of failed nodes and failure reasons.
  5. Filter(pod *v1.Pod,
  6. nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  7. ) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error)
  8. // Prioritize based on extender-implemented priority functions. The returned scores & weight
  9. // are used to compute the weighted score for an extender. The weighted scores are added to
  10. // the scores computed by Kubernetes scheduler. The total scores are used to do the host selection.
  11. Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error)
  12. // Bind delegates the action of binding a pod to a node to the extender.
  13. Bind(binding *v1.Binding) error
  14. ..................................
  15. }

    3.1.3 ExtenderConfig 结构体

    当调度 pod 时,extender 通过外部的进程来预选(filter)和优选 (prioritize) 节点,extender 也可以直接实现把 pod bind 到node

    使用 extender 功能时,需要创建 scheduler policy 配置文件,配置文件指明怎样能访问到 extender

  "extenders" : [{
    "urlPrefix": "http://localhost:8880",
    "filterVerb": "predicates",
    "prioritizeVerb": "priorities",
    "preemptVerb": "preemption",
    "bindVerb": "",
    "weight": 1,
    "enableHttps": false,
    "nodeCacheCapable": false
   }],

  •     URLPrefix 是 extender 服务的可用地址前缀
  •     FilterVerb filter 的调用地址,例如上述配置就是 http://localhost:8880/predicates,如果未空则不支持   
  •     enableHttps 是否 https 服务
  •     nodeCacheCapable :如果设置 NodeCache,那调度器只会传给 nodenames 列表。如果没有开启调度器会把所有 nodeinfo 完整结构都传递过来。
  1. type ExtenderConfig struct {
  2. // URLPrefix at which the extender is available
  3. URLPrefix string
  4. // Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender.
  5. FilterVerb string
  6. // Verb for the preempt call, empty if not supported. This verb is appended to the URLPrefix when issuing the preempt call to extender.
  7. PreemptVerb string
  8. // Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender.
  9. PrioritizeVerb string
  10. // The numeric multiplier for the node scores that the prioritize call generates.
  11. // The weight should be a positive integer
  12. Weight int
  13. // Verb for the bind call, empty if not supported. This verb is appended to the URLPrefix when issuing the bind call to extender.
  14. // If this method is implemented by the extender, it is the extender's responsibility to bind the pod to apiserver. Only one extender
  15. // can implement this function.
  16. BindVerb string
  17. // EnableHTTPS specifies whether https should be used to communicate with the extender
  18. EnableHTTPS bool
  19. // TLSConfig specifies the transport layer security config
  20. TLSConfig *ExtenderTLSConfig
  21. // HTTPTimeout specifies the timeout duration for a call to the extender. Filter timeout fails the scheduling of the pod. Prioritize
  22. // timeout is ignored, k8s/other extenders priorities are used to select the node.
  23. HTTPTimeout time.Duration
  24. // NodeCacheCapable specifies that the extender is capable of caching node information,
  25. // so the scheduler should only send minimal information about the eligible nodes
  26. // assuming that the extender already cached full details of all nodes in the cluster
  27. NodeCacheCapable bool
  28. // ManagedResources is a list of extended resources that are managed by
  29. // this extender.
  30. // - A pod will be sent to the extender on the Filter, Prioritize and Bind
  31. // (if the extender is the binder) phases iff the pod requests at least
  32. // one of the extended resources in this list. If empty or unspecified,
  33. // all pods will be sent to this extender.
  34. // - If IgnoredByScheduler is set to true for a resource, kube-scheduler
  35. // will skip checking the resource in predicates.
  36. // +optional
  37. ManagedResources []ExtenderManagedResource
  38. // Ignorable specifies if the extender is ignorable, i.e. scheduling should not
  39. // fail when the extender returns an error or is not reachable.
  40. Ignorable bool
  41. }

  3.2 HTTPExtender 实现了 SchedulerExtender 接口

    NewHTTPExtender 实例化 HTTPExtender 对象,实现了 SchedulerExtender 接口

  1. var extenders []algorithm.SchedulerExtender
  2. if len(policy.ExtenderConfigs) != 0 {
  3. ignoredExtendedResources := sets.NewString()
  4. var ignorableExtenders []algorithm.SchedulerExtender
  5. for ii := range policy.ExtenderConfigs {
  6. klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
  7. extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])

    哪里会调用 Extender 呢?请看下文分解

   3.3 findNodesThatFit 函数

    预选阶段,这里会调用 extender 在根据自定义规则在过滤一下,所以这里是 HTTPExtender 的 Filter 方法

  1. // Filters the nodes to find the ones that fit based on the given predicate functions
  2. // Each node is passed through the predicate functions to determine if it is a fit
  3. func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
  4. var filtered []*v1.Node
  5. failedPredicateMap := FailedPredicateMap{}
  6. if len(filtered) > 0 && len(g.extenders) != 0 {
  7. for _, extender := range g.extenders {
  8. if !extender.IsInterested(pod) {
  9. continue
  10. }
  11. filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)

  3.3.1 HTTPExtender 的 Filter 方法

   这里 ExtenderArgs 作为请求 body 的主体结构,ExtenderFilterResult 作为 response 的返回结构

  1. // Filter based on extender implemented predicate functions. The filtered list is
  2. // expected to be a subset of the supplied list. failedNodesMap optionally contains
  3. // the list of failed nodes and failure reasons.
  4. func (h *HTTPExtender) Filter(
  5. pod *v1.Pod,
  6. nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  7. ) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
  8. var (
  9. result schedulerapi.ExtenderFilterResult
  10. nodeList *v1.NodeList
  11. nodeNames *[]string
  12. nodeResult []*v1.Node
  13. args *schedulerapi.ExtenderArgs
  14. )

     3.3.1.1 如果 filterVerb 未空,则未过略任何节点

  1. if h.filterVerb == "" {
  2. return nodes, schedulerapi.FailedNodesMap{}, nil
  3. }

     3.3.1.2 是否设置 nodeCacheCapable

     如果设置 NodeCache,那调度器只会传给 nodenames 列表。如果没有开启调度器会把所有 nodeinfo 完整结构都传递过来。

  1. if h.nodeCacheCapable {
  2. nodeNameSlice := make([]string, 0, len(nodes))
  3. for _, node := range nodes {
  4. nodeNameSlice = append(nodeNameSlice, node.Name)
  5. }
  6. nodeNames = &nodeNameSlice
  7. } else {
  8. nodeList = &v1.NodeList{}
  9. for _, node := range nodes {
  10. nodeList.Items = append(nodeList.Items, *node)
  11. }
  12. }

     3.3.1.3 封装 ExtenderArgs 结构,包括需要调度的 Pod 与节点列表

  1. args = &schedulerapi.ExtenderArgs{
  2. Pod: pod,
  3. Nodes: nodeList,
  4. NodeNames: nodeNames,
  5. }
  6. if err := h.send(h.filterVerb, args, &result); err != nil {
  7. return nil, nil, err
  8. }

     3.3.1.4 发送 http 请求

     将 ExtenderArgs 序列化,请求地址就是 urlPrefix + “/” + filterVerb,所以主要实现逻辑在自定义 extender 服务中

  1. // Helper function to send messages to the extender
  2. func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
  3. out, err := json.Marshal(args)
  4. if err != nil {
  5. return err
  6. }
  7. url := strings.TrimRight(h.extenderURL, "/") + "/" + action
  8. req, err := http.NewRequest("POST", url, bytes.NewReader(out))
  9. if err != nil {
  10. return err
  11. }
  12. req.Header.Set("Content-Type", "application/json")
  13. resp, err := h.client.Do(req)
  14. if err != nil {
  15. return err
  16. }
  17. defer resp.Body.Close()
  18. if resp.StatusCode != http.StatusOK {
  19. return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
  20. }
  21. return json.NewDecoder(resp.Body).Decode(result)
  22. }

   对于优选阶段怎么处理的呢?请看第 4 章节

4. PiroritizeNodes 函数

  1. // PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
  2. // Each priority function is expected to set a score of 0-10
  3. // 0 is the lowest priority score (least preferred node) and 10 is the highest
  4. // Each priority function can also have its own weight
  5. // The node scores returned by the priority function are multiplied by the weights to get weighted scores
  6. // All scores are finally combined (added) to get the total weighted scores of all nodes
  7. func PrioritizeNodes(
  8. pod *v1.Pod,
  9. nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  10. meta interface{},
  11. priorityConfigs []priorities.PriorityConfig,
  12. nodes []*v1.Node,
  13. extenders []algorithm.SchedulerExtender,
  14. ) (schedulerapi.HostPriorityList, error) {

   4.1 对所有 extender 实行并发异步处理

    对所有结果分数累加,分析 HTTPExtender 的 Prioritize 方法

  1. if len(extenders) != 0 && nodes != nil {
  2. combinedScores := make(map[string]int, len(nodeNameToInfo))
  3. for i := range extenders {
  4. if !extenders[i].IsInterested(pod) {
  5. continue
  6. }
  7. wg.Add(1)
  8. go func(extIndex int) {
  9. defer wg.Done()
  10. prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
  11. if err != nil {
  12. // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
  13. return
  14. }
  15. mu.Lock()
  16. for i := range *prioritizedList {
  17. host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
  18. if klog.V(10) {
  19. klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
  20. }
  21. combinedScores[host] += score * weight
  22. }
  23. mu.Unlock()
  24. }(i)
  25. }
  26. // wait for all go routines to finish
  27. wg.Wait()
  28. for i := range result {
  29. result[i].Score += combinedScores[result[i].Host]
  30. }
  31. }

   4.2 HTTPExtender 的 Prioritize 方法

  1. // Prioritize based on extender implemented priority functions. Weight*priority is added
  2. // up for each such priority function. The returned score is added to the score computed
  3. // by Kubernetes scheduler. The total score is used to do the host selection.
  4. func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, int, error) {
  5. var (
  6. result schedulerapi.HostPriorityList
  7. nodeList *v1.NodeList
  8. nodeNames *[]string
  9. args *schedulerapi.ExtenderArgs
  10. )

     4.2.1 如果未设置 prioritizeVerb 

     打个比方,交白卷打 0 分,也就是无需实现自定义 prioritize 打分,0 分也就是对原来的得分没有影响

  1. if h.prioritizeVerb == "" {
  2. result := schedulerapi.HostPriorityList{}
  3. for _, node := range nodes {
  4. result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 0})
  5. }
  6. return &result, 0, nil
  7. }

    预选与优选的 HTTPExtender 处理方法相同,不过返回结果不同而已,一个是过略的 node 节点,一个是 node 节点给的分数

 

总结:

   部署需要scheduler指定配置 policy 的 Extender 配置

   Extender 服务实现 HTTP server,实现预选优选方法

 

参考:

    https://developer.ibm.com/technologies/containers/articles/creating-a-custom-kube-scheduler/

    https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/scheduler_extender.md

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/40885
推荐阅读
相关标签
  

闽ICP备14008679号