veridian_kernel/ipc/
channel.rs1#[cfg(feature = "alloc")]
9extern crate alloc;
10
11#[cfg(feature = "alloc")]
12use alloc::collections::VecDeque;
13#[cfg(feature = "alloc")]
14use alloc::vec::Vec;
15use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16
17use spin::Mutex;
18
19use super::{
20 error::{IpcError, Result},
21 Message, SmallMessage,
22};
23use crate::{process::ProcessId, raii::ChannelGuard};
24
25pub const MAX_CHANNEL_QUEUE_SIZE: usize = 1024;
27
28static ENDPOINT_COUNTER: AtomicU64 = AtomicU64::new(1);
30
31pub struct Endpoint {
33 id: u64,
35 pub owner: ProcessId,
37 bound_to: Mutex<Option<ProcessId>>,
39 #[cfg(feature = "alloc")]
41 receive_queue: Mutex<VecDeque<Message>>,
42 #[cfg(feature = "alloc")]
44 waiting_senders: Mutex<Vec<WaitingProcess>>,
45 #[cfg(feature = "alloc")]
47 waiting_receivers: Mutex<Vec<WaitingProcess>>,
48 active: AtomicBool,
50}
51
52struct WaitingProcess {
54 pid: ProcessId,
56 #[allow(dead_code)] message: Option<Message>,
59 #[allow(dead_code)] timeout: u64,
62}
63
64impl Endpoint {
65 pub fn new(owner: ProcessId) -> Self {
67 Self {
68 id: ENDPOINT_COUNTER.fetch_add(1, Ordering::Relaxed),
69 owner,
70 bound_to: Mutex::new(None),
71 #[cfg(feature = "alloc")]
72 receive_queue: Mutex::new(VecDeque::with_capacity(MAX_CHANNEL_QUEUE_SIZE)),
73 #[cfg(feature = "alloc")]
74 waiting_senders: Mutex::new(Vec::new()),
75 #[cfg(feature = "alloc")]
76 waiting_receivers: Mutex::new(Vec::new()),
77 active: AtomicBool::new(true),
78 }
79 }
80
81 pub fn new_with_guard(owner: ProcessId) -> (Self, ChannelGuard) {
83 let endpoint = Self::new(owner);
84 let guard = ChannelGuard::new(endpoint.id);
85 (endpoint, guard)
86 }
87
88 pub fn id(&self) -> u64 {
90 self.id
91 }
92
93 pub fn bind(&self, target: ProcessId) -> Result<()> {
95 let mut bound = self.bound_to.lock();
96 if bound.is_some() {
97 return Err(IpcError::EndpointBusy);
98 }
99 *bound = Some(target);
100 Ok(())
101 }
102
103 #[cfg(feature = "alloc")]
105 pub fn send_sync(&self, msg: Message, _sender: ProcessId) -> Result<()> {
106 if !self.active.load(Ordering::Acquire) {
107 return Err(IpcError::EndpointNotFound);
108 }
109
110 let mut receivers = self.waiting_receivers.lock();
112 if let Some(receiver) = receivers.pop() {
113 drop(receivers);
114
115 let mut queue = self.receive_queue.lock();
117 queue.push_back(msg);
118 drop(queue);
119
120 crate::sched::ipc_blocking::wake_up_process(receiver.pid);
122
123 Ok(())
124 } else {
125 drop(receivers);
126
127 let mut queue = self.receive_queue.lock();
129 if queue.len() >= MAX_CHANNEL_QUEUE_SIZE {
130 return Err(IpcError::ChannelFull);
131 }
132 queue.push_back(msg);
133 Ok(())
134 }
135 }
136
137 #[cfg(not(feature = "alloc"))]
138 pub fn send_sync(&self, _msg: Message, _sender: ProcessId) -> Result<()> {
139 if !self.active.load(Ordering::Acquire) {
140 return Err(IpcError::EndpointNotFound);
141 }
142 Err(IpcError::WouldBlock)
144 }
145
146 #[cfg(feature = "alloc")]
148 pub fn receive_sync(&self, receiver: ProcessId) -> Result<Message> {
149 if !self.active.load(Ordering::Acquire) {
150 return Err(IpcError::EndpointNotFound);
151 }
152
153 let mut queue = self.receive_queue.lock();
155 if let Some(msg) = queue.pop_front() {
156 return Ok(msg);
157 }
158 drop(queue);
159
160 let mut receivers = self.waiting_receivers.lock();
162 receivers.push(WaitingProcess {
163 pid: receiver,
164 message: None,
165 timeout: 0,
166 });
167 drop(receivers);
168
169 crate::sched::ipc_blocking::block_on_ipc(self.id);
173
174 let mut queue = self.receive_queue.lock();
176 if let Some(msg) = queue.pop_front() {
177 Ok(msg)
178 } else {
179 Err(IpcError::WouldBlock)
181 }
182 }
183
184 #[cfg(not(feature = "alloc"))]
185 pub fn receive_sync(&self, _receiver: ProcessId) -> Result<Message> {
186 if !self.active.load(Ordering::Acquire) {
187 return Err(IpcError::EndpointNotFound);
188 }
189 Err(IpcError::WouldBlock)
191 }
192
193 #[cfg(feature = "alloc")]
195 pub fn send_async(&self, msg: Message) -> Result<()> {
196 if !self.active.load(Ordering::Acquire) {
197 return Err(IpcError::EndpointNotFound);
198 }
199
200 let mut queue = self.receive_queue.lock();
201 if queue.len() >= MAX_CHANNEL_QUEUE_SIZE {
202 return Err(IpcError::ChannelFull);
203 }
204 queue.push_back(msg);
205 drop(queue);
206
207 let mut receivers = self.waiting_receivers.lock();
209 if let Some(receiver) = receivers.pop() {
210 drop(receivers);
211 crate::sched::ipc_blocking::wake_up_process(receiver.pid);
212 }
213
214 Ok(())
215 }
216
217 #[cfg(not(feature = "alloc"))]
218 pub fn send_async(&self, _msg: Message) -> Result<()> {
219 if !self.active.load(Ordering::Acquire) {
220 return Err(IpcError::EndpointNotFound);
221 }
222 Err(IpcError::WouldBlock)
224 }
225
226 #[cfg(feature = "alloc")]
228 pub fn try_receive(&self) -> Result<Message> {
229 if !self.active.load(Ordering::Acquire) {
230 return Err(IpcError::EndpointNotFound);
231 }
232
233 let mut queue = self.receive_queue.lock();
234 queue.pop_front().ok_or(IpcError::ChannelEmpty)
235 }
236
237 #[cfg(not(feature = "alloc"))]
238 pub fn try_receive(&self) -> Result<Message> {
239 if !self.active.load(Ordering::Acquire) {
240 return Err(IpcError::EndpointNotFound);
241 }
242 Err(IpcError::ChannelEmpty)
244 }
245
246 #[cfg(feature = "alloc")]
248 pub fn close(&self) {
249 self.active.store(false, Ordering::Release);
250
251 let receivers: Vec<WaitingProcess> = {
253 let mut r = self.waiting_receivers.lock();
254 r.drain(..).collect()
255 };
256 let senders: Vec<WaitingProcess> = {
257 let mut s = self.waiting_senders.lock();
258 s.drain(..).collect()
259 };
260
261 for waiter in receivers.iter().chain(senders.iter()) {
262 crate::sched::ipc_blocking::wake_up_process(waiter.pid);
263 }
264
265 self.receive_queue.lock().clear();
267
268 crate::sched::ipc_blocking::wake_up_endpoint_waiters(self.id);
270 }
271
272 #[cfg(not(feature = "alloc"))]
274 pub fn close(&self) {
275 self.active.store(false, Ordering::Release);
276 }
277}
278
279pub struct Channel {
281 send_endpoint: Endpoint,
283 receive_endpoint: Endpoint,
285 #[allow(dead_code)] capacity: usize,
288}
289
290impl Channel {
291 pub fn new(owner: ProcessId, capacity: usize) -> Self {
293 Self {
294 send_endpoint: Endpoint::new(owner),
295 receive_endpoint: Endpoint::new(owner),
296 capacity: capacity.min(MAX_CHANNEL_QUEUE_SIZE),
297 }
298 }
299
300 pub fn send_id(&self) -> u64 {
302 self.send_endpoint.id()
303 }
304
305 pub fn receive_id(&self) -> u64 {
307 self.receive_endpoint.id()
308 }
309
310 pub fn send(&self, msg: Message) -> Result<()> {
312 self.receive_endpoint.send_async(msg)
313 }
314
315 pub fn receive(&self) -> Result<Message> {
317 self.receive_endpoint.try_receive()
318 }
319
320 pub fn close(self) {
322 self.send_endpoint.close();
323 self.receive_endpoint.close();
324 }
325}
326
327#[inline(always)]
332pub fn fast_ipc_send(msg: &SmallMessage, target: ProcessId) -> Result<()> {
333 if msg.capability == 0 {
338 return Err(IpcError::InvalidCapability);
339 }
340
341 crate::sched::ipc_blocking::wake_up_process(target);
344
345 Ok(())
346}
347
348#[cfg(feature = "alloc")]
354pub fn call_reply(request: Message, target: ProcessId) -> Result<Message> {
355 let caller = crate::sched::current_process_id();
357 let reply_endpoint = Endpoint::new(caller);
358 let reply_id = reply_endpoint.id();
359
360 let mut req = request;
362 req.set_flags(reply_id as u32);
364
365 crate::sched::ipc_blocking::wake_up_process(target);
367
368 crate::sched::ipc_blocking::block_on_ipc(reply_id);
370
371 let mut queue = reply_endpoint.receive_queue.lock();
373 if let Some(reply) = queue.pop_front() {
374 Ok(reply)
375 } else {
376 Err(IpcError::WouldBlock)
377 }
378}
379
380#[cfg(not(feature = "alloc"))]
382pub fn call_reply(_request: Message, _target: ProcessId) -> Result<Message> {
383 Err(IpcError::WouldBlock)
384}
385
386#[cfg(all(test, not(target_os = "none")))]
387mod tests {
388 use super::*;
389 use crate::process::ProcessId;
390
391 #[test]
392 fn test_endpoint_creation() {
393 let endpoint = Endpoint::new(ProcessId(1));
394 assert_eq!(endpoint.owner, ProcessId(1));
395 assert!(endpoint.active.load(Ordering::Relaxed));
396 }
397
398 #[test]
399 fn test_channel_creation() {
400 let channel = Channel::new(ProcessId(1), 100);
401 assert_ne!(channel.send_id(), channel.receive_id());
402 }
403
404 #[test]
405 fn test_async_send_receive() {
406 let endpoint = Endpoint::new(ProcessId(1));
407 let msg = Message::small(0x1234, 42);
408
409 assert!(endpoint.send_async(msg).is_ok());
410
411 let received = endpoint.try_receive();
412 assert!(received.is_ok());
413 assert_eq!(received.unwrap().capability(), 0x1234);
414 }
415}