上篇我们分析了 hami-webhook,该 Webhook 将申请了 vGPU 资源的 Pod 的调度器修改为 hami-scheduler,后续使用 hami-scheduler 进行调度。
本文为 HAMi 原理分析的第三篇,分析 hami-scheduler 工作流程。
上篇主要分析了 hami-webhook,解决了:Pod 是如何使用到 hami-scheduler,创建 Pod 时我们未指定 SchedulerName 默认会使用 default-scheduler 进行调度才对 问题。
这篇开始分析 hami-scheduler,解决另一个问题:hami-scheduler 逻辑,spread & binpark 等 高级调度策略是如何实现的
写完发现内容还是很多,spread & binpark 调度策略下一篇在分析吧,这篇主要分析调度流程。
以下分析基于 HAMi v2.4.0
省流:
HAMi Webhook 、Scheduler 工作流程如下:
-
1)用户创建 Pod 并在 Pod 中申请了 vGPU 资源
-
2)
kube-apiserver 根据 MutatingWebhookConfiguration 配置请求 HAMi-Webhook
-
3)HAMi-Webhook 检测 Pod 中的 Resource,如果申请的由 HAMi 管理的 vGPU 资源,就会把 Pod 中的 SchedulerName 改成了 hami-scheduler,这样这个 Pod 就会由 hami-scheduler 进行调度了。
-
对于特权模式的 Pod,Webhook 会直接跳过不处理
-
对于使用 vGPU 资源但指定了 nodeName 的 Pod,Webhook 会直接拒绝
-
4)hami-scheduler 进行 Pod 调度,不过就是用的 k8s 的默认 kube-scheduler 镜像,因此调度逻辑和默认的 default-scheduler 是一样的,
但是 kube-scheduler 还会根据 KubeSchedulerConfiguration 配置,调用 Extender Scheduler 插件
-
这个 Extender Scheduler 就是 hami-scheduler Pod 中的另一个 Container,该 Container 同时提供了 Webhook 和 Scheduler 相关 API。
-
当 Pod 申请了 vGPU 资源时,kube-scheduler 就会根据配置以 HTTP 形式调用 Extender Scheduler 插件,这样就实现了自定义调度逻辑
-
5)Extender Scheduler 插件包含了真正的 hami 调度逻辑, 调度时根据节点剩余资源量进行打分选择节点
- 这里就包含了 spread & binpark 等 高级调度策略的实现
-
6)异步任务,包括 GPU 感知逻辑
- devicePlugin 中的后台 Goroutine 定时上报 Node 上的 GPU 资源并写入到 Node 的 Annoations
- 除了 DevicePlugin 之外,还使用异步任务以 Patch Annotation 方式提交更多信息
- Extender Scheduler 插件根据 Node Annoations 解析出 GPU 资源总量、从 Node 上已经运行的 Pod 的 Annoations 中解析出 GPU 使用量,计算出每个 Node 剩余的可用资源保存到内存供调度时使用
1. 概述
Hami-scheduler 主要是 Pod 的调度逻辑,从集群节点中为当前 Pod 选择最合适的节点。
Hami-scheduler 也是通过 Scheduler Extender 方式实现的,可以参考上一篇文章 K8s 自定义调度器 Part1:通过 Scheduler Extender 实现自定义调度逻辑 手把手实现一个自定义调度器。
但是 HAMi 并没有直接扩展 default-scheduler,而是使用默认的 kube-scheduler 镜像额外启动了一个 scheduler,但是通过配置把名称指定为了 hami-scheduler。
然后给这个 hami-scheduler 配置了 Extender,Extender 服务就是同 Pod 中的另一个 Container 启动的一个 http 服务。
ps:后续说的 hami-scheduler 一般只这部分 Extender 实现的调度插件
2. 具体部署
Deployment
Hami-scheduler 使用 Deployment 进行部署,该 Deployment 中有两个 Container,其中一个是原生的 kube-scheduler,另一个则是 HAMi 的 Scheduler 服务。
完整 yaml 如下:
apiVersion: apps/v1 kind: Deployment metadata: name: vgpu-hami-scheduler namespace: kube-system spec: template: spec: containers: - command: - kube-scheduler - --config=/config/config.yaml - -v=4 - --leader-elect=true - --leader-elect-resource-name=hami-scheduler - --leader-elect-resource-namespace=kube-system image: 192.168.116.54:5000/kube-scheduler:v1.23.17 imagePullPolicy: IfNotPresent name: kube-scheduler resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /config name: scheduler-config - command: - scheduler - --resource-name=nvidia.com/vgpu - --resource-mem=nvidia.com/gpumem - --resource-cores=nvidia.com/gpucores - --resource-mem-percentage=nvidia.com/gpumem-percentage - --resource-priority=nvidia.com/priority - --http_bind=0.0.0.0:443 - --cert_file=/tls/tls.crt - --key_file=/tls/tls.key - --scheduler-name=hami-scheduler - --metrics-bind-address=:9395 - --default-mem=0 - --default-gpu=1 - --default-cores=0 - --iluvatar-memory=iluvatar.ai/vcuda-memory - --iluvatar-cores=iluvatar.ai/vcuda-core - --cambricon-mlu-name=cambricon.com/vmlu - --cambricon-mlu-memory=cambricon.com/mlu.smlu.vmemory - --cambricon-mlu-cores=cambricon.com/mlu.smlu.vcore - --ascend-name=huawei.com/Ascend910 - --ascend-memory=huawei.com/Ascend910-memory - --ascend310p-name=huawei.com/Ascend310P - --ascend310p-memory=huawei.com/Ascend310P-memory - --overwrite-env=false - --node-scheduler-policy=binpack - --gpu-scheduler-policy=spread - --debug - -v=4 image: projecthami/hami:v2.3.13 imagePullPolicy: IfNotPresent name: vgpu-scheduler-extender ports: - containerPort: 443 name: http protocol: TCP volumeMounts: - mountPath: /tls name: tls-config dnsPolicy: ClusterFirst priorityClassName: system-node-critical restartPolicy: Always schedulerName: default-scheduler serviceAccount: vgpu-hami-scheduler serviceAccountName: vgpu-hami-scheduler terminationGracePeriodSeconds: 30 volumes: - name: tls-config secret: defaultMode: 420 secretName: vgpu-hami-scheduler-tls - configMap: defaultMode: 420 name: vgpu-hami-scheduler-newversion name: scheduler-config
KubeSchedulerConfiguration
对应的 Scheduler 的配置文件存储在 Configmap 中,具体内容如下:
apiVersion: v1 data: config.yaml: | apiVersion: kubescheduler.config.k8s.io/v1beta2 kind: KubeSchedulerConfiguration leaderElection: leaderElect: false profiles: - schedulerName: hami-scheduler extenders: - urlPrefix: "https://127.0.0.1:443" filterVerb: filter bindVerb: bind nodeCacheCapable: true weight: 1 httpTimeout: 30s enableHTTPS: true tlsConfig: insecure: true managedResources: - name: nvidia.com/vgpu ignoredByScheduler: true - name: nvidia.com/gpumem ignoredByScheduler: true - name: nvidia.com/gpucores ignoredByScheduler: true - name: nvidia.com/gpumem-percentage ignoredByScheduler: true - name: nvidia.com/priority ignoredByScheduler: true - name: cambricon.com/vmlu ignoredByScheduler: true - name: hygon.com/dcunum ignoredByScheduler: true - name: hygon.com/dcumem ignoredByScheduler: true - name: hygon.com/dcucores ignoredByScheduler: true - name: iluvatar.ai/vgpu ignoredByScheduler: true - name: huawei.com/Ascend910-memory ignoredByScheduler: true - name: huawei.com/Ascend910 ignoredByScheduler: true - name: huawei.com/Ascend310P-memory ignoredByScheduler: true - name: huawei.com/Ascend310P ignoredByScheduler: true kind: ConfigMap metadata: name: vgpu-hami-scheduler-newversion namespace: kube-system
SchedulerName
首先是指定了 Scheduler 名字叫做 hami-scheduler,k8s 默认的调度器叫做 default-scheduler。
profiles: - schedulerName: hami-scheduler
创建 Pod 的时候我们是没有指定 schedulerName 的,所以默认都会使用 default-scheduler,也就是默认 kube-scheduler 进行调度。
之前 hami-webhook 修改 SchedulerName 时就需要和这里配置的名称对应。
Extenders
调度器核心配置如下:
extenders: - urlPrefix: "https://127.0.0.1:443" filterVerb: filter bindVerb: bind
参数解释:
-
urlPrefix
:"https://127.0.0.1:443"
:这是一个调度器扩展器的服务地址,Kubernetes 调度器会调用这个地址来请求外部调度逻辑。可以通过 HTTPS 访问。- External Scheduler 因为是和 kube-scheduler 部署在一个 Pod 里的,因此使用 127.0.0.1 进行访问
-
filterVerb
:filter
:这个动词指示了调度器会调用这个扩展器服务来过滤节点,即决定哪些节点适合调度 Pod。- Filter 接口对应这个 http 服务的 url 就是
/filter
- Filter 接口对应这个 http 服务的 url 就是
-
bindVerb
:bind
:调度器扩展器可以执行绑定操作,即将 Pod 绑定到特定节点。- 同上,bind 就要对应
/bind
这个接口
- 同上,bind 就要对应
managedResources
managedResources 这部分指定这个扩展调度器 hami-cheduler 管理的资源,只有 Pod Resource 中申请了 managedResources 中指定的资源时,Scheduler 才会请求我们配置的 Extender,也就是 hami-scheduler。
即:只要没申请 vGPU 资源,就是指定使用 hami-scheduler 调度,也是由名为 hami-scheduler 的 kube-scheduler 进行调度,不会请求 Extender,真正的 HAMi 调度插件不会生效。
managedResources: - name: nvidia.com/vgpu ignoredByScheduler: true - name: nvidia.com/gpumem ignoredByScheduler: true ...
-
name: nvidia.com/vgpu:资源名称
-
ignoredByScheduler:当设置为
true
时,调度器在做节点资源匹配和资源分配时,会忽略这个资源。这些资源都由扩展的 hami-scheduler 进行调度即可。
这样配置之后,对于 nvidia.com/vgpu、nvidia.com/gpumem 等等 managedResources 中指定的资源,调度器在做节点资源匹配和资源分配时,会忽略这个资源,不会因为 Node 上没有这些虚拟资源,就直接调度失败了。
当调度器请求扩展的 hami-scheduler 进行调度时,hami-scheduler 就能够正常处理这些资源,根据 Pod 申请的 Resource 配置找到对应的节点。
接下来则分析 hami-scheduler 的具体实现,包括两个问题:
-
1)hami-scheduler 如何感知 Node 上的 GPU 信息的,因为前面提到 gpucore、gpumem 这些都是虚拟资源, DevicePlugin 也是没有直接上报到 Node 上的
-
2)hami-scheduler 是如何选择最合适的节点的,spark & binpark 等高级调度策略是如何实现的
3. hami 如何感知 Nod 上的 GPU 资源情况的
分为两部分:
-
1)感知 Node 上的 GPU 资源信息
-
2)感知 Node 上 GPU 资源使用情况
因为 gpucore、gpumem 这些都是虚拟资源,因此不能像 DevicePlugin 上报的标准第三方资源一样,由 K8s 直接维护,而是需要 hami 自行维护。
为什么需要自定义感知逻辑
到这里大家可能会有疑问,
上一篇文章中介绍了 hami-device-plugin-nvidia,这里的 devicePlugin 不就已经感知了节点上的 GPU 并上报到 kube-apiserver 了吗,怎么还需要实现一个感知逻辑?
在节点上都可以看到了,例如下面节点上就有 20 个 GPU
root@j99cloudvm:~# k describe node j99cloudvm |grep Capa -A 7 Capacity: cpu: 64 ephemeral-storage: 2097139692Ki hugepages-1Gi: 0 hugepages-2Mi: 0 memory: 131966216Ki nvidia.com/gpu: 20
因为:hami-scheduler 做了细粒度的 gpucore、gpumem 切分,那么就要知道节点上的 GPU 具体数量、每张卡的显存大小等信息,不然如果把 Pod 分配到一个所有 GPU 显存都已经消耗完的 Node 上不就出问题了。
感知节点上的 GPU 资源
Hami 是如何感知节点上的 GPU 情况的呢?也就是之前 start 方法中的这个 Goroutine 在维护,核心就是在这个 RegisterFromNodeAnnotations 方法中
go sher.RegisterFromNodeAnnotations()
精简后代码如下:
调用 kube-apiserver 获取到节点列表,然后从 node 的 Annoations 中解析出 Device 信息,并保存到内存中。
func (s *Scheduler) RegisterFromNodeAnnotations() { klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations") ticker := time.NewTicker(time.Second * 15) for { select { case <-s.nodeNotify: case <-ticker.C: case <-s.stopCh: return } // kube-apiserver 查询节点列表 labelSelector := labels.Everything() if config.NodeLabelSelector != nil && len(config.NodeLabelSelector) > 0 { labelSelector = (labels.Set)(config.NodeLabelSelector).AsSelector() } rawNodes, err := s.nodeLister.List(labelSelector) if err != nil { klog.Errorln("nodes list failed", err.Error()) continue } var nodeNames []string for _, val := range rawNodes { // 从 node 信息中解析出 GPU 信息 nodedevices, err := devInstance.GetNodeDevices(*val) for _, deviceinfo := range nodedevices { found := false _, ok := s.nodes[val.Name] if ok { for i1, val1 := range s.nodes[val.Name].Devices { if strings.Compare(val1.ID, deviceinfo.ID) == 0 { found = true s.nodes[val.Name].Devices[i1].Devmem = deviceinfo.Devmem s.nodes[val.Name].Devices[i1].Devcore = deviceinfo.Devcore break } } } if !found { nodeInfo.Devices = append(nodeInfo.Devices, util.DeviceInfo{ ID: deviceinfo.ID, Index: uint(deviceinfo.Index), Count: deviceinfo.Count, Devmem: deviceinfo.Devmem, Devcore: deviceinfo.Devcore, Type: deviceinfo.Type, Numa: deviceinfo.Numa, Health: deviceinfo.Health, DeviceVendor: devhandsk, }) } } // 最终将节点信息添加到缓存里 s.addNode(val.Name, nodeInfo) } // .....
会将最新的 Node 数据存到内存中,便于调度时使用,就是 nodeManager 中的 nodes 这个 map 对象
type Scheduler struct { nodeManager podManager stopCh chan struct{} kubeClient kubernetes.Interface podLister listerscorev1.PodLister nodeLister listerscorev1.NodeLister //Node status returned by filter cachedstatus map[string]*NodeUsage nodeNotify chan struct{} //Node Overview overviewstatus map[string]*NodeUsage eventRecorder record.EventRecorder } type nodeManager struct { nodes map[string]*util.NodeInfo mutex sync.RWMutex }
重点来了:这里的数据来源是 node 的 Annoations,而这个 Annoations 就是上一篇中提到的 hami device plugin nvidia 中的一个后台 goroutine 在维护。
// returns all nodes and its device memory usage, and we filter it with nodeSelector, taints, nodeAffinity // unschedulerable and nodeName. func (s *Scheduler) getNodesUsage(nodes *[]string, task *corev1.Pod) (*map[string]*NodeUsage, map[string]string, error) { overallnodeMap := make(map[string]*NodeUsage) cachenodeMap := make(map[string]*NodeUsage) failedNodes := make(map[string]string) //for _, nodeID := range *nodes { allNodes, err := s.ListNodes() if err != nil { return &overallnodeMap, failedNodes, err } for _, node := range allNodes { nodeInfo := &NodeUsage{} userGPUPolicy := config.GPUSchedulerPolicy if task != nil && task.Annotations != nil { if value, ok := task.Annotations[policy.GPUSchedulerPolicyAnnotationKey]; ok { userGPUPolicy = value } } nodeInfo.Devices = policy.DeviceUsageList{ Policy: userGPUPolicy, DeviceLists: make([]*policy.DeviceListsScore, 0), } for _, d := range node.Devices { nodeInfo.Devices.DeviceLists = append(nodeInfo.Devices.DeviceLists, &policy.DeviceListsScore{ Score: 0, Device: &util.DeviceUsage{ ID: d.ID, Index: d.Index, Used: 0, Count: d.Count, Usedmem: 0, Totalmem: d.Devmem, Totalcore: d.Devcore, Usedcores: 0, Type: d.Type, Numa: d.Numa, Health: d.Health, }, }) } overallnodeMap[node.ID] = nodeInfo } podsInfo := s.ListPodsInfo() for _, p := range podsInfo { node, ok := overallnodeMap[p.NodeID] if !ok { continue } for _, podsingleds := range p.Devices { for _, ctrdevs := range podsingleds { for _, udevice := range ctrdevs { for _, d := range node.Devices.DeviceLists { if d.Device.ID == udevice.UUID { d.Device.Used++ d.Device.Usedmem += udevice.Usedmem d.Device.Usedcores += udevice.Usedcores } } } } } klog.V(5).Infof("usage: pod %v assigned %v %v", p.Name, p.NodeID, p.Devices) } s.overviewstatus = overallnodeMap for _, nodeID := range *nodes { node, err := s.GetNode(nodeID) if err != nil { // The identified node does not have a gpu device, so the log here has no practical meaning,increase log priority. klog.V(5).InfoS("node unregistered", "node", nodeID, "error", err) failedNodes[nodeID] = "node unregistered" continue } cachenodeMap[node.ID] = overallnodeMap[node.ID] } s.cachedstatus = cachenodeMap return &cachenodeMap, failedNodes, nil }
大概长这样:
root@test:~# k get node test -oyaml apiVersion: v1 kind: Node metadata: annotations: csi.volume.kubernetes.io/nodeid: '{"nfs.csi.k8s.io":"j99cloudvm","rbd.csi.ceph.com":"j99cloudvm"}' hami.io/node-handshake: Requesting_2024.11.19 03:10:32 hami.io/node-handshake-dcu: Deleted_2024.09.13 06:42:44 hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA A40,0,true:' kubeadm.alpha.kubernetes.io/cri-socket: /run/containerd/containerd.sock
其中下面这个就是 GPU 的具体信息,包括 ID、型号、内存等信息。
hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA A40,0,true:'
感知节点上 GPU 使用情况
除此之外,
hami 还需要感知到 GPU 的使用情况,才能计算出还有多少 gpucore、gpumem 可以使用
。这里使用的 client-go 中提供的 Informer 机制,Watch Pod 和 Node 的变化情况,获取 Node 上运行的 Pod,根据 Pod 申请的资源和 Node 上的总资源计算出剩余资源。
// pkg/scheduler/scheduler.go#L127 func (s *Scheduler) Start() { kubeClient, err := k8sutil.NewClient() check(err) s.kubeClient = kubeClient informerFactory := informers.NewSharedInformerFactoryWithOptions(s.kubeClient, time.Hour*1) s.podLister = informerFactory.Core().V1().Pods().Lister() s.nodeLister = informerFactory.Core().V1().Nodes().Lister() informer := informerFactory.Core().V1().Pods().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: s.onAddPod, UpdateFunc: s.onUpdatePod, DeleteFunc: s.onDelPod, }) informerFactory.Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: s.onAddNode, UpdateFunc: s.onUpdateNode, DeleteFunc: s.onDelNode, }) informerFactory.Start(s.stopCh) informerFactory.WaitForCacheSync(s.stopCh) s.addAllEventHandlers() }
其中 Pod Create、Update 时都会 j 进入下面这个 onAddPod 逻辑:
AssignedNodeAnnotations = "hami.io/vgpu-node" // pkg/scheduler/scheduler.go#L92 func (s *Scheduler) onAddPod(obj interface{}) { pod, ok := obj.(*corev1.Pod) if !ok { klog.Errorf("unknown add object type") return } nodeID, ok := pod.Annotations[util.AssignedNodeAnnotations] if !ok { return } if k8sutil.IsPodInTerminatedState(pod) { s.delPod(pod) return } podDev, _ := util.DecodePodDevices(util.SupportDevices, pod.Annotations) s.addPod(pod, nodeID, podDev) }
这边限制了只会处理 hami.io/vgpu-node
annoations 的 Pod,过滤掉其他 Pod,从 Pod Annoations 中解析出该 Pod 使用的 GPU UUID 以及 memory 和 core 等信息。
Pod 上的 Annoations 大概是这样的:
$ k get po gpu-pod -oyaml apiVersion: v1 kind: Pod metadata: annotations: hami.io/bind-phase: success hami.io/bind-time: "1727251686" hami.io/vgpu-devices-allocated: GPU-03f69c50-207a-2038-9b45-23cac89cb67d,NVIDIA,3000,30:; hami.io/vgpu-devices-to-allocate: ; hami.io/vgpu-node: test hami.io/vgpu-time: "1727251686"
其中hami.io/vgpu-devices-allocated
对应的 value 就是 GPU 信息,格式化后如下
GPU-03f69c50-207a-2038-9b45-23cac89cb67d,NVIDIA,3000,30:;
-
GPU-03f69c50-207a-2038-9b45-23cac89cb67d:设备 UUID
-
NVIDIA:设备类型
-
3000:使用 3000M memory
-
30:使用 30% core
至于 Pod 删除则是直接删除内存中缓存的 Pod 信息即可。
Node 的变化则比较简单,就是往 nodeNotify channel 发送通知
func (s *Scheduler) onUpdateNode(_, newObj interface{}) { s.nodeNotify <- struct{}{} } func (s *Scheduler) onDelNode(obj interface{}) { s.nodeNotify <- struct{}{} } func (s *Scheduler) onAddNode(obj interface{}) { s.nodeNotify <- struct{}{} }
这个消息最终就会立即触发一次上面的 RegisterFromNodeAnnotations 逻辑
默认是定时的,每 15s 触发一次,增加 notify 可以在节点变化时更快感知到
func (s *Scheduler) RegisterFromNodeAnnotations() { klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations") ticker := time.NewTicker(time.Second * 15) for { select { case <-s.nodeNotify: case <-ticker.C: case <-s.stopCh: return } // .....
通过这个 Informer HAMi 可以知道以下信息:
-
集群中的 GPU 节点情况,每个节点上的 GPU 设备信息
-
Pod 对 GPU 的具体使用情况,包括 memory、core 使用量等
通过这些信息,就可以完成后续的 Scheduler 逻辑了。
4. 调度实现
上一步拿到集群中的 GPU 信息之后,就可以开始调度了,具体实现分为两个接口:
-
Filter:根据各种条件进行过滤,为当前 Pod 选择合适的节点
-
Bind:将 Pod 最终后某一个节点进行绑定,完成调度
Filter 接口
看下看 Filter 接口是怎么进行节点过滤的
// pkg/scheduler/scheduler.go#L444 func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) { klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace) nums := k8sutil.Resourcereqs(args.Pod) total := 0 for _, n := range nums { for _, k := range n { total += int(k.Nums) } } if total == 0 { klog.V(1).Infof("pod %v not find resource", args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource")) return &extenderv1.ExtenderFilterResult{ NodeNames: args.NodeNames, FailedNodes: nil, Error: "", }, nil } annos := args.Pod.Annotations s.delPod(args.Pod) nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod) if err != nil { s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err) return nil, err } if len(failedNodes) != 0 { klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes) } nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod) if err != nil { err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err) return nil, err } if len((*nodeScores).NodeList) == 0 { klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet")) return &extenderv1.ExtenderFilterResult{ FailedNodes: failedNodes, }, nil } klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList)) sort.Sort(nodeScores) m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1] klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices) annotations := make(map[string]string) annotations[util.AssignedNodeAnnotations] = m.NodeID annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) for _, val := range device.GetDevices() { val.PatchAnnotations(&annotations, m.Devices) } //InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices) //supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices) //maps.Copy(annotations, InRequestDevices) //maps.Copy(annotations, supportDevices) s.addPod(args.Pod, m.NodeID, m.Devices) err = util.PatchPodAnnotations(args.Pod, annotations) if err != nil { s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err) s.delPod(args.Pod) return nil, err } s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil) res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}} return &res, nil }
对于没有申请特殊资源的 Pod 直接返回全部 Node 都可以调度,不做处理
nums := k8sutil.Resourcereqs(args.Pod) total := 0 for _, n := range nums { for _, k := range n { total += int(k.Nums) } } if total == 0 { klog.V(1).Infof("pod %v not find resource", args.Pod.Name) s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource")) return &extenderv1.ExtenderFilterResult{ NodeNames: args.NodeNames, FailedNodes: nil, Error: "", }, nil }
否则就根据上一步中获取到的 Node 信息进行打分,具体打分逻辑则是根据每个节点上的已经使用的 GPU Core、GPU Memory 资源和总的 GPU Core、GPU Memory 的比值,根据权重归一化处理后得到最终的得分。
总的来说就是:
节点上 GPU Core 和 GPU Memory 资源剩余越少,得分越高
。// pkg/scheduler/policy/node_policy.go#L52 func (ns *NodeScore) ComputeScore(devices DeviceUsageList) { // current user having request resource used, usedCore, usedMem := int32(0), int32(0), int32(0) for _, device := range devices.DeviceLists { used += device.Device.Used usedCore += device.Device.Usedcores usedMem += device.Device.Usedmem } klog.V(2).Infof("node %s used %d, usedCore %d, usedMem %d,", ns.NodeID, used, usedCore, usedMem) total, totalCore, totalMem := int32(0), int32(0), int32(0) for _, deviceLists := range devices.DeviceLists { total += deviceLists.Device.Count totalCore += deviceLists.Device.Totalcore totalMem += deviceLists.Device.Totalmem } useScore := float32(used) / float32(total) coreScore := float32(usedCore) / float32(totalCore) memScore := float32(usedMem) / float32(totalMem) ns.Score = float32(Weight) * (useScore + coreScore + memScore) klog.V(2).Infof("node %s computer score is %f", ns.NodeID, ns.Score) }
除了计算得分还要判断节点剩余资源是否能满足当前 Pod,如果不满足则直接忽略掉
// pkg/scheduler/score.go#L185 func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) { userNodePolicy := config.NodeSchedulerPolicy if annos != nil { if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok { userNodePolicy = value } } res := policy.NodeScoreList{ Policy: userNodePolicy, NodeList: make([]*policy.NodeScore, 0), } //func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) { // res := make(NodeScoreList, 0, len(*nodes)) for nodeID, node := range *nodes { viewStatus(*node) score := policy.NodeScore{NodeID: nodeID, Devices: make(util.PodDevices), Score: 0} score.ComputeScore(node.Devices) //This loop is for different container request ctrfit := false for ctrid, n := range nums { sums := 0 for _, k := range n { sums += int(k.Nums) } if sums == 0 { for idx := range score.Devices { if len(score.Devices[idx]) <= ctrid { score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{}) } score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{}) continue } } klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID) fit, _ := fitInDevices(node, n, annos, task, &score.Devices) ctrfit = fit if !fit { klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID) break } } if ctrfit { res.NodeList = append(res.NodeList, &score) } } return &res, nil }
计算完成之后从中选了一个节点进行调度。
//计算得分,拿到所有满足条件的节点 nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod) // 排序 sort.Sort(nodeScores) // 直接选择最后一个节点 m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1] // 返回结果 res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}} return &res, nil
到这里,我们已经拿到了最终要调度的 Node 了,调度逻辑就结束了。
这里大家可能会有疑问:
为什么 Filter 方法就只返回了一个节点,甚至还融合了打分的逻辑在里面。
这个问题在上一篇K8s 自定义调度器 Part1:通过 Scheduler Extender 实现自定义调度逻辑 中已经有解释了,如果按照正常逻辑实现 Filter、Score 等方法,最终 Scheduler 会汇总多个插件的打分,然后根据最终结果选择一个节点,
但是 HAMi 这边是要完全控制调度结果的,因此直接将 Filter、Score 逻辑融合到一起,最终就只返回一个目标节点
,这样最后肯定会调度到该节点。Bind 接口
很简单,直接根据 Filter 的返回结果,将 Pod 和 Node 绑定即可完成调度。
func (s *Scheduler) Bind(args extenderv1.ExtenderBindingArgs) (*extenderv1.ExtenderBindingResult, error) { klog.InfoS("Bind", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node) var err error var res *extenderv1.ExtenderBindingResult binding := &corev1.Binding{ ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID}, Target: corev1.ObjectReference{Kind: "Node", Name: args.Node}, } current, err := s.kubeClient.CoreV1().Pods(args.PodNamespace).Get(context.Background(), args.PodName, metav1.GetOptions{}) if err != nil { klog.ErrorS(err, "Get pod failed") } node, err := s.kubeClient.CoreV1().Nodes().Get(context.Background(), args.Node, metav1.GetOptions{}) if err != nil { klog.ErrorS(err, "Failed to get node", "node", args.Node) s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, fmt.Errorf("failed to get node %v", args.Node)) res = &extenderv1.ExtenderBindingResult{ Error: err.Error(), } return res, nil } tmppatch := make(map[string]string) for _, val := range device.GetDevices() { err = val.LockNode(node, current) if err != nil { goto ReleaseNodeLocks } } /* err = nodelock.LockNode(args.Node) if err != nil { klog.ErrorS(err, "Failed to lock node", "node", args.Node) res = &extenderv1.ExtenderBindingResult{ Error: err.Error(), } return res, nil }*/ //defer util.ReleaseNodeLock(args.Node) tmppatch[util.DeviceBindPhase] = "allocating" tmppatch[util.BindTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) err = util.PatchPodAnnotations(current, tmppatch) if err != nil { klog.ErrorS(err, "patch pod annotation failed") } if err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{}); err != nil { klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node) } if err == nil { s.recordScheduleBindingResultEvent(current, EventReasonBindingSucceed, []string{args.Node}, nil) res = &extenderv1.ExtenderBindingResult{ Error: "", } klog.Infoln("After Binding Process") return res, nil } ReleaseNodeLocks: klog.InfoS("bind failed", "err", err.Error()) for _, val := range device.GetDevices() { val.ReleaseNodeLock(node, current) } s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, err) return &extenderv1.ExtenderBindingResult{ Error: err.Error(), }, nil }
核心部分:
binding := &corev1.Binding{ ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID}, Target: corev1.ObjectReference{Kind: "Node", Name: args.Node}, } if err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{}); err != nil { klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node) }
调用 api 创建了一个 binding 对象,将 Pod 调度到指定节点即可。
至此,Scheduler 的逻辑我们就分析完了,调度完成 Kubelet 就开始启动 Pod 了,然后 hami 的 device plugin 也要开始发挥作用了。
小结
这里 HAMi 是使用默认的 kube-scheduler 镜像额外启动了一个 scheduler,但是通过配置把名称指定为了 hami-scheduler。
然后给这个 hami-scheduler 配置了 Extender,Extender 服务就是同 Pod 中的另一个 Container 启动的一个 http 服务。
ps:我们说的 hami-scheduler 一般只这部分 Extender 实现的调度插件
然后在调度可以分为两部分:
-
1)获取 GPU 信息
-
从 Node Annoations 中获取节点上的 GPU 资源信息
-
从 Pod Annoations 中获取 GPU 的使用情况
-
-
2)按配置策略进行节点选择并完成调度
- 直接在 Filter 接口按照得分排序后返回最推荐的一个节点,以实现完全控制调度结果
-
按照 GPU memory、core 剩余情况计算得分,剩余资源越多得分越低
【Kubernetes 系列】
持续更新中,搜索公众号【探索云原生
】订阅,阅读更多文章。5. 小结
本文主要分析了 hami-scheduler 的实现原理,其中包含两个组件:
-
Webhook
:根据 Pod Resource 中的 ResourceName 判断该 Pod 是否使用的 HAMi vGPU,如果是则修改 Pod 的 SchedulerName 为 hami-scheduler,让 hami-scheduler 进行调度。 -
Scheduler
:以 kube-shceduler 为镜像启动服务并改名为 hami-scheduler,然后通过配置 extender 接入真正的 hami-scheduler 逻辑。-
从 Node 的 Annoations 上解析拿到 GPU 资源信息,从已经运行的 Pod Annoations 上解析拿到 Pod 消耗的 GPU 资源计算出每个 Node 上真实可用的 GPU 资源
-
根据节点剩余资源进行打分,然后根据配置的 Spread、Binpack 调度策略选择得分最高或最低的节点,将 Pod 进行调度。
-
HAMi Webhook 、Scheduler 工作流程如下:
-
1)用户创建 Pod 并在 Pod 中申请了 vGPU 资源
-
2)
kube-apiserver 根据 MutatingWebhookConfiguration 配置请求 HAMi-Webhook
-
3)HAMi-Webhook 检测 Pod 中的 Resource,如果申请的由 HAMi 管理的 vGPU 资源,就会把 Pod 中的 SchedulerName 改成了 hami-scheduler,这样这个 Pod 就会由 hami-scheduler 进行调度了。
-
对于特权模式的 Pod,Webhook 会直接跳过不处理
-
对于使用 vGPU 资源但指定了 nodeName 的 Pod,Webhook 会直接拒绝
-
-
4)hami-scheduler 进行 Pod 调度,不过就是用的 k8s 的默认 kube-scheduler 镜像,因此调度逻辑和默认的 default-scheduler 是一样的,
但是 kube-scheduler 还会根据 KubeSchedulerConfiguration 配置,调用 Extender Scheduler 插件
-
这个 Extender Scheduler 就是 hami-scheduler Pod 中的另一个 Container,该 Container 同时提供了 Webhook 和 Scheduler 相关 API。
-
当 Pod 申请了 vGPU 资源时,kube-scheduler 就会根据配置以 HTTP 形式调用 Extender Scheduler 插件,这样就实现了自定义调度逻辑
-
-
5)Extender Scheduler 插件包含了真正的 hami 调度逻辑, 调度时根据节点剩余资源量进行打分选择节点
- 这里就包含了 spread & binpark 等 高级调度策略的实现
-
6)异步任务,包括 GPU 感知逻辑
- devicePlugin 中的后台 Goroutine 定时上报 Node 上的 GPU 资源并写入到 Node 的 Annoations
- 除了 DevicePlugin 之外,还使用异步任务以 Patch Annotation 方式提交更多信息
- Extender Scheduler 插件根据 Node Annoations 解析出 GPU 资源总量、从 Node 上已经运行的 Pod 的 Annoations 中解析出 GPU 使用量,计算出每个 Node 剩余的可用资源保存到内存供调度时使用
至此,HAMi Webhook、Scheduler 就分析完了,spread & binpark 等 高级调度策略是如何实现的留着下篇分析~。
这一切,似未曾拥有