上篇我们分析了 hami-scheduler 工作流程,知道了 hami-webhook、hami-scheduler 是怎么配合工作的。
本文为 HAMi 原理分析的第四篇,分析 hami-scheduler 在调度时是如何选择节点的,即:Spread、Binpack 等高级调度策略是怎么实现的。
这篇文章我们解决最后一个问题:Spread、Binpack 等高级调度策略是怎么实现的
以下分析基于 HAMi v2.4.0
这里在贴一下上一篇总结的 HAMi Webhook 、Scheduler 工作流程:
-
1)用户创建 Pod 并在 Pod 中申请了 vGPU 资源
-
2)kube-apiserver 根据 MutatingWebhookConfiguration 配置请求 HAMi-Webhook
-
3)HAMi-Webhook 检测 Pod 中的 Resource,发现是申请的由 HAMi 管理的 vGPU 资源,因此把 Pod 中的 SchedulerName 改成了 hami-scheduler,这样这个 Pod 就会由 hami-scheduler 进行调度了。
-
对于特权模式的 Pod,Webhook 会直接跳过不处理
-
对于使用 vGPU 资源但指定了 nodeName 的 Pod,Webhook 会直接拒绝
-
-
4)hami-scheduler 进行 Pod 调度,不过就是用的 k8s 的默认 kube-scheduler 镜像,因此调度逻辑和默认的 default-scheduler 是一样的,kube-scheduler 根据 KubeSchedulerConfiguration 配置,调用 Extender Scheduler 插件
-
这个 Extender Scheduler 就是 hami-scheduler Pod 中的另一个 Container,该 Container 同时提供了 Webhook 和 Scheduler 相关 API。
-
当 Pod 申请了 vGPU 资源时,kube-scheduler 就会根据配置以 HTTP 形式调用 Extender Scheduler 插件,这样就实现了自定义调度逻辑
-
-
5)Extender Scheduler 插件包含了真正的 hami 调度逻辑, 调度时根据节点剩余资源量进行打分选择节点
-
6)异步任务,包括 GPU 感知逻辑
-
devicePlugin 中的后台 Goroutine 定时上报 Node 上的 GPU 资源并写入到 Node 的 Annoations
-
Extender Scheduler 插件根据 Node Annoations 解析出 GPU 资源总量、从 Node 上已经运行的 Pod 的 Annoations 中解析出 GPU 使用量,计算出每个 Node 剩余的可用资源保存到内存供调度时使用
-
1. 配置调度策略
hami-scheduler 提供了两种不同级别的调度策略:
- 节点调度策略:作用于调度过程中如何为 Pod 选择节点
- GPU 调度策略:作用于选择节点后,节点存在多 GPU 时如何为 Pod 选择 GPU
根据部署文档,我们可以在部署时指定调度策略
scheduler.defaultSchedulerPolicy.nodeSchedulerPolicy:
字符串类型,预设值为"binpack",表示 GPU 节点调度策略。- "binpack"表示尽量将任务分配到同一个 GPU 节点上
- "spread"表示尽量将任务分配到不同 GPU 节点上。
scheduler.defaultSchedulerPolicy.gpuSchedulerPolicy:
字符串类型,预设值为"spread", 表示 GPU 调度策略。- "binpack"表示尽量将任务分配到同一个 GPU 上
- "spread"表示尽量将任务分配到不同 GPU 上。
就像这样:
helm install vgpu vgpu-charts/vgpu --set scheduler.defaultSchedulerPolicy.nodeSchedulerPolicy=binpark --set scheduler.defaultSchedulerPolicy.gpuSchedulerPolicy=spread
部署后,这两个配置作用域 hami-scheduler 上具体如下:
kk get deploy vgpu-hami-scheduler -oyaml apiVersion: apps/v1 kind: Deployment metadata: name: vgpu-hami-scheduler namespace: kube-system spec: template: spec: containers: - command: - scheduler - --resource-name=nvidia.com/gpu - --resource-mem=nvidia.com/gpumem - --resource-cores=nvidia.com/gpucores - --resource-mem-percentage=nvidia.com/gpumem-percentage - --resource-priority=nvidia.com/priority - --http_bind=0.0.0.0:443 - --cert_file=/tls/tls.crt - --key_file=/tls/tls.key - --scheduler-name=hami-scheduler - --metrics-bind-address=:9395 - --default-mem=0 - --default-gpu=1 - --default-cores=0 - --iluvatar-memory=iluvatar.ai/vcuda-memory - --iluvatar-cores=iluvatar.ai/vcuda-core - --cambricon-mlu-name=cambricon.com/vmlu - --cambricon-mlu-memory=cambricon.com/mlu.smlu.vmemory - --cambricon-mlu-cores=cambricon.com/mlu.smlu.vcore - --ascend-name=huawei.com/Ascend910 - --ascend-memory=huawei.com/Ascend910-memory - --ascend310p-name=huawei.com/Ascend310P - --ascend310p-memory=huawei.com/Ascend310P-memory - --overwrite-env=false - --node-scheduler-policy=binpack - --gpu-scheduler-policy=spread
就是这两个参数
- --node-scheduler-policy=binpack - --gpu-scheduler-policy=spread
2. Node 调度策略原理
这部分比较简单,选择节点的逻辑就在 Filter 接口中。
// pkg/scheduler/scheduler.go#L444 func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) { klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace) nums := k8sutil.Resourcereqs(args.Pod) total := 0 for _, n := range nums { for _, k := range n { total += int(k.Nums) } } if total == 0 { klog.V(1).Infof("pod %v not find resource", args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource")) return &extenderv1.ExtenderFilterResult{ NodeNames: args.NodeNames, FailedNodes: nil, Error: "", }, nil } annos := args.Pod.Annotations s.delPod(args.Pod) nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod) if err != nil { s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err) return nil, err } if len(failedNodes) != 0 { klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes) } nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod) if err != nil { err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err) return nil, err } if len((*nodeScores).NodeList) == 0 { klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet")) return &extenderv1.ExtenderFilterResult{ FailedNodes: failedNodes, }, nil } klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList)) sort.Sort(nodeScores) m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1] klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices) annotations := make(map[string]string) annotations[util.AssignedNodeAnnotations] = m.NodeID annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) for _, val := range device.GetDevices() { val.PatchAnnotations(&annotations, m.Devices) } //InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices) //supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices) //maps.Copy(annotations, InRequestDevices) //maps.Copy(annotations, supportDevices) s.addPod(args.Pod, m.NodeID, m.Devices) err = util.PatchPodAnnotations(args.Pod, annotations) if err != nil { s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err) s.delPod(args.Pod) return nil, err } s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil) res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}} return &res, nil }
主要就是下面这几句:
//计算得分,拿到所有满足条件的节点 nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod) // 排序 sort.Sort(nodeScores) // 直接选择最后一个节点 m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1] // 返回结果 res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}} return &res, nil
可以分为两个部分:
- 1)为所有节点计算得分
- 2)根据调度策略选择最合适的节点
计算得分
得分计算逻辑在 calcScore
方法里:
// pkg/scheduler/score.go#L185 func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) { userNodePolicy := config.NodeSchedulerPolicy if annos != nil { if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok { userNodePolicy = value } } res := policy.NodeScoreList{ Policy: userNodePolicy, NodeList: make([]*policy.NodeScore, 0), } //func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) { // res := make(NodeScoreList, 0, len(*nodes)) for nodeID, node := range *nodes { viewStatus(*node) score := policy.NodeScore{NodeID: nodeID, Devices: make(util.PodDevices), Score: 0} score.ComputeScore(node.Devices) //This loop is for different container request ctrfit := false for ctrid, n := range nums { sums := 0 for _, k := range n { sums += int(k.Nums) } if sums == 0 { for idx := range score.Devices { if len(score.Devices[idx]) <= ctrid { score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{}) } score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{}) continue } } klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID) fit, _ := fitInDevices(node, n, annos, task, &score.Devices) ctrfit = fit if !fit { klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID) break } } if ctrfit { res.NodeList = append(res.NodeList, &score) } } return &res, nil }
具体算法在 ComputeScore
中:
// pkg/scheduler/policy/node_policy.go#L53 func (ns *NodeScore) ComputeScore(devices DeviceUsageList) { // current user having request resource used, usedCore, usedMem := int32(0), int32(0), int32(0) for _, device := range devices.DeviceLists { used += device.Device.Used usedCore += device.Device.Usedcores usedMem += device.Device.Usedmem } klog.V(2).Infof("node %s used %d, usedCore %d, usedMem %d,", ns.NodeID, used, usedCore, usedMem) total, totalCore, totalMem := int32(0), int32(0), int32(0) for _, deviceLists := range devices.DeviceLists { total += deviceLists.Device.Count totalCore += deviceLists.Device.Totalcore totalMem += deviceLists.Device.Totalmem } useScore := float32(used) / float32(total) coreScore := float32(usedCore) / float32(totalCore) memScore := float32(usedMem) / float32(totalMem) ns.Score = float32(Weight) * (useScore + coreScore + memScore) klog.V(2).Infof("node %s computer score is %f", ns.NodeID, ns.Score) }
具体打分逻辑则是根据每个节点上的已经使用的 GPU Core、GPU Memory 资源和总的 GPU Core、GPU Memory 的比值,根据权重归一化处理后得到最终的得分。
总的来说就是:
节点上 GPU Core 和 GPU Memory 资源剩余越少,得分越高
。乍一看这个逻辑有点反直觉了,不是应该资源越多得分越高吗。
问题不大,等看完后续 根据策略选择节点 章节就清楚了。
过滤节点
打分之后还需要根据 Pod 申请的 GPU 信息,过滤掉不满足条件的节点。
比如:Pod 申请 2 vGPU,Node 上只有一张卡,肯定是不行的。
解析 Pod 申请的 GPU 信息
首先是从 Pod 信息中解析出申请的 GPU 信息:
// pkg/scheduler/scheduler.go#L444 func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) { nums := k8sutil.Resourcereqs(args.Pod) }
Resourcereqs
内容如下:
// pkg/k8sutil/pod.go#L27 func Resourcereqs(pod *corev1.Pod) (counts util.PodDeviceRequests) { counts = make(util.PodDeviceRequests, len(pod.Spec.Containers)) //Count Nvidia GPU for i := 0; i < len(pod.Spec.Containers); i++ { devices := device.GetDevices() counts[i] = make(util.ContainerDeviceRequests) for idx, val := range devices { request := val.GenerateResourceRequests(&pod.Spec.Containers[i]) if request.Nums > 0 { counts[i][idx] = val.GenerateResourceRequests(&pod.Spec.Containers[i]) } } } klog.InfoS("collect requestreqs", "counts", counts) return counts }
GenerateResourceRequests
是 Interface,以 NVIDIA 实现为例
// pkg/device/nvidia/device.go#L264 func (dev *NvidiaGPUDevices) GenerateResourceRequests(ctr *corev1.Container) util.ContainerDeviceRequest { resourceName := corev1.ResourceName(ResourceName) resourceMem := corev1.ResourceName(ResourceMem) resourceMemPercentage := corev1.ResourceName(ResourceMemPercentage) resourceCores := corev1.ResourceName(ResourceCores) v, ok := ctr.Resources.Limits[resourceName] if !ok { v, ok = ctr.Resources.Requests[resourceName] } if ok { if n, ok := v.AsInt64(); ok { memnum := 0 mem, ok := ctr.Resources.Limits[resourceMem] if !ok { mem, ok = ctr.Resources.Requests[resourceMem] } if ok { memnums, ok := mem.AsInt64() if ok { memnum = int(memnums) } } mempnum := int32(101) mem, ok = ctr.Resources.Limits[resourceMemPercentage] if !ok { mem, ok = ctr.Resources.Requests[resourceMemPercentage] } if ok { mempnums, ok := mem.AsInt64() if ok { mempnum = int32(mempnums) } } if mempnum == 101 && memnum == 0 { if config.DefaultMem != 0 { memnum = int(config.DefaultMem) } else { mempnum = 100 } } corenum := config.DefaultCores core, ok := ctr.Resources.Limits[resourceCores] if !ok { core, ok = ctr.Resources.Requests[resourceCores] } if ok { corenums, ok := core.AsInt64() if ok { corenum = int32(corenums) } } return util.ContainerDeviceRequest{ Nums: int32(n), Type: NvidiaGPUDevice, Memreq: int32(memnum), MemPercentagereq: int32(mempnum), Coresreq: int32(corenum), } } } return util.ContainerDeviceRequest{} }
逻辑也比较简单,就是从 Container 的 Resources 中根据名称解析拿到申请的 gpu、gpucore、gpumem 等信息。
过滤节点
逻辑同样在 calcScore
方法中,具体如下:
ctrfit := false for ctrid, n := range nums { sums := 0 for _, k := range n { sums += int(k.Nums) } if sums == 0 { for idx := range score.Devices { if len(score.Devices[idx]) <= ctrid { score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{}) } score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{}) continue } } klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID) fit, _ := fitInDevices(node, n, annos, task, &score.Devices) ctrfit = fit if !fit { klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID) break } } if ctrfit { res.NodeList = append(res.NodeList, &score) }
- nums 就是上一步解析出来的 Pod 的 GPU 申请情况
- score.Devices:就是当前节点上的 GPU 设备
具体过滤规则在这里:
fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
fitInDevices
内容如下:
内容比较多,去掉了其他无关内容,主要就是做了这几个判断,如果都满足则记录对应的 GPU 信息并返回 true,否则返回 false,表示该节点无法调度。
func fitInCertainDevice(node *NodeUsage, request util.ContainerDeviceRequest, annos map[string]string, pod *corev1.Pod) (bool, map[string]util.ContainerDevices) { // .... for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- { if node.Devices.DeviceLists[i].Device.Totalmem-node.Devices.DeviceLists[i].Device.Usedmem < memreq { continue } if node.Devices.DeviceLists[i].Device.Totalcore-node.Devices.DeviceLists[i].Device.Usedcores < k.Coresreq { continue } // Coresreq=100 indicates it want this card exclusively if node.Devices.DeviceLists[i].Device.Totalcore == 100 && k.Coresreq == 100 && node.Devices.DeviceLists[i].Device.Used > 0 { continue } // You can't allocate core=0 job to an already full GPU if node.Devices.DeviceLists[i].Device.Totalcore != 0 && node.Devices.DeviceLists[i].Device.Usedcores == node.Devices.DeviceLists[i].Device.Totalcore && k.Coresreq == 0 { continue } if k.Nums > 0 { klog.InfoS("first fitted", "pod", klog.KObj(pod), "device", node.Devices.DeviceLists[i].Device.ID) k.Nums-- tmpDevs[k.Type] = append(tmpDevs[k.Type], util.ContainerDevice{ Idx: int(node.Devices.DeviceLists[i].Device.Index), UUID: node.Devices.DeviceLists[i].Device.ID, Type: k.Type, Usedmem: memreq, Usedcores: k.Coresreq, }) } if k.Nums == 0 { klog.InfoS("device allocate success", "pod", klog.KObj(pod), "allocate device", tmpDevs) return true, tmpDevs } } return false, tmpDevs }
这样,我们就把不满足条件的节点给过滤掉了,剩下的节点都是可以正常调度 Pod 的,不过具体选择哪个节点还需要依赖于配置的调度策略。
根据策略选择节点
上一步计算出了每个节点的得分之后,就可以根据策略进行选择了。
//计算得分,拿到所有满足条件的节点 nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod) // 排序 sort.Sort(nodeScores) // 直接选择最后一个节点 m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1] // 返回结果 res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}} return &res, nil
具体的选择逻辑在这里:
// 排序 sort.Sort(nodeScores) // 直接选择最后一个节点 m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
对得分数据排序后,直接就选择了最后一个节点。
初次看到这里时也有点懵,想不明白这怎么和调度策略牵扯到一起的。
实际上具体逻辑就在 sort 这里
,NodeScoreList 要实现 sort 接口才能进行排序,因此看下是怎么实现的:// pkg/scheduler/policy/node_policy.go#L32 type NodeScoreList struct { NodeList []*NodeScore Policy string } func (l NodeScoreList) Len() int { return len(l.NodeList) } func (l NodeScoreList) Swap(i, j int) { l.NodeList[i], l.NodeList[j] = l.NodeList[j], l.NodeList[i] } func (l NodeScoreList) Less(i, j int) bool { if l.Policy == NodeSchedulerPolicySpread.String() { return l.NodeList[i].Score > l.NodeList[j].Score } // default policy is Binpack return l.NodeList[i].Score < l.NodeList[j].Score }
核心部分:
func (l NodeScoreList) Less(i, j int) bool { if l.Policy == NodeSchedulerPolicySpread.String() { return l.NodeList[i].Score > l.NodeList[j].Score } // default policy is Binpack return l.NodeList[i].Score < l.NodeList[j].Score }
根据我们的 Policy 不同,有两种排序方式,而且排序正好相反。
// NodeSchedulerPolicyBinpack is node use binpack scheduler policy. NodeSchedulerPolicyBinpack SchedulerPolicyName = "binpack" // NodeSchedulerPolicySpread is node use spread scheduler policy. NodeSchedulerPolicySpread SchedulerPolicyName = "spread"
这里涉及到 sort.Sort() 的实现,简单来说:
- 如果
Less()
方法中使用大于(>
)比较,最终排序结果将是降序。 - 如果
Less()
方法中使用小于(<
)比较,最终排序结果将是升序。
对应到调度策略:
Binpack 策略
使用 小于(<
)比较,最终排序结果将是升序Spread 策略
使用 大于(>
)比较,最终排序结果将是降序
又因为前面打分时的规则是:
剩余资源越少,得分越低
,再加上我们会选择排序后的最后一个节点
。至此,逻辑就清晰了。
- Binpack 策略选择最后一个节点,因为升序排列,最后一个 Node 得分最高,即:空闲资源最少
- Spread 策略选择最后一个节点,因为降序排列,最后一个 Node 得分最低,即:空闲资源最多
正好符合了策略的原本含义:
-
Binpack 则是让所有 Pod 尽量调度到同一个节点,优先把一个节点资源用完,然后再使用其他节点。
-
Spread 则是相反,尽量让 Pod 分散到所有节点上去。
调度策略是哪儿来的
这部分逻辑实际上是在 calScore
方法里:
func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) { userNodePolicy := config.NodeSchedulerPolicy if annos != nil { if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok { userNodePolicy = value } } res := policy.NodeScoreList{ Policy: userNodePolicy, NodeList: make([]*policy.NodeScore, 0), } }
首先使用默认的调度策略,当然默认调度策略也是会被参数覆盖的:
rootCmd.Flags().StringVar(&config.NodeSchedulerPolicy, "node-scheduler-policy", policy.NodeSchedulerPolicyBinpack.String(), "node scheduler policy") rootCmd.Flags().StringVar(&config.GPUSchedulerPolicy, "gpu-scheduler-policy", policy.GPUSchedulerPolicySpread.String(), "GPU scheduler policy")
然后解析 Pod 的 Annoations,如果有指定hami.io/node-scheduler-policy
就使用 Pod 上指定的调度策略。
至此,Node 调度策略就分析完成了。
3. GPU 调度策略原理
当 Node 选好之后,Node 上有多块 GPU,Pod 还分配哪块呢? 这时候就该 GPU 调度策略生效了。
实际上选择 GPU 的逻辑也暗含在 Filter 方法里了。
// pkg/scheduler/scheduler.go#L444 func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) { klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace) nums := k8sutil.Resourcereqs(args.Pod) total := 0 for _, n := range nums { for _, k := range n { total += int(k.Nums) } } if total == 0 { klog.V(1).Infof("pod %v not find resource", args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource")) return &extenderv1.ExtenderFilterResult{ NodeNames: args.NodeNames, FailedNodes: nil, Error: "", }, nil } annos := args.Pod.Annotations s.delPod(args.Pod) nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod) if err != nil { s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err) return nil, err } if len(failedNodes) != 0 { klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes) } nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod) if err != nil { err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err) return nil, err } if len((*nodeScores).NodeList) == 0 { klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet")) return &extenderv1.ExtenderFilterResult{ FailedNodes: failedNodes, }, nil } klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList)) sort.Sort(nodeScores) m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1] klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices) annotations := make(map[string]string) annotations[util.AssignedNodeAnnotations] = m.NodeID annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) for _, val := range device.GetDevices() { val.PatchAnnotations(&annotations, m.Devices) } //InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices) //supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices) //maps.Copy(annotations, InRequestDevices) //maps.Copy(annotations, supportDevices) s.addPod(args.Pod, m.NodeID, m.Devices) err = util.PatchPodAnnotations(args.Pod, annotations) if err != nil { s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err) s.delPod(args.Pod) return nil, err } s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil) res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}} return &res, nil }
过滤 GPU
解析 Pod 申请的 GPU 信息
首先是从 Pod 信息中解析出申请的 GPU 信息:
// pkg/scheduler/scheduler.go#L444 func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) { nums := k8sutil.Resourcereqs(args.Pod) }
和 Node 调度策略中的一样,这里就不在赘述了。
过滤 GPU
这部分逻辑隐藏比较深刻,在fitInDevices
方法中
// pkg/scheduler/score.go#L185 func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) { for nodeID, node := range *nodes { fit, _ := fitInDevices(node, n, annos, task, &score.Devices) } }
fitInDevices
内容如下:
func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) { //devmap := make(map[string]util.ContainerDevices) devs := util.ContainerDevices{} total, totalCore, totalMem := int32(0), int32(0), int32(0) free, freeCore, freeMem := int32(0), int32(0), int32(0) sums := 0 // computer all device score for one node for index := range node.Devices.DeviceLists { node.Devices.DeviceLists[index].ComputeScore(requests) } //This loop is for requests for different devices for _, k := range requests { sums += int(k.Nums) if int(k.Nums) > len(node.Devices.DeviceLists) { klog.InfoS("request devices nums cannot exceed the total number of devices on the node.", "pod", klog.KObj(pod), "request devices nums", k.Nums, "node device nums", len(node.Devices.DeviceLists)) return false, 0 } sort.Sort(node.Devices) fit, tmpDevs := fitInCertainDevice(node, k, annos, pod) if fit { for _, val := range tmpDevs[k.Type] { total += node.Devices.DeviceLists[val.Idx].Device.Count totalCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore totalMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem free += node.Devices.DeviceLists[val.Idx].Device.Count - node.Devices.DeviceLists[val.Idx].Device.Used freeCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore - node.Devices.DeviceLists[val.Idx].Device.Usedcores freeMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem - node.Devices.DeviceLists[val.Idx].Device.Usedmem node.Devices.DeviceLists[val.Idx].Device.Used++ node.Devices.DeviceLists[val.Idx].Device.Usedcores += val.Usedcores node.Devices.DeviceLists[val.Idx].Device.Usedmem += val.Usedmem } devs = append(devs, tmpDevs[k.Type]...) } else { return false, 0 } (*devinput)[k.Type] = append((*devinput)[k.Type], devs) } return true, 0 }
核心部分:
for _, k := range requests { sort.Sort(node.Devices) fit, tmpDevs := fitInCertainDevice(node, k, annos, pod) if fit { devs = append(devs, tmpDevs[k.Type]...) } else { return false, 0 } (*devinput)[k.Type] = append((*devinput)[k.Type], devs) }
这里又出现了 sort.Sort
是不是有点熟悉,不过暂时先不管,还是先分析怎么过滤 GPU 的。
核心部分在fitInCertainDevice
中,根据 Pod 申请的 GPU 信息找出当前节点上所有满足条件的 GPU
fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
fitInCertainDevice
在前面过滤 Node 时也分析过,这里就简单看下
func fitInCertainDevice(node *NodeUsage, request util.ContainerDeviceRequest, annos map[string]string, pod *corev1.Pod) (bool, map[string]util.ContainerDevices) { for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- { continue } if node.Devices.DeviceLists[i].Device.Totalcore-node.Devices.DeviceLists[i].Device.Usedcores < k.Coresreq { continue } // Coresreq=100 indicates it want this card exclusively if node.Devices.DeviceLists[i].Device.Totalcore == 100 && k.Coresreq == 100 && node.Devices.DeviceLists[i].Device.Used > 0 { continue } // You can't allocate core=0 job to an already full GPU if node.Devices.DeviceLists[i].Device.Totalcore != 0 && node.Devices.DeviceLists[i].Device.Usedcores == node.Devices.DeviceLists[i].Device.Totalcore && k.Coresreq == 0 { continue } if k.Nums > 0 { k.Nums-- tmpDevs[k.Type] = append(tmpDevs[k.Type], util.ContainerDevice{ Idx: int(node.Devices.DeviceLists[i].Device.Index), UUID: node.Devices.DeviceLists[i].Device.ID, Type: k.Type, Usedmem: memreq, Usedcores: k.Coresreq, }) } if k.Nums == 0 { klog.InfoS("device allocate success", "pod", klog.KObj(pod), "allocate device", tmpDevs) return true, tmpDevs } } return false, tmpDevs } }
如果某个 GPU 能满足这些条件就认为该 GPU 可以分配给对应 Container。
又回到前面的核心逻辑,对于满足条件的 GPU,这里使用了 devinput 对象进行记录。
for _, k := range requests { sort.Sort(node.Devices) fit, tmpDevs := fitInCertainDevice(node, k, annos, pod) if fit { devs = append(devs, tmpDevs[k.Type]...) } else { return false, 0 } (*devinput)[k.Type] = append((*devinput)[k.Type], devs) }
这里的 devinput 实际上就是前面传进来的 Score 对象。
type NodeScore struct { NodeID string Devices util.PodDevices // Score recode every node all device user/allocate score Score float32 }
标记到 Pod 上
hami 为了让后续的 DevicePlugin 能够知道要把哪些 GPU 分配给该 Pod,是直接将其记录到 Pod 的 Annoations 上的。
// pkg/scheduler/scheduler.go func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) { sort.Sort(nodeScores) m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1] klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices) annotations := make(map[string]string) annotations[util.AssignedNodeAnnotations] = m.NodeID annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) for _, val := range device.GetDevices() { val.PatchAnnotations(&annotations, m.Devices) } }
选择到节点之后,还把 m.Devices 信息记录到了 Pod 的 Annoations 上。
annotations := make(map[string]string) annotations[util.AssignedNodeAnnotations] = m.NodeID annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) for _, val := range device.GetDevices() { val.PatchAnnotations(&annotations, m.Devices) }
这里的 m.Devices 实际上就是前面我们过滤出来的满足条件的 GPU。
Annoations 大概是这样的:
root@test:~/lixd/hami# k get po hami-30 -oyaml apiVersion: v1 kind: Pod metadata: annotations: hami.io/bind-phase: allocating hami.io/bind-time: "1732072495" hami.io/vgpu-devices-allocated: GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,NVIDIA,20000,30:; hami.io/vgpu-devices-to-allocate: GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,NVIDIA,20000,30:; hami.io/vgpu-node: test hami.io/vgpu-time: "1732072495"
- hami.io/vgpu-devices-to-allocate 是 Scheduler 为 Pod 选择的目标 GPU
- hami.io/vgpu-devices-allocated 是当前已经分配的
ps:对于已经调度的 Pod hami.io/vgpu-devices-to-allocate 会被清空
调度完成后,DevicePlugin 直接读取 hami.io/vgpu-devices-to-allocate
就知道要为该 Pod 分配哪些 GPU 了。
根据策略选择 GPU
前面都已经选出了满足条件的 GPU 甚至都记录到了 Pod 的 Annoations 上了,那么 GPU 调度策略是什么时候生效的呢?
GPU 打分
逻辑和 Node 打分逻辑基本一致:
都是剩余资源越多,得分越低
。func (ds *DeviceListsScore) ComputeScore(requests util.ContainerDeviceRequests) { request, core, mem := int32(0), int32(0), int32(0) // Here we are required to use the same type device for _, container := range requests { request += container.Nums core += container.Coresreq if container.MemPercentagereq != 0 && container.MemPercentagereq != 101 { mem += ds.Device.Totalmem * (container.MemPercentagereq / 100.0) continue } mem += container.Memreq } klog.V(2).Infof("device %s user %d, userCore %d, userMem %d,", ds.Device.ID, ds.Device.Used, ds.Device.Usedcores, ds.Device.Usedmem) usedScore := float32(request+ds.Device.Used) / float32(ds.Device.Count) coreScore := float32(core+ds.Device.Usedcores) / float32(ds.Device.Totalcore) memScore := float32(mem+ds.Device.Usedmem) / float32(ds.Device.Totalmem) ds.Score = float32(Weight) * (usedScore + coreScore + memScore) klog.V(2).Infof("device %s computer score is %f", ds.Device.ID, ds.Score) }
排序
这部分也在 fitInDevices
方法中
// pkg/scheduler/score.go#L144 func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) { for _, k := range requests { sort.Sort(node.Devices) fit, tmpDevs := fitInCertainDevice(node, k, annos, pod) if fit { devs = append(devs, tmpDevs[k.Type]...) } else { return false, 0 } (*devinput)[k.Type] = append((*devinput)[k.Type], devs) } }
核心就是这个 sort 方法
sort.Sort(node.Devices)
又出现了 sort.Sort
是不是想到了什么。
前面选择节点的时候也是这样实现的,把具体逻辑放在 sort 接口实现上。看看 GPU 的 Sort 接口怎么实现的:
func (l DeviceUsageList) Len() int { return len(l.DeviceLists) } func (l DeviceUsageList) Swap(i, j int) { l.DeviceLists[i], l.DeviceLists[j] = l.DeviceLists[j], l.DeviceLists[i] } func (l DeviceUsageList) Less(i, j int) bool { if l.Policy == GPUSchedulerPolicyBinpack.String() { if l.DeviceLists[i].Device.Numa == l.DeviceLists[j].Device.Numa { return l.DeviceLists[i].Score < l.DeviceLists[j].Score } return l.DeviceLists[i].Device.Numa > l.DeviceLists[j].Device.Numa } // default policy is spread if l.DeviceLists[i].Device.Numa == l.DeviceLists[j].Device.Numa { return l.DeviceLists[i].Score > l.DeviceLists[j].Score } return l.DeviceLists[i].Device.Numa < l.DeviceLists[j].Device.Numa }
果然又是这样的,根据不同的 GPU 调度策略,Less 方法返回不同结果以控制排序结果是降序还是升序。
选择 GPU
然后后续再选择 GPU 的时候的代码如下:
func fitInCertainDevice(node *NodeUsage, request util.ContainerDeviceRequest, annos map[string]string, pod *corev1.Pod) (bool, map[string]util.ContainerDevices) { for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- { continue } if node.Devices.DeviceLists[i].Device.Totalcore-node.Devices.DeviceLists[i].Device.Usedcores < k.Coresreq { continue } // Coresreq=100 indicates it want this card exclusively if node.Devices.DeviceLists[i].Device.Totalcore == 100 && k.Coresreq == 100 && node.Devices.DeviceLists[i].Device.Used > 0 { continue } // You can't allocate core=0 job to an already full GPU if node.Devices.DeviceLists[i].Device.Totalcore != 0 && node.Devices.DeviceLists[i].Device.Usedcores == node.Devices.DeviceLists[i].Device.Totalcore && k.Coresreq == 0 { continue } if k.Nums > 0 { k.Nums-- tmpDevs[k.Type] = append(tmpDevs[k.Type], util.ContainerDevice{ Idx: int(node.Devices.DeviceLists[i].Device.Index), UUID: node.Devices.DeviceLists[i].Device.ID, Type: k.Type, Usedmem: memreq, Usedcores: k.Coresreq, }) } if k.Nums == 0 { klog.InfoS("device allocate success", "pod", klog.KObj(pod), "allocate device", tmpDevs) return true, tmpDevs } } return false, tmpDevs } }
核心是这个 for 循环
for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- { }
也是从最后一个 GPU 开始的,也就是如果排在后面的 GPU 满足条件就会直接被选中,不会再去选择前面的了。
- Binpack 策略:结果为升序,越往后的 GPU 空闲资源越少
- Spread 策略:结果为降序,越往后的 GPU 空闲资源越多
同样也是符合对应策略含义的。
至此,GPU 调度策略也分析完了。
DevicePlugin 解析 GPU 信息
在调度时,我们把最终选择的 GPU 记录到了 Pod 的 Annoations 上,DevicePlugin 这边就不需要选择 GPU 了,从 Annoations 上解析即可
// pkg/util/util.go#L281 func GetNextDeviceRequest(dtype string, p corev1.Pod) (corev1.Container, ContainerDevices, error) { pdevices, err := DecodePodDevices(InRequestDevices, p.Annotations) if err != nil { return corev1.Container{}, ContainerDevices{}, err } klog.Infof("pod annotation decode vaule is %+v", pdevices) res := ContainerDevices{} pd, ok := pdevices[dtype] if !ok { return corev1.Container{}, res, errors.New("device request not found") } for ctridx, ctrDevice := range pd { if len(ctrDevice) > 0 { return p.Spec.Containers[ctridx], ctrDevice, nil } } return corev1.Container{}, res, errors.New("device request not found") } // pkg/util/util.go#L254 func DecodePodDevices(checklist map[string]string, annos map[string]string) (PodDevices, error) { klog.V(5).Infof("checklist is [%+v], annos is [%+v]", checklist, annos) if len(annos) == 0 { return PodDevices{}, nil } pd := make(PodDevices) for devID, devs := range checklist { str, ok := annos[devs] if !ok { continue } pd[devID] = make(PodSingleDevice, 0) for _, s := range strings.Split(str, OnePodMultiContainerSplitSymbol) { cd, err := DecodeContainerDevices(s) if err != nil { return PodDevices{}, nil } if len(cd) == 0 { continue } pd[devID] = append(pd[devID], cd) } } klog.InfoS("Decoded pod annos", "poddevices", pd) return pd, nil }
具体的解析逻辑如下,就是按照预设规则,以冒号,逗号进行切分
// pkg/util/util.go#L223 func DecodeContainerDevices(str string) (ContainerDevices, error) { if len(str) == 0 { return ContainerDevices{}, nil } cd := strings.Split(str, OneContainerMultiDeviceSplitSymbol) contdev := ContainerDevices{} tmpdev := ContainerDevice{} klog.V(5).Infof("Start to decode container device %s", str) if len(str) == 0 { return ContainerDevices{}, nil } for _, val := range cd { if strings.Contains(val, ",") { //fmt.Println("cd is ", val) tmpstr := strings.Split(val, ",") if len(tmpstr) < 4 { return ContainerDevices{}, fmt.Errorf("pod annotation format error; information missing, please do not use nodeName field in task") } tmpdev.UUID = tmpstr[0] tmpdev.Type = tmpstr[1] devmem, _ := strconv.ParseInt(tmpstr[2], 10, 32) tmpdev.Usedmem = int32(devmem) devcores, _ := strconv.ParseInt(tmpstr[3], 10, 32) tmpdev.Usedcores = int32(devcores) contdev = append(contdev, tmpdev) } } klog.V(5).Infof("Finished decoding container devices. Total devices: %d", len(contdev)) return contdev, nil }
至此,hami 提供的 Node、GPU 级别的 Spread、Binpack 高级调度策略就分析完成了。
【Kubernetes 系列】
持续更新中,搜索公众号【探索云原生
】订阅,阅读更多文章。4. 小结
调度策略配置
hami-scheduler 提供了两种不同级别的调度策略:
- 节点调度策略:作用于调度过程中如何为 Pod 选择节点
- GPU 调度策略:作用于选择节点后,节点存在多 GPU 时如何为 Pod 选择 GPU
二者都支持 Spread 和 Binpack 两种配置:
- Spread 表示尽量将任务分配到不同 Node 或 GPU 上,让集群中的 Node 或 GPU 负载尽量保持相同水位线。
- Binpack 表示尽量将任务分配到同一 Node 或者 GPU 上,尽量先占满一个 Node 或者 GPU 后再使用别的
具体 Node、GPU 调度策略实现都可以分为以下几步
:- 1)给 Node、GPU 打分
- 2)过滤掉不满足条件的 Node、GPU
- 3)根据调度策略选择出最优 Node、GPU
- 具体逻辑都在 sort.Sort 接口的 Less 方法实现的
- 对于 Spread 策略就选择剩余资源多的 Node、GPU,Binpack 策略就选择剩余资源少的 Node、GPU
- 4)对结果进行记录
- Node 则是通过 Bind 结果直接和 Pod 绑定
- GPU 则是记录到 Pod 的 Annoations 上
所有逻辑都在 Filter 方法里,Node 的调度策略还算比较清晰,除了 sort.Sort 这个点需要多看会之外,其他都还好。
GPU 调度策略就复杂了一点,所有逻辑都混在一起的,不是很容易区分,需要慢慢分析。
这一切,似未曾拥有