1use core::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
7
8use crate::error::KernelError;
9
10#[cfg(feature = "alloc")]
11extern crate alloc;
12
13#[cfg(feature = "alloc")]
14use alloc::{collections::VecDeque, vec::Vec};
15
16use spin::Mutex as SpinMutex;
17
18use super::{ProcessId, ThreadId};
19
20#[cfg(feature = "alloc")]
22pub struct WaitQueue {
23 waiters: SpinMutex<VecDeque<(ProcessId, ThreadId)>>,
25}
26
27#[cfg(feature = "alloc")]
28impl Default for WaitQueue {
29 fn default() -> Self {
30 Self {
31 waiters: SpinMutex::new(VecDeque::new()),
32 }
33 }
34}
35
36#[cfg(feature = "alloc")]
37impl WaitQueue {
38 pub const fn new() -> Self {
40 Self {
41 waiters: SpinMutex::new(VecDeque::new()),
42 }
43 }
44
45 pub fn wait(&self) {
47 if let (Some(process), Some(thread)) = (super::current_process(), super::current_thread()) {
48 self.waiters.lock().push_back((process.pid, thread.tid));
49
50 thread.set_state(super::thread::ThreadState::Blocked);
52
53 crate::sched::yield_cpu();
55 }
56 }
57
58 pub fn wake_one(&self) -> bool {
60 if let Some((pid, tid)) = self.waiters.lock().pop_front() {
61 if let Some(process) = super::table::get_process(pid) {
63 if let Some(thread) = process.get_thread(tid) {
64 thread.set_state(super::thread::ThreadState::Ready);
65 #[cfg(feature = "alloc")]
67 {
68 let _ = crate::sched::schedule_thread(pid, tid, thread);
69 }
70 return true;
71 }
72 }
73 }
74 false
75 }
76
77 pub fn wake_all(&self) -> usize {
79 let mut count = 0;
80 let waiters = self.waiters.lock().drain(..).collect::<Vec<_>>();
81
82 for (pid, tid) in waiters {
83 if let Some(process) = super::table::get_process(pid) {
84 if let Some(thread) = process.get_thread(tid) {
85 thread.set_state(super::thread::ThreadState::Ready);
86 #[cfg(feature = "alloc")]
88 {
89 let _ = crate::sched::schedule_thread(pid, tid, thread);
90 }
91 count += 1;
92 }
93 }
94 }
95
96 count
97 }
98
99 pub fn is_empty(&self) -> bool {
101 self.waiters.lock().is_empty()
102 }
103}
104
105pub struct Mutex {
107 locked: AtomicBool,
109 owner: AtomicU64,
111 #[cfg(feature = "alloc")]
113 waiters: WaitQueue,
114}
115
116impl Default for Mutex {
117 fn default() -> Self {
118 Self {
119 locked: AtomicBool::new(false),
120 owner: AtomicU64::new(0),
121 #[cfg(feature = "alloc")]
122 waiters: WaitQueue::new(),
123 }
124 }
125}
126
127impl Mutex {
128 pub const fn new() -> Self {
130 Self {
131 locked: AtomicBool::new(false),
132 owner: AtomicU64::new(0),
133 #[cfg(feature = "alloc")]
134 waiters: WaitQueue::new(),
135 }
136 }
137
138 pub fn try_lock(&self) -> bool {
140 if self
141 .locked
142 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
143 .is_ok()
144 {
145 if let Some(thread) = super::current_thread() {
146 self.owner.store(thread.tid.0, Ordering::Relaxed);
147 }
148 true
149 } else {
150 false
151 }
152 }
153
154 pub fn lock(&self) {
156 while !self.try_lock() {
157 #[cfg(feature = "alloc")]
158 {
159 self.waiters.wait();
161 }
162
163 #[cfg(not(feature = "alloc"))]
164 {
165 crate::sched::yield_cpu();
167 }
168 }
169 }
170
171 pub fn unlock(&self) -> Result<(), KernelError> {
177 if let Some(thread) = super::current_thread() {
179 if self.owner.load(Ordering::Relaxed) != thread.tid.0 {
180 return Err(KernelError::PermissionDenied {
181 operation: "mutex_unlock",
182 });
183 }
184 }
185
186 self.owner.store(0, Ordering::Relaxed);
187 self.locked.store(false, Ordering::Release);
188
189 #[cfg(feature = "alloc")]
191 self.waiters.wake_one();
192
193 Ok(())
194 }
195
196 pub fn is_locked(&self) -> bool {
198 self.locked.load(Ordering::Relaxed)
199 }
200}
201
202pub struct Semaphore {
204 count: AtomicU32,
206 max_count: u32,
208 #[cfg(feature = "alloc")]
210 waiters: WaitQueue,
211}
212
213impl Semaphore {
214 pub const fn new(initial: u32, max: u32) -> Self {
216 Self {
217 count: AtomicU32::new(initial),
218 max_count: max,
219 #[cfg(feature = "alloc")]
220 waiters: WaitQueue::new(),
221 }
222 }
223
224 pub fn wait(&self) {
226 loop {
227 let count = self.count.load(Ordering::Relaxed);
228 if count > 0 {
229 if self
230 .count
231 .compare_exchange(count, count - 1, Ordering::Acquire, Ordering::Relaxed)
232 .is_ok()
233 {
234 return;
235 }
236 } else {
237 #[cfg(feature = "alloc")]
238 {
239 self.waiters.wait();
241 }
242
243 #[cfg(not(feature = "alloc"))]
244 {
245 crate::sched::yield_cpu();
247 }
248 }
249 }
250 }
251
252 pub fn try_wait(&self) -> bool {
254 loop {
255 let count = self.count.load(Ordering::Relaxed);
256 if count > 0 {
257 if self
258 .count
259 .compare_exchange(count, count - 1, Ordering::Acquire, Ordering::Relaxed)
260 .is_ok()
261 {
262 return true;
263 }
264 } else {
265 return false;
266 }
267 }
268 }
269
270 pub fn signal(&self) -> Result<(), KernelError> {
276 loop {
277 let count = self.count.load(Ordering::Relaxed);
278 if count >= self.max_count {
279 return Err(KernelError::InvalidState {
280 expected: "count < max_count",
281 actual: "semaphore overflow",
282 });
283 }
284
285 if self
286 .count
287 .compare_exchange(count, count + 1, Ordering::Release, Ordering::Relaxed)
288 .is_ok()
289 {
290 #[cfg(feature = "alloc")]
292 self.waiters.wake_one();
293
294 return Ok(());
295 }
296 }
297 }
298
299 pub fn count(&self) -> u32 {
301 self.count.load(Ordering::Relaxed)
302 }
303}
304
305#[cfg(feature = "alloc")]
307pub struct CondVar {
308 waiters: WaitQueue,
310}
311
312#[cfg(feature = "alloc")]
313impl Default for CondVar {
314 fn default() -> Self {
315 Self {
316 waiters: WaitQueue::new(),
317 }
318 }
319}
320
321#[cfg(feature = "alloc")]
322impl CondVar {
323 pub const fn new() -> Self {
325 Self {
326 waiters: WaitQueue::new(),
327 }
328 }
329
330 pub fn wait(&self, mutex: &Mutex) -> Result<(), KernelError> {
336 if !mutex.is_locked() {
338 return Err(KernelError::InvalidState {
339 expected: "mutex locked",
340 actual: "mutex unlocked",
341 });
342 }
343
344 self.waiters.wait();
346
347 let _ = mutex.unlock();
352
353 mutex.lock();
355
356 Ok(())
357 }
358
359 pub fn signal(&self) {
361 self.waiters.wake_one();
362 }
363
364 pub fn broadcast(&self) {
366 self.waiters.wake_all();
367 }
368}
369
370pub struct RwLock {
372 state: AtomicUsize,
374 #[cfg(feature = "alloc")]
376 read_waiters: WaitQueue,
377 #[cfg(feature = "alloc")]
378 write_waiters: WaitQueue,
379}
380
381impl Default for RwLock {
382 fn default() -> Self {
383 Self {
384 state: AtomicUsize::new(0),
385 #[cfg(feature = "alloc")]
386 read_waiters: WaitQueue::new(),
387 #[cfg(feature = "alloc")]
388 write_waiters: WaitQueue::new(),
389 }
390 }
391}
392
393impl RwLock {
394 pub const fn new() -> Self {
396 Self {
397 state: AtomicUsize::new(0),
398 #[cfg(feature = "alloc")]
399 read_waiters: WaitQueue::new(),
400 #[cfg(feature = "alloc")]
401 write_waiters: WaitQueue::new(),
402 }
403 }
404
405 pub fn read_lock(&self) {
407 loop {
408 let state = self.state.load(Ordering::Relaxed);
409
410 if state == usize::MAX {
412 #[cfg(feature = "alloc")]
413 self.read_waiters.wait();
414
415 #[cfg(not(feature = "alloc"))]
416 crate::sched::yield_cpu();
417
418 continue;
419 }
420
421 if self
423 .state
424 .compare_exchange(state, state + 1, Ordering::Acquire, Ordering::Relaxed)
425 .is_ok()
426 {
427 return;
428 }
429 }
430 }
431
432 pub fn try_read_lock(&self) -> bool {
434 let state = self.state.load(Ordering::Relaxed);
435
436 if state != usize::MAX {
437 self.state
438 .compare_exchange(state, state + 1, Ordering::Acquire, Ordering::Relaxed)
439 .is_ok()
440 } else {
441 false
442 }
443 }
444
445 pub fn read_unlock(&self) {
447 let prev = self.state.fetch_sub(1, Ordering::Release);
448
449 #[cfg(feature = "alloc")]
451 if prev == 1 {
452 self.write_waiters.wake_one();
453 }
454 }
455
456 pub fn write_lock(&self) {
458 loop {
459 if self
460 .state
461 .compare_exchange(0, usize::MAX, Ordering::Acquire, Ordering::Relaxed)
462 .is_ok()
463 {
464 return;
465 }
466
467 #[cfg(feature = "alloc")]
468 self.write_waiters.wait();
469
470 #[cfg(not(feature = "alloc"))]
471 crate::sched::yield_cpu();
472 }
473 }
474
475 pub fn try_write_lock(&self) -> bool {
477 self.state
478 .compare_exchange(0, usize::MAX, Ordering::Acquire, Ordering::Relaxed)
479 .is_ok()
480 }
481
482 pub fn write_unlock(&self) {
484 self.state.store(0, Ordering::Release);
485
486 #[cfg(feature = "alloc")]
488 {
489 self.read_waiters.wake_all();
490 self.write_waiters.wake_one();
491 }
492 }
493}
494
495pub struct Barrier {
497 threshold: usize,
499 count: AtomicUsize,
501 generation: AtomicUsize,
503 #[cfg(feature = "alloc")]
505 waiters: WaitQueue,
506}
507
508impl Barrier {
509 pub const fn new(n: usize) -> Self {
511 Self {
512 threshold: n,
513 count: AtomicUsize::new(0),
514 generation: AtomicUsize::new(0),
515 #[cfg(feature = "alloc")]
516 waiters: WaitQueue::new(),
517 }
518 }
519
520 pub fn wait(&self) {
522 let gen = self.generation.load(Ordering::Relaxed);
523 let count = self.count.fetch_add(1, Ordering::Relaxed) + 1;
524
525 if count == self.threshold {
526 self.count.store(0, Ordering::Relaxed);
528 self.generation.fetch_add(1, Ordering::Relaxed);
529
530 #[cfg(feature = "alloc")]
531 self.waiters.wake_all();
532 } else {
533 while self.generation.load(Ordering::Relaxed) == gen {
535 #[cfg(feature = "alloc")]
536 self.waiters.wait();
537
538 #[cfg(not(feature = "alloc"))]
539 crate::sched::yield_cpu();
540 }
541 }
542 }
543}
544
545pub struct PiMutex {
551 owner: AtomicU64,
553 original_priority: SpinMutex<Option<crate::sched::task::Priority>>,
555 #[cfg(feature = "alloc")]
557 waiters: WaitQueue,
558}
559
560impl Default for PiMutex {
561 fn default() -> Self {
562 Self::new()
563 }
564}
565
566impl PiMutex {
567 pub const fn new() -> Self {
569 Self {
570 owner: AtomicU64::new(0),
571 original_priority: SpinMutex::new(None),
572 #[cfg(feature = "alloc")]
573 waiters: WaitQueue::new(),
574 }
575 }
576
577 pub fn try_lock(&self) -> bool {
579 let pid = current_pid();
580 self.owner
581 .compare_exchange(0, pid, Ordering::Acquire, Ordering::Relaxed)
582 .is_ok()
583 }
584
585 pub fn lock(&self) {
590 if self.try_lock() {
591 if let Some(task) = get_current_task_ptr() {
593 let mut orig = self.original_priority.lock();
596 unsafe {
597 *orig = Some((*task).priority);
598 }
599 }
600 return;
601 }
602
603 self.boost_owner_if_needed();
605
606 loop {
608 #[cfg(feature = "alloc")]
609 self.waiters.wait();
610
611 #[cfg(not(feature = "alloc"))]
612 crate::sched::yield_cpu();
613
614 if self.try_lock() {
615 if let Some(task) = get_current_task_ptr() {
616 let mut orig = self.original_priority.lock();
617 unsafe {
618 *orig = Some((*task).priority);
619 }
620 }
621 return;
622 }
623 }
624 }
625
626 pub fn unlock(&self) -> Result<(), KernelError> {
630 let pid = current_pid();
631 if self.owner.load(Ordering::Relaxed) != pid {
632 return Err(KernelError::PermissionDenied {
633 operation: "pi_mutex_unlock",
634 });
635 }
636
637 if let Some(task) = get_current_task_ptr() {
639 let mut orig = self.original_priority.lock();
640 if let Some(original) = orig.take() {
641 unsafe {
643 (*task).priority = original;
644 (*task).priority_boost = None;
645 }
646 }
647 }
648
649 self.owner.store(0, Ordering::Release);
651
652 #[cfg(feature = "alloc")]
654 self.waiters.wake_one();
655
656 Ok(())
657 }
658
659 fn boost_owner_if_needed(&self) {
661 let owner_pid = self.owner.load(Ordering::Relaxed);
662 if owner_pid == 0 {
663 return;
664 }
665
666 let my_priority = if let Some(task) = get_current_task_ptr() {
668 unsafe { (*task).priority }
669 } else {
670 return;
671 };
672
673 if let Some(owner) = crate::sched::find_process(crate::process::ProcessId(owner_pid)) {
675 let _ = (owner, my_priority);
680 }
681 }
682
683 pub fn is_locked(&self) -> bool {
685 self.owner.load(Ordering::Relaxed) != 0
686 }
687}
688
689fn current_pid() -> u64 {
691 crate::sched::current_process_id().0
692}
693
694fn get_current_task_ptr() -> Option<*mut crate::sched::task::Task> {
696 let sched = crate::sched::scheduler::SCHEDULER.lock();
697 sched.current().map(|ptr| ptr.as_ptr())
698}