⚠️ VeridianOS Kernel Documentation - This is low-level kernel code. All functions are unsafe unless explicitly marked otherwise. no_std

veridian_kernel/fs/
pipe.rs

1//! Kernel pipe objects for inter-process and shell pipeline communication.
2//!
3//! Provides a unidirectional byte stream between a writer and a reader.
4//! Used by the shell's `|` operator and the `pipe` syscall.
5
6// Kernel pipe objects -- exercised via shell pipe operator and pipe syscall
7#![allow(dead_code)]
8
9use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
10
11use spin::Mutex;
12
13use crate::error::KernelError;
14
15/// Default pipe capacity (64 KB).
16const PIPE_CAPACITY: usize = 64 * 1024;
17
18/// Internal shared state of a pipe.
19struct PipeInner {
20    /// Data buffer.
21    buffer: VecDeque<u8>,
22    /// Maximum capacity in bytes.
23    capacity: usize,
24    /// True when the write end has been closed.
25    write_closed: bool,
26    /// True when the read end has been closed.
27    read_closed: bool,
28}
29
30impl PipeInner {
31    fn new(capacity: usize) -> Self {
32        Self {
33            buffer: VecDeque::with_capacity(capacity),
34            capacity,
35            write_closed: false,
36            read_closed: false,
37        }
38    }
39}
40
41/// A handle to the shared pipe state.
42type PipeState = Arc<Mutex<PipeInner>>;
43
44/// The read end of a kernel pipe.
45pub struct PipeReader {
46    inner: PipeState,
47}
48
49/// The write end of a kernel pipe.
50pub struct PipeWriter {
51    inner: PipeState,
52}
53
54/// Create a new pipe pair `(reader, writer)`.
55pub fn create_pipe() -> Result<(PipeReader, PipeWriter), KernelError> {
56    create_pipe_with_capacity(PIPE_CAPACITY)
57}
58
59/// Create a pipe pair with a custom capacity.
60pub fn create_pipe_with_capacity(capacity: usize) -> Result<(PipeReader, PipeWriter), KernelError> {
61    let inner = Arc::new(Mutex::new(PipeInner::new(capacity)));
62    Ok((
63        PipeReader {
64            inner: inner.clone(),
65        },
66        PipeWriter { inner },
67    ))
68}
69
70impl PipeReader {
71    /// Read up to `buf.len()` bytes from the pipe.
72    ///
73    /// Returns the number of bytes read. Returns 0 when the write end is
74    /// closed and the buffer is empty (EOF). Spins briefly if the buffer is
75    /// empty but the write end is still open.
76    pub fn read(&self, buf: &mut [u8]) -> Result<usize, KernelError> {
77        loop {
78            {
79                let mut pipe = self.inner.lock();
80                if !pipe.buffer.is_empty() {
81                    let to_read = buf.len().min(pipe.buffer.len());
82                    for byte in buf.iter_mut().take(to_read) {
83                        *byte = pipe.buffer.pop_front().unwrap_or(0);
84                    }
85                    return Ok(to_read);
86                }
87                if pipe.write_closed {
88                    return Ok(0); // EOF
89                }
90                if pipe.read_closed {
91                    return Ok(0);
92                }
93            }
94            // Buffer empty, write end still open — spin wait
95            core::hint::spin_loop();
96        }
97    }
98
99    /// Non-blocking read: return immediately if no data available.
100    pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, KernelError> {
101        let mut pipe = self.inner.lock();
102        if pipe.buffer.is_empty() {
103            if pipe.write_closed {
104                return Ok(0); // EOF
105            }
106            return Err(KernelError::WouldBlock);
107        }
108        let to_read = buf.len().min(pipe.buffer.len());
109        for byte in buf.iter_mut().take(to_read) {
110            *byte = pipe.buffer.pop_front().unwrap_or(0);
111        }
112        Ok(to_read)
113    }
114
115    /// Close the read end.
116    pub fn close(&self) {
117        self.inner.lock().read_closed = true;
118    }
119
120    /// Check if there is data available to read.
121    pub fn has_data(&self) -> bool {
122        !self.inner.lock().buffer.is_empty()
123    }
124}
125
126impl Drop for PipeReader {
127    fn drop(&mut self) {
128        self.close();
129    }
130}
131
132impl PipeWriter {
133    /// Write data to the pipe.
134    ///
135    /// Returns the number of bytes written. Returns an error if the read
136    /// end has been closed (broken pipe).
137    pub fn write(&self, data: &[u8]) -> Result<usize, KernelError> {
138        let mut pipe = self.inner.lock();
139        if pipe.read_closed {
140            return Err(KernelError::BrokenPipe);
141        }
142        if pipe.write_closed {
143            return Err(KernelError::BrokenPipe);
144        }
145        let available = pipe.capacity.saturating_sub(pipe.buffer.len());
146        let to_write = data.len().min(available);
147        for &byte in &data[..to_write] {
148            pipe.buffer.push_back(byte);
149        }
150        Ok(to_write)
151    }
152
153    /// Write all data, blocking until complete.
154    pub fn write_all(&self, data: &[u8]) -> Result<(), KernelError> {
155        let mut offset = 0;
156        while offset < data.len() {
157            let written = self.write(&data[offset..])?;
158            if written == 0 {
159                core::hint::spin_loop();
160            }
161            offset += written;
162        }
163        Ok(())
164    }
165
166    /// Close the write end.
167    pub fn close(&self) {
168        self.inner.lock().write_closed = true;
169    }
170}
171
172impl Drop for PipeWriter {
173    fn drop(&mut self) {
174        self.close();
175    }
176}
177
178/// Capture all output written to a pipe writer and return it as bytes.
179///
180/// This is a helper for the shell to capture command output for piping
181/// and command substitution. The writer should already be closed.
182pub fn drain_pipe(reader: &PipeReader) -> Vec<u8> {
183    let mut result = Vec::new();
184    let mut buf = [0u8; 4096];
185    loop {
186        match reader.try_read(&mut buf) {
187            Ok(0) => break,
188            Ok(n) => result.extend_from_slice(&buf[..n]),
189            Err(_) => break,
190        }
191    }
192    result
193}
194
195// ============================================================================
196// VfsNode adapters for pipe ends (used by pipe2 syscall)
197// ============================================================================
198
199use super::{DirEntry, Metadata, NodeType, Permissions, VfsNode};
200
201/// VfsNode adapter wrapping the read end of a pipe.
202pub struct PipeReadNode {
203    reader: Arc<Mutex<PipeReader>>,
204}
205
206impl PipeReadNode {
207    /// Create a new PipeReadNode from a PipeReader.
208    pub fn new(reader: PipeReader) -> Self {
209        Self {
210            reader: Arc::new(Mutex::new(reader)),
211        }
212    }
213}
214
215impl VfsNode for PipeReadNode {
216    fn node_type(&self) -> NodeType {
217        NodeType::CharDevice
218    }
219
220    fn read(&self, _offset: usize, buffer: &mut [u8]) -> Result<usize, KernelError> {
221        self.reader.lock().try_read(buffer)
222    }
223
224    fn write(&self, _offset: usize, _data: &[u8]) -> Result<usize, KernelError> {
225        Err(KernelError::PermissionDenied {
226            operation: "write to pipe read end",
227        })
228    }
229
230    fn poll_readiness(&self) -> u16 {
231        let reader = self.reader.lock();
232        let pipe = reader.inner.lock();
233        let mut events = 0u16;
234        if !pipe.buffer.is_empty() {
235            events |= 0x0001; // POLLIN
236        }
237        if pipe.write_closed {
238            events |= 0x0010; // POLLHUP -- write end closed (EOF pending)
239            if pipe.buffer.is_empty() {
240                events |= 0x0001; // POLLIN -- read will return 0 (EOF)
241            }
242        }
243        events
244    }
245
246    fn metadata(&self) -> Result<Metadata, KernelError> {
247        Ok(Metadata {
248            size: 0,
249            node_type: NodeType::CharDevice,
250            permissions: Permissions::from_mode(0o444),
251            uid: 0,
252            gid: 0,
253            created: 0,
254            modified: 0,
255            accessed: 0,
256            inode: 0,
257        })
258    }
259
260    fn readdir(&self) -> Result<Vec<DirEntry>, KernelError> {
261        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
262    }
263
264    fn lookup(&self, _name: &str) -> Result<Arc<dyn VfsNode>, KernelError> {
265        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
266    }
267
268    fn create(
269        &self,
270        _name: &str,
271        _permissions: Permissions,
272    ) -> Result<Arc<dyn VfsNode>, KernelError> {
273        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
274    }
275
276    fn mkdir(
277        &self,
278        _name: &str,
279        _permissions: Permissions,
280    ) -> Result<Arc<dyn VfsNode>, KernelError> {
281        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
282    }
283
284    fn unlink(&self, _name: &str) -> Result<(), KernelError> {
285        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
286    }
287
288    fn truncate(&self, _size: usize) -> Result<(), KernelError> {
289        Err(KernelError::PermissionDenied {
290            operation: "truncate pipe",
291        })
292    }
293}
294
295/// VfsNode adapter wrapping the write end of a pipe.
296pub struct PipeWriteNode {
297    writer: Arc<Mutex<PipeWriter>>,
298}
299
300impl PipeWriteNode {
301    /// Create a new PipeWriteNode from a PipeWriter.
302    pub fn new(writer: PipeWriter) -> Self {
303        Self {
304            writer: Arc::new(Mutex::new(writer)),
305        }
306    }
307}
308
309impl VfsNode for PipeWriteNode {
310    fn node_type(&self) -> NodeType {
311        NodeType::CharDevice
312    }
313
314    fn read(&self, _offset: usize, _buffer: &mut [u8]) -> Result<usize, KernelError> {
315        Err(KernelError::PermissionDenied {
316            operation: "read from pipe write end",
317        })
318    }
319
320    fn write(&self, _offset: usize, data: &[u8]) -> Result<usize, KernelError> {
321        self.writer.lock().write(data)
322    }
323
324    fn poll_readiness(&self) -> u16 {
325        let writer = self.writer.lock();
326        let pipe = writer.inner.lock();
327        let mut events = 0u16;
328        if pipe.read_closed {
329            events |= 0x0008; // POLLERR -- broken pipe
330        } else if pipe.buffer.len() < pipe.capacity {
331            events |= 0x0004; // POLLOUT -- space available
332        }
333        events
334    }
335
336    fn metadata(&self) -> Result<Metadata, KernelError> {
337        Ok(Metadata {
338            size: 0,
339            node_type: NodeType::CharDevice,
340            permissions: Permissions::from_mode(0o222),
341            uid: 0,
342            gid: 0,
343            created: 0,
344            modified: 0,
345            accessed: 0,
346            inode: 0,
347        })
348    }
349
350    fn readdir(&self) -> Result<Vec<DirEntry>, KernelError> {
351        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
352    }
353
354    fn lookup(&self, _name: &str) -> Result<Arc<dyn VfsNode>, KernelError> {
355        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
356    }
357
358    fn create(
359        &self,
360        _name: &str,
361        _permissions: Permissions,
362    ) -> Result<Arc<dyn VfsNode>, KernelError> {
363        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
364    }
365
366    fn mkdir(
367        &self,
368        _name: &str,
369        _permissions: Permissions,
370    ) -> Result<Arc<dyn VfsNode>, KernelError> {
371        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
372    }
373
374    fn unlink(&self, _name: &str) -> Result<(), KernelError> {
375        Err(KernelError::FsError(crate::error::FsError::NotADirectory))
376    }
377
378    fn truncate(&self, _size: usize) -> Result<(), KernelError> {
379        Err(KernelError::PermissionDenied {
380            operation: "truncate pipe",
381        })
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    #[test]
390    fn test_pipe_basic_read_write() {
391        let (reader, writer) = create_pipe().unwrap();
392        writer.write(b"hello").unwrap();
393        writer.close();
394        let mut buf = [0u8; 16];
395        let n = reader.read(&mut buf).unwrap();
396        assert_eq!(&buf[..n], b"hello");
397    }
398
399    #[test]
400    fn test_pipe_eof_after_close() {
401        let (reader, writer) = create_pipe().unwrap();
402        writer.close();
403        let mut buf = [0u8; 16];
404        let n = reader.read(&mut buf).unwrap();
405        assert_eq!(n, 0);
406    }
407
408    #[test]
409    fn test_pipe_broken_pipe() {
410        let (reader, writer) = create_pipe().unwrap();
411        reader.close();
412        let result = writer.write(b"data");
413        assert!(result.is_err());
414    }
415
416    #[test]
417    fn test_pipe_large_write() {
418        let (reader, writer) = create_pipe_with_capacity(16).unwrap();
419        // Write more than capacity
420        let n = writer.write(b"this is a long string").unwrap();
421        assert_eq!(n, 16); // Only capacity bytes written
422        writer.close();
423        let mut buf = [0u8; 32];
424        let n = reader.read(&mut buf).unwrap();
425        assert_eq!(n, 16);
426    }
427
428    #[test]
429    fn test_drain_pipe() {
430        let (reader, writer) = create_pipe().unwrap();
431        writer.write(b"hello ").unwrap();
432        writer.write(b"world").unwrap();
433        writer.close();
434        let data = drain_pipe(&reader);
435        assert_eq!(&data, b"hello world");
436    }
437}