veridian_kernel/services/cri/
streaming.rs1#![allow(dead_code)]
7
8use alloc::{collections::BTreeMap, string::String, vec::Vec};
9use core::sync::atomic::{AtomicU64, Ordering};
10
11#[derive(Debug, Clone)]
17pub struct ExecRequest {
18 pub container_id: u64,
20 pub command: Vec<String>,
22 pub stdin: bool,
24 pub stdout: bool,
26 pub stderr: bool,
28 pub tty: bool,
30}
31
32#[derive(Debug, Clone, PartialEq)]
34pub struct ExecResponse {
35 pub exit_code: i32,
37 pub stdout_data: Vec<u8>,
39 pub stderr_data: Vec<u8>,
41}
42
43#[derive(Debug, Clone)]
45pub struct AttachRequest {
46 pub container_id: u64,
48 pub stdin: bool,
50 pub stdout: bool,
52 pub stderr: bool,
54 pub tty: bool,
56}
57
58#[derive(Debug, Clone)]
60pub struct PortForwardRequest {
61 pub pod_sandbox_id: u64,
63 pub port: u16,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum StreamState {
70 Active,
72 Closed,
74}
75
76impl StreamState {
77 pub fn is_active(self) -> bool {
79 self == StreamState::Active
80 }
81}
82
83#[derive(Debug, Clone)]
85pub struct StreamSession {
86 pub id: u64,
88 pub session_type: String,
90 pub target_id: u64,
92 pub state: StreamState,
94 pub buffer: Vec<u8>,
96 pub created_tick: u64,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
102pub enum StreamError {
103 ContainerNotFound(u64),
105 SandboxNotFound(u64),
107 SessionNotFound(u64),
109 SessionClosed(u64),
111}
112
113static NEXT_SESSION_ID: AtomicU64 = AtomicU64::new(1);
119
120fn alloc_session_id() -> u64 {
121 NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed)
122}
123
124#[derive(Debug)]
126pub struct StreamingService {
127 sessions: BTreeMap<u64, StreamSession>,
129}
130
131impl Default for StreamingService {
132 fn default() -> Self {
133 Self::new()
134 }
135}
136
137impl StreamingService {
138 pub fn new() -> Self {
140 StreamingService {
141 sessions: BTreeMap::new(),
142 }
143 }
144
145 pub fn exec_sync(&self, request: &ExecRequest) -> Result<ExecResponse, StreamError> {
147 if request.container_id == 0 {
148 return Err(StreamError::ContainerNotFound(0));
149 }
150 Ok(ExecResponse {
151 exit_code: 0,
152 stdout_data: Vec::new(),
153 stderr_data: Vec::new(),
154 })
155 }
156
157 pub fn run_command(
159 &mut self,
160 request: &ExecRequest,
161 current_tick: u64,
162 ) -> Result<u64, StreamError> {
163 if request.container_id == 0 {
164 return Err(StreamError::ContainerNotFound(0));
165 }
166 let session_id = alloc_session_id();
167 let session = StreamSession {
168 id: session_id,
169 session_type: String::from("run"),
170 target_id: request.container_id,
171 state: StreamState::Active,
172 buffer: Vec::new(),
173 created_tick: current_tick,
174 };
175 self.sessions.insert(session_id, session);
176 Ok(session_id)
177 }
178
179 pub fn attach(
181 &mut self,
182 request: &AttachRequest,
183 current_tick: u64,
184 ) -> Result<u64, StreamError> {
185 if request.container_id == 0 {
186 return Err(StreamError::ContainerNotFound(0));
187 }
188 let session_id = alloc_session_id();
189 let session = StreamSession {
190 id: session_id,
191 session_type: String::from("attach"),
192 target_id: request.container_id,
193 state: StreamState::Active,
194 buffer: Vec::new(),
195 created_tick: current_tick,
196 };
197 self.sessions.insert(session_id, session);
198 Ok(session_id)
199 }
200
201 pub fn port_forward(
203 &mut self,
204 request: &PortForwardRequest,
205 current_tick: u64,
206 ) -> Result<u64, StreamError> {
207 if request.pod_sandbox_id == 0 {
208 return Err(StreamError::SandboxNotFound(0));
209 }
210 let session_id = alloc_session_id();
211 let session = StreamSession {
212 id: session_id,
213 session_type: String::from("port-forward"),
214 target_id: request.pod_sandbox_id,
215 state: StreamState::Active,
216 buffer: Vec::new(),
217 created_tick: current_tick,
218 };
219 self.sessions.insert(session_id, session);
220 Ok(session_id)
221 }
222
223 pub fn close_session(&mut self, session_id: u64) -> Result<(), StreamError> {
225 let session = self
226 .sessions
227 .get_mut(&session_id)
228 .ok_or(StreamError::SessionNotFound(session_id))?;
229 if session.state == StreamState::Closed {
230 return Err(StreamError::SessionClosed(session_id));
231 }
232 session.state = StreamState::Closed;
233 Ok(())
234 }
235
236 pub fn session_status(&self, session_id: u64) -> Option<&StreamSession> {
238 self.sessions.get(&session_id)
239 }
240
241 pub fn active_session_count(&self) -> usize {
243 self.sessions
244 .values()
245 .filter(|s| s.state == StreamState::Active)
246 .count()
247 }
248
249 pub fn cleanup_closed(&mut self) -> usize {
251 let before = self.sessions.len();
252 self.sessions.retain(|_, s| s.state != StreamState::Closed);
253 before - self.sessions.len()
254 }
255}
256
257#[cfg(test)]
262mod tests {
263 #[allow(unused_imports)]
264 use alloc::string::ToString;
265 #[allow(unused_imports)]
266 use alloc::vec;
267
268 use super::*;
269
270 #[test]
271 fn test_exec_sync_ok() {
272 let svc = StreamingService::new();
273 let req = ExecRequest {
274 container_id: 1,
275 command: vec![String::from("ls")],
276 stdin: false,
277 stdout: true,
278 stderr: true,
279 tty: false,
280 };
281 let resp = svc.exec_sync(&req).unwrap();
282 assert_eq!(resp.exit_code, 0);
283 }
284
285 #[test]
286 fn test_exec_sync_not_found() {
287 let svc = StreamingService::new();
288 let req = ExecRequest {
289 container_id: 0,
290 command: vec![String::from("ls")],
291 stdin: false,
292 stdout: true,
293 stderr: true,
294 tty: false,
295 };
296 assert_eq!(svc.exec_sync(&req), Err(StreamError::ContainerNotFound(0)));
297 }
298
299 #[test]
300 fn test_run_command_async() {
301 let mut svc = StreamingService::new();
302 let req = ExecRequest {
303 container_id: 5,
304 command: vec![String::from("sh")],
305 stdin: true,
306 stdout: true,
307 stderr: true,
308 tty: true,
309 };
310 let sid = svc.run_command(&req, 100).unwrap();
311 let session = svc.session_status(sid).unwrap();
312 assert_eq!(session.session_type, "run");
313 assert_eq!(session.state, StreamState::Active);
314 }
315
316 #[test]
317 fn test_attach() {
318 let mut svc = StreamingService::new();
319 let req = AttachRequest {
320 container_id: 3,
321 stdin: true,
322 stdout: true,
323 stderr: false,
324 tty: false,
325 };
326 let sid = svc.attach(&req, 200).unwrap();
327 assert_eq!(svc.active_session_count(), 1);
328 let session = svc.session_status(sid).unwrap();
329 assert_eq!(session.session_type, "attach");
330 }
331
332 #[test]
333 fn test_port_forward() {
334 let mut svc = StreamingService::new();
335 let req = PortForwardRequest {
336 pod_sandbox_id: 7,
337 port: 8080,
338 };
339 let sid = svc.port_forward(&req, 300).unwrap();
340 let session = svc.session_status(sid).unwrap();
341 assert_eq!(session.session_type, "port-forward");
342 }
343
344 #[test]
345 fn test_close_and_double_close() {
346 let mut svc = StreamingService::new();
347 let req = ExecRequest {
348 container_id: 1,
349 command: Vec::new(),
350 stdin: false,
351 stdout: false,
352 stderr: false,
353 tty: false,
354 };
355 let sid = svc.run_command(&req, 100).unwrap();
356 svc.close_session(sid).unwrap();
357 assert_eq!(svc.close_session(sid), Err(StreamError::SessionClosed(sid)));
358 }
359}