veridian_kernel/services/mesh/
proxy.rs1#![allow(dead_code)]
7
8use alloc::{string::String, vec::Vec};
9use core::sync::atomic::{AtomicU64, Ordering};
10
11#[derive(Debug, Clone)]
17pub struct ProxyConfig {
18 pub listen_port: u16,
20 pub upstream_addrs: Vec<String>,
22 pub health_check_interval: u64,
24 pub connect_timeout: u64,
26 pub max_retries: u32,
28}
29
30impl Default for ProxyConfig {
31 fn default() -> Self {
32 ProxyConfig {
33 listen_port: 15001,
34 upstream_addrs: Vec::new(),
35 health_check_interval: 10,
36 connect_timeout: 5,
37 max_retries: 3,
38 }
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ConnectionState {
49 Idle,
51 Active,
53 Draining,
55}
56
57#[derive(Debug, Clone)]
59pub struct Connection {
60 pub id: u64,
62 pub upstream_idx: usize,
64 pub state: ConnectionState,
66 pub request_count: u64,
68 pub created_tick: u64,
70}
71
72static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
74
75#[derive(Debug)]
77pub struct ConnectionPool {
78 connections: Vec<Connection>,
80 max_per_upstream: usize,
82}
83
84impl Default for ConnectionPool {
85 fn default() -> Self {
86 Self::new(10)
87 }
88}
89
90impl ConnectionPool {
91 pub fn new(max_per_upstream: usize) -> Self {
93 ConnectionPool {
94 connections: Vec::new(),
95 max_per_upstream,
96 }
97 }
98
99 pub fn get_connection(&mut self, upstream_idx: usize, current_tick: u64) -> &mut Connection {
101 let idle_pos = self
103 .connections
104 .iter()
105 .position(|c| c.upstream_idx == upstream_idx && c.state == ConnectionState::Idle);
106
107 if let Some(pos) = idle_pos {
108 self.connections[pos].state = ConnectionState::Active;
109 return &mut self.connections[pos];
110 }
111
112 let conn = Connection {
114 id: NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed),
115 upstream_idx,
116 state: ConnectionState::Active,
117 request_count: 0,
118 created_tick: current_tick,
119 };
120 self.connections.push(conn);
121 let last = self.connections.len() - 1;
122 &mut self.connections[last]
123 }
124
125 pub fn release(&mut self, conn_id: u64) {
127 if let Some(conn) = self.connections.iter_mut().find(|c| c.id == conn_id) {
128 conn.state = ConnectionState::Idle;
129 }
130 }
131
132 pub fn drain_upstream(&mut self, upstream_idx: usize) {
134 for conn in &mut self.connections {
135 if conn.upstream_idx == upstream_idx {
136 conn.state = ConnectionState::Draining;
137 }
138 }
139 }
140
141 pub fn cleanup_drained(&mut self) -> usize {
143 let before = self.connections.len();
144 self.connections
145 .retain(|c| c.state != ConnectionState::Draining);
146 before - self.connections.len()
147 }
148
149 pub fn active_count(&self) -> usize {
151 self.connections
152 .iter()
153 .filter(|c| c.state == ConnectionState::Active)
154 .count()
155 }
156
157 pub fn total_count(&self) -> usize {
159 self.connections.len()
160 }
161}
162
163#[derive(Debug, Default)]
169pub struct ProxyStats {
170 pub total_requests: AtomicU64,
172 pub failed_requests: AtomicU64,
174 pub bytes_sent: AtomicU64,
176 pub bytes_received: AtomicU64,
178 pub health_checks_passed: AtomicU64,
180 pub health_checks_failed: AtomicU64,
182}
183
184impl ProxyStats {
185 pub fn new() -> Self {
187 Self::default()
188 }
189
190 pub fn record_success(&self, bytes_sent: u64, bytes_received: u64) {
192 self.total_requests.fetch_add(1, Ordering::Relaxed);
193 self.bytes_sent.fetch_add(bytes_sent, Ordering::Relaxed);
194 self.bytes_received
195 .fetch_add(bytes_received, Ordering::Relaxed);
196 }
197
198 pub fn record_failure(&self) {
200 self.total_requests.fetch_add(1, Ordering::Relaxed);
201 self.failed_requests.fetch_add(1, Ordering::Relaxed);
202 }
203}
204
205#[derive(Debug, Clone)]
211pub struct UpstreamHealth {
212 pub idx: usize,
214 pub healthy: bool,
216 pub consecutive_failures: u32,
218 pub last_check_tick: u64,
220}
221
222#[derive(Debug, Clone, PartialEq, Eq)]
228pub enum ProxyError {
229 NoHealthyUpstream,
231 ConnectionFailed(String),
233 UpstreamTimeout,
235 InvalidRequest(String),
237}
238
239#[derive(Debug)]
241pub struct SidecarProxy {
242 config: ProxyConfig,
244 pool: ConnectionPool,
246 stats: ProxyStats,
248 upstream_health: Vec<UpstreamHealth>,
250 rr_counter: u64,
252 mtls_enabled: bool,
254}
255
256impl SidecarProxy {
257 pub fn new(config: ProxyConfig) -> Self {
259 let health: Vec<UpstreamHealth> = (0..config.upstream_addrs.len())
260 .map(|idx| UpstreamHealth {
261 idx,
262 healthy: true,
263 consecutive_failures: 0,
264 last_check_tick: 0,
265 })
266 .collect();
267
268 SidecarProxy {
269 config,
270 pool: ConnectionPool::default(),
271 stats: ProxyStats::new(),
272 upstream_health: health,
273 rr_counter: 0,
274 mtls_enabled: false,
275 }
276 }
277
278 pub fn set_mtls(&mut self, enabled: bool) {
280 self.mtls_enabled = enabled;
281 }
282
283 fn select_upstream(&mut self) -> Result<usize, ProxyError> {
285 let count = self.upstream_health.len();
286 if count == 0 {
287 return Err(ProxyError::NoHealthyUpstream);
288 }
289
290 for _ in 0..count {
291 let idx = (self.rr_counter as usize) % count;
292 self.rr_counter += 1;
293 if self.upstream_health[idx].healthy {
294 return Ok(idx);
295 }
296 }
297 Err(ProxyError::NoHealthyUpstream)
298 }
299
300 pub fn handle_tcp(&mut self, payload: &[u8], current_tick: u64) -> Result<Vec<u8>, ProxyError> {
302 let upstream_idx = self.select_upstream()?;
303 let conn = self.pool.get_connection(upstream_idx, current_tick);
304 conn.request_count += 1;
305 let conn_id = conn.id;
306
307 self.stats.record_success(payload.len() as u64, 0);
309 self.pool.release(conn_id);
310
311 Ok(Vec::new()) }
313
314 pub fn handle_http(
316 &mut self,
317 _method: &str,
318 _path: &str,
319 payload: &[u8],
320 current_tick: u64,
321 ) -> Result<Vec<u8>, ProxyError> {
322 let upstream_idx = self.select_upstream()?;
323 let conn = self.pool.get_connection(upstream_idx, current_tick);
324 conn.request_count += 1;
325 let conn_id = conn.id;
326
327 self.stats.record_success(payload.len() as u64, 0);
328 self.pool.release(conn_id);
329
330 Ok(Vec::new())
331 }
332
333 pub fn health_check(&mut self, current_tick: u64) {
335 for health in &mut self.upstream_health {
336 health.last_check_tick = current_tick;
337 if health.healthy {
339 health.consecutive_failures = 0;
340 self.stats
341 .health_checks_passed
342 .fetch_add(1, Ordering::Relaxed);
343 } else {
344 health.consecutive_failures += 1;
345 self.stats
346 .health_checks_failed
347 .fetch_add(1, Ordering::Relaxed);
348 }
349 }
350 }
351
352 pub fn mark_unhealthy(&mut self, idx: usize) {
354 if idx < self.upstream_health.len() {
355 self.upstream_health[idx].healthy = false;
356 }
357 }
358
359 pub fn mark_healthy(&mut self, idx: usize) {
361 if idx < self.upstream_health.len() {
362 self.upstream_health[idx].healthy = true;
363 self.upstream_health[idx].consecutive_failures = 0;
364 }
365 }
366
367 pub fn healthy_upstream_count(&self) -> usize {
369 self.upstream_health.iter().filter(|h| h.healthy).count()
370 }
371
372 pub fn stats(&self) -> &ProxyStats {
374 &self.stats
375 }
376
377 pub fn pool(&self) -> &ConnectionPool {
379 &self.pool
380 }
381}
382
383#[cfg(test)]
388mod tests {
389 #[allow(unused_imports)]
390 use alloc::string::ToString;
391
392 use super::*;
393
394 fn make_proxy() -> SidecarProxy {
395 let config = ProxyConfig {
396 listen_port: 15001,
397 upstream_addrs: alloc::vec![
398 String::from("10.0.0.1:8080"),
399 String::from("10.0.0.2:8080"),
400 String::from("10.0.0.3:8080"),
401 ],
402 health_check_interval: 10,
403 connect_timeout: 5,
404 max_retries: 3,
405 };
406 SidecarProxy::new(config)
407 }
408
409 #[test]
410 fn test_proxy_creation() {
411 let proxy = make_proxy();
412 assert_eq!(proxy.healthy_upstream_count(), 3);
413 }
414
415 #[test]
416 fn test_handle_tcp() {
417 let mut proxy = make_proxy();
418 let result = proxy.handle_tcp(&[1, 2, 3], 100);
419 assert!(result.is_ok());
420 assert_eq!(proxy.stats.total_requests.load(Ordering::Relaxed), 1);
421 }
422
423 #[test]
424 fn test_handle_http() {
425 let mut proxy = make_proxy();
426 let result = proxy.handle_http("GET", "/api/v1/pods", &[], 100);
427 assert!(result.is_ok());
428 }
429
430 #[test]
431 fn test_round_robin_selection() {
432 let mut proxy = make_proxy();
433 proxy.handle_tcp(&[], 100).unwrap();
435 proxy.handle_tcp(&[], 200).unwrap();
436 proxy.handle_tcp(&[], 300).unwrap();
437 assert_eq!(proxy.stats.total_requests.load(Ordering::Relaxed), 3);
438 }
439
440 #[test]
441 fn test_no_healthy_upstream() {
442 let mut proxy = make_proxy();
443 proxy.mark_unhealthy(0);
444 proxy.mark_unhealthy(1);
445 proxy.mark_unhealthy(2);
446 assert_eq!(
447 proxy.handle_tcp(&[], 100),
448 Err(ProxyError::NoHealthyUpstream)
449 );
450 }
451
452 #[test]
453 fn test_health_check() {
454 let mut proxy = make_proxy();
455 proxy.health_check(100);
456 assert_eq!(proxy.stats.health_checks_passed.load(Ordering::Relaxed), 3);
457 }
458
459 #[test]
460 fn test_mark_unhealthy_healthy() {
461 let mut proxy = make_proxy();
462 proxy.mark_unhealthy(1);
463 assert_eq!(proxy.healthy_upstream_count(), 2);
464 proxy.mark_healthy(1);
465 assert_eq!(proxy.healthy_upstream_count(), 3);
466 }
467
468 #[test]
469 fn test_connection_pool_lifecycle() {
470 let mut pool = ConnectionPool::new(5);
471 let conn = pool.get_connection(0, 100);
472 let conn_id = conn.id;
473 assert_eq!(pool.active_count(), 1);
474 pool.release(conn_id);
475 assert_eq!(pool.active_count(), 0);
476 }
477
478 #[test]
479 fn test_connection_pool_drain() {
480 let mut pool = ConnectionPool::new(5);
481 pool.get_connection(0, 100);
482 pool.get_connection(0, 200);
483 pool.drain_upstream(0);
484 let removed = pool.cleanup_drained();
485 assert_eq!(removed, 2);
486 }
487
488 #[test]
489 fn test_mtls_toggle() {
490 let mut proxy = make_proxy();
491 assert!(!proxy.mtls_enabled);
492 proxy.set_mtls(true);
493 assert!(proxy.mtls_enabled);
494 }
495}