⚠️ VeridianOS Kernel Documentation - This is low-level kernel code. All functions are unsafe unless explicitly marked otherwise. no_std

veridian_kernel/services/mesh/
proxy.rs

1//! Sidecar Proxy
2//!
3//! Provides a connection-pooling sidecar proxy with health checking,
4//! mTLS wrapping, and request routing for service mesh communication.
5
6#![allow(dead_code)]
7
8use alloc::{string::String, vec::Vec};
9use core::sync::atomic::{AtomicU64, Ordering};
10
11// ---------------------------------------------------------------------------
12// Proxy Config
13// ---------------------------------------------------------------------------
14
15/// Sidecar proxy configuration.
16#[derive(Debug, Clone)]
17pub struct ProxyConfig {
18    /// Port to listen on.
19    pub listen_port: u16,
20    /// Upstream service addresses (ip:port strings).
21    pub upstream_addrs: Vec<String>,
22    /// Health check interval in ticks.
23    pub health_check_interval: u64,
24    /// Connection timeout in ticks.
25    pub connect_timeout: u64,
26    /// Maximum retries on upstream failure.
27    pub max_retries: u32,
28}
29
30impl Default for ProxyConfig {
31    fn default() -> Self {
32        ProxyConfig {
33            listen_port: 15001,
34            upstream_addrs: Vec::new(),
35            health_check_interval: 10,
36            connect_timeout: 5,
37            max_retries: 3,
38        }
39    }
40}
41
42// ---------------------------------------------------------------------------
43// Connection Pool
44// ---------------------------------------------------------------------------
45
46/// Connection state.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ConnectionState {
49    /// Connection is idle and available.
50    Idle,
51    /// Connection is actively in use.
52    Active,
53    /// Connection is draining (finishing pending requests).
54    Draining,
55}
56
57/// A pooled connection to an upstream.
58#[derive(Debug, Clone)]
59pub struct Connection {
60    /// Unique connection identifier.
61    pub id: u64,
62    /// Index into upstream_addrs.
63    pub upstream_idx: usize,
64    /// Current state.
65    pub state: ConnectionState,
66    /// Number of requests handled.
67    pub request_count: u64,
68    /// Tick when created.
69    pub created_tick: u64,
70}
71
72/// Next connection ID generator.
73static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
74
75/// Connection pool for upstream services.
76#[derive(Debug)]
77pub struct ConnectionPool {
78    /// Pooled connections.
79    connections: Vec<Connection>,
80    /// Maximum connections per upstream.
81    max_per_upstream: usize,
82}
83
84impl Default for ConnectionPool {
85    fn default() -> Self {
86        Self::new(10)
87    }
88}
89
90impl ConnectionPool {
91    /// Create a new connection pool.
92    pub fn new(max_per_upstream: usize) -> Self {
93        ConnectionPool {
94            connections: Vec::new(),
95            max_per_upstream,
96        }
97    }
98
99    /// Get an idle connection to a specific upstream, or create one.
100    pub fn get_connection(&mut self, upstream_idx: usize, current_tick: u64) -> &mut Connection {
101        // Try to find an idle connection
102        let idle_pos = self
103            .connections
104            .iter()
105            .position(|c| c.upstream_idx == upstream_idx && c.state == ConnectionState::Idle);
106
107        if let Some(pos) = idle_pos {
108            self.connections[pos].state = ConnectionState::Active;
109            return &mut self.connections[pos];
110        }
111
112        // Create a new connection
113        let conn = Connection {
114            id: NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed),
115            upstream_idx,
116            state: ConnectionState::Active,
117            request_count: 0,
118            created_tick: current_tick,
119        };
120        self.connections.push(conn);
121        let last = self.connections.len() - 1;
122        &mut self.connections[last]
123    }
124
125    /// Release a connection back to the pool.
126    pub fn release(&mut self, conn_id: u64) {
127        if let Some(conn) = self.connections.iter_mut().find(|c| c.id == conn_id) {
128            conn.state = ConnectionState::Idle;
129        }
130    }
131
132    /// Drain connections to a specific upstream.
133    pub fn drain_upstream(&mut self, upstream_idx: usize) {
134        for conn in &mut self.connections {
135            if conn.upstream_idx == upstream_idx {
136                conn.state = ConnectionState::Draining;
137            }
138        }
139    }
140
141    /// Remove drained connections.
142    pub fn cleanup_drained(&mut self) -> usize {
143        let before = self.connections.len();
144        self.connections
145            .retain(|c| c.state != ConnectionState::Draining);
146        before - self.connections.len()
147    }
148
149    /// Get the number of active connections.
150    pub fn active_count(&self) -> usize {
151        self.connections
152            .iter()
153            .filter(|c| c.state == ConnectionState::Active)
154            .count()
155    }
156
157    /// Get total connection count.
158    pub fn total_count(&self) -> usize {
159        self.connections.len()
160    }
161}
162
163// ---------------------------------------------------------------------------
164// Proxy Stats
165// ---------------------------------------------------------------------------
166
167/// Proxy statistics.
168#[derive(Debug, Default)]
169pub struct ProxyStats {
170    /// Total requests proxied.
171    pub total_requests: AtomicU64,
172    /// Total failed requests.
173    pub failed_requests: AtomicU64,
174    /// Total bytes sent upstream.
175    pub bytes_sent: AtomicU64,
176    /// Total bytes received from upstream.
177    pub bytes_received: AtomicU64,
178    /// Total health check passes.
179    pub health_checks_passed: AtomicU64,
180    /// Total health check failures.
181    pub health_checks_failed: AtomicU64,
182}
183
184impl ProxyStats {
185    /// Create new stats.
186    pub fn new() -> Self {
187        Self::default()
188    }
189
190    /// Record a successful request.
191    pub fn record_success(&self, bytes_sent: u64, bytes_received: u64) {
192        self.total_requests.fetch_add(1, Ordering::Relaxed);
193        self.bytes_sent.fetch_add(bytes_sent, Ordering::Relaxed);
194        self.bytes_received
195            .fetch_add(bytes_received, Ordering::Relaxed);
196    }
197
198    /// Record a failed request.
199    pub fn record_failure(&self) {
200        self.total_requests.fetch_add(1, Ordering::Relaxed);
201        self.failed_requests.fetch_add(1, Ordering::Relaxed);
202    }
203}
204
205// ---------------------------------------------------------------------------
206// Upstream Health
207// ---------------------------------------------------------------------------
208
209/// Upstream health status.
210#[derive(Debug, Clone)]
211pub struct UpstreamHealth {
212    /// Upstream index.
213    pub idx: usize,
214    /// Whether the upstream is healthy.
215    pub healthy: bool,
216    /// Consecutive failure count.
217    pub consecutive_failures: u32,
218    /// Last check tick.
219    pub last_check_tick: u64,
220}
221
222// ---------------------------------------------------------------------------
223// Sidecar Proxy
224// ---------------------------------------------------------------------------
225
226/// Proxy error.
227#[derive(Debug, Clone, PartialEq, Eq)]
228pub enum ProxyError {
229    /// No healthy upstream available.
230    NoHealthyUpstream,
231    /// Connection failed.
232    ConnectionFailed(String),
233    /// Upstream timeout.
234    UpstreamTimeout,
235    /// Invalid request.
236    InvalidRequest(String),
237}
238
239/// Sidecar proxy implementation.
240#[derive(Debug)]
241pub struct SidecarProxy {
242    /// Proxy configuration.
243    config: ProxyConfig,
244    /// Connection pool.
245    pool: ConnectionPool,
246    /// Proxy statistics.
247    stats: ProxyStats,
248    /// Upstream health status.
249    upstream_health: Vec<UpstreamHealth>,
250    /// Round-robin counter for upstream selection.
251    rr_counter: u64,
252    /// Whether mTLS is enabled.
253    mtls_enabled: bool,
254}
255
256impl SidecarProxy {
257    /// Create a new sidecar proxy.
258    pub fn new(config: ProxyConfig) -> Self {
259        let health: Vec<UpstreamHealth> = (0..config.upstream_addrs.len())
260            .map(|idx| UpstreamHealth {
261                idx,
262                healthy: true,
263                consecutive_failures: 0,
264                last_check_tick: 0,
265            })
266            .collect();
267
268        SidecarProxy {
269            config,
270            pool: ConnectionPool::default(),
271            stats: ProxyStats::new(),
272            upstream_health: health,
273            rr_counter: 0,
274            mtls_enabled: false,
275        }
276    }
277
278    /// Enable or disable mTLS.
279    pub fn set_mtls(&mut self, enabled: bool) {
280        self.mtls_enabled = enabled;
281    }
282
283    /// Select a healthy upstream using round-robin.
284    fn select_upstream(&mut self) -> Result<usize, ProxyError> {
285        let count = self.upstream_health.len();
286        if count == 0 {
287            return Err(ProxyError::NoHealthyUpstream);
288        }
289
290        for _ in 0..count {
291            let idx = (self.rr_counter as usize) % count;
292            self.rr_counter += 1;
293            if self.upstream_health[idx].healthy {
294                return Ok(idx);
295            }
296        }
297        Err(ProxyError::NoHealthyUpstream)
298    }
299
300    /// Handle a TCP connection by routing to an upstream.
301    pub fn handle_tcp(&mut self, payload: &[u8], current_tick: u64) -> Result<Vec<u8>, ProxyError> {
302        let upstream_idx = self.select_upstream()?;
303        let conn = self.pool.get_connection(upstream_idx, current_tick);
304        conn.request_count += 1;
305        let conn_id = conn.id;
306
307        // Simulate forwarding
308        self.stats.record_success(payload.len() as u64, 0);
309        self.pool.release(conn_id);
310
311        Ok(Vec::new()) // Response would come from actual upstream
312    }
313
314    /// Handle an HTTP request by routing to an upstream.
315    pub fn handle_http(
316        &mut self,
317        _method: &str,
318        _path: &str,
319        payload: &[u8],
320        current_tick: u64,
321    ) -> Result<Vec<u8>, ProxyError> {
322        let upstream_idx = self.select_upstream()?;
323        let conn = self.pool.get_connection(upstream_idx, current_tick);
324        conn.request_count += 1;
325        let conn_id = conn.id;
326
327        self.stats.record_success(payload.len() as u64, 0);
328        self.pool.release(conn_id);
329
330        Ok(Vec::new())
331    }
332
333    /// Run health checks on all upstreams.
334    pub fn health_check(&mut self, current_tick: u64) {
335        for health in &mut self.upstream_health {
336            health.last_check_tick = current_tick;
337            // Simulated: always passes unless manually marked unhealthy
338            if health.healthy {
339                health.consecutive_failures = 0;
340                self.stats
341                    .health_checks_passed
342                    .fetch_add(1, Ordering::Relaxed);
343            } else {
344                health.consecutive_failures += 1;
345                self.stats
346                    .health_checks_failed
347                    .fetch_add(1, Ordering::Relaxed);
348            }
349        }
350    }
351
352    /// Mark an upstream as unhealthy.
353    pub fn mark_unhealthy(&mut self, idx: usize) {
354        if idx < self.upstream_health.len() {
355            self.upstream_health[idx].healthy = false;
356        }
357    }
358
359    /// Mark an upstream as healthy.
360    pub fn mark_healthy(&mut self, idx: usize) {
361        if idx < self.upstream_health.len() {
362            self.upstream_health[idx].healthy = true;
363            self.upstream_health[idx].consecutive_failures = 0;
364        }
365    }
366
367    /// Get the number of healthy upstreams.
368    pub fn healthy_upstream_count(&self) -> usize {
369        self.upstream_health.iter().filter(|h| h.healthy).count()
370    }
371
372    /// Get proxy stats.
373    pub fn stats(&self) -> &ProxyStats {
374        &self.stats
375    }
376
377    /// Get connection pool.
378    pub fn pool(&self) -> &ConnectionPool {
379        &self.pool
380    }
381}
382
383// ---------------------------------------------------------------------------
384// Tests
385// ---------------------------------------------------------------------------
386
387#[cfg(test)]
388mod tests {
389    #[allow(unused_imports)]
390    use alloc::string::ToString;
391
392    use super::*;
393
394    fn make_proxy() -> SidecarProxy {
395        let config = ProxyConfig {
396            listen_port: 15001,
397            upstream_addrs: alloc::vec![
398                String::from("10.0.0.1:8080"),
399                String::from("10.0.0.2:8080"),
400                String::from("10.0.0.3:8080"),
401            ],
402            health_check_interval: 10,
403            connect_timeout: 5,
404            max_retries: 3,
405        };
406        SidecarProxy::new(config)
407    }
408
409    #[test]
410    fn test_proxy_creation() {
411        let proxy = make_proxy();
412        assert_eq!(proxy.healthy_upstream_count(), 3);
413    }
414
415    #[test]
416    fn test_handle_tcp() {
417        let mut proxy = make_proxy();
418        let result = proxy.handle_tcp(&[1, 2, 3], 100);
419        assert!(result.is_ok());
420        assert_eq!(proxy.stats.total_requests.load(Ordering::Relaxed), 1);
421    }
422
423    #[test]
424    fn test_handle_http() {
425        let mut proxy = make_proxy();
426        let result = proxy.handle_http("GET", "/api/v1/pods", &[], 100);
427        assert!(result.is_ok());
428    }
429
430    #[test]
431    fn test_round_robin_selection() {
432        let mut proxy = make_proxy();
433        // Should cycle through upstreams
434        proxy.handle_tcp(&[], 100).unwrap();
435        proxy.handle_tcp(&[], 200).unwrap();
436        proxy.handle_tcp(&[], 300).unwrap();
437        assert_eq!(proxy.stats.total_requests.load(Ordering::Relaxed), 3);
438    }
439
440    #[test]
441    fn test_no_healthy_upstream() {
442        let mut proxy = make_proxy();
443        proxy.mark_unhealthy(0);
444        proxy.mark_unhealthy(1);
445        proxy.mark_unhealthy(2);
446        assert_eq!(
447            proxy.handle_tcp(&[], 100),
448            Err(ProxyError::NoHealthyUpstream)
449        );
450    }
451
452    #[test]
453    fn test_health_check() {
454        let mut proxy = make_proxy();
455        proxy.health_check(100);
456        assert_eq!(proxy.stats.health_checks_passed.load(Ordering::Relaxed), 3);
457    }
458
459    #[test]
460    fn test_mark_unhealthy_healthy() {
461        let mut proxy = make_proxy();
462        proxy.mark_unhealthy(1);
463        assert_eq!(proxy.healthy_upstream_count(), 2);
464        proxy.mark_healthy(1);
465        assert_eq!(proxy.healthy_upstream_count(), 3);
466    }
467
468    #[test]
469    fn test_connection_pool_lifecycle() {
470        let mut pool = ConnectionPool::new(5);
471        let conn = pool.get_connection(0, 100);
472        let conn_id = conn.id;
473        assert_eq!(pool.active_count(), 1);
474        pool.release(conn_id);
475        assert_eq!(pool.active_count(), 0);
476    }
477
478    #[test]
479    fn test_connection_pool_drain() {
480        let mut pool = ConnectionPool::new(5);
481        pool.get_connection(0, 100);
482        pool.get_connection(0, 200);
483        pool.drain_upstream(0);
484        let removed = pool.cleanup_drained();
485        assert_eq!(removed, 2);
486    }
487
488    #[test]
489    fn test_mtls_toggle() {
490        let mut proxy = make_proxy();
491        assert!(!proxy.mtls_enabled);
492        proxy.set_mtls(true);
493        assert!(proxy.mtls_enabled);
494    }
495}