veridian_kernel/services/mesh/
discovery.rs1#![allow(dead_code)]
7
8use alloc::{collections::BTreeMap, string::String, vec::Vec};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum Protocol {
17 Tcp,
19 Udp,
21 Http,
23 Grpc,
25}
26
27#[derive(Debug, Clone)]
29pub struct ServiceEndpoint {
30 pub address: String,
32 pub port: u16,
34 pub protocol: Protocol,
36 pub healthy: bool,
38}
39
40#[derive(Debug, Clone)]
42pub struct ServiceEntry {
43 pub name: String,
45 pub namespace: String,
47 pub endpoints: Vec<ServiceEndpoint>,
49 pub labels: BTreeMap<String, String>,
51 pub cluster_ip: Option<String>,
53 pub service_port: u16,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum DiscoveryError {
64 NotFound(String),
66 AlreadyRegistered(String),
68 NoHealthyEndpoints(String),
70}
71
72type ServiceKey = (String, String);
78
79#[derive(Debug)]
81pub struct ServiceRegistry {
82 services: BTreeMap<ServiceKey, ServiceEntry>,
84 domain: String,
86}
87
88impl Default for ServiceRegistry {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl ServiceRegistry {
95 pub fn new() -> Self {
97 ServiceRegistry {
98 services: BTreeMap::new(),
99 domain: String::from("cluster.local"),
100 }
101 }
102
103 pub fn with_domain(domain: String) -> Self {
105 ServiceRegistry {
106 services: BTreeMap::new(),
107 domain,
108 }
109 }
110
111 pub fn register(&mut self, entry: ServiceEntry) -> Result<(), DiscoveryError> {
113 let key = (entry.namespace.clone(), entry.name.clone());
114 if self.services.contains_key(&key) {
115 return Err(DiscoveryError::AlreadyRegistered(entry.name));
116 }
117 self.services.insert(key, entry);
118 Ok(())
119 }
120
121 pub fn deregister(
123 &mut self,
124 name: &str,
125 namespace: &str,
126 ) -> Result<ServiceEntry, DiscoveryError> {
127 let key = (String::from(namespace), String::from(name));
128 self.services
129 .remove(&key)
130 .ok_or_else(|| DiscoveryError::NotFound(String::from(name)))
131 }
132
133 pub fn lookup(&self, name: &str, namespace: &str) -> Result<&ServiceEntry, DiscoveryError> {
135 let key = (String::from(namespace), String::from(name));
136 self.services
137 .get(&key)
138 .ok_or_else(|| DiscoveryError::NotFound(String::from(name)))
139 }
140
141 pub fn list(&self, namespace_filter: Option<&str>) -> Vec<&ServiceEntry> {
143 self.services
144 .values()
145 .filter(|s| {
146 namespace_filter.is_none() || namespace_filter == Some(s.namespace.as_str())
147 })
148 .collect()
149 }
150
151 pub fn resolve_service_dns(&self, dns_name: &str) -> Result<&ServiceEntry, DiscoveryError> {
155 let parts: Vec<&str> = dns_name.splitn(4, '.').collect();
157 if parts.len() < 2 {
158 return Err(DiscoveryError::NotFound(String::from(dns_name)));
159 }
160
161 let name = parts[0];
162 let namespace = parts[1];
163 self.lookup(name, namespace)
164 }
165
166 pub fn fqdn(&self, name: &str, namespace: &str) -> String {
168 alloc::format!("{}.{}.svc.{}", name, namespace, self.domain)
169 }
170
171 pub fn healthy_endpoints(
173 &self,
174 name: &str,
175 namespace: &str,
176 ) -> Result<Vec<&ServiceEndpoint>, DiscoveryError> {
177 let entry = self.lookup(name, namespace)?;
178 let healthy: Vec<&ServiceEndpoint> = entry.endpoints.iter().filter(|e| e.healthy).collect();
179 if healthy.is_empty() {
180 return Err(DiscoveryError::NoHealthyEndpoints(String::from(name)));
181 }
182 Ok(healthy)
183 }
184
185 pub fn update_endpoint_health(
187 &mut self,
188 name: &str,
189 namespace: &str,
190 address: &str,
191 healthy: bool,
192 ) -> Result<(), DiscoveryError> {
193 let key = (String::from(namespace), String::from(name));
194 let entry = self
195 .services
196 .get_mut(&key)
197 .ok_or_else(|| DiscoveryError::NotFound(String::from(name)))?;
198
199 for ep in &mut entry.endpoints {
200 if ep.address == address {
201 ep.healthy = healthy;
202 return Ok(());
203 }
204 }
205 Ok(())
206 }
207
208 pub fn service_count(&self) -> usize {
210 self.services.len()
211 }
212}
213
214#[cfg(test)]
219mod tests {
220 #[allow(unused_imports)]
221 use alloc::string::ToString;
222 #[allow(unused_imports)]
223 use alloc::vec;
224
225 use super::*;
226
227 fn make_entry(name: &str, namespace: &str) -> ServiceEntry {
228 ServiceEntry {
229 name: String::from(name),
230 namespace: String::from(namespace),
231 endpoints: vec![
232 ServiceEndpoint {
233 address: String::from("10.0.0.1"),
234 port: 8080,
235 protocol: Protocol::Http,
236 healthy: true,
237 },
238 ServiceEndpoint {
239 address: String::from("10.0.0.2"),
240 port: 8080,
241 protocol: Protocol::Http,
242 healthy: true,
243 },
244 ],
245 labels: BTreeMap::new(),
246 cluster_ip: Some(String::from("10.96.0.10")),
247 service_port: 80,
248 }
249 }
250
251 #[test]
252 fn test_register_and_lookup() {
253 let mut registry = ServiceRegistry::new();
254 registry.register(make_entry("nginx", "default")).unwrap();
255 let entry = registry.lookup("nginx", "default").unwrap();
256 assert_eq!(entry.name, "nginx");
257 assert_eq!(entry.endpoints.len(), 2);
258 }
259
260 #[test]
261 fn test_register_duplicate() {
262 let mut registry = ServiceRegistry::new();
263 registry.register(make_entry("nginx", "default")).unwrap();
264 assert!(registry.register(make_entry("nginx", "default")).is_err());
265 }
266
267 #[test]
268 fn test_deregister() {
269 let mut registry = ServiceRegistry::new();
270 registry.register(make_entry("nginx", "default")).unwrap();
271 let entry = registry.deregister("nginx", "default").unwrap();
272 assert_eq!(entry.name, "nginx");
273 assert_eq!(registry.service_count(), 0);
274 }
275
276 #[test]
277 fn test_list_with_filter() {
278 let mut registry = ServiceRegistry::new();
279 registry.register(make_entry("svc1", "default")).unwrap();
280 registry
281 .register(make_entry("svc2", "kube-system"))
282 .unwrap();
283
284 let all = registry.list(None);
285 assert_eq!(all.len(), 2);
286
287 let filtered = registry.list(Some("default"));
288 assert_eq!(filtered.len(), 1);
289 }
290
291 #[test]
292 fn test_dns_resolution() {
293 let mut registry = ServiceRegistry::new();
294 registry.register(make_entry("nginx", "default")).unwrap();
295 let entry = registry
296 .resolve_service_dns("nginx.default.svc.cluster.local")
297 .unwrap();
298 assert_eq!(entry.name, "nginx");
299 }
300
301 #[test]
302 fn test_fqdn() {
303 let registry = ServiceRegistry::new();
304 assert_eq!(
305 registry.fqdn("nginx", "default"),
306 "nginx.default.svc.cluster.local"
307 );
308 }
309
310 #[test]
311 fn test_healthy_endpoints() {
312 let mut registry = ServiceRegistry::new();
313 registry.register(make_entry("nginx", "default")).unwrap();
314 let healthy = registry.healthy_endpoints("nginx", "default").unwrap();
315 assert_eq!(healthy.len(), 2);
316 }
317
318 #[test]
319 fn test_update_endpoint_health() {
320 let mut registry = ServiceRegistry::new();
321 registry.register(make_entry("nginx", "default")).unwrap();
322 registry
323 .update_endpoint_health("nginx", "default", "10.0.0.1", false)
324 .unwrap();
325 let healthy = registry.healthy_endpoints("nginx", "default").unwrap();
326 assert_eq!(healthy.len(), 1);
327 }
328}