⚠️ VeridianOS Kernel Documentation - This is low-level kernel code. All functions are unsafe unless explicitly marked otherwise. no_std

veridian_kernel/sched/
percpu_queue.rs

1//! Per-CPU Ready Queues with Work-Stealing
2//!
3//! Eliminates the global scheduler lock by giving each CPU its own run queue.
4//! When a CPU's queue is empty, it steals work from the busiest neighbor.
5//!
6//! ## Design
7//!
8//! - Each CPU has a `PerCpuQueue` protected by a per-CPU spin lock
9//! - `AtomicU32` run_length allows lock-free queue length queries
10//! - Work stealing takes half the victim's queue (from the back)
11//! - Steal threshold prevents thrashing on lightly loaded systems
12
13use alloc::collections::VecDeque;
14use core::sync::atomic::{AtomicU32, Ordering};
15
16use spin::Mutex;
17
18use crate::process::ProcessId;
19
20/// Maximum CPUs supported (matches smp::MAX_CPUS).
21const MAX_CPUS: usize = 16;
22
23/// Minimum queue depth before stealing is attempted.
24const STEAL_THRESHOLD: u32 = 2;
25
26/// Per-CPU run queue.
27pub struct PerCpuQueue {
28    /// Local run queue of process IDs.
29    queue: Mutex<VecDeque<ProcessId>>,
30    /// Atomic run length for lock-free queue length queries.
31    run_length: AtomicU32,
32}
33
34impl Default for PerCpuQueue {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl PerCpuQueue {
41    /// Create a new empty per-CPU queue.
42    pub const fn new() -> Self {
43        Self {
44            queue: Mutex::new(VecDeque::new()),
45            run_length: AtomicU32::new(0),
46        }
47    }
48
49    /// Push a process onto the local queue.
50    pub fn push(&self, pid: ProcessId) {
51        let mut q = self.queue.lock();
52        q.push_back(pid);
53        self.run_length.fetch_add(1, Ordering::Release);
54    }
55
56    /// Pop a process from the local queue (front = oldest).
57    pub fn pop(&self) -> Option<ProcessId> {
58        let mut q = self.queue.lock();
59        if let Some(pid) = q.pop_front() {
60            self.run_length.fetch_sub(1, Ordering::Release);
61            Some(pid)
62        } else {
63            None
64        }
65    }
66
67    /// Steal half the tasks from this queue (from the back = newest).
68    ///
69    /// Returns stolen tasks or an empty vec if queue is below threshold.
70    pub fn steal(&self) -> alloc::vec::Vec<ProcessId> {
71        let mut q = self.queue.lock();
72        let len = q.len();
73        if len < STEAL_THRESHOLD as usize {
74            return alloc::vec::Vec::new();
75        }
76
77        let steal_count = len / 2;
78        let mut stolen = alloc::vec::Vec::with_capacity(steal_count);
79
80        for _ in 0..steal_count {
81            if let Some(pid) = q.pop_back() {
82                stolen.push(pid);
83                self.run_length.fetch_sub(1, Ordering::Release);
84            }
85        }
86
87        stolen
88    }
89
90    /// Get the current queue length (lock-free).
91    pub fn queue_length(&self) -> u32 {
92        self.run_length.load(Ordering::Acquire)
93    }
94}
95
96/// Per-CPU scheduler managing all CPU queues.
97pub struct PerCpuScheduler {
98    /// One queue per CPU.
99    queues: [PerCpuQueue; MAX_CPUS],
100    /// Number of CPUs actually in use.
101    cpu_count: AtomicU32,
102}
103
104impl Default for PerCpuScheduler {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl PerCpuScheduler {
111    /// Create a new per-CPU scheduler.
112    #[allow(clippy::declare_interior_mutable_const)]
113    pub const fn new() -> Self {
114        const EMPTY_QUEUE: PerCpuQueue = PerCpuQueue::new();
115        Self {
116            queues: [EMPTY_QUEUE; MAX_CPUS],
117            cpu_count: AtomicU32::new(1),
118        }
119    }
120
121    /// Set the number of active CPUs.
122    pub fn set_cpu_count(&self, count: u32) {
123        self.cpu_count
124            .store(count.min(MAX_CPUS as u32), Ordering::Release);
125    }
126
127    /// Push a process onto a specific CPU's queue.
128    pub fn push(&self, cpu_id: usize, pid: ProcessId) {
129        if cpu_id < MAX_CPUS {
130            self.queues[cpu_id].push(pid);
131        }
132    }
133
134    /// Pop the next process from a specific CPU's queue.
135    pub fn pop(&self, cpu_id: usize) -> Option<ProcessId> {
136        if cpu_id < MAX_CPUS {
137            self.queues[cpu_id].pop()
138        } else {
139            None
140        }
141    }
142
143    /// Try to steal work from another CPU.
144    ///
145    /// Finds the busiest CPU and steals half its tasks.
146    pub fn steal_for(&self, cpu_id: usize) -> Option<ProcessId> {
147        let count = self.cpu_count.load(Ordering::Acquire) as usize;
148        let mut busiest = 0usize;
149        let mut max_len = 0u32;
150
151        for i in 0..count {
152            if i == cpu_id {
153                continue;
154            }
155            let len = self.queues[i].queue_length();
156            if len > max_len {
157                max_len = len;
158                busiest = i;
159            }
160        }
161
162        if max_len < STEAL_THRESHOLD {
163            return None;
164        }
165
166        let stolen = self.queues[busiest].steal();
167        if stolen.is_empty() {
168            return None;
169        }
170
171        // Push all but the first stolen task onto our queue
172        let mut first = None;
173        for pid in stolen {
174            if first.is_none() {
175                first = Some(pid);
176            } else {
177                self.queues[cpu_id].push(pid);
178            }
179        }
180
181        first
182    }
183
184    /// Find the least-loaded CPU.
185    pub fn find_least_loaded(&self) -> usize {
186        let count = self.cpu_count.load(Ordering::Acquire) as usize;
187        let mut min_len = u32::MAX;
188        let mut best = 0;
189
190        for i in 0..count {
191            let len = self.queues[i].queue_length();
192            if len < min_len {
193                min_len = len;
194                best = i;
195            }
196        }
197
198        best
199    }
200
201    /// Rebalance: move tasks from overloaded CPUs to underloaded ones.
202    pub fn rebalance(&self) {
203        let count = self.cpu_count.load(Ordering::Acquire) as usize;
204        if count < 2 {
205            return;
206        }
207
208        // Find min and max loaded CPUs
209        let mut min_cpu = 0;
210        let mut max_cpu = 0;
211        let mut min_len = u32::MAX;
212        let mut max_len = 0u32;
213
214        for i in 0..count {
215            let len = self.queues[i].queue_length();
216            if len < min_len {
217                min_len = len;
218                min_cpu = i;
219            }
220            if len > max_len {
221                max_len = len;
222                max_cpu = i;
223            }
224        }
225
226        // Only rebalance if imbalance exceeds threshold
227        if max_len > min_len + STEAL_THRESHOLD {
228            let stolen = self.queues[max_cpu].steal();
229            for pid in stolen {
230                self.queues[min_cpu].push(pid);
231            }
232        }
233    }
234
235    /// Get queue length for a specific CPU.
236    pub fn queue_length(&self, cpu_id: usize) -> u32 {
237        if cpu_id < MAX_CPUS {
238            self.queues[cpu_id].queue_length()
239        } else {
240            0
241        }
242    }
243}
244
245/// Global per-CPU scheduler instance.
246pub(crate) static PERCPU_SCHED: Mutex<Option<PerCpuScheduler>> = Mutex::new(None);
247
248/// Push a process onto the appropriate CPU's queue.
249pub fn percpu_push(cpu_id: usize, pid: ProcessId) {
250    if let Some(ref sched) = *PERCPU_SCHED.lock() {
251        sched.push(cpu_id, pid);
252    }
253}
254
255/// Pop the next process from a CPU's queue, with work-stealing fallback.
256pub fn percpu_pop(cpu_id: usize) -> Option<ProcessId> {
257    if let Some(ref sched) = *PERCPU_SCHED.lock() {
258        // Try local queue first
259        if let Some(pid) = sched.pop(cpu_id) {
260            return Some(pid);
261        }
262        // Try stealing from busiest neighbor
263        sched.steal_for(cpu_id)
264    } else {
265        None
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[test]
274    fn test_push_pop() {
275        let q = PerCpuQueue::new();
276        q.push(ProcessId(1));
277        q.push(ProcessId(2));
278        assert_eq!(q.queue_length(), 2);
279        assert_eq!(q.pop(), Some(ProcessId(1)));
280        assert_eq!(q.pop(), Some(ProcessId(2)));
281        assert_eq!(q.pop(), None);
282        assert_eq!(q.queue_length(), 0);
283    }
284
285    #[test]
286    fn test_steal() {
287        let q = PerCpuQueue::new();
288        q.push(ProcessId(1));
289        q.push(ProcessId(2));
290        q.push(ProcessId(3));
291        q.push(ProcessId(4));
292
293        let stolen = q.steal();
294        assert_eq!(stolen.len(), 2);
295        assert_eq!(q.queue_length(), 2);
296    }
297
298    #[test]
299    fn test_steal_empty() {
300        let q = PerCpuQueue::new();
301        let stolen = q.steal();
302        assert!(stolen.is_empty());
303    }
304
305    #[test]
306    fn test_steal_single() {
307        let q = PerCpuQueue::new();
308        q.push(ProcessId(1));
309        let stolen = q.steal();
310        assert!(stolen.is_empty()); // Below threshold
311    }
312
313    #[test]
314    fn test_percpu_scheduler() {
315        let sched = PerCpuScheduler::new();
316        sched.set_cpu_count(4);
317
318        sched.push(0, ProcessId(10));
319        sched.push(0, ProcessId(11));
320        sched.push(1, ProcessId(20));
321
322        assert_eq!(sched.queue_length(0), 2);
323        assert_eq!(sched.queue_length(1), 1);
324        assert_eq!(sched.find_least_loaded(), 2); // CPU 2 has 0 tasks
325    }
326
327    #[test]
328    fn test_rebalance() {
329        let sched = PerCpuScheduler::new();
330        sched.set_cpu_count(2);
331
332        // Load CPU 0 heavily
333        for i in 0..8 {
334            sched.push(0, ProcessId(i));
335        }
336
337        assert_eq!(sched.queue_length(0), 8);
338        assert_eq!(sched.queue_length(1), 0);
339
340        sched.rebalance();
341
342        // After rebalance, some work should have moved
343        assert!(sched.queue_length(0) < 8);
344        assert!(sched.queue_length(1) > 0);
345    }
346
347    #[test]
348    fn test_invalid_cpu() {
349        let sched = PerCpuScheduler::new();
350        assert_eq!(sched.pop(999), None);
351        assert_eq!(sched.queue_length(999), 0);
352    }
353}