1#![allow(dead_code)]
8
9use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
10
11use spin::Mutex;
12
13use crate::error::KernelError;
14
15const PIPE_CAPACITY: usize = 64 * 1024;
17
18struct PipeInner {
20 buffer: VecDeque<u8>,
22 capacity: usize,
24 write_closed: bool,
26 read_closed: bool,
28}
29
30impl PipeInner {
31 fn new(capacity: usize) -> Self {
32 Self {
33 buffer: VecDeque::with_capacity(capacity),
34 capacity,
35 write_closed: false,
36 read_closed: false,
37 }
38 }
39}
40
41type PipeState = Arc<Mutex<PipeInner>>;
43
44pub struct PipeReader {
46 inner: PipeState,
47}
48
49pub struct PipeWriter {
51 inner: PipeState,
52}
53
54pub fn create_pipe() -> Result<(PipeReader, PipeWriter), KernelError> {
56 create_pipe_with_capacity(PIPE_CAPACITY)
57}
58
59pub fn create_pipe_with_capacity(capacity: usize) -> Result<(PipeReader, PipeWriter), KernelError> {
61 let inner = Arc::new(Mutex::new(PipeInner::new(capacity)));
62 Ok((
63 PipeReader {
64 inner: inner.clone(),
65 },
66 PipeWriter { inner },
67 ))
68}
69
70impl PipeReader {
71 pub fn read(&self, buf: &mut [u8]) -> Result<usize, KernelError> {
77 loop {
78 {
79 let mut pipe = self.inner.lock();
80 if !pipe.buffer.is_empty() {
81 let to_read = buf.len().min(pipe.buffer.len());
82 for byte in buf.iter_mut().take(to_read) {
83 *byte = pipe.buffer.pop_front().unwrap_or(0);
84 }
85 return Ok(to_read);
86 }
87 if pipe.write_closed {
88 return Ok(0); }
90 if pipe.read_closed {
91 return Ok(0);
92 }
93 }
94 core::hint::spin_loop();
96 }
97 }
98
99 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, KernelError> {
101 let mut pipe = self.inner.lock();
102 if pipe.buffer.is_empty() {
103 if pipe.write_closed {
104 return Ok(0); }
106 return Err(KernelError::WouldBlock);
107 }
108 let to_read = buf.len().min(pipe.buffer.len());
109 for byte in buf.iter_mut().take(to_read) {
110 *byte = pipe.buffer.pop_front().unwrap_or(0);
111 }
112 Ok(to_read)
113 }
114
115 pub fn close(&self) {
117 self.inner.lock().read_closed = true;
118 }
119
120 pub fn has_data(&self) -> bool {
122 !self.inner.lock().buffer.is_empty()
123 }
124}
125
126impl Drop for PipeReader {
127 fn drop(&mut self) {
128 self.close();
129 }
130}
131
132impl PipeWriter {
133 pub fn write(&self, data: &[u8]) -> Result<usize, KernelError> {
138 let mut pipe = self.inner.lock();
139 if pipe.read_closed {
140 return Err(KernelError::BrokenPipe);
141 }
142 if pipe.write_closed {
143 return Err(KernelError::BrokenPipe);
144 }
145 let available = pipe.capacity.saturating_sub(pipe.buffer.len());
146 let to_write = data.len().min(available);
147 for &byte in &data[..to_write] {
148 pipe.buffer.push_back(byte);
149 }
150 Ok(to_write)
151 }
152
153 pub fn write_all(&self, data: &[u8]) -> Result<(), KernelError> {
155 let mut offset = 0;
156 while offset < data.len() {
157 let written = self.write(&data[offset..])?;
158 if written == 0 {
159 core::hint::spin_loop();
160 }
161 offset += written;
162 }
163 Ok(())
164 }
165
166 pub fn close(&self) {
168 self.inner.lock().write_closed = true;
169 }
170}
171
172impl Drop for PipeWriter {
173 fn drop(&mut self) {
174 self.close();
175 }
176}
177
178pub fn drain_pipe(reader: &PipeReader) -> Vec<u8> {
183 let mut result = Vec::new();
184 let mut buf = [0u8; 4096];
185 loop {
186 match reader.try_read(&mut buf) {
187 Ok(0) => break,
188 Ok(n) => result.extend_from_slice(&buf[..n]),
189 Err(_) => break,
190 }
191 }
192 result
193}
194
195use super::{DirEntry, Metadata, NodeType, Permissions, VfsNode};
200
201pub struct PipeReadNode {
203 reader: Arc<Mutex<PipeReader>>,
204}
205
206impl PipeReadNode {
207 pub fn new(reader: PipeReader) -> Self {
209 Self {
210 reader: Arc::new(Mutex::new(reader)),
211 }
212 }
213}
214
215impl VfsNode for PipeReadNode {
216 fn node_type(&self) -> NodeType {
217 NodeType::CharDevice
218 }
219
220 fn read(&self, _offset: usize, buffer: &mut [u8]) -> Result<usize, KernelError> {
221 self.reader.lock().try_read(buffer)
222 }
223
224 fn write(&self, _offset: usize, _data: &[u8]) -> Result<usize, KernelError> {
225 Err(KernelError::PermissionDenied {
226 operation: "write to pipe read end",
227 })
228 }
229
230 fn poll_readiness(&self) -> u16 {
231 let reader = self.reader.lock();
232 let pipe = reader.inner.lock();
233 let mut events = 0u16;
234 if !pipe.buffer.is_empty() {
235 events |= 0x0001; }
237 if pipe.write_closed {
238 events |= 0x0010; if pipe.buffer.is_empty() {
240 events |= 0x0001; }
242 }
243 events
244 }
245
246 fn metadata(&self) -> Result<Metadata, KernelError> {
247 Ok(Metadata {
248 size: 0,
249 node_type: NodeType::CharDevice,
250 permissions: Permissions::from_mode(0o444),
251 uid: 0,
252 gid: 0,
253 created: 0,
254 modified: 0,
255 accessed: 0,
256 inode: 0,
257 })
258 }
259
260 fn readdir(&self) -> Result<Vec<DirEntry>, KernelError> {
261 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
262 }
263
264 fn lookup(&self, _name: &str) -> Result<Arc<dyn VfsNode>, KernelError> {
265 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
266 }
267
268 fn create(
269 &self,
270 _name: &str,
271 _permissions: Permissions,
272 ) -> Result<Arc<dyn VfsNode>, KernelError> {
273 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
274 }
275
276 fn mkdir(
277 &self,
278 _name: &str,
279 _permissions: Permissions,
280 ) -> Result<Arc<dyn VfsNode>, KernelError> {
281 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
282 }
283
284 fn unlink(&self, _name: &str) -> Result<(), KernelError> {
285 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
286 }
287
288 fn truncate(&self, _size: usize) -> Result<(), KernelError> {
289 Err(KernelError::PermissionDenied {
290 operation: "truncate pipe",
291 })
292 }
293}
294
295pub struct PipeWriteNode {
297 writer: Arc<Mutex<PipeWriter>>,
298}
299
300impl PipeWriteNode {
301 pub fn new(writer: PipeWriter) -> Self {
303 Self {
304 writer: Arc::new(Mutex::new(writer)),
305 }
306 }
307}
308
309impl VfsNode for PipeWriteNode {
310 fn node_type(&self) -> NodeType {
311 NodeType::CharDevice
312 }
313
314 fn read(&self, _offset: usize, _buffer: &mut [u8]) -> Result<usize, KernelError> {
315 Err(KernelError::PermissionDenied {
316 operation: "read from pipe write end",
317 })
318 }
319
320 fn write(&self, _offset: usize, data: &[u8]) -> Result<usize, KernelError> {
321 self.writer.lock().write(data)
322 }
323
324 fn poll_readiness(&self) -> u16 {
325 let writer = self.writer.lock();
326 let pipe = writer.inner.lock();
327 let mut events = 0u16;
328 if pipe.read_closed {
329 events |= 0x0008; } else if pipe.buffer.len() < pipe.capacity {
331 events |= 0x0004; }
333 events
334 }
335
336 fn metadata(&self) -> Result<Metadata, KernelError> {
337 Ok(Metadata {
338 size: 0,
339 node_type: NodeType::CharDevice,
340 permissions: Permissions::from_mode(0o222),
341 uid: 0,
342 gid: 0,
343 created: 0,
344 modified: 0,
345 accessed: 0,
346 inode: 0,
347 })
348 }
349
350 fn readdir(&self) -> Result<Vec<DirEntry>, KernelError> {
351 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
352 }
353
354 fn lookup(&self, _name: &str) -> Result<Arc<dyn VfsNode>, KernelError> {
355 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
356 }
357
358 fn create(
359 &self,
360 _name: &str,
361 _permissions: Permissions,
362 ) -> Result<Arc<dyn VfsNode>, KernelError> {
363 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
364 }
365
366 fn mkdir(
367 &self,
368 _name: &str,
369 _permissions: Permissions,
370 ) -> Result<Arc<dyn VfsNode>, KernelError> {
371 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
372 }
373
374 fn unlink(&self, _name: &str) -> Result<(), KernelError> {
375 Err(KernelError::FsError(crate::error::FsError::NotADirectory))
376 }
377
378 fn truncate(&self, _size: usize) -> Result<(), KernelError> {
379 Err(KernelError::PermissionDenied {
380 operation: "truncate pipe",
381 })
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 #[test]
390 fn test_pipe_basic_read_write() {
391 let (reader, writer) = create_pipe().unwrap();
392 writer.write(b"hello").unwrap();
393 writer.close();
394 let mut buf = [0u8; 16];
395 let n = reader.read(&mut buf).unwrap();
396 assert_eq!(&buf[..n], b"hello");
397 }
398
399 #[test]
400 fn test_pipe_eof_after_close() {
401 let (reader, writer) = create_pipe().unwrap();
402 writer.close();
403 let mut buf = [0u8; 16];
404 let n = reader.read(&mut buf).unwrap();
405 assert_eq!(n, 0);
406 }
407
408 #[test]
409 fn test_pipe_broken_pipe() {
410 let (reader, writer) = create_pipe().unwrap();
411 reader.close();
412 let result = writer.write(b"data");
413 assert!(result.is_err());
414 }
415
416 #[test]
417 fn test_pipe_large_write() {
418 let (reader, writer) = create_pipe_with_capacity(16).unwrap();
419 let n = writer.write(b"this is a long string").unwrap();
421 assert_eq!(n, 16); writer.close();
423 let mut buf = [0u8; 32];
424 let n = reader.read(&mut buf).unwrap();
425 assert_eq!(n, 16);
426 }
427
428 #[test]
429 fn test_drain_pipe() {
430 let (reader, writer) = create_pipe().unwrap();
431 writer.write(b"hello ").unwrap();
432 writer.write(b"world").unwrap();
433 writer.close();
434 let data = drain_pipe(&reader);
435 assert_eq!(&data, b"hello world");
436 }
437}