1#[cfg(feature = "alloc")]
9extern crate alloc;
10
11use core::{
12 ptr,
13 sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
14};
15
16use super::{
17 capability::ProcessId,
18 error::{IpcError, Result},
19 message::Message,
20};
21use crate::arch::entropy::read_timestamp;
22
23pub const ASYNC_CHANNEL_SIZE: usize = 256;
25
26pub struct AsyncChannel {
28 id: u64,
30 #[allow(dead_code)] owner: ProcessId,
33 buffer: RingBuffer<Message>,
35 #[cfg(feature = "alloc")]
37 subscribers: spin::Mutex<alloc::vec::Vec<ProcessId>>,
38 stats: ChannelStats,
40 active: AtomicBool,
42}
43
44struct ChannelStats {
46 messages_sent: AtomicU64,
47 messages_received: AtomicU64,
48 messages_dropped: AtomicU64,
49 max_queue_depth: AtomicUsize,
50}
51
52impl AsyncChannel {
53 pub fn new(id: u64, owner: ProcessId, capacity: usize) -> Self {
55 Self {
56 id,
57 owner,
58 buffer: RingBuffer::new(capacity),
59 #[cfg(feature = "alloc")]
60 subscribers: spin::Mutex::new(alloc::vec::Vec::new()),
61 stats: ChannelStats {
62 messages_sent: AtomicU64::new(0),
63 messages_received: AtomicU64::new(0),
64 messages_dropped: AtomicU64::new(0),
65 max_queue_depth: AtomicUsize::new(0),
66 },
67 active: AtomicBool::new(true),
68 }
69 }
70
71 pub fn send_async(&self, msg: Message) -> Result<()> {
73 if !self.active.load(Ordering::Acquire) {
74 return Err(IpcError::EndpointNotFound);
75 }
76
77 let cap_id = msg.capability();
79 if cap_id != 0 {
80 if let Some(current_process) = crate::process::current_process() {
82 if let Some(real_process) = crate::process::table::get_process(current_process.pid)
83 {
84 let cap_space = real_process.capability_space.lock();
85 let cap_token = crate::cap::CapabilityToken::from_u64(cap_id);
86
87 crate::cap::ipc_integration::check_send_permission(cap_token, &cap_space)?;
89 }
90 }
91 }
92
93 match self.buffer.push(msg) {
95 Ok(()) => {
96 self.stats.messages_sent.fetch_add(1, Ordering::Relaxed);
97
98 let current_size = self.buffer.size();
100 let mut max_depth = self.stats.max_queue_depth.load(Ordering::Relaxed);
101 while current_size > max_depth {
102 match self.stats.max_queue_depth.compare_exchange_weak(
103 max_depth,
104 current_size,
105 Ordering::Relaxed,
106 Ordering::Relaxed,
107 ) {
108 Ok(_) => break,
109 Err(old) => max_depth = old,
110 }
111 }
112
113 #[cfg(feature = "alloc")]
115 {
116 let subscribers = self.subscribers.lock();
117 for &pid in subscribers.iter() {
118 wake_process(pid);
119 }
120 }
121
122 crate::sched::ipc_blocking::wake_up_endpoint_waiters(self.id);
124
125 Ok(())
126 }
127 Err(_) => {
128 self.stats.messages_dropped.fetch_add(1, Ordering::Relaxed);
129 Err(IpcError::ChannelFull)
130 }
131 }
132 }
133
134 pub fn receive_async(&self) -> Result<Message> {
136 if !self.active.load(Ordering::Acquire) {
137 return Err(IpcError::EndpointNotFound);
138 }
139
140 match self.buffer.pop() {
145 Some(msg) => {
146 self.stats.messages_received.fetch_add(1, Ordering::Relaxed);
147 Ok(msg)
148 }
149 None => Err(IpcError::ChannelEmpty),
150 }
151 }
152
153 pub fn poll(&self, timeout_ns: u64) -> Result<Option<Message>> {
155 let start = read_timestamp();
156
157 loop {
158 match self.receive_async() {
160 Ok(msg) => return Ok(Some(msg)),
161 Err(IpcError::ChannelEmpty) => {
162 if timeout_ns > 0 {
164 let elapsed = timestamp_to_ns(read_timestamp() - start);
165 if elapsed >= timeout_ns {
166 return Ok(None);
167 }
168 }
169
170 core::hint::spin_loop();
172 }
173 Err(e) => return Err(e),
174 }
175 }
176 }
177
178 #[cfg(feature = "alloc")]
180 pub fn subscribe(&self, pid: ProcessId) -> Result<()> {
181 if !self.active.load(Ordering::Acquire) {
182 return Err(IpcError::EndpointNotFound);
183 }
184
185 let mut subscribers = self.subscribers.lock();
186 if !subscribers.contains(&pid) {
187 subscribers.push(pid);
188 }
189 Ok(())
190 }
191
192 #[cfg(not(feature = "alloc"))]
193 pub fn subscribe(&self, _pid: ProcessId) -> Result<()> {
194 Ok(())
195 }
196
197 pub fn get_stats(&self) -> AsyncChannelStats {
199 AsyncChannelStats {
200 messages_sent: self.stats.messages_sent.load(Ordering::Relaxed),
201 messages_received: self.stats.messages_received.load(Ordering::Relaxed),
202 messages_dropped: self.stats.messages_dropped.load(Ordering::Relaxed),
203 max_queue_depth: self.stats.max_queue_depth.load(Ordering::Relaxed),
204 current_size: self.buffer.size(),
205 capacity: self.buffer.capacity(),
206 }
207 }
208
209 pub fn close(&self) {
211 self.active.store(false, Ordering::Release);
212
213 #[cfg(feature = "alloc")]
215 {
216 let subscribers = self.subscribers.lock();
217 for &pid in subscribers.iter() {
218 wake_process(pid);
219 }
220 }
221 }
222}
223
224struct RingBuffer<T> {
226 buffer: *mut T,
228 capacity: usize,
230 write_pos: AtomicUsize,
232 read_pos: AtomicUsize,
234 size: AtomicUsize,
236}
237
238impl<T> RingBuffer<T> {
239 fn new(capacity: usize) -> Self {
241 let layout =
242 core::alloc::Layout::array::<T>(capacity).expect("ring buffer capacity overflow");
243 let buffer = unsafe { alloc::alloc::alloc(layout) as *mut T };
251
252 Self {
253 buffer,
254 capacity,
255 write_pos: AtomicUsize::new(0),
256 read_pos: AtomicUsize::new(0),
257 size: AtomicUsize::new(0),
258 }
259 }
260
261 fn push(&self, item: T) -> core::result::Result<(), T> {
263 let current_size = self.size.load(Ordering::Acquire);
264 if current_size >= self.capacity {
265 return Err(item);
266 }
267
268 let write_pos = self.write_pos.fetch_add(1, Ordering::Relaxed) % self.capacity;
270
271 unsafe {
279 ptr::write(self.buffer.add(write_pos), item);
280 }
281
282 self.size.fetch_add(1, Ordering::Release);
284
285 Ok(())
286 }
287
288 fn pop(&self) -> Option<T> {
290 loop {
291 let current_size = self.size.load(Ordering::Acquire);
292 if current_size == 0 {
293 return None;
294 }
295
296 match self.size.compare_exchange_weak(
298 current_size,
299 current_size - 1,
300 Ordering::Acquire,
301 Ordering::Relaxed,
302 ) {
303 Ok(_) => {
304 let read_pos = self.read_pos.fetch_add(1, Ordering::Relaxed) % self.capacity;
306
307 let item = unsafe { ptr::read(self.buffer.add(read_pos)) };
314
315 return Some(item);
316 }
317 Err(_) => {
318 core::hint::spin_loop();
320 }
321 }
322 }
323 }
324
325 fn size(&self) -> usize {
327 self.size.load(Ordering::Relaxed)
328 }
329
330 fn capacity(&self) -> usize {
332 self.capacity
333 }
334}
335
336impl<T> Drop for RingBuffer<T> {
337 fn drop(&mut self) {
338 while self.pop().is_some() {}
340
341 let layout = core::alloc::Layout::array::<T>(self.capacity)
343 .expect("ring buffer layout error in drop");
344 unsafe {
350 alloc::alloc::dealloc(self.buffer as *mut u8, layout);
351 }
352 }
353}
354
355unsafe impl<T: Send> Send for RingBuffer<T> {}
360unsafe impl<T: Send> Sync for RingBuffer<T> {}
369
370pub struct AsyncChannelStats {
372 pub messages_sent: u64,
373 pub messages_received: u64,
374 pub messages_dropped: u64,
375 pub max_queue_depth: usize,
376 pub current_size: usize,
377 pub capacity: usize,
378}
379
380pub struct MessageBatch {
382 messages: [Option<Message>; 16],
383 count: usize,
384}
385
386impl MessageBatch {
387 pub fn new() -> Self {
389 Self {
390 messages: [None; 16],
391 count: 0,
392 }
393 }
394
395 pub fn add(&mut self, msg: Message) -> bool {
397 if self.count < 16 {
398 self.messages[self.count] = Some(msg);
399 self.count += 1;
400 true
401 } else {
402 false
403 }
404 }
405
406 pub fn process<F>(self, mut f: F)
408 where
409 F: FnMut(Message),
410 {
411 for i in 0..self.count {
412 if let Some(msg) = self.messages[i] {
413 f(msg);
414 }
415 }
416 }
417}
418
419impl Default for MessageBatch {
420 fn default() -> Self {
421 Self::new()
422 }
423}
424
425fn wake_process(pid: ProcessId) {
427 crate::sched::ipc_blocking::wake_up_process(pid);
428}
429
430fn timestamp_to_ns(cycles: u64) -> u64 {
431 cycles / 2
433}
434
435#[cfg(all(test, not(target_os = "none")))]
436mod tests {
437 use super::*;
438 use crate::process::ProcessId;
439
440 #[test]
441 fn test_ring_buffer() {
442 let buffer = RingBuffer::<u64>::new(4);
443
444 assert!(buffer.push(1).is_ok());
446 assert!(buffer.push(2).is_ok());
447 assert_eq!(buffer.pop(), Some(1));
448 assert_eq!(buffer.pop(), Some(2));
449 assert_eq!(buffer.pop(), None);
450 }
451
452 #[test]
453 fn test_async_channel() {
454 let channel = AsyncChannel::new(1, ProcessId(1), 10);
455 let msg = Message::small(0x1234, 42);
456
457 assert!(channel.send_async(msg).is_ok());
459 let received = channel.receive_async();
460 assert!(received.is_ok());
461 assert_eq!(received.unwrap().capability(), 0x1234);
462 }
463
464 #[test]
465 fn test_channel_full() {
466 let channel = AsyncChannel::new(1, ProcessId(1), 2);
467 let msg = Message::small(0x1234, 42);
468
469 assert!(channel.send_async(msg).is_ok());
471 assert!(channel.send_async(msg).is_ok());
472
473 assert_eq!(channel.send_async(msg), Err(IpcError::ChannelFull));
475 }
476}