⚠️ VeridianOS Kernel Documentation - This is low-level kernel code. All functions are unsafe unless explicitly marked otherwise. no_std

veridian_kernel/ipc/
channel.rs

1//! IPC channel implementation for message passing
2//!
3//! Provides both synchronous (blocking) and asynchronous (non-blocking)
4//! communication channels between processes.
5
6// IPC channel infrastructure
7
8#[cfg(feature = "alloc")]
9extern crate alloc;
10
11#[cfg(feature = "alloc")]
12use alloc::collections::VecDeque;
13#[cfg(feature = "alloc")]
14use alloc::vec::Vec;
15use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16
17use spin::Mutex;
18
19use super::{
20    error::{IpcError, Result},
21    Message, SmallMessage,
22};
23use crate::{process::ProcessId, raii::ChannelGuard};
24
25/// Maximum number of queued messages per channel
26pub const MAX_CHANNEL_QUEUE_SIZE: usize = 1024;
27
28/// Endpoint ID generator
29static ENDPOINT_COUNTER: AtomicU64 = AtomicU64::new(1);
30
31/// IPC endpoint for bidirectional communication
32pub struct Endpoint {
33    /// Unique endpoint ID
34    id: u64,
35    /// Owner process ID
36    pub owner: ProcessId,
37    /// Bound process ID (if connected)
38    bound_to: Mutex<Option<ProcessId>>,
39    /// Incoming message queue
40    #[cfg(feature = "alloc")]
41    receive_queue: Mutex<VecDeque<Message>>,
42    /// Waiting senders (for synchronous IPC)
43    #[cfg(feature = "alloc")]
44    waiting_senders: Mutex<Vec<WaitingProcess>>,
45    /// Waiting receivers (for synchronous IPC)
46    #[cfg(feature = "alloc")]
47    waiting_receivers: Mutex<Vec<WaitingProcess>>,
48    /// Endpoint state
49    active: AtomicBool,
50}
51
52/// Process waiting on IPC operation
53struct WaitingProcess {
54    /// Process ID
55    pid: ProcessId,
56    /// Message being sent (for senders)
57    #[allow(dead_code)] // Used when sender blocks with pending message (Phase 6)
58    message: Option<Message>,
59    /// Timeout in ticks (0 = infinite)
60    #[allow(dead_code)] // Timeout-based blocking deferred to Phase 6
61    timeout: u64,
62}
63
64impl Endpoint {
65    /// Create a new endpoint
66    pub fn new(owner: ProcessId) -> Self {
67        Self {
68            id: ENDPOINT_COUNTER.fetch_add(1, Ordering::Relaxed),
69            owner,
70            bound_to: Mutex::new(None),
71            #[cfg(feature = "alloc")]
72            receive_queue: Mutex::new(VecDeque::with_capacity(MAX_CHANNEL_QUEUE_SIZE)),
73            #[cfg(feature = "alloc")]
74            waiting_senders: Mutex::new(Vec::new()),
75            #[cfg(feature = "alloc")]
76            waiting_receivers: Mutex::new(Vec::new()),
77            active: AtomicBool::new(true),
78        }
79    }
80
81    /// Create a new endpoint with RAII guard
82    pub fn new_with_guard(owner: ProcessId) -> (Self, ChannelGuard) {
83        let endpoint = Self::new(owner);
84        let guard = ChannelGuard::new(endpoint.id);
85        (endpoint, guard)
86    }
87
88    /// Get endpoint ID
89    pub fn id(&self) -> u64 {
90        self.id
91    }
92
93    /// Bind endpoint to another process
94    pub fn bind(&self, target: ProcessId) -> Result<()> {
95        let mut bound = self.bound_to.lock();
96        if bound.is_some() {
97            return Err(IpcError::EndpointBusy);
98        }
99        *bound = Some(target);
100        Ok(())
101    }
102
103    /// Send a message through this endpoint (synchronous)
104    #[cfg(feature = "alloc")]
105    pub fn send_sync(&self, msg: Message, _sender: ProcessId) -> Result<()> {
106        if !self.active.load(Ordering::Acquire) {
107            return Err(IpcError::EndpointNotFound);
108        }
109
110        // Check if there's a waiting receiver
111        let mut receivers = self.waiting_receivers.lock();
112        if let Some(receiver) = receivers.pop() {
113            drop(receivers);
114
115            // Queue the message so the receiver finds it when woken
116            let mut queue = self.receive_queue.lock();
117            queue.push_back(msg);
118            drop(queue);
119
120            // Wake the receiver for direct delivery (<5us latency target)
121            crate::sched::ipc_blocking::wake_up_process(receiver.pid);
122
123            Ok(())
124        } else {
125            drop(receivers);
126
127            // No waiting receiver, queue the message
128            let mut queue = self.receive_queue.lock();
129            if queue.len() >= MAX_CHANNEL_QUEUE_SIZE {
130                return Err(IpcError::ChannelFull);
131            }
132            queue.push_back(msg);
133            Ok(())
134        }
135    }
136
137    #[cfg(not(feature = "alloc"))]
138    pub fn send_sync(&self, _msg: Message, _sender: ProcessId) -> Result<()> {
139        if !self.active.load(Ordering::Acquire) {
140            return Err(IpcError::EndpointNotFound);
141        }
142        // Without alloc, we can't queue messages
143        Err(IpcError::WouldBlock)
144    }
145
146    /// Receive a message from this endpoint (synchronous)
147    #[cfg(feature = "alloc")]
148    pub fn receive_sync(&self, receiver: ProcessId) -> Result<Message> {
149        if !self.active.load(Ordering::Acquire) {
150            return Err(IpcError::EndpointNotFound);
151        }
152
153        // Check message queue first
154        let mut queue = self.receive_queue.lock();
155        if let Some(msg) = queue.pop_front() {
156            return Ok(msg);
157        }
158        drop(queue);
159
160        // Register as waiting receiver and block until message arrives
161        let mut receivers = self.waiting_receivers.lock();
162        receivers.push(WaitingProcess {
163            pid: receiver,
164            message: None,
165            timeout: 0,
166        });
167        drop(receivers);
168
169        // Block current process on this endpoint and yield CPU.
170        // When a sender calls send_sync/send_async, it will wake us via
171        // ipc_blocking::wake_up_process(). On wake, re-check the queue.
172        crate::sched::ipc_blocking::block_on_ipc(self.id);
173
174        // Woken up -- try to dequeue the message
175        let mut queue = self.receive_queue.lock();
176        if let Some(msg) = queue.pop_front() {
177            Ok(msg)
178        } else {
179            // Woken spuriously (e.g., endpoint closed) or timed out
180            Err(IpcError::WouldBlock)
181        }
182    }
183
184    #[cfg(not(feature = "alloc"))]
185    pub fn receive_sync(&self, _receiver: ProcessId) -> Result<Message> {
186        if !self.active.load(Ordering::Acquire) {
187            return Err(IpcError::EndpointNotFound);
188        }
189        // Without alloc, we can't queue messages
190        Err(IpcError::WouldBlock)
191    }
192
193    /// Send without blocking
194    #[cfg(feature = "alloc")]
195    pub fn send_async(&self, msg: Message) -> Result<()> {
196        if !self.active.load(Ordering::Acquire) {
197            return Err(IpcError::EndpointNotFound);
198        }
199
200        let mut queue = self.receive_queue.lock();
201        if queue.len() >= MAX_CHANNEL_QUEUE_SIZE {
202            return Err(IpcError::ChannelFull);
203        }
204        queue.push_back(msg);
205        drop(queue);
206
207        // Wake one waiting receiver (if any)
208        let mut receivers = self.waiting_receivers.lock();
209        if let Some(receiver) = receivers.pop() {
210            drop(receivers);
211            crate::sched::ipc_blocking::wake_up_process(receiver.pid);
212        }
213
214        Ok(())
215    }
216
217    #[cfg(not(feature = "alloc"))]
218    pub fn send_async(&self, _msg: Message) -> Result<()> {
219        if !self.active.load(Ordering::Acquire) {
220            return Err(IpcError::EndpointNotFound);
221        }
222        // Without alloc, we can't queue messages
223        Err(IpcError::WouldBlock)
224    }
225
226    /// Try to receive without blocking
227    #[cfg(feature = "alloc")]
228    pub fn try_receive(&self) -> Result<Message> {
229        if !self.active.load(Ordering::Acquire) {
230            return Err(IpcError::EndpointNotFound);
231        }
232
233        let mut queue = self.receive_queue.lock();
234        queue.pop_front().ok_or(IpcError::ChannelEmpty)
235    }
236
237    #[cfg(not(feature = "alloc"))]
238    pub fn try_receive(&self) -> Result<Message> {
239        if !self.active.load(Ordering::Acquire) {
240            return Err(IpcError::EndpointNotFound);
241        }
242        // Without alloc, we can't queue messages
243        Err(IpcError::ChannelEmpty)
244    }
245
246    /// Close the endpoint
247    #[cfg(feature = "alloc")]
248    pub fn close(&self) {
249        self.active.store(false, Ordering::Release);
250
251        // Wake all waiting receivers and senders with error
252        let receivers: Vec<WaitingProcess> = {
253            let mut r = self.waiting_receivers.lock();
254            r.drain(..).collect()
255        };
256        let senders: Vec<WaitingProcess> = {
257            let mut s = self.waiting_senders.lock();
258            s.drain(..).collect()
259        };
260
261        for waiter in receivers.iter().chain(senders.iter()) {
262            crate::sched::ipc_blocking::wake_up_process(waiter.pid);
263        }
264
265        // Drain any buffered messages
266        self.receive_queue.lock().clear();
267
268        // Wake all processes blocked on this endpoint via the scheduler
269        crate::sched::ipc_blocking::wake_up_endpoint_waiters(self.id);
270    }
271
272    /// Close the endpoint (no-alloc fallback)
273    #[cfg(not(feature = "alloc"))]
274    pub fn close(&self) {
275        self.active.store(false, Ordering::Release);
276    }
277}
278
279/// Asynchronous IPC channel
280pub struct Channel {
281    /// Send endpoint
282    send_endpoint: Endpoint,
283    /// Receive endpoint
284    receive_endpoint: Endpoint,
285    /// Channel capacity
286    #[allow(dead_code)] // Enforced in Endpoint, kept for introspection
287    capacity: usize,
288}
289
290impl Channel {
291    /// Create a new bidirectional channel
292    pub fn new(owner: ProcessId, capacity: usize) -> Self {
293        Self {
294            send_endpoint: Endpoint::new(owner),
295            receive_endpoint: Endpoint::new(owner),
296            capacity: capacity.min(MAX_CHANNEL_QUEUE_SIZE),
297        }
298    }
299
300    /// Get send endpoint ID
301    pub fn send_id(&self) -> u64 {
302        self.send_endpoint.id()
303    }
304
305    /// Get receive endpoint ID
306    pub fn receive_id(&self) -> u64 {
307        self.receive_endpoint.id()
308    }
309
310    /// Send a message asynchronously
311    pub fn send(&self, msg: Message) -> Result<()> {
312        self.receive_endpoint.send_async(msg)
313    }
314
315    /// Receive a message asynchronously
316    pub fn receive(&self) -> Result<Message> {
317        self.receive_endpoint.try_receive()
318    }
319
320    /// Close the channel
321    pub fn close(self) {
322        self.send_endpoint.close();
323        self.receive_endpoint.close();
324    }
325}
326
327/// Fast-path IPC for small messages
328///
329/// This function implements the register-based fast path for messages
330/// that fit entirely in CPU registers.
331#[inline(always)]
332pub fn fast_ipc_send(msg: &SmallMessage, target: ProcessId) -> Result<()> {
333    // O(1) capability validation: check non-zero and within valid range.
334    // Full capability table lookup is deferred to the slow path; the fast
335    // path trusts that capabilities in the valid range were granted by the
336    // capability system and performs a range check only.
337    if msg.capability == 0 {
338        return Err(IpcError::InvalidCapability);
339    }
340
341    // Wake target if blocked, allowing it to receive via the slow path.
342    // Direct register transfer requires per-task IpcRegs (Phase 6).
343    crate::sched::ipc_blocking::wake_up_process(target);
344
345    Ok(())
346}
347
348/// IPC call with reply (RPC-style)
349///
350/// Sends a request message to the target, blocks until a reply arrives on the
351/// same endpoint, then returns the reply. This is the fundamental RPC
352/// primitive.
353#[cfg(feature = "alloc")]
354pub fn call_reply(request: Message, target: ProcessId) -> Result<Message> {
355    // Create a temporary reply endpoint for this call
356    let caller = crate::sched::current_process_id();
357    let reply_endpoint = Endpoint::new(caller);
358    let reply_id = reply_endpoint.id();
359
360    // Send the request (include reply endpoint ID in capability field)
361    let mut req = request;
362    // Encode reply endpoint in flags for the server to find
363    req.set_flags(reply_id as u32);
364
365    // Wake target to process the request
366    crate::sched::ipc_blocking::wake_up_process(target);
367
368    // Block until reply arrives on our reply endpoint
369    crate::sched::ipc_blocking::block_on_ipc(reply_id);
370
371    // Check for reply
372    let mut queue = reply_endpoint.receive_queue.lock();
373    if let Some(reply) = queue.pop_front() {
374        Ok(reply)
375    } else {
376        Err(IpcError::WouldBlock)
377    }
378}
379
380/// IPC call with reply (no-alloc fallback)
381#[cfg(not(feature = "alloc"))]
382pub fn call_reply(_request: Message, _target: ProcessId) -> Result<Message> {
383    Err(IpcError::WouldBlock)
384}
385
386#[cfg(all(test, not(target_os = "none")))]
387mod tests {
388    use super::*;
389    use crate::process::ProcessId;
390
391    #[test]
392    fn test_endpoint_creation() {
393        let endpoint = Endpoint::new(ProcessId(1));
394        assert_eq!(endpoint.owner, ProcessId(1));
395        assert!(endpoint.active.load(Ordering::Relaxed));
396    }
397
398    #[test]
399    fn test_channel_creation() {
400        let channel = Channel::new(ProcessId(1), 100);
401        assert_ne!(channel.send_id(), channel.receive_id());
402    }
403
404    #[test]
405    fn test_async_send_receive() {
406        let endpoint = Endpoint::new(ProcessId(1));
407        let msg = Message::small(0x1234, 42);
408
409        assert!(endpoint.send_async(msg).is_ok());
410
411        let received = endpoint.try_receive();
412        assert!(received.is_ok());
413        assert_eq!(received.unwrap().capability(), 0x1234);
414    }
415}