⚠️ VeridianOS Kernel Documentation - This is low-level kernel code. All functions are unsafe unless explicitly marked otherwise. no_std

veridian_kernel/services/cri/
streaming.rs

1//! CRI Streaming Service
2//!
3//! Provides exec, attach, and port-forward operations for container
4//! runtime interaction.
5
6#![allow(dead_code)]
7
8use alloc::{collections::BTreeMap, string::String, vec::Vec};
9use core::sync::atomic::{AtomicU64, Ordering};
10
11// ---------------------------------------------------------------------------
12// Types
13// ---------------------------------------------------------------------------
14
15/// Request to run a command in a container.
16#[derive(Debug, Clone)]
17pub struct ExecRequest {
18    /// Target container ID.
19    pub container_id: u64,
20    /// Command to run.
21    pub command: Vec<String>,
22    /// Whether to attach stdin.
23    pub stdin: bool,
24    /// Whether to attach stdout.
25    pub stdout: bool,
26    /// Whether to attach stderr.
27    pub stderr: bool,
28    /// Whether to allocate a TTY.
29    pub tty: bool,
30}
31
32/// Result of a synchronous command run.
33#[derive(Debug, Clone, PartialEq)]
34pub struct ExecResponse {
35    /// Exit code.
36    pub exit_code: i32,
37    /// Stdout output.
38    pub stdout_data: Vec<u8>,
39    /// Stderr output.
40    pub stderr_data: Vec<u8>,
41}
42
43/// Request to attach to a running container.
44#[derive(Debug, Clone)]
45pub struct AttachRequest {
46    /// Target container ID.
47    pub container_id: u64,
48    /// Whether to attach stdin.
49    pub stdin: bool,
50    /// Whether to attach stdout.
51    pub stdout: bool,
52    /// Whether to attach stderr.
53    pub stderr: bool,
54    /// Whether to allocate a TTY.
55    pub tty: bool,
56}
57
58/// Request to forward a port from a pod sandbox.
59#[derive(Debug, Clone)]
60pub struct PortForwardRequest {
61    /// Target pod sandbox ID.
62    pub pod_sandbox_id: u64,
63    /// Port number to forward.
64    pub port: u16,
65}
66
67/// Stream session state.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum StreamState {
70    /// Session is active.
71    Active,
72    /// Session is closed.
73    Closed,
74}
75
76impl StreamState {
77    /// Check if stream is active.
78    pub fn is_active(self) -> bool {
79        self == StreamState::Active
80    }
81}
82
83/// A streaming session (for running commands, attaching, or port-forwarding).
84#[derive(Debug, Clone)]
85pub struct StreamSession {
86    /// Unique session identifier.
87    pub id: u64,
88    /// Session type description.
89    pub session_type: String,
90    /// Target container or pod ID.
91    pub target_id: u64,
92    /// Current state.
93    pub state: StreamState,
94    /// Data buffered for this session.
95    pub buffer: Vec<u8>,
96    /// Tick when session was created.
97    pub created_tick: u64,
98}
99
100/// Streaming service error.
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub enum StreamError {
103    /// Container not found.
104    ContainerNotFound(u64),
105    /// Pod sandbox not found.
106    SandboxNotFound(u64),
107    /// Session not found.
108    SessionNotFound(u64),
109    /// Session already closed.
110    SessionClosed(u64),
111}
112
113// ---------------------------------------------------------------------------
114// Streaming Service
115// ---------------------------------------------------------------------------
116
117/// Next session ID generator.
118static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(1);
119
120fn alloc_session_id() -> u64 {
121    NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed)
122}
123
124/// CRI Streaming Service implementation.
125#[derive(Debug)]
126pub struct StreamingService {
127    /// Active streaming sessions.
128    sessions: BTreeMap<u64, StreamSession>,
129}
130
131impl Default for StreamingService {
132    fn default() -> Self {
133        Self::new()
134    }
135}
136
137impl StreamingService {
138    /// Create a new streaming service.
139    pub fn new() -> Self {
140        StreamingService {
141            sessions: BTreeMap::new(),
142        }
143    }
144
145    /// Run a command synchronously in a container.
146    pub fn exec_sync(&self, request: &ExecRequest) -> Result<ExecResponse, StreamError> {
147        if request.container_id == 0 {
148            return Err(StreamError::ContainerNotFound(0));
149        }
150        Ok(ExecResponse {
151            exit_code: 0,
152            stdout_data: Vec::new(),
153            stderr_data: Vec::new(),
154        })
155    }
156
157    /// Start an asynchronous command session.
158    pub fn run_command(
159        &mut self,
160        request: &ExecRequest,
161        current_tick: u64,
162    ) -> Result<u64, StreamError> {
163        if request.container_id == 0 {
164            return Err(StreamError::ContainerNotFound(0));
165        }
166        let session_id = alloc_session_id();
167        let session = StreamSession {
168            id: session_id,
169            session_type: String::from("run"),
170            target_id: request.container_id,
171            state: StreamState::Active,
172            buffer: Vec::new(),
173            created_tick: current_tick,
174        };
175        self.sessions.insert(session_id, session);
176        Ok(session_id)
177    }
178
179    /// Attach to a running container's I/O streams.
180    pub fn attach(
181        &mut self,
182        request: &AttachRequest,
183        current_tick: u64,
184    ) -> Result<u64, StreamError> {
185        if request.container_id == 0 {
186            return Err(StreamError::ContainerNotFound(0));
187        }
188        let session_id = alloc_session_id();
189        let session = StreamSession {
190            id: session_id,
191            session_type: String::from("attach"),
192            target_id: request.container_id,
193            state: StreamState::Active,
194            buffer: Vec::new(),
195            created_tick: current_tick,
196        };
197        self.sessions.insert(session_id, session);
198        Ok(session_id)
199    }
200
201    /// Set up port forwarding for a pod sandbox.
202    pub fn port_forward(
203        &mut self,
204        request: &PortForwardRequest,
205        current_tick: u64,
206    ) -> Result<u64, StreamError> {
207        if request.pod_sandbox_id == 0 {
208            return Err(StreamError::SandboxNotFound(0));
209        }
210        let session_id = alloc_session_id();
211        let session = StreamSession {
212            id: session_id,
213            session_type: String::from("port-forward"),
214            target_id: request.pod_sandbox_id,
215            state: StreamState::Active,
216            buffer: Vec::new(),
217            created_tick: current_tick,
218        };
219        self.sessions.insert(session_id, session);
220        Ok(session_id)
221    }
222
223    /// Close a streaming session.
224    pub fn close_session(&mut self, session_id: u64) -> Result<(), StreamError> {
225        let session = self
226            .sessions
227            .get_mut(&session_id)
228            .ok_or(StreamError::SessionNotFound(session_id))?;
229        if session.state == StreamState::Closed {
230            return Err(StreamError::SessionClosed(session_id));
231        }
232        session.state = StreamState::Closed;
233        Ok(())
234    }
235
236    /// Get session status.
237    pub fn session_status(&self, session_id: u64) -> Option<&StreamSession> {
238        self.sessions.get(&session_id)
239    }
240
241    /// Count active sessions.
242    pub fn active_session_count(&self) -> usize {
243        self.sessions
244            .values()
245            .filter(|s| s.state == StreamState::Active)
246            .count()
247    }
248
249    /// Remove closed sessions.
250    pub fn cleanup_closed(&mut self) -> usize {
251        let before = self.sessions.len();
252        self.sessions.retain(|_, s| s.state != StreamState::Closed);
253        before - self.sessions.len()
254    }
255}
256
257// ---------------------------------------------------------------------------
258// Tests
259// ---------------------------------------------------------------------------
260
261#[cfg(test)]
262mod tests {
263    #[allow(unused_imports)]
264    use alloc::string::ToString;
265    #[allow(unused_imports)]
266    use alloc::vec;
267
268    use super::*;
269
270    #[test]
271    fn test_exec_sync_ok() {
272        let svc = StreamingService::new();
273        let req = ExecRequest {
274            container_id: 1,
275            command: vec![String::from("ls")],
276            stdin: false,
277            stdout: true,
278            stderr: true,
279            tty: false,
280        };
281        let resp = svc.exec_sync(&req).unwrap();
282        assert_eq!(resp.exit_code, 0);
283    }
284
285    #[test]
286    fn test_exec_sync_not_found() {
287        let svc = StreamingService::new();
288        let req = ExecRequest {
289            container_id: 0,
290            command: vec![String::from("ls")],
291            stdin: false,
292            stdout: true,
293            stderr: true,
294            tty: false,
295        };
296        assert_eq!(svc.exec_sync(&req), Err(StreamError::ContainerNotFound(0)));
297    }
298
299    #[test]
300    fn test_run_command_async() {
301        let mut svc = StreamingService::new();
302        let req = ExecRequest {
303            container_id: 5,
304            command: vec![String::from("sh")],
305            stdin: true,
306            stdout: true,
307            stderr: true,
308            tty: true,
309        };
310        let sid = svc.run_command(&req, 100).unwrap();
311        let session = svc.session_status(sid).unwrap();
312        assert_eq!(session.session_type, "run");
313        assert_eq!(session.state, StreamState::Active);
314    }
315
316    #[test]
317    fn test_attach() {
318        let mut svc = StreamingService::new();
319        let req = AttachRequest {
320            container_id: 3,
321            stdin: true,
322            stdout: true,
323            stderr: false,
324            tty: false,
325        };
326        let sid = svc.attach(&req, 200).unwrap();
327        assert_eq!(svc.active_session_count(), 1);
328        let session = svc.session_status(sid).unwrap();
329        assert_eq!(session.session_type, "attach");
330    }
331
332    #[test]
333    fn test_port_forward() {
334        let mut svc = StreamingService::new();
335        let req = PortForwardRequest {
336            pod_sandbox_id: 7,
337            port: 8080,
338        };
339        let sid = svc.port_forward(&req, 300).unwrap();
340        let session = svc.session_status(sid).unwrap();
341        assert_eq!(session.session_type, "port-forward");
342    }
343
344    #[test]
345    fn test_close_and_double_close() {
346        let mut svc = StreamingService::new();
347        let req = ExecRequest {
348            container_id: 1,
349            command: Vec::new(),
350            stdin: false,
351            stdout: false,
352            stderr: false,
353            tty: false,
354        };
355        let sid = svc.run_command(&req, 100).unwrap();
356        svc.close_session(sid).unwrap();
357        assert_eq!(svc.close_session(sid), Err(StreamError::SessionClosed(sid)));
358    }
359}