Scheduling Queue
By Li Xiang
- 6 minutes read - 2703 words - 89 viewsscheduling-queue介绍,以及scheduling-queue运行原理剖析
简介:
Scheduling-queue是scheduler存储待调度Pod的队列,1.18版本scheduling-queue为PriorityQueue,结构如下:
type PriorityQueue struct {
stop chan struct{}
clock util.Clock
// pod initial backoff duration.
podInitialBackoffDuration time.Duration
// pod maximum backoff duration.
podMaxBackoffDuration time.Duration
lock sync.RWMutex
cond sync.Cond
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
activeQ *heap.Heap 存放待调度的Pod,每次调度从中pop一个pod出来
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
podBackoffQ *heap.Heap 存放backoff状态的pod
// unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap 存放Unschedulable状态的**pod
// nominatedPods is a structures that stores pods which are nominated to run
// on nodes.
nominatedPods *nominatedPodMap
// schedulingCycle represents sequence number of scheduling cycle and is incremented
// when a pod is popped.
schedulingCycle int64 递增的序列,类似ETCD的Index,每次pop pod时增加
// moveRequestCycle caches the sequence number of scheduling cycle when we
// received a move request. Unscheduable pods in and before this scheduling
// cycle will be put back to activeQueue if we were trying to schedule them
// when we received move request.
moveRequestCycle int64 记录上一次移动unschedulableQ的pod到podBackoffQ的序列
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closed bool
}
activeQ
数据结构
activeQ是PriorityQueue实现pod排序、存储的结构,activeQ是一个Heap,里面按照优先级顺序存放了待调度的pod,Heap的所有功能是由data结构实现的。
// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {
// data stores objects and has a queue that keeps their ordering according
// to the heap invariant.
data *data
// metricRecorder updates the counter when elements of a heap get added or
// removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
}
堆的实现
Heap的data实现了golang里面的堆接口。
type data struct {
// items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa.
items map[string]*heapItem
// queue implements a heap data structure and keeps the order of elements
// according to the heap invariant. The queue keeps the keys of objects stored
// in "items".
queue []string
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
// lessFunc is used to compare two objects in the heap.
lessFunc lessFunc
}
- item是存放了所有的pod信息的map,key是keyFunc通过pod信息生成的string,value是pod信息。
- queue是一个堆,里面按顺序存放了所有pod的key。优先级越高的pod在queue中的index越小,Pod加入堆的优先级比较由lessFunc实现。
- keyFunc用于生成pod的key。
- lessFunc用于比较两个pod的优先级,在pod加入时使用,入参是两个pod信息,当第一个pod在queue中的位置比第二个靠前时返回true。 (activQ的lessFunc可以通过QueueSort插件来自定义。)
lessFunc初始化:
初始化时,activeQ的lessFunc使用的是第一个scheduler的第一个QueueSort插件,也就是说所有的scheduler都使用同一种QueueSort插件, 多个QueueSort插件配置了也只有第一个生效(一个scheduler进程只有一个scheduling-queue,所以只能有一个QueueSort插件)。
// Profiles are required to have equivalent queue sort plugins.
lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
)
// QueueSortFunc returns the function to sort pods in scheduling queue
func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
if f == nil {
// If frameworkImpl is nil, simply keep their order unchanged.
// NOTE: this is primarily for tests.
return func(_, _ *framework.QueuedPodInfo) bool { return false }
}
if len(f.queueSortPlugins) == 0 {
panic("No QueueSort plugin is registered in the frameworkImpl.")
}
// Only one QueueSort plugin can be enabled.
return f.queueSortPlugins[0].Less
}
默认QueueSort插件的lessFunc:
// Less is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodQueueInfo.timestamp.
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
p1 := corev1helpers.PodPriority(pInfo1.Pod)
p2 := corev1helpers.PodPriority(pInfo2.Pod)
return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}
通过比较pod的Spec.Priority来判断优先级,如果pod该字段为空,则默认为0。Priority相同的两个pod,则先加入scheduling-queue的pod优先级高(即FIFO)
podBackoffQ
数据结构
podBackoffQ是PriorityQueue实现调度失败pod的backoff机制的队列,pod的backoff时间是 2^Pod尝试调度次数
秒(最多为10秒),从pod上次加入scheduling-queue的时间开始计算,按照pod backoff到期时间顺序排序,先到期的pod排在前面,podBackoffQ的数据结构与activeQ相同,都是一个Heap,lessFunc如下:
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.QueuedPodInfo)
pInfo2 := podInfo2.(*framework.QueuedPodInfo)
bo1 := p.getBackoffTime(pInfo1)
bo2 := p.getBackoffTime(pInfo2)
return bo1.Before(bo2)
}
backoff时间先到的pod优先级高。
unschedulableQ
数据结构
unschedulableQ是一个UnschedulablePodsMap
type UnschedulablePodsMap struct {
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the QueuedPodInfo.
podInfoMap map[string]*framework.QueuedPodInfo
keyFunc func(*v1.Pod) string
// metricRecorder updates the counter when elements of an unschedulablePodsMap
// get added or removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
}
unschedulableQ用于PriorityQueue存储调度失败的Pod,当收到集群变化的特定事件或距离pod上次调度已经过了60秒时(每30s轮询一次),会触发将unschedulableQ中的Pod放入podBackoffQ(backoff时间未到)或activeQ(已经过了pod的backoff时间)的move操作。
运行原理
pod流向
上图是一个pod在PriorityQueue中的流向,图中每一步分别为:
- Informer watch到Pod的Add,调用PriorityQueue的Add将Pod加入activQ中
- Scheduler调度完一个pod后,通过NextPodFunc从activQ中Pop一个pod进行调度
- Scheduler调度Pod失败,记录当前的schedulingCycle,根据情况对Pod处理:
- 记录的schedulingCycle <= moveRequestCycle,说明在Pod调度失败后进行了把pod从unschedulableQ移出的move操作,直接将Pod加入到podBackoffQ
- 记录的schedulingCycle > moveRequestCycle,说明调度失败后未发生过move操作,将Pod加入unschedulableQ
- 当Pod在unschedulableQ中存放时间超过60秒(每30秒检查一次)或有特定事件(pod bound、Node add、pod delete、CSINode add、Pv add、PvcAdd、Service、StorageClass等相关事件)时,会触发一次unschedulableQ的move,对unschedulableQ中的所有pod进行:
- pod的backoff时间已过,将pod加入activQ
- pod的backoff时间未到,将pod加入podBackoffQ
- 当Pod的backoff时间到期时,会将Pod从podBackoffQ取出加入activQ(每秒检查一次)