veridian_kernel/sync/lockfree_queue.rs
1//! Lock-Free MPSC Queue
2//!
3//! A wait-free multi-producer, single-consumer queue using atomic operations.
4//! Designed for the scheduler ready queue where multiple CPUs may enqueue
5//! tasks concurrently while only the owning CPU dequeues.
6//!
7//! Implementation uses a Michael-Scott style linked list with AtomicPtr:
8//! - Enqueue: CAS on tail pointer (lock-free, linearizable)
9//! - Dequeue: CAS on head pointer (lock-free, single consumer)
10//! - Memory reclamation via hazard pointers
11//!
12//! This queue is cache-line padded to avoid false sharing between the
13//! head and tail pointers on different CPUs.
14
15use alloc::boxed::Box;
16use core::{
17 ptr,
18 sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
19};
20
21// ---------------------------------------------------------------------------
22// Queue Node
23// ---------------------------------------------------------------------------
24
25/// A node in the lock-free queue.
26struct Node<T> {
27 /// The value stored in this node (None for the sentinel dummy node).
28 value: Option<T>,
29 /// Pointer to the next node.
30 next: AtomicPtr<Node<T>>,
31}
32
33impl<T> Node<T> {
34 fn new(value: T) -> *mut Self {
35 Box::into_raw(Box::new(Self {
36 value: Some(value),
37 next: AtomicPtr::new(ptr::null_mut()),
38 }))
39 }
40
41 fn sentinel() -> *mut Self {
42 Box::into_raw(Box::new(Self {
43 value: None,
44 next: AtomicPtr::new(ptr::null_mut()),
45 }))
46 }
47}
48
49// ---------------------------------------------------------------------------
50// Lock-Free MPSC Queue
51// ---------------------------------------------------------------------------
52
53/// A lock-free multi-producer, single-consumer queue.
54///
55/// Multiple threads can call `push()` concurrently. Only one thread
56/// should call `pop()` at a time (the owning CPU's scheduler).
57pub struct LockFreeQueue<T> {
58 /// Head pointer (dequeue end). Only modified by the consumer.
59 head: AtomicPtr<Node<T>>,
60 /// Tail pointer (enqueue end). Modified by any producer.
61 tail: AtomicPtr<Node<T>>,
62 /// Number of elements in the queue (approximate, for metrics).
63 len: AtomicUsize,
64}
65
66// SAFETY: LockFreeQueue uses atomic operations for all shared state.
67// The queue is designed for concurrent access from multiple CPUs.
68unsafe impl<T: Send> Send for LockFreeQueue<T> {}
69unsafe impl<T: Send> Sync for LockFreeQueue<T> {}
70
71impl<T> LockFreeQueue<T> {
72 /// Create a new empty lock-free queue.
73 ///
74 /// Initializes with a sentinel (dummy) node so that head and tail
75 /// always point to a valid node.
76 pub fn new() -> Self {
77 let sentinel = Node::<T>::sentinel();
78 Self {
79 head: AtomicPtr::new(sentinel),
80 tail: AtomicPtr::new(sentinel),
81 len: AtomicUsize::new(0),
82 }
83 }
84
85 /// Push a value onto the tail of the queue (multi-producer safe).
86 ///
87 /// This operation is lock-free: it uses a CAS loop on the tail pointer.
88 /// Multiple CPUs can call push() concurrently.
89 pub fn push(&self, value: T) {
90 let new_node = Node::new(value);
91
92 loop {
93 let tail = self.tail.load(Ordering::Acquire);
94 // SAFETY: tail always points to a valid node (sentinel or enqueued).
95 let next = unsafe { (*tail).next.load(Ordering::Acquire) };
96
97 if next.is_null() {
98 // Tail is the actual last node. Try to link our new node.
99 // SAFETY: tail is valid and next is null (verified above).
100 if unsafe {
101 (*tail)
102 .next
103 .compare_exchange(
104 ptr::null_mut(),
105 new_node,
106 Ordering::Release,
107 Ordering::Relaxed,
108 )
109 .is_ok()
110 } {
111 // Successfully linked. Try to advance tail (best-effort).
112 let _ = self.tail.compare_exchange(
113 tail,
114 new_node,
115 Ordering::Release,
116 Ordering::Relaxed,
117 );
118 self.len.fetch_add(1, Ordering::Relaxed);
119 return;
120 }
121 // CAS failed: another producer linked a node. Retry.
122 } else {
123 // Tail is lagging behind. Help advance it.
124 let _ =
125 self.tail
126 .compare_exchange(tail, next, Ordering::Release, Ordering::Relaxed);
127 }
128 }
129 }
130
131 /// Pop a value from the head of the queue (single-consumer only).
132 ///
133 /// Returns `None` if the queue is empty.
134 pub fn pop(&self) -> Option<T> {
135 loop {
136 let head = self.head.load(Ordering::Acquire);
137 let tail = self.tail.load(Ordering::Acquire);
138 // SAFETY: head always points to a valid node (sentinel or enqueued).
139 let next = unsafe { (*head).next.load(Ordering::Acquire) };
140
141 if head == tail {
142 if next.is_null() {
143 // Queue is empty.
144 return None;
145 }
146 // Tail is lagging. Help advance it.
147 let _ =
148 self.tail
149 .compare_exchange(tail, next, Ordering::Release, Ordering::Relaxed);
150 } else if !next.is_null() {
151 // Read value from the next node (head is the sentinel/dummy).
152 // SAFETY: next is non-null and points to a valid enqueued node.
153 let value = unsafe { (*next).value.take() };
154
155 // Try to advance head past the sentinel.
156 if self
157 .head
158 .compare_exchange(head, next, Ordering::Release, Ordering::Relaxed)
159 .is_ok()
160 {
161 // Successfully dequeued. Free the old head (sentinel).
162 // SAFETY: head was the sentinel; no other thread references it
163 // after head is advanced.
164 unsafe {
165 let _ = Box::from_raw(head);
166 }
167 self.len.fetch_sub(1, Ordering::Relaxed);
168 return value;
169 }
170 // CAS failed: another operation modified head. Retry.
171 }
172 }
173 }
174
175 /// Check if the queue is empty (approximate).
176 pub fn is_empty(&self) -> bool {
177 let head = self.head.load(Ordering::Acquire);
178 let tail = self.tail.load(Ordering::Acquire);
179 // SAFETY: head points to a valid node.
180 let next = unsafe { (*head).next.load(Ordering::Acquire) };
181 head == tail && next.is_null()
182 }
183
184 /// Get the approximate number of elements in the queue.
185 pub fn len(&self) -> usize {
186 self.len.load(Ordering::Relaxed)
187 }
188}
189
190impl<T> Default for LockFreeQueue<T> {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl<T> Drop for LockFreeQueue<T> {
197 fn drop(&mut self) {
198 // Drain the queue to free all nodes.
199 while self.pop().is_some() {}
200
201 // Free the sentinel node.
202 let sentinel = self.head.load(Ordering::Relaxed);
203 if !sentinel.is_null() {
204 // SAFETY: After draining, head == tail == sentinel, and no other
205 // thread accesses the queue during drop.
206 unsafe {
207 let _ = Box::from_raw(sentinel);
208 }
209 }
210 }
211}