⚠️ VeridianOS Kernel Documentation - This is low-level kernel code. All functions are unsafe unless explicitly marked otherwise. no_std

veridian_kernel/ipc/
message_passing.rs

1//! Message passing implementation for IPC
2//!
3//! This module provides the core message passing functionality between
4//! processes, including message queues, delivery, and process synchronization.
5
6// Core message passing
7#[cfg(feature = "alloc")]
8extern crate alloc;
9
10#[cfg(feature = "alloc")]
11use alloc::{collections::BTreeMap, vec::Vec};
12use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13
14use spin::Mutex;
15
16use super::{
17    capability::EndpointId,
18    error::{IpcError, Result},
19    message::Message,
20};
21use crate::{process::ProcessId, sched};
22
23/// Message queue for each endpoint
24#[cfg(feature = "alloc")]
25pub struct MessageQueue {
26    /// Queued messages
27    messages: Vec<Message>,
28    /// Maximum queue size
29    max_size: usize,
30    /// Processes waiting to receive
31    waiting_receivers: Vec<ProcessId>,
32    /// Processes waiting to send (when queue is full)
33    waiting_senders: Vec<(ProcessId, Message)>,
34}
35
36#[cfg(feature = "alloc")]
37impl MessageQueue {
38    /// Create a new message queue
39    pub fn new(max_size: usize) -> Self {
40        Self {
41            messages: Vec::new(),
42            max_size,
43            waiting_receivers: Vec::new(),
44            waiting_senders: Vec::new(),
45        }
46    }
47
48    /// Enqueue a message
49    pub fn enqueue(&mut self, msg: Message) -> Result<()> {
50        if self.messages.len() >= self.max_size {
51            return Err(IpcError::ChannelFull);
52        }
53        self.messages.push(msg);
54        Ok(())
55    }
56
57    /// Dequeue a message
58    pub fn dequeue(&mut self) -> Option<Message> {
59        if self.messages.is_empty() {
60            None
61        } else {
62            Some(self.messages.remove(0))
63        }
64    }
65
66    /// Add a waiting receiver
67    pub fn add_receiver(&mut self, pid: ProcessId) {
68        self.waiting_receivers.push(pid);
69    }
70
71    /// Get a waiting receiver
72    pub fn get_receiver(&mut self) -> Option<ProcessId> {
73        if self.waiting_receivers.is_empty() {
74            None
75        } else {
76            Some(self.waiting_receivers.remove(0))
77        }
78    }
79
80    /// Add a waiting sender
81    pub fn add_sender(&mut self, pid: ProcessId, msg: Message) {
82        self.waiting_senders.push((pid, msg));
83    }
84
85    /// Process waiting senders (when space becomes available)
86    pub fn process_waiting_senders(&mut self) -> Vec<ProcessId> {
87        let mut woken = Vec::new();
88
89        while self.messages.len() < self.max_size && !self.waiting_senders.is_empty() {
90            let (pid, msg) = self.waiting_senders.remove(0);
91            self.messages.push(msg);
92            woken.push(pid);
93        }
94
95        woken
96    }
97
98    /// Check if queue has messages
99    pub fn has_messages(&self) -> bool {
100        !self.messages.is_empty()
101    }
102
103    /// Check if there are waiting receivers
104    pub fn has_waiting_receivers(&self) -> bool {
105        !self.waiting_receivers.is_empty()
106    }
107}
108
109/// Endpoint structure with message queue
110pub struct Endpoint {
111    /// Endpoint ID
112    pub id: EndpointId,
113    /// Owner process
114    pub owner: ProcessId,
115    /// Message queue
116    #[cfg(feature = "alloc")]
117    pub queue: Mutex<MessageQueue>,
118    /// Active flag
119    pub active: AtomicBool,
120    /// Message counter
121    pub message_count: AtomicU64,
122}
123
124impl Endpoint {
125    /// Create a new endpoint
126    #[cfg(feature = "alloc")]
127    pub fn new(id: EndpointId, owner: ProcessId) -> Self {
128        Self {
129            id,
130            owner,
131            queue: Mutex::new(MessageQueue::new(1024)), // Default queue size
132            active: AtomicBool::new(true),
133            message_count: AtomicU64::new(0),
134        }
135    }
136
137    /// Close the endpoint
138    pub fn close(&self) {
139        self.active.store(false, Ordering::Release);
140    }
141
142    /// Check if endpoint is active
143    pub fn is_active(&self) -> bool {
144        self.active.load(Ordering::Acquire)
145    }
146}
147
148/// Global endpoint registry
149#[cfg(feature = "alloc")]
150pub struct EndpointRegistry {
151    /// Endpoints indexed by ID
152    endpoints: Mutex<BTreeMap<EndpointId, Endpoint>>,
153}
154
155#[cfg(feature = "alloc")]
156impl EndpointRegistry {
157    /// Create a new registry
158    pub const fn new() -> Self {
159        Self {
160            endpoints: Mutex::new(BTreeMap::new()),
161        }
162    }
163
164    /// Register an endpoint
165    pub fn register(&self, endpoint: Endpoint) -> Result<()> {
166        let mut endpoints = self.endpoints.lock();
167
168        if endpoints.contains_key(&endpoint.id) {
169            return Err(IpcError::EndpointBusy);
170        }
171
172        endpoints.insert(endpoint.id, endpoint);
173        Ok(())
174    }
175
176    /// Unregister an endpoint
177    pub fn unregister(&self, id: EndpointId) -> Result<()> {
178        let mut endpoints = self.endpoints.lock();
179
180        if let Some(endpoint) = endpoints.remove(&id) {
181            endpoint.close();
182
183            // Wake up any waiting processes
184            let queue = endpoint.queue.lock();
185            let waiting = queue.waiting_receivers.clone();
186            drop(queue);
187
188            for pid in waiting {
189                sched::wake_up_process(pid);
190            }
191
192            Ok(())
193        } else {
194            Err(IpcError::EndpointNotFound)
195        }
196    }
197
198    /// Get an endpoint reference
199    pub fn get(&self, id: EndpointId) -> Option<&'static Endpoint> {
200        let endpoints = self.endpoints.lock();
201
202        endpoints.get(&id).map(|ep| {
203            // SAFETY: The Endpoint is stored in a BTreeMap behind a Mutex, so it has
204            // a stable address as long as it is not removed. The returned 'static
205            // reference is valid as long as the endpoint remains in the registry.
206            // Callers must not hold this reference across endpoint removal.
207            unsafe { &*(ep as *const Endpoint) }
208        })
209    }
210}
211
212#[cfg(feature = "alloc")]
213impl Default for EndpointRegistry {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219/// Global endpoint registry instance
220#[cfg(feature = "alloc")]
221pub(crate) static ENDPOINT_REGISTRY: EndpointRegistry = EndpointRegistry::new();
222
223/// Send a message to an endpoint
224#[cfg(feature = "alloc")]
225pub fn send_to_endpoint(msg: Message, endpoint_id: EndpointId) -> Result<()> {
226    let endpoint = ENDPOINT_REGISTRY
227        .get(endpoint_id)
228        .ok_or(IpcError::EndpointNotFound)?;
229
230    if !endpoint.is_active() {
231        return Err(IpcError::EndpointNotFound);
232    }
233
234    let mut queue = endpoint.queue.lock();
235
236    // Check if there's a waiting receiver
237    if let Some(receiver_pid) = queue.get_receiver() {
238        // Direct delivery
239        drop(queue);
240
241        // Handle capability transfer if present
242        if msg.capability() != 0 {
243            if let Some(sender) = crate::process::current_process() {
244                if let Some(real_sender) = crate::process::table::get_process(sender.pid) {
245                    let sender_cap_space = real_sender.capability_space.lock();
246                    // Transfer capability to receiver
247                    if let Err(e) = crate::ipc::cap_transfer::transfer_capability(
248                        &msg,
249                        &sender_cap_space,
250                        receiver_pid,
251                    ) {
252                        // Log capability transfer failure but don't fail the message send
253                        #[cfg(target_arch = "x86_64")]
254                        println!("[IPC] Capability transfer failed: {:?}", e);
255                        #[cfg(not(target_arch = "x86_64"))]
256                        let _ = e;
257                    }
258                }
259            }
260        }
261
262        deliver_to_process(msg, receiver_pid)?;
263        sched::wake_up_process(receiver_pid);
264        endpoint.message_count.fetch_add(1, Ordering::Relaxed);
265        Ok(())
266    } else {
267        // Queue the message
268        match queue.enqueue(msg) {
269            Ok(()) => {
270                endpoint.message_count.fetch_add(1, Ordering::Relaxed);
271                // Note: Capability transfer happens when message is dequeued
272                Ok(())
273            }
274            Err(e) => Err(e),
275        }
276    }
277}
278
279/// Receive a message from an endpoint
280#[cfg(feature = "alloc")]
281pub fn receive_from_endpoint(endpoint_id: EndpointId, blocking: bool) -> Result<Message> {
282    let endpoint = ENDPOINT_REGISTRY
283        .get(endpoint_id)
284        .ok_or(IpcError::EndpointNotFound)?;
285
286    if !endpoint.is_active() {
287        return Err(IpcError::EndpointNotFound);
288    }
289
290    let mut queue = endpoint.queue.lock();
291
292    // Try to get a message
293    if let Some(msg) = queue.dequeue() {
294        // Process any waiting senders
295        let woken_senders = queue.process_waiting_senders();
296        drop(queue);
297
298        // Wake up senders that were waiting for space
299        for pid in woken_senders {
300            sched::wake_up_process(pid);
301        }
302
303        Ok(msg)
304    } else if blocking {
305        // No message, block if requested
306        let current_pid = sched::current_process().pid;
307        queue.add_receiver(current_pid);
308        drop(queue);
309
310        // Block the current process
311        sched::block_on_ipc(endpoint_id);
312
313        // When we wake up, try again
314        receive_from_endpoint(endpoint_id, false)
315    } else {
316        Err(IpcError::WouldBlock)
317    }
318}
319
320/// Deliver a message directly to a process
321#[cfg(feature = "alloc")]
322fn deliver_to_process(msg: Message, pid: ProcessId) -> Result<()> {
323    // Store message in per-process message buffer
324    static PROCESS_MESSAGES: Mutex<BTreeMap<ProcessId, Message>> = Mutex::new(BTreeMap::new());
325
326    let mut messages = PROCESS_MESSAGES.lock();
327    messages.insert(pid, msg);
328
329    Ok(())
330}
331
332/// Retrieve a delivered message for the current process
333#[cfg(feature = "alloc")]
334pub fn retrieve_delivered_message() -> Option<Message> {
335    static PROCESS_MESSAGES: Mutex<BTreeMap<ProcessId, Message>> = Mutex::new(BTreeMap::new());
336
337    let current_pid = sched::current_process().pid;
338    let mut messages = PROCESS_MESSAGES.lock();
339
340    if let Some(msg) = messages.remove(&current_pid) {
341        // Validate any capability in the message
342        if msg.capability() != 0 {
343            if let Some(current) = crate::process::current_process() {
344                if let Some(real_process) = crate::process::table::get_process(current.pid) {
345                    let cap_space = real_process.capability_space.lock();
346                    let cap_token = crate::cap::CapabilityToken::from_u64(msg.capability());
347
348                    // Verify the capability exists in receiver's space
349                    if cap_space.lookup(cap_token).is_none() {
350                        println!(
351                            "[IPC] Warning: Capability {} not found in receiver's space",
352                            msg.capability()
353                        );
354                        // Still deliver the message but without the capability
355                    }
356                }
357            }
358        }
359        Some(msg)
360    } else {
361        None
362    }
363}
364
365/// Initialize message passing subsystem
366pub fn init() {
367    println!("[IPC] Message passing subsystem initialized");
368}