⚠️ VeridianOS Kernel Documentation - This is low-level kernel code. All functions are unsafe unless explicitly marked otherwise. no_std

veridian_kernel/sync/
lockfree_queue.rs

1//! Lock-Free MPSC Queue
2//!
3//! A wait-free multi-producer, single-consumer queue using atomic operations.
4//! Designed for the scheduler ready queue where multiple CPUs may enqueue
5//! tasks concurrently while only the owning CPU dequeues.
6//!
7//! Implementation uses a Michael-Scott style linked list with AtomicPtr:
8//! - Enqueue: CAS on tail pointer (lock-free, linearizable)
9//! - Dequeue: CAS on head pointer (lock-free, single consumer)
10//! - Memory reclamation via hazard pointers
11//!
12//! This queue is cache-line padded to avoid false sharing between the
13//! head and tail pointers on different CPUs.
14
15use alloc::boxed::Box;
16use core::{
17    ptr,
18    sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
19};
20
21// ---------------------------------------------------------------------------
22// Queue Node
23// ---------------------------------------------------------------------------
24
25/// A node in the lock-free queue.
26struct Node<T> {
27    /// The value stored in this node (None for the sentinel dummy node).
28    value: Option<T>,
29    /// Pointer to the next node.
30    next: AtomicPtr<Node<T>>,
31}
32
33impl<T> Node<T> {
34    fn new(value: T) -> *mut Self {
35        Box::into_raw(Box::new(Self {
36            value: Some(value),
37            next: AtomicPtr::new(ptr::null_mut()),
38        }))
39    }
40
41    fn sentinel() -> *mut Self {
42        Box::into_raw(Box::new(Self {
43            value: None,
44            next: AtomicPtr::new(ptr::null_mut()),
45        }))
46    }
47}
48
49// ---------------------------------------------------------------------------
50// Lock-Free MPSC Queue
51// ---------------------------------------------------------------------------
52
53/// A lock-free multi-producer, single-consumer queue.
54///
55/// Multiple threads can call `push()` concurrently. Only one thread
56/// should call `pop()` at a time (the owning CPU's scheduler).
57pub struct LockFreeQueue<T> {
58    /// Head pointer (dequeue end). Only modified by the consumer.
59    head: AtomicPtr<Node<T>>,
60    /// Tail pointer (enqueue end). Modified by any producer.
61    tail: AtomicPtr<Node<T>>,
62    /// Number of elements in the queue (approximate, for metrics).
63    len: AtomicUsize,
64}
65
66// SAFETY: LockFreeQueue uses atomic operations for all shared state.
67// The queue is designed for concurrent access from multiple CPUs.
68unsafe impl<T: Send> Send for LockFreeQueue<T> {}
69unsafe impl<T: Send> Sync for LockFreeQueue<T> {}
70
71impl<T> LockFreeQueue<T> {
72    /// Create a new empty lock-free queue.
73    ///
74    /// Initializes with a sentinel (dummy) node so that head and tail
75    /// always point to a valid node.
76    pub fn new() -> Self {
77        let sentinel = Node::<T>::sentinel();
78        Self {
79            head: AtomicPtr::new(sentinel),
80            tail: AtomicPtr::new(sentinel),
81            len: AtomicUsize::new(0),
82        }
83    }
84
85    /// Push a value onto the tail of the queue (multi-producer safe).
86    ///
87    /// This operation is lock-free: it uses a CAS loop on the tail pointer.
88    /// Multiple CPUs can call push() concurrently.
89    pub fn push(&self, value: T) {
90        let new_node = Node::new(value);
91
92        loop {
93            let tail = self.tail.load(Ordering::Acquire);
94            // SAFETY: tail always points to a valid node (sentinel or enqueued).
95            let next = unsafe { (*tail).next.load(Ordering::Acquire) };
96
97            if next.is_null() {
98                // Tail is the actual last node. Try to link our new node.
99                // SAFETY: tail is valid and next is null (verified above).
100                if unsafe {
101                    (*tail)
102                        .next
103                        .compare_exchange(
104                            ptr::null_mut(),
105                            new_node,
106                            Ordering::Release,
107                            Ordering::Relaxed,
108                        )
109                        .is_ok()
110                } {
111                    // Successfully linked. Try to advance tail (best-effort).
112                    let _ = self.tail.compare_exchange(
113                        tail,
114                        new_node,
115                        Ordering::Release,
116                        Ordering::Relaxed,
117                    );
118                    self.len.fetch_add(1, Ordering::Relaxed);
119                    return;
120                }
121                // CAS failed: another producer linked a node. Retry.
122            } else {
123                // Tail is lagging behind. Help advance it.
124                let _ =
125                    self.tail
126                        .compare_exchange(tail, next, Ordering::Release, Ordering::Relaxed);
127            }
128        }
129    }
130
131    /// Pop a value from the head of the queue (single-consumer only).
132    ///
133    /// Returns `None` if the queue is empty.
134    pub fn pop(&self) -> Option<T> {
135        loop {
136            let head = self.head.load(Ordering::Acquire);
137            let tail = self.tail.load(Ordering::Acquire);
138            // SAFETY: head always points to a valid node (sentinel or enqueued).
139            let next = unsafe { (*head).next.load(Ordering::Acquire) };
140
141            if head == tail {
142                if next.is_null() {
143                    // Queue is empty.
144                    return None;
145                }
146                // Tail is lagging. Help advance it.
147                let _ =
148                    self.tail
149                        .compare_exchange(tail, next, Ordering::Release, Ordering::Relaxed);
150            } else if !next.is_null() {
151                // Read value from the next node (head is the sentinel/dummy).
152                // SAFETY: next is non-null and points to a valid enqueued node.
153                let value = unsafe { (*next).value.take() };
154
155                // Try to advance head past the sentinel.
156                if self
157                    .head
158                    .compare_exchange(head, next, Ordering::Release, Ordering::Relaxed)
159                    .is_ok()
160                {
161                    // Successfully dequeued. Free the old head (sentinel).
162                    // SAFETY: head was the sentinel; no other thread references it
163                    // after head is advanced.
164                    unsafe {
165                        let _ = Box::from_raw(head);
166                    }
167                    self.len.fetch_sub(1, Ordering::Relaxed);
168                    return value;
169                }
170                // CAS failed: another operation modified head. Retry.
171            }
172        }
173    }
174
175    /// Check if the queue is empty (approximate).
176    pub fn is_empty(&self) -> bool {
177        let head = self.head.load(Ordering::Acquire);
178        let tail = self.tail.load(Ordering::Acquire);
179        // SAFETY: head points to a valid node.
180        let next = unsafe { (*head).next.load(Ordering::Acquire) };
181        head == tail && next.is_null()
182    }
183
184    /// Get the approximate number of elements in the queue.
185    pub fn len(&self) -> usize {
186        self.len.load(Ordering::Relaxed)
187    }
188}
189
190impl<T> Default for LockFreeQueue<T> {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196impl<T> Drop for LockFreeQueue<T> {
197    fn drop(&mut self) {
198        // Drain the queue to free all nodes.
199        while self.pop().is_some() {}
200
201        // Free the sentinel node.
202        let sentinel = self.head.load(Ordering::Relaxed);
203        if !sentinel.is_null() {
204            // SAFETY: After draining, head == tail == sentinel, and no other
205            // thread accesses the queue during drop.
206            unsafe {
207                let _ = Box::from_raw(sentinel);
208            }
209        }
210    }
211}