⚠️ VeridianOS Kernel Documentation - This is low-level kernel code. All functions are unsafe unless explicitly marked otherwise. no_std

veridian_kernel/ipc/
async_channel.rs

1//! Asynchronous IPC channels with lock-free implementation
2//!
3//! This module provides high-performance async channels using lock-free
4//! ring buffers and event notification for efficient message passing.
5
6// Async IPC channels
7
8#[cfg(feature = "alloc")]
9extern crate alloc;
10
11use core::{
12    ptr,
13    sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
14};
15
16use super::{
17    capability::ProcessId,
18    error::{IpcError, Result},
19    message::Message,
20};
21use crate::arch::entropy::read_timestamp;
22
23/// Maximum messages in async channel
24pub const ASYNC_CHANNEL_SIZE: usize = 256;
25
26/// Lock-free ring buffer for async messages
27pub struct AsyncChannel {
28    /// Channel ID
29    id: u64,
30    /// Owner process
31    #[allow(dead_code)] // Needed for ownership checks in Phase 6
32    owner: ProcessId,
33    /// Ring buffer for messages
34    buffer: RingBuffer<Message>,
35    /// Subscribers waiting for messages
36    #[cfg(feature = "alloc")]
37    subscribers: spin::Mutex<alloc::vec::Vec<ProcessId>>,
38    /// Channel statistics
39    stats: ChannelStats,
40    /// Channel active flag
41    active: AtomicBool,
42}
43
44/// Channel statistics
45struct ChannelStats {
46    messages_sent: AtomicU64,
47    messages_received: AtomicU64,
48    messages_dropped: AtomicU64,
49    max_queue_depth: AtomicUsize,
50}
51
52impl AsyncChannel {
53    /// Create a new async channel
54    pub fn new(id: u64, owner: ProcessId, capacity: usize) -> Self {
55        Self {
56            id,
57            owner,
58            buffer: RingBuffer::new(capacity),
59            #[cfg(feature = "alloc")]
60            subscribers: spin::Mutex::new(alloc::vec::Vec::new()),
61            stats: ChannelStats {
62                messages_sent: AtomicU64::new(0),
63                messages_received: AtomicU64::new(0),
64                messages_dropped: AtomicU64::new(0),
65                max_queue_depth: AtomicUsize::new(0),
66            },
67            active: AtomicBool::new(true),
68        }
69    }
70
71    /// Send a message without blocking
72    pub fn send_async(&self, msg: Message) -> Result<()> {
73        if !self.active.load(Ordering::Acquire) {
74            return Err(IpcError::EndpointNotFound);
75        }
76
77        // Validate capability if provided
78        let cap_id = msg.capability();
79        if cap_id != 0 {
80            // Get current process's capability space
81            if let Some(current_process) = crate::process::current_process() {
82                if let Some(real_process) = crate::process::table::get_process(current_process.pid)
83                {
84                    let cap_space = real_process.capability_space.lock();
85                    let cap_token = crate::cap::CapabilityToken::from_u64(cap_id);
86
87                    // Check send permission
88                    crate::cap::ipc_integration::check_send_permission(cap_token, &cap_space)?;
89                }
90            }
91        }
92
93        // Try to enqueue message
94        match self.buffer.push(msg) {
95            Ok(()) => {
96                self.stats.messages_sent.fetch_add(1, Ordering::Relaxed);
97
98                // Update max queue depth
99                let current_size = self.buffer.size();
100                let mut max_depth = self.stats.max_queue_depth.load(Ordering::Relaxed);
101                while current_size > max_depth {
102                    match self.stats.max_queue_depth.compare_exchange_weak(
103                        max_depth,
104                        current_size,
105                        Ordering::Relaxed,
106                        Ordering::Relaxed,
107                    ) {
108                        Ok(_) => break,
109                        Err(old) => max_depth = old,
110                    }
111                }
112
113                // Wake up subscribers
114                #[cfg(feature = "alloc")]
115                {
116                    let subscribers = self.subscribers.lock();
117                    for &pid in subscribers.iter() {
118                        wake_process(pid);
119                    }
120                }
121
122                // Also wake any processes blocked on this channel's endpoint
123                crate::sched::ipc_blocking::wake_up_endpoint_waiters(self.id);
124
125                Ok(())
126            }
127            Err(_) => {
128                self.stats.messages_dropped.fetch_add(1, Ordering::Relaxed);
129                Err(IpcError::ChannelFull)
130            }
131        }
132    }
133
134    /// Receive a message without blocking
135    pub fn receive_async(&self) -> Result<Message> {
136        if !self.active.load(Ordering::Acquire) {
137            return Err(IpcError::EndpointNotFound);
138        }
139
140        // For receiving, we check if the caller has receive permission
141        // This would typically be done at channel subscription time
142        // For now, we allow receives if the process has access to the channel
143
144        match self.buffer.pop() {
145            Some(msg) => {
146                self.stats.messages_received.fetch_add(1, Ordering::Relaxed);
147                Ok(msg)
148            }
149            None => Err(IpcError::ChannelEmpty),
150        }
151    }
152
153    /// Poll for messages with timeout
154    pub fn poll(&self, timeout_ns: u64) -> Result<Option<Message>> {
155        let start = read_timestamp();
156
157        loop {
158            // Try to receive
159            match self.receive_async() {
160                Ok(msg) => return Ok(Some(msg)),
161                Err(IpcError::ChannelEmpty) => {
162                    // Check timeout
163                    if timeout_ns > 0 {
164                        let elapsed = timestamp_to_ns(read_timestamp() - start);
165                        if elapsed >= timeout_ns {
166                            return Ok(None);
167                        }
168                    }
169
170                    // Yield CPU and retry
171                    core::hint::spin_loop();
172                }
173                Err(e) => return Err(e),
174            }
175        }
176    }
177
178    /// Subscribe to channel notifications
179    #[cfg(feature = "alloc")]
180    pub fn subscribe(&self, pid: ProcessId) -> Result<()> {
181        if !self.active.load(Ordering::Acquire) {
182            return Err(IpcError::EndpointNotFound);
183        }
184
185        let mut subscribers = self.subscribers.lock();
186        if !subscribers.contains(&pid) {
187            subscribers.push(pid);
188        }
189        Ok(())
190    }
191
192    #[cfg(not(feature = "alloc"))]
193    pub fn subscribe(&self, _pid: ProcessId) -> Result<()> {
194        Ok(())
195    }
196
197    /// Get channel statistics
198    pub fn get_stats(&self) -> AsyncChannelStats {
199        AsyncChannelStats {
200            messages_sent: self.stats.messages_sent.load(Ordering::Relaxed),
201            messages_received: self.stats.messages_received.load(Ordering::Relaxed),
202            messages_dropped: self.stats.messages_dropped.load(Ordering::Relaxed),
203            max_queue_depth: self.stats.max_queue_depth.load(Ordering::Relaxed),
204            current_size: self.buffer.size(),
205            capacity: self.buffer.capacity(),
206        }
207    }
208
209    /// Close the channel
210    pub fn close(&self) {
211        self.active.store(false, Ordering::Release);
212
213        // Wake all subscribers
214        #[cfg(feature = "alloc")]
215        {
216            let subscribers = self.subscribers.lock();
217            for &pid in subscribers.iter() {
218                wake_process(pid);
219            }
220        }
221    }
222}
223
224/// Lock-free ring buffer implementation
225struct RingBuffer<T> {
226    /// Buffer storage
227    buffer: *mut T,
228    /// Buffer capacity
229    capacity: usize,
230    /// Write position
231    write_pos: AtomicUsize,
232    /// Read position
233    read_pos: AtomicUsize,
234    /// Number of items in buffer
235    size: AtomicUsize,
236}
237
238impl<T> RingBuffer<T> {
239    /// Create a new ring buffer
240    fn new(capacity: usize) -> Self {
241        let layout =
242            core::alloc::Layout::array::<T>(capacity).expect("ring buffer capacity overflow");
243        // SAFETY: The layout is computed from Layout::array::<T>(capacity) which
244        // ensures proper size and alignment for an array of T elements. The alloc()
245        // call returns a pointer to uninitialized memory of the requested layout.
246        // If allocation fails (returns null), subsequent push/pop operations will
247        // cause undefined behavior -- in a production kernel, this should be checked.
248        // The returned pointer is cast to *mut T, which is valid because the layout
249        // guarantees correct alignment for T.
250        let buffer = unsafe { alloc::alloc::alloc(layout) as *mut T };
251
252        Self {
253            buffer,
254            capacity,
255            write_pos: AtomicUsize::new(0),
256            read_pos: AtomicUsize::new(0),
257            size: AtomicUsize::new(0),
258        }
259    }
260
261    /// Push an item into the buffer
262    fn push(&self, item: T) -> core::result::Result<(), T> {
263        let current_size = self.size.load(Ordering::Acquire);
264        if current_size >= self.capacity {
265            return Err(item);
266        }
267
268        // Reserve a slot
269        let write_pos = self.write_pos.fetch_add(1, Ordering::Relaxed) % self.capacity;
270
271        // SAFETY: `write_pos` is computed modulo `self.capacity`, so it is always
272        // within bounds of the allocated buffer (0..capacity-1). The size check above
273        // ensures we are not writing past the buffer's logical capacity. `ptr::write`
274        // is used instead of assignment because the slot may contain uninitialized
275        // memory (never written) or previously-read memory (already consumed by pop).
276        // In either case, we must not run Drop on the old value, which ptr::write
277        // avoids. The buffer pointer is valid because it was allocated in `new()`.
278        unsafe {
279            ptr::write(self.buffer.add(write_pos), item);
280        }
281
282        // Update size
283        self.size.fetch_add(1, Ordering::Release);
284
285        Ok(())
286    }
287
288    /// Pop an item from the buffer
289    fn pop(&self) -> Option<T> {
290        loop {
291            let current_size = self.size.load(Ordering::Acquire);
292            if current_size == 0 {
293                return None;
294            }
295
296            // Try to decrement size
297            match self.size.compare_exchange_weak(
298                current_size,
299                current_size - 1,
300                Ordering::Acquire,
301                Ordering::Relaxed,
302            ) {
303                Ok(_) => {
304                    // Successfully reserved an item
305                    let read_pos = self.read_pos.fetch_add(1, Ordering::Relaxed) % self.capacity;
306
307                    // SAFETY: `read_pos` is computed modulo `self.capacity`, so it is
308                    // within bounds. The compare_exchange above successfully decremented
309                    // the size, guaranteeing a valid item exists at this slot (placed by
310                    // a prior push()). `ptr::read` is used to move the value out of the
311                    // buffer without dropping it in place -- ownership transfers to the
312                    // caller. The buffer pointer is valid from the allocation in `new()`.
313                    let item = unsafe { ptr::read(self.buffer.add(read_pos)) };
314
315                    return Some(item);
316                }
317                Err(_) => {
318                    // Retry
319                    core::hint::spin_loop();
320                }
321            }
322        }
323    }
324
325    /// Get current size
326    fn size(&self) -> usize {
327        self.size.load(Ordering::Relaxed)
328    }
329
330    /// Get capacity
331    fn capacity(&self) -> usize {
332        self.capacity
333    }
334}
335
336impl<T> Drop for RingBuffer<T> {
337    fn drop(&mut self) {
338        // Clean up remaining items
339        while self.pop().is_some() {}
340
341        // Deallocate buffer
342        let layout = core::alloc::Layout::array::<T>(self.capacity)
343            .expect("ring buffer layout error in drop");
344        // SAFETY: The buffer was allocated in `new()` using `alloc::alloc::alloc`
345        // with the same layout (same capacity and type T). All remaining items have
346        // been drained by the pop() loop above, so no live T values remain in the
347        // buffer. The pointer has not been deallocated elsewhere. We have exclusive
348        // access via `&mut self` in the Drop impl.
349        unsafe {
350            alloc::alloc::dealloc(self.buffer as *mut u8, layout);
351        }
352    }
353}
354
355// SAFETY: RingBuffer<T> can be sent across threads if T: Send. The buffer is a
356// raw heap allocation owned entirely by the RingBuffer, and all T values stored
357// in it are owned by the buffer. Transferring the RingBuffer transfers
358// ownership of the contained T values.
359unsafe impl<T: Send> Send for RingBuffer<T> {}
360// SAFETY: RingBuffer<T> can be shared across threads if T: Send. Thread safety
361// is provided by atomic operations on write_pos, read_pos, and size, which
362// coordinate concurrent push/pop access. The size atomic with Acquire/Release
363// ordering ensures that a consumer sees fully written data from a producer.
364// NOTE: This implementation has a subtle race between concurrent pushers (or
365// concurrent poppers) since fetch_add on position does not coordinate with the
366// size check. In practice, this is used with single-producer/single-consumer
367// patterns.
368unsafe impl<T: Send> Sync for RingBuffer<T> {}
369
370/// Async channel statistics
371pub struct AsyncChannelStats {
372    pub messages_sent: u64,
373    pub messages_received: u64,
374    pub messages_dropped: u64,
375    pub max_queue_depth: usize,
376    pub current_size: usize,
377    pub capacity: usize,
378}
379
380/// Batch message processing for efficiency
381pub struct MessageBatch {
382    messages: [Option<Message>; 16],
383    count: usize,
384}
385
386impl MessageBatch {
387    /// Create a new batch
388    pub fn new() -> Self {
389        Self {
390            messages: [None; 16],
391            count: 0,
392        }
393    }
394
395    /// Add a message to the batch
396    pub fn add(&mut self, msg: Message) -> bool {
397        if self.count < 16 {
398            self.messages[self.count] = Some(msg);
399            self.count += 1;
400            true
401        } else {
402            false
403        }
404    }
405
406    /// Process the batch
407    pub fn process<F>(self, mut f: F)
408    where
409        F: FnMut(Message),
410    {
411        for i in 0..self.count {
412            if let Some(msg) = self.messages[i] {
413                f(msg);
414            }
415        }
416    }
417}
418
419impl Default for MessageBatch {
420    fn default() -> Self {
421        Self::new()
422    }
423}
424
425// Process wakeup via scheduler
426fn wake_process(pid: ProcessId) {
427    crate::sched::ipc_blocking::wake_up_process(pid);
428}
429
430fn timestamp_to_ns(cycles: u64) -> u64 {
431    // Assume 2GHz CPU for now
432    cycles / 2
433}
434
435#[cfg(all(test, not(target_os = "none")))]
436mod tests {
437    use super::*;
438    use crate::process::ProcessId;
439
440    #[test]
441    fn test_ring_buffer() {
442        let buffer = RingBuffer::<u64>::new(4);
443
444        // Test push/pop
445        assert!(buffer.push(1).is_ok());
446        assert!(buffer.push(2).is_ok());
447        assert_eq!(buffer.pop(), Some(1));
448        assert_eq!(buffer.pop(), Some(2));
449        assert_eq!(buffer.pop(), None);
450    }
451
452    #[test]
453    fn test_async_channel() {
454        let channel = AsyncChannel::new(1, ProcessId(1), 10);
455        let msg = Message::small(0x1234, 42);
456
457        // Test send/receive
458        assert!(channel.send_async(msg).is_ok());
459        let received = channel.receive_async();
460        assert!(received.is_ok());
461        assert_eq!(received.unwrap().capability(), 0x1234);
462    }
463
464    #[test]
465    fn test_channel_full() {
466        let channel = AsyncChannel::new(1, ProcessId(1), 2);
467        let msg = Message::small(0x1234, 42);
468
469        // Fill channel
470        assert!(channel.send_async(msg).is_ok());
471        assert!(channel.send_async(msg).is_ok());
472
473        // Should be full
474        assert_eq!(channel.send_async(msg), Err(IpcError::ChannelFull));
475    }
476}