赞
踩
有三种方式为 kubernetes 添加新的调度规则,包括 predicates 和 priority 功能,本文讲解第三种方式
- 第一种,直接在 kubernetes 添加调度规则,重新编译
- 第二种,实现自己的调度,替换 k8s 的 scheduler
- 第三种,实现 scheduler extender,提供扩展 k8s 调度的一个能力
设置的 scheduler 会向 extender 服务发起 http 请求,其中将 ExtenderArgs 序列化作为 body,主要是需要调度的 pod 的信息,以及调度的节点列表。
- // ExtenderArgs represents the arguments needed by the extender to filter/prioritize
- // nodes for a pod.
- type ExtenderArgs struct {
- // Pod being scheduled
- Pod *v1.Pod
- // List of candidate nodes where the pod can be scheduled; to be populated
- // only if ExtenderConfig.NodeCacheCapable == false
- Nodes *v1.NodeList
- // List of candidate node names where the pod can be scheduled; to be
- // populated only if ExtenderConfig.NodeCacheCapable == true
- NodeNames *[]string
- }
1.1.1 读取 http body
如果使用 github.com/emicklei/go-restful 包比较简单,直接使用 ReadEntity 读取到 ExtenderArgs 变量即可
- func predicates(r *restful.Request, w *restful.Response) {
- var extenderArgs schedulerapi.ExtenderArgs
-
- if err := r.ReadEntity(&extenderArgs); err != nil {
- logrus.Errorf("predicate read entity error: %v", err)
- w.WriteErrorString(http.StatusInternalServerError, err.Error())
- return
- }
1.1.2 定义自己的 filter 规则
轮询所有 node 节点,使用自定义的过略规则,比如 CPU 内存 存储等指标,通过的加入到 canSchedule,未通的加入到 canNotSchedule,返回结果在 ExtenderFilterResult。
predicateHandler 可以根据情况二定义,比如 CPU,内存 存储等等指标
- func handleFilter(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
- pod := args.Pod
- canSchedule := make([]v1.Node, 0, len(args.Nodes.Items))
- canNotSchedule := make(map[string]string)
-
- for _, node := range args.Nodes.Items {
- result, err := predicateHandler(*pod, node)
- if err != nil {
- canNotSchedule[node.Name] = err.Error()
- } else if result {
- canSchedule = append(canSchedule, node)
- }
- }
- return &schedulerapi.ExtenderFilterResult{
- Nodes: &v1.NodeList{
- Items: canSchedule,
- },
- FailedNodes: canNotSchedule,
- Error: "",
- }
- }

使用 ExtenderFilterResult 作为结构告知 scheduler 哪些 node 可以调度,哪些是不可以调度节点
- // ExtenderFilterResult represents the results of a filter call to an extender
- type ExtenderFilterResult struct {
- // Filtered set of nodes where the pod can be scheduled; to be populated
- // only if ExtenderConfig.NodeCacheCapable == false
- Nodes *v1.NodeList
- // Filtered set of nodes where the pod can be scheduled; to be populated
- // only if ExtenderConfig.NodeCacheCapable == true
- NodeNames *[]string
- // Filtered out nodes where the pod can't be scheduled and the failure messages
- FailedNodes FailedNodesMap
- // Error message indicating failure
- Error string
- }
- {
- "kind" : "Policy",
- "apiVersion" : "v1",
- "predicates" : [
- {"name" : "PodFitsHostPorts"},
- {"name" : "PodFitsResources"},
- {"name" : "NoDiskConflict"},
- {"name" : "MatchNodeSelector"},
- {"name" : "HostName"}
- ],
- "priorities" : [
- {"name" : "LeastRequestedPriority", "weight" : 1},
- {"name" : "BalancedResourceAllocation", "weight" : 1},
- {"name" : "ServiceSpreadingPriority", "weight" : 1},
- {"name" : "EqualPriority", "weight" : 1}
- ],
- "extenders" : [{
- "urlPrefix": "http://localhost:8880",
- "filterVerb": "predicates",
- "prioritizeVerb": "priorities",
- "preemptVerb": "preemption",
- "bindVerb": "",
- "weight": 1,
- "enableHttps": false,
- "nodeCacheCapable": false
- }],
- "hardPodAffinitySymmetricWeight" : 10
- }

设置调度器,以及调度算法的配置 policy 文件,使用上文的 2.1 policy.yaml 文件,下文 schedulerConfig.yaml
kube-scheduler 启动时可以通过 --config=schedulerConfig.yaml 参数可以指定调度策略文件,用户可以根据需要组装Predicates 和 Priority函数。选择不同的过滤函数和优先级函数、控制优先级函数的权重、调整过滤函数的顺序都会影响调度过程。
- apiVersion: kubescheduler.config.k8s.io/v1alpha1
- kind: KubeSchedulerConfiguration
- schedulerName: my-scheduler
- algorithmSource:
- policy:
- file:
- path: policy.yaml
- leaderElection:
- leaderElect: true
- lockObjectName: my-scheduler
- lockObjectNamespace: kube-system
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
policy:
file:
path: extender-policy.yaml
- source := schedulerAlgorithmSource
- switch {
- case source.Provider != nil:
- // Create the config from a named algorithm provider.
- sc, err := configurator.CreateFromProvider(*source.Provider)
- if err != nil {
- return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
- }
- config = sc
- case source.Policy != nil:
- // Create the config from a user specified policy source.
- policy := &schedulerapi.Policy{}
- switch {
- case source.Policy.File != nil:
- if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
- return nil, err
- }
- case source.Policy.ConfigMap != nil:
- if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
- return nil, err
- }
- }
- sc, err := configurator.CreateFromConfig(*policy)
- if err != nil {
- return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
- }
- config = sc

3.1.1 CreateFromConfig 读取设置的 policy.yaml 文件或者 configMap 配置
- {
- "kind" : "Policy",
- "apiVersion" : "v1",
- "predicates" : [
- {"name" : "PodFitsHostPorts"},
- {"name" : "PodFitsResources"},
- {"name" : "NoDiskConflict"},
- {"name" : "MatchNodeSelector"},
- {"name" : "HostName"}
- ],
- "priorities" : [
- {"name" : "LeastRequestedPriority", "weight" : 1},
- {"name" : "BalancedResourceAllocation", "weight" : 1},
- {"name" : "ServiceSpreadingPriority", "weight" : 1},
- {"name" : "EqualPriority", "weight" : 1}
- ],
- "extenders" : [{
- "urlPrefix": "http://localhost:8880",
- "filterVerb": "predicates",
- "prioritizeVerb": "priorities",
- "preemptVerb": "preemption",
- "bindVerb": "",
- "weight": 1,
- "enableHttps": false,
- "nodeCacheCapable": false
- }],
- "hardPodAffinitySymmetricWeight" : 10
- }

3.1.2 SchedulerExtender 接口
方法比较简单明了,路径 pkg/scheduler/algorithm/scheduler_interface.go
- type SchedulerExtender interface {
-
- // Filter based on extender-implemented predicate functions. The filtered list is
- // expected to be a subset of the supplied list. failedNodesMap optionally contains
- // the list of failed nodes and failure reasons.
- Filter(pod *v1.Pod,
- nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
- ) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error)
-
- // Prioritize based on extender-implemented priority functions. The returned scores & weight
- // are used to compute the weighted score for an extender. The weighted scores are added to
- // the scores computed by Kubernetes scheduler. The total scores are used to do the host selection.
- Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error)
-
- // Bind delegates the action of binding a pod to a node to the extender.
- Bind(binding *v1.Binding) error
- ..................................
- }

3.1.3 ExtenderConfig 结构体
当调度 pod 时,extender 通过外部的进程来预选(filter)和优选 (prioritize) 节点,extender 也可以直接实现把 pod bind 到node
使用 extender 功能时,需要创建 scheduler policy 配置文件,配置文件指明怎样能访问到 extender
"extenders" : [{
"urlPrefix": "http://localhost:8880",
"filterVerb": "predicates",
"prioritizeVerb": "priorities",
"preemptVerb": "preemption",
"bindVerb": "",
"weight": 1,
"enableHttps": false,
"nodeCacheCapable": false
}],
- type ExtenderConfig struct {
- // URLPrefix at which the extender is available
- URLPrefix string
- // Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender.
- FilterVerb string
- // Verb for the preempt call, empty if not supported. This verb is appended to the URLPrefix when issuing the preempt call to extender.
- PreemptVerb string
- // Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender.
- PrioritizeVerb string
- // The numeric multiplier for the node scores that the prioritize call generates.
- // The weight should be a positive integer
- Weight int
- // Verb for the bind call, empty if not supported. This verb is appended to the URLPrefix when issuing the bind call to extender.
- // If this method is implemented by the extender, it is the extender's responsibility to bind the pod to apiserver. Only one extender
- // can implement this function.
- BindVerb string
- // EnableHTTPS specifies whether https should be used to communicate with the extender
- EnableHTTPS bool
- // TLSConfig specifies the transport layer security config
- TLSConfig *ExtenderTLSConfig
- // HTTPTimeout specifies the timeout duration for a call to the extender. Filter timeout fails the scheduling of the pod. Prioritize
- // timeout is ignored, k8s/other extenders priorities are used to select the node.
- HTTPTimeout time.Duration
- // NodeCacheCapable specifies that the extender is capable of caching node information,
- // so the scheduler should only send minimal information about the eligible nodes
- // assuming that the extender already cached full details of all nodes in the cluster
- NodeCacheCapable bool
- // ManagedResources is a list of extended resources that are managed by
- // this extender.
- // - A pod will be sent to the extender on the Filter, Prioritize and Bind
- // (if the extender is the binder) phases iff the pod requests at least
- // one of the extended resources in this list. If empty or unspecified,
- // all pods will be sent to this extender.
- // - If IgnoredByScheduler is set to true for a resource, kube-scheduler
- // will skip checking the resource in predicates.
- // +optional
- ManagedResources []ExtenderManagedResource
- // Ignorable specifies if the extender is ignorable, i.e. scheduling should not
- // fail when the extender returns an error or is not reachable.
- Ignorable bool
- }

NewHTTPExtender 实例化 HTTPExtender 对象,实现了 SchedulerExtender 接口
- var extenders []algorithm.SchedulerExtender
- if len(policy.ExtenderConfigs) != 0 {
- ignoredExtendedResources := sets.NewString()
- var ignorableExtenders []algorithm.SchedulerExtender
- for ii := range policy.ExtenderConfigs {
- klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
- extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
哪里会调用 Extender 呢?请看下文分解
预选阶段,这里会调用 extender 在根据自定义规则在过滤一下,所以这里是 HTTPExtender 的 Filter 方法
- // Filters the nodes to find the ones that fit based on the given predicate functions
- // Each node is passed through the predicate functions to determine if it is a fit
- func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
- var filtered []*v1.Node
- failedPredicateMap := FailedPredicateMap{}
-
- if len(filtered) > 0 && len(g.extenders) != 0 {
- for _, extender := range g.extenders {
- if !extender.IsInterested(pod) {
- continue
- }
- filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)
3.3.1 HTTPExtender 的 Filter 方法
这里 ExtenderArgs 作为请求 body 的主体结构,ExtenderFilterResult 作为 response 的返回结构
- // Filter based on extender implemented predicate functions. The filtered list is
- // expected to be a subset of the supplied list. failedNodesMap optionally contains
- // the list of failed nodes and failure reasons.
- func (h *HTTPExtender) Filter(
- pod *v1.Pod,
- nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
- ) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
- var (
- result schedulerapi.ExtenderFilterResult
- nodeList *v1.NodeList
- nodeNames *[]string
- nodeResult []*v1.Node
- args *schedulerapi.ExtenderArgs
- )
3.3.1.1 如果 filterVerb 未空,则未过略任何节点
- if h.filterVerb == "" {
- return nodes, schedulerapi.FailedNodesMap{}, nil
- }
3.3.1.2 是否设置 nodeCacheCapable
如果设置 NodeCache,那调度器只会传给 nodenames 列表。如果没有开启调度器会把所有 nodeinfo 完整结构都传递过来。
- if h.nodeCacheCapable {
- nodeNameSlice := make([]string, 0, len(nodes))
- for _, node := range nodes {
- nodeNameSlice = append(nodeNameSlice, node.Name)
- }
- nodeNames = &nodeNameSlice
- } else {
- nodeList = &v1.NodeList{}
- for _, node := range nodes {
- nodeList.Items = append(nodeList.Items, *node)
- }
- }
3.3.1.3 封装 ExtenderArgs 结构,包括需要调度的 Pod 与节点列表
- args = &schedulerapi.ExtenderArgs{
- Pod: pod,
- Nodes: nodeList,
- NodeNames: nodeNames,
- }
-
- if err := h.send(h.filterVerb, args, &result); err != nil {
- return nil, nil, err
- }
3.3.1.4 发送 http 请求
将 ExtenderArgs 序列化,请求地址就是 urlPrefix + “/” + filterVerb,所以主要实现逻辑在自定义 extender 服务中
- // Helper function to send messages to the extender
- func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
- out, err := json.Marshal(args)
- if err != nil {
- return err
- }
-
- url := strings.TrimRight(h.extenderURL, "/") + "/" + action
-
- req, err := http.NewRequest("POST", url, bytes.NewReader(out))
- if err != nil {
- return err
- }
-
- req.Header.Set("Content-Type", "application/json")
-
- resp, err := h.client.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
- }
-
- return json.NewDecoder(resp.Body).Decode(result)
- }

对于优选阶段怎么处理的呢?请看第 4 章节
- // PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
- // Each priority function is expected to set a score of 0-10
- // 0 is the lowest priority score (least preferred node) and 10 is the highest
- // Each priority function can also have its own weight
- // The node scores returned by the priority function are multiplied by the weights to get weighted scores
- // All scores are finally combined (added) to get the total weighted scores of all nodes
- func PrioritizeNodes(
- pod *v1.Pod,
- nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
- meta interface{},
- priorityConfigs []priorities.PriorityConfig,
- nodes []*v1.Node,
- extenders []algorithm.SchedulerExtender,
- ) (schedulerapi.HostPriorityList, error) {
对所有结果分数累加,分析 HTTPExtender 的 Prioritize 方法
- if len(extenders) != 0 && nodes != nil {
- combinedScores := make(map[string]int, len(nodeNameToInfo))
- for i := range extenders {
- if !extenders[i].IsInterested(pod) {
- continue
- }
- wg.Add(1)
- go func(extIndex int) {
- defer wg.Done()
- prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
- if err != nil {
- // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
- return
- }
- mu.Lock()
- for i := range *prioritizedList {
- host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
- if klog.V(10) {
- klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
- }
- combinedScores[host] += score * weight
- }
- mu.Unlock()
- }(i)
- }
- // wait for all go routines to finish
- wg.Wait()
- for i := range result {
- result[i].Score += combinedScores[result[i].Host]
- }
- }

- // Prioritize based on extender implemented priority functions. Weight*priority is added
- // up for each such priority function. The returned score is added to the score computed
- // by Kubernetes scheduler. The total score is used to do the host selection.
- func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, int, error) {
- var (
- result schedulerapi.HostPriorityList
- nodeList *v1.NodeList
- nodeNames *[]string
- args *schedulerapi.ExtenderArgs
- )
4.2.1 如果未设置 prioritizeVerb
打个比方,交白卷打 0 分,也就是无需实现自定义 prioritize 打分,0 分也就是对原来的得分没有影响
- if h.prioritizeVerb == "" {
- result := schedulerapi.HostPriorityList{}
- for _, node := range nodes {
- result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 0})
- }
- return &result, 0, nil
- }
预选与优选的 HTTPExtender 处理方法相同,不过返回结果不同而已,一个是过略的 node 节点,一个是 node 节点给的分数
部署需要scheduler指定配置 policy 的 Extender 配置
Extender 服务实现 HTTP server,实现预选优选方法
https://developer.ibm.com/technologies/containers/articles/creating-a-custom-kube-scheduler/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。