veridian_kernel/ipc/
message_passing.rs1#[cfg(feature = "alloc")]
8extern crate alloc;
9
10#[cfg(feature = "alloc")]
11use alloc::{collections::BTreeMap, vec::Vec};
12use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13
14use spin::Mutex;
15
16use super::{
17 capability::EndpointId,
18 error::{IpcError, Result},
19 message::Message,
20};
21use crate::{process::ProcessId, sched};
22
23#[cfg(feature = "alloc")]
25pub struct MessageQueue {
26 messages: Vec<Message>,
28 max_size: usize,
30 waiting_receivers: Vec<ProcessId>,
32 waiting_senders: Vec<(ProcessId, Message)>,
34}
35
36#[cfg(feature = "alloc")]
37impl MessageQueue {
38 pub fn new(max_size: usize) -> Self {
40 Self {
41 messages: Vec::new(),
42 max_size,
43 waiting_receivers: Vec::new(),
44 waiting_senders: Vec::new(),
45 }
46 }
47
48 pub fn enqueue(&mut self, msg: Message) -> Result<()> {
50 if self.messages.len() >= self.max_size {
51 return Err(IpcError::ChannelFull);
52 }
53 self.messages.push(msg);
54 Ok(())
55 }
56
57 pub fn dequeue(&mut self) -> Option<Message> {
59 if self.messages.is_empty() {
60 None
61 } else {
62 Some(self.messages.remove(0))
63 }
64 }
65
66 pub fn add_receiver(&mut self, pid: ProcessId) {
68 self.waiting_receivers.push(pid);
69 }
70
71 pub fn get_receiver(&mut self) -> Option<ProcessId> {
73 if self.waiting_receivers.is_empty() {
74 None
75 } else {
76 Some(self.waiting_receivers.remove(0))
77 }
78 }
79
80 pub fn add_sender(&mut self, pid: ProcessId, msg: Message) {
82 self.waiting_senders.push((pid, msg));
83 }
84
85 pub fn process_waiting_senders(&mut self) -> Vec<ProcessId> {
87 let mut woken = Vec::new();
88
89 while self.messages.len() < self.max_size && !self.waiting_senders.is_empty() {
90 let (pid, msg) = self.waiting_senders.remove(0);
91 self.messages.push(msg);
92 woken.push(pid);
93 }
94
95 woken
96 }
97
98 pub fn has_messages(&self) -> bool {
100 !self.messages.is_empty()
101 }
102
103 pub fn has_waiting_receivers(&self) -> bool {
105 !self.waiting_receivers.is_empty()
106 }
107}
108
109pub struct Endpoint {
111 pub id: EndpointId,
113 pub owner: ProcessId,
115 #[cfg(feature = "alloc")]
117 pub queue: Mutex<MessageQueue>,
118 pub active: AtomicBool,
120 pub message_count: AtomicU64,
122}
123
124impl Endpoint {
125 #[cfg(feature = "alloc")]
127 pub fn new(id: EndpointId, owner: ProcessId) -> Self {
128 Self {
129 id,
130 owner,
131 queue: Mutex::new(MessageQueue::new(1024)), active: AtomicBool::new(true),
133 message_count: AtomicU64::new(0),
134 }
135 }
136
137 pub fn close(&self) {
139 self.active.store(false, Ordering::Release);
140 }
141
142 pub fn is_active(&self) -> bool {
144 self.active.load(Ordering::Acquire)
145 }
146}
147
148#[cfg(feature = "alloc")]
150pub struct EndpointRegistry {
151 endpoints: Mutex<BTreeMap<EndpointId, Endpoint>>,
153}
154
155#[cfg(feature = "alloc")]
156impl EndpointRegistry {
157 pub const fn new() -> Self {
159 Self {
160 endpoints: Mutex::new(BTreeMap::new()),
161 }
162 }
163
164 pub fn register(&self, endpoint: Endpoint) -> Result<()> {
166 let mut endpoints = self.endpoints.lock();
167
168 if endpoints.contains_key(&endpoint.id) {
169 return Err(IpcError::EndpointBusy);
170 }
171
172 endpoints.insert(endpoint.id, endpoint);
173 Ok(())
174 }
175
176 pub fn unregister(&self, id: EndpointId) -> Result<()> {
178 let mut endpoints = self.endpoints.lock();
179
180 if let Some(endpoint) = endpoints.remove(&id) {
181 endpoint.close();
182
183 let queue = endpoint.queue.lock();
185 let waiting = queue.waiting_receivers.clone();
186 drop(queue);
187
188 for pid in waiting {
189 sched::wake_up_process(pid);
190 }
191
192 Ok(())
193 } else {
194 Err(IpcError::EndpointNotFound)
195 }
196 }
197
198 pub fn get(&self, id: EndpointId) -> Option<&'static Endpoint> {
200 let endpoints = self.endpoints.lock();
201
202 endpoints.get(&id).map(|ep| {
203 unsafe { &*(ep as *const Endpoint) }
208 })
209 }
210}
211
212#[cfg(feature = "alloc")]
213impl Default for EndpointRegistry {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219#[cfg(feature = "alloc")]
221pub(crate) static ENDPOINT_REGISTRY: EndpointRegistry = EndpointRegistry::new();
222
223#[cfg(feature = "alloc")]
225pub fn send_to_endpoint(msg: Message, endpoint_id: EndpointId) -> Result<()> {
226 let endpoint = ENDPOINT_REGISTRY
227 .get(endpoint_id)
228 .ok_or(IpcError::EndpointNotFound)?;
229
230 if !endpoint.is_active() {
231 return Err(IpcError::EndpointNotFound);
232 }
233
234 let mut queue = endpoint.queue.lock();
235
236 if let Some(receiver_pid) = queue.get_receiver() {
238 drop(queue);
240
241 if msg.capability() != 0 {
243 if let Some(sender) = crate::process::current_process() {
244 if let Some(real_sender) = crate::process::table::get_process(sender.pid) {
245 let sender_cap_space = real_sender.capability_space.lock();
246 if let Err(e) = crate::ipc::cap_transfer::transfer_capability(
248 &msg,
249 &sender_cap_space,
250 receiver_pid,
251 ) {
252 #[cfg(target_arch = "x86_64")]
254 println!("[IPC] Capability transfer failed: {:?}", e);
255 #[cfg(not(target_arch = "x86_64"))]
256 let _ = e;
257 }
258 }
259 }
260 }
261
262 deliver_to_process(msg, receiver_pid)?;
263 sched::wake_up_process(receiver_pid);
264 endpoint.message_count.fetch_add(1, Ordering::Relaxed);
265 Ok(())
266 } else {
267 match queue.enqueue(msg) {
269 Ok(()) => {
270 endpoint.message_count.fetch_add(1, Ordering::Relaxed);
271 Ok(())
273 }
274 Err(e) => Err(e),
275 }
276 }
277}
278
279#[cfg(feature = "alloc")]
281pub fn receive_from_endpoint(endpoint_id: EndpointId, blocking: bool) -> Result<Message> {
282 let endpoint = ENDPOINT_REGISTRY
283 .get(endpoint_id)
284 .ok_or(IpcError::EndpointNotFound)?;
285
286 if !endpoint.is_active() {
287 return Err(IpcError::EndpointNotFound);
288 }
289
290 let mut queue = endpoint.queue.lock();
291
292 if let Some(msg) = queue.dequeue() {
294 let woken_senders = queue.process_waiting_senders();
296 drop(queue);
297
298 for pid in woken_senders {
300 sched::wake_up_process(pid);
301 }
302
303 Ok(msg)
304 } else if blocking {
305 let current_pid = sched::current_process().pid;
307 queue.add_receiver(current_pid);
308 drop(queue);
309
310 sched::block_on_ipc(endpoint_id);
312
313 receive_from_endpoint(endpoint_id, false)
315 } else {
316 Err(IpcError::WouldBlock)
317 }
318}
319
320#[cfg(feature = "alloc")]
322fn deliver_to_process(msg: Message, pid: ProcessId) -> Result<()> {
323 static PROCESS_MESSAGES: Mutex<BTreeMap<ProcessId, Message>> = Mutex::new(BTreeMap::new());
325
326 let mut messages = PROCESS_MESSAGES.lock();
327 messages.insert(pid, msg);
328
329 Ok(())
330}
331
332#[cfg(feature = "alloc")]
334pub fn retrieve_delivered_message() -> Option<Message> {
335 static PROCESS_MESSAGES: Mutex<BTreeMap<ProcessId, Message>> = Mutex::new(BTreeMap::new());
336
337 let current_pid = sched::current_process().pid;
338 let mut messages = PROCESS_MESSAGES.lock();
339
340 if let Some(msg) = messages.remove(¤t_pid) {
341 if msg.capability() != 0 {
343 if let Some(current) = crate::process::current_process() {
344 if let Some(real_process) = crate::process::table::get_process(current.pid) {
345 let cap_space = real_process.capability_space.lock();
346 let cap_token = crate::cap::CapabilityToken::from_u64(msg.capability());
347
348 if cap_space.lookup(cap_token).is_none() {
350 println!(
351 "[IPC] Warning: Capability {} not found in receiver's space",
352 msg.capability()
353 );
354 }
356 }
357 }
358 }
359 Some(msg)
360 } else {
361 None
362 }
363}
364
365pub fn init() {
367 println!("[IPC] Message passing subsystem initialized");
368}