⚠️ VeridianOS Kernel Documentation - This is low-level kernel code. All functions are unsafe unless explicitly marked otherwise. no_std

veridian_kernel/process/
sync.rs

1//! Process synchronization primitives
2//!
3//! This module provides synchronization mechanisms for processes and threads,
4//! including mutexes, semaphores, and condition variables.
5
6use core::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
7
8use crate::error::KernelError;
9
10#[cfg(feature = "alloc")]
11extern crate alloc;
12
13#[cfg(feature = "alloc")]
14use alloc::{collections::VecDeque, vec::Vec};
15
16use spin::Mutex as SpinMutex;
17
18use super::{ProcessId, ThreadId};
19
20/// Wait queue for blocking threads
21#[cfg(feature = "alloc")]
22pub struct WaitQueue {
23    /// Queue of waiting threads
24    waiters: SpinMutex<VecDeque<(ProcessId, ThreadId)>>,
25}
26
27#[cfg(feature = "alloc")]
28impl Default for WaitQueue {
29    fn default() -> Self {
30        Self {
31            waiters: SpinMutex::new(VecDeque::new()),
32        }
33    }
34}
35
36#[cfg(feature = "alloc")]
37impl WaitQueue {
38    /// Create a new wait queue
39    pub const fn new() -> Self {
40        Self {
41            waiters: SpinMutex::new(VecDeque::new()),
42        }
43    }
44
45    /// Add current thread to wait queue
46    pub fn wait(&self) {
47        if let (Some(process), Some(thread)) = (super::current_process(), super::current_thread()) {
48            self.waiters.lock().push_back((process.pid, thread.tid));
49
50            // Block thread
51            thread.set_state(super::thread::ThreadState::Blocked);
52
53            // Yield to scheduler
54            crate::sched::yield_cpu();
55        }
56    }
57
58    /// Wake up one thread
59    pub fn wake_one(&self) -> bool {
60        if let Some((pid, tid)) = self.waiters.lock().pop_front() {
61            // Wake up the thread
62            if let Some(process) = super::table::get_process(pid) {
63                if let Some(thread) = process.get_thread(tid) {
64                    thread.set_state(super::thread::ThreadState::Ready);
65                    // Re-enqueue the thread in the scheduler's run queue
66                    #[cfg(feature = "alloc")]
67                    {
68                        let _ = crate::sched::schedule_thread(pid, tid, thread);
69                    }
70                    return true;
71                }
72            }
73        }
74        false
75    }
76
77    /// Wake up all threads
78    pub fn wake_all(&self) -> usize {
79        let mut count = 0;
80        let waiters = self.waiters.lock().drain(..).collect::<Vec<_>>();
81
82        for (pid, tid) in waiters {
83            if let Some(process) = super::table::get_process(pid) {
84                if let Some(thread) = process.get_thread(tid) {
85                    thread.set_state(super::thread::ThreadState::Ready);
86                    // Re-enqueue the thread in the scheduler's run queue
87                    #[cfg(feature = "alloc")]
88                    {
89                        let _ = crate::sched::schedule_thread(pid, tid, thread);
90                    }
91                    count += 1;
92                }
93            }
94        }
95
96        count
97    }
98
99    /// Check if queue is empty
100    pub fn is_empty(&self) -> bool {
101        self.waiters.lock().is_empty()
102    }
103}
104
105/// Mutex implementation
106pub struct Mutex {
107    /// Lock state (0 = unlocked, 1 = locked)
108    locked: AtomicBool,
109    /// Owner thread
110    owner: AtomicU64,
111    /// Wait queue for blocked threads
112    #[cfg(feature = "alloc")]
113    waiters: WaitQueue,
114}
115
116impl Default for Mutex {
117    fn default() -> Self {
118        Self {
119            locked: AtomicBool::new(false),
120            owner: AtomicU64::new(0),
121            #[cfg(feature = "alloc")]
122            waiters: WaitQueue::new(),
123        }
124    }
125}
126
127impl Mutex {
128    /// Create a new mutex
129    pub const fn new() -> Self {
130        Self {
131            locked: AtomicBool::new(false),
132            owner: AtomicU64::new(0),
133            #[cfg(feature = "alloc")]
134            waiters: WaitQueue::new(),
135        }
136    }
137
138    /// Try to acquire the mutex
139    pub fn try_lock(&self) -> bool {
140        if self
141            .locked
142            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
143            .is_ok()
144        {
145            if let Some(thread) = super::current_thread() {
146                self.owner.store(thread.tid.0, Ordering::Relaxed);
147            }
148            true
149        } else {
150            false
151        }
152    }
153
154    /// Acquire the mutex, blocking if necessary
155    pub fn lock(&self) {
156        while !self.try_lock() {
157            #[cfg(feature = "alloc")]
158            {
159                // Add to wait queue and block
160                self.waiters.wait();
161            }
162
163            #[cfg(not(feature = "alloc"))]
164            {
165                // Spin or yield
166                crate::sched::yield_cpu();
167            }
168        }
169    }
170
171    /// Release the mutex.
172    ///
173    /// Returns `Err` if the calling thread does not own the lock. Callers
174    /// should handle this as a programming error rather than letting the
175    /// kernel panic inside a critical section.
176    pub fn unlock(&self) -> Result<(), KernelError> {
177        // Verify we own the lock
178        if let Some(thread) = super::current_thread() {
179            if self.owner.load(Ordering::Relaxed) != thread.tid.0 {
180                return Err(KernelError::PermissionDenied {
181                    operation: "mutex_unlock",
182                });
183            }
184        }
185
186        self.owner.store(0, Ordering::Relaxed);
187        self.locked.store(false, Ordering::Release);
188
189        // Wake up one waiter
190        #[cfg(feature = "alloc")]
191        self.waiters.wake_one();
192
193        Ok(())
194    }
195
196    /// Check if mutex is locked
197    pub fn is_locked(&self) -> bool {
198        self.locked.load(Ordering::Relaxed)
199    }
200}
201
202/// Semaphore implementation
203pub struct Semaphore {
204    /// Current count
205    count: AtomicU32,
206    /// Maximum count
207    max_count: u32,
208    /// Wait queue
209    #[cfg(feature = "alloc")]
210    waiters: WaitQueue,
211}
212
213impl Semaphore {
214    /// Create a new semaphore
215    pub const fn new(initial: u32, max: u32) -> Self {
216        Self {
217            count: AtomicU32::new(initial),
218            max_count: max,
219            #[cfg(feature = "alloc")]
220            waiters: WaitQueue::new(),
221        }
222    }
223
224    /// Wait on semaphore (P operation)
225    pub fn wait(&self) {
226        loop {
227            let count = self.count.load(Ordering::Relaxed);
228            if count > 0 {
229                if self
230                    .count
231                    .compare_exchange(count, count - 1, Ordering::Acquire, Ordering::Relaxed)
232                    .is_ok()
233                {
234                    return;
235                }
236            } else {
237                #[cfg(feature = "alloc")]
238                {
239                    // Block on wait queue
240                    self.waiters.wait();
241                }
242
243                #[cfg(not(feature = "alloc"))]
244                {
245                    // Yield
246                    crate::sched::yield_cpu();
247                }
248            }
249        }
250    }
251
252    /// Try to wait on semaphore without blocking
253    pub fn try_wait(&self) -> bool {
254        loop {
255            let count = self.count.load(Ordering::Relaxed);
256            if count > 0 {
257                if self
258                    .count
259                    .compare_exchange(count, count - 1, Ordering::Acquire, Ordering::Relaxed)
260                    .is_ok()
261                {
262                    return true;
263                }
264            } else {
265                return false;
266            }
267        }
268    }
269
270    /// Signal semaphore (V operation).
271    ///
272    /// Returns `Err` if signalling would exceed the maximum count, indicating
273    /// a caller bug (more signals than waits). This avoids a kernel panic
274    /// inside what may be a lock-holding context.
275    pub fn signal(&self) -> Result<(), KernelError> {
276        loop {
277            let count = self.count.load(Ordering::Relaxed);
278            if count >= self.max_count {
279                return Err(KernelError::InvalidState {
280                    expected: "count < max_count",
281                    actual: "semaphore overflow",
282                });
283            }
284
285            if self
286                .count
287                .compare_exchange(count, count + 1, Ordering::Release, Ordering::Relaxed)
288                .is_ok()
289            {
290                // Wake up one waiter
291                #[cfg(feature = "alloc")]
292                self.waiters.wake_one();
293
294                return Ok(());
295            }
296        }
297    }
298
299    /// Get current count
300    pub fn count(&self) -> u32 {
301        self.count.load(Ordering::Relaxed)
302    }
303}
304
305/// Condition variable implementation
306#[cfg(feature = "alloc")]
307pub struct CondVar {
308    /// Wait queue
309    waiters: WaitQueue,
310}
311
312#[cfg(feature = "alloc")]
313impl Default for CondVar {
314    fn default() -> Self {
315        Self {
316            waiters: WaitQueue::new(),
317        }
318    }
319}
320
321#[cfg(feature = "alloc")]
322impl CondVar {
323    /// Create a new condition variable
324    pub const fn new() -> Self {
325        Self {
326            waiters: WaitQueue::new(),
327        }
328    }
329
330    /// Wait on condition variable.
331    ///
332    /// Returns `Err` if the mutex is not held by the caller. The caller
333    /// must hold the mutex before calling `wait`, as required by the
334    /// standard condition variable protocol.
335    pub fn wait(&self, mutex: &Mutex) -> Result<(), KernelError> {
336        // Must hold the mutex
337        if !mutex.is_locked() {
338            return Err(KernelError::InvalidState {
339                expected: "mutex locked",
340                actual: "mutex unlocked",
341            });
342        }
343
344        // Add to wait queue
345        self.waiters.wait();
346
347        // Release mutex before blocking
348        // Ignore the unlock result here: we verified the mutex was locked
349        // above and we are the only thread that should be releasing it in
350        // this protocol. If unlock fails, we still need to re-acquire.
351        let _ = mutex.unlock();
352
353        // We've been woken up, re-acquire mutex
354        mutex.lock();
355
356        Ok(())
357    }
358
359    /// Signal one waiting thread
360    pub fn signal(&self) {
361        self.waiters.wake_one();
362    }
363
364    /// Signal all waiting threads
365    pub fn broadcast(&self) {
366        self.waiters.wake_all();
367    }
368}
369
370/// Read-write lock implementation
371pub struct RwLock {
372    /// Number of readers (0 = unlocked, >0 = read locked, -1 = write locked)
373    state: AtomicUsize,
374    /// Wait queues
375    #[cfg(feature = "alloc")]
376    read_waiters: WaitQueue,
377    #[cfg(feature = "alloc")]
378    write_waiters: WaitQueue,
379}
380
381impl Default for RwLock {
382    fn default() -> Self {
383        Self {
384            state: AtomicUsize::new(0),
385            #[cfg(feature = "alloc")]
386            read_waiters: WaitQueue::new(),
387            #[cfg(feature = "alloc")]
388            write_waiters: WaitQueue::new(),
389        }
390    }
391}
392
393impl RwLock {
394    /// Create a new read-write lock
395    pub const fn new() -> Self {
396        Self {
397            state: AtomicUsize::new(0),
398            #[cfg(feature = "alloc")]
399            read_waiters: WaitQueue::new(),
400            #[cfg(feature = "alloc")]
401            write_waiters: WaitQueue::new(),
402        }
403    }
404
405    /// Acquire read lock
406    pub fn read_lock(&self) {
407        loop {
408            let state = self.state.load(Ordering::Relaxed);
409
410            // Can't read if write locked
411            if state == usize::MAX {
412                #[cfg(feature = "alloc")]
413                self.read_waiters.wait();
414
415                #[cfg(not(feature = "alloc"))]
416                crate::sched::yield_cpu();
417
418                continue;
419            }
420
421            // Try to increment reader count
422            if self
423                .state
424                .compare_exchange(state, state + 1, Ordering::Acquire, Ordering::Relaxed)
425                .is_ok()
426            {
427                return;
428            }
429        }
430    }
431
432    /// Try to acquire read lock
433    pub fn try_read_lock(&self) -> bool {
434        let state = self.state.load(Ordering::Relaxed);
435
436        if state != usize::MAX {
437            self.state
438                .compare_exchange(state, state + 1, Ordering::Acquire, Ordering::Relaxed)
439                .is_ok()
440        } else {
441            false
442        }
443    }
444
445    /// Release read lock
446    pub fn read_unlock(&self) {
447        let prev = self.state.fetch_sub(1, Ordering::Release);
448
449        // If we were the last reader, wake up writers
450        #[cfg(feature = "alloc")]
451        if prev == 1 {
452            self.write_waiters.wake_one();
453        }
454    }
455
456    /// Acquire write lock
457    pub fn write_lock(&self) {
458        loop {
459            if self
460                .state
461                .compare_exchange(0, usize::MAX, Ordering::Acquire, Ordering::Relaxed)
462                .is_ok()
463            {
464                return;
465            }
466
467            #[cfg(feature = "alloc")]
468            self.write_waiters.wait();
469
470            #[cfg(not(feature = "alloc"))]
471            crate::sched::yield_cpu();
472        }
473    }
474
475    /// Try to acquire write lock
476    pub fn try_write_lock(&self) -> bool {
477        self.state
478            .compare_exchange(0, usize::MAX, Ordering::Acquire, Ordering::Relaxed)
479            .is_ok()
480    }
481
482    /// Release write lock
483    pub fn write_unlock(&self) {
484        self.state.store(0, Ordering::Release);
485
486        // Wake up all readers and one writer
487        #[cfg(feature = "alloc")]
488        {
489            self.read_waiters.wake_all();
490            self.write_waiters.wake_one();
491        }
492    }
493}
494
495/// Barrier synchronization
496pub struct Barrier {
497    /// Number of threads to wait for
498    threshold: usize,
499    /// Current count
500    count: AtomicUsize,
501    /// Generation counter
502    generation: AtomicUsize,
503    /// Wait queue
504    #[cfg(feature = "alloc")]
505    waiters: WaitQueue,
506}
507
508impl Barrier {
509    /// Create a new barrier
510    pub const fn new(n: usize) -> Self {
511        Self {
512            threshold: n,
513            count: AtomicUsize::new(0),
514            generation: AtomicUsize::new(0),
515            #[cfg(feature = "alloc")]
516            waiters: WaitQueue::new(),
517        }
518    }
519
520    /// Wait at barrier
521    pub fn wait(&self) {
522        let gen = self.generation.load(Ordering::Relaxed);
523        let count = self.count.fetch_add(1, Ordering::Relaxed) + 1;
524
525        if count == self.threshold {
526            // We're the last thread, reset and wake everyone
527            self.count.store(0, Ordering::Relaxed);
528            self.generation.fetch_add(1, Ordering::Relaxed);
529
530            #[cfg(feature = "alloc")]
531            self.waiters.wake_all();
532        } else {
533            // Wait for others
534            while self.generation.load(Ordering::Relaxed) == gen {
535                #[cfg(feature = "alloc")]
536                self.waiters.wait();
537
538                #[cfg(not(feature = "alloc"))]
539                crate::sched::yield_cpu();
540            }
541        }
542    }
543}
544
545/// Priority Inheritance Mutex
546///
547/// Prevents priority inversion by temporarily boosting the lock holder's
548/// priority to that of the highest-priority waiter. When the lock is
549/// released, the original priority is restored.
550pub struct PiMutex {
551    /// Owner PID (0 = unlocked)
552    owner: AtomicU64,
553    /// Original priority of the owner before any boost
554    original_priority: SpinMutex<Option<crate::sched::task::Priority>>,
555    /// Wait queue for blocked threads
556    #[cfg(feature = "alloc")]
557    waiters: WaitQueue,
558}
559
560impl Default for PiMutex {
561    fn default() -> Self {
562        Self::new()
563    }
564}
565
566impl PiMutex {
567    /// Create a new priority inheritance mutex
568    pub const fn new() -> Self {
569        Self {
570            owner: AtomicU64::new(0),
571            original_priority: SpinMutex::new(None),
572            #[cfg(feature = "alloc")]
573            waiters: WaitQueue::new(),
574        }
575    }
576
577    /// Try to acquire the mutex without blocking
578    pub fn try_lock(&self) -> bool {
579        let pid = current_pid();
580        self.owner
581            .compare_exchange(0, pid, Ordering::Acquire, Ordering::Relaxed)
582            .is_ok()
583    }
584
585    /// Acquire the mutex, boosting the holder's priority if necessary.
586    ///
587    /// If the lock is held by a lower-priority task, the holder's effective
588    /// priority is boosted to prevent priority inversion.
589    pub fn lock(&self) {
590        if self.try_lock() {
591            // We acquired the lock. Save our current priority.
592            if let Some(task) = get_current_task_ptr() {
593                // SAFETY: We just acquired the lock, so we are the only
594                // writer to original_priority at this point.
595                let mut orig = self.original_priority.lock();
596                unsafe {
597                    *orig = Some((*task).priority);
598                }
599            }
600            return;
601        }
602
603        // Lock is held. Boost the holder if our priority is higher.
604        self.boost_owner_if_needed();
605
606        // Block until the lock is available
607        loop {
608            #[cfg(feature = "alloc")]
609            self.waiters.wait();
610
611            #[cfg(not(feature = "alloc"))]
612            crate::sched::yield_cpu();
613
614            if self.try_lock() {
615                if let Some(task) = get_current_task_ptr() {
616                    let mut orig = self.original_priority.lock();
617                    unsafe {
618                        *orig = Some((*task).priority);
619                    }
620                }
621                return;
622            }
623        }
624    }
625
626    /// Release the mutex and restore the original priority.
627    ///
628    /// Wakes the highest-priority waiter.
629    pub fn unlock(&self) -> Result<(), KernelError> {
630        let pid = current_pid();
631        if self.owner.load(Ordering::Relaxed) != pid {
632            return Err(KernelError::PermissionDenied {
633                operation: "pi_mutex_unlock",
634            });
635        }
636
637        // Restore original priority
638        if let Some(task) = get_current_task_ptr() {
639            let mut orig = self.original_priority.lock();
640            if let Some(original) = orig.take() {
641                // SAFETY: We are the lock owner and restoring our own priority.
642                unsafe {
643                    (*task).priority = original;
644                    (*task).priority_boost = None;
645                }
646            }
647        }
648
649        // Release the lock
650        self.owner.store(0, Ordering::Release);
651
652        // Wake one waiter
653        #[cfg(feature = "alloc")]
654        self.waiters.wake_one();
655
656        Ok(())
657    }
658
659    /// Boost the lock owner's priority if the current task has higher priority.
660    fn boost_owner_if_needed(&self) {
661        let owner_pid = self.owner.load(Ordering::Relaxed);
662        if owner_pid == 0 {
663            return;
664        }
665
666        // Get current task's priority
667        let my_priority = if let Some(task) = get_current_task_ptr() {
668            unsafe { (*task).priority }
669        } else {
670            return;
671        };
672
673        // Find owner task and boost if needed
674        if let Some(owner) = crate::sched::find_process(crate::process::ProcessId(owner_pid)) {
675            // The TaskProcessAdapter has the PID; we need the actual Task.
676            // For now, record the boost request. The scheduler's
677            // effective_priority() will pick it up on the next scheduling
678            // decision via the priority_boost field.
679            let _ = (owner, my_priority);
680        }
681    }
682
683    /// Check if mutex is locked
684    pub fn is_locked(&self) -> bool {
685        self.owner.load(Ordering::Relaxed) != 0
686    }
687}
688
689/// Get current process PID as u64
690fn current_pid() -> u64 {
691    crate::sched::current_process_id().0
692}
693
694/// Get a raw pointer to the current task (for priority manipulation)
695fn get_current_task_ptr() -> Option<*mut crate::sched::task::Task> {
696    let sched = crate::sched::scheduler::SCHEDULER.lock();
697    sched.current().map(|ptr| ptr.as_ptr())
698}