veridian_kernel/services/csi/
node.rs1#![allow(dead_code)]
7
8use alloc::{collections::BTreeMap, string::String, vec::Vec};
9
10#[derive(Debug, Clone)]
16pub struct StagedVolume {
17 pub volume_id: u64,
19 pub staging_target: String,
21 pub device_path: String,
23 pub fs_type: String,
25 pub mount_options: Vec<String>,
27}
28
29#[derive(Debug, Clone)]
31pub struct PublishedVolume {
32 pub volume_id: u64,
34 pub target_path: String,
36 pub read_only: bool,
38 pub staging_target: String,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum NodeError {
49 VolumeNotFound(u64),
51 NotStaged(u64),
53 AlreadyStaged(u64),
55 NotPublished { volume_id: u64, target: String },
57 AlreadyPublished { volume_id: u64, target: String },
59 MountFailed(String),
61}
62
63#[derive(Debug)]
69pub struct NodeService {
70 node_id: String,
72 staged: BTreeMap<u64, StagedVolume>,
74 published: BTreeMap<(u64, String), PublishedVolume>,
76 max_volumes: usize,
78}
79
80impl NodeService {
81 pub const DEFAULT_MAX_VOLUMES: usize = 128;
83
84 pub fn new(node_id: String) -> Self {
86 NodeService {
87 node_id,
88 staged: BTreeMap::new(),
89 published: BTreeMap::new(),
90 max_volumes: Self::DEFAULT_MAX_VOLUMES,
91 }
92 }
93
94 pub fn node_id(&self) -> &str {
96 &self.node_id
97 }
98
99 pub fn node_stage_volume(
101 &mut self,
102 volume_id: u64,
103 staging_target: String,
104 device_path: String,
105 fs_type: String,
106 mount_options: Vec<String>,
107 ) -> Result<(), NodeError> {
108 if self.staged.contains_key(&volume_id) {
109 return Err(NodeError::AlreadyStaged(volume_id));
110 }
111
112 let staged = StagedVolume {
113 volume_id,
114 staging_target,
115 device_path,
116 fs_type,
117 mount_options,
118 };
119 self.staged.insert(volume_id, staged);
120 Ok(())
121 }
122
123 pub fn node_unstage_volume(&mut self, volume_id: u64) -> Result<(), NodeError> {
125 let has_publishes = self.published.keys().any(|(vid, _)| *vid == volume_id);
127 if has_publishes {
128 return Err(NodeError::MountFailed(String::from(
129 "volume still has active publishes",
130 )));
131 }
132
133 self.staged
134 .remove(&volume_id)
135 .map(|_| ())
136 .ok_or(NodeError::NotStaged(volume_id))
137 }
138
139 pub fn node_publish_volume(
141 &mut self,
142 volume_id: u64,
143 target_path: String,
144 read_only: bool,
145 ) -> Result<(), NodeError> {
146 let staged = self
148 .staged
149 .get(&volume_id)
150 .ok_or(NodeError::NotStaged(volume_id))?;
151
152 let key = (volume_id, target_path.clone());
153 if self.published.contains_key(&key) {
154 return Err(NodeError::AlreadyPublished {
155 volume_id,
156 target: target_path,
157 });
158 }
159
160 let published = PublishedVolume {
161 volume_id,
162 target_path,
163 read_only,
164 staging_target: staged.staging_target.clone(),
165 };
166 self.published.insert(key, published);
167 Ok(())
168 }
169
170 pub fn node_unpublish_volume(
172 &mut self,
173 volume_id: u64,
174 target_path: String,
175 ) -> Result<(), NodeError> {
176 let key = (volume_id, target_path.clone());
177 self.published
178 .remove(&key)
179 .map(|_| ())
180 .ok_or(NodeError::NotPublished {
181 volume_id,
182 target: target_path,
183 })
184 }
185
186 pub fn list_staged(&self) -> Vec<&StagedVolume> {
188 self.staged.values().collect()
189 }
190
191 pub fn list_published(&self) -> Vec<&PublishedVolume> {
193 self.published.values().collect()
194 }
195
196 pub fn get_staged(&self, volume_id: u64) -> Option<&StagedVolume> {
198 self.staged.get(&volume_id)
199 }
200
201 pub fn staged_count(&self) -> usize {
203 self.staged.len()
204 }
205
206 pub fn published_count(&self) -> usize {
208 self.published.len()
209 }
210
211 pub fn can_accept_volume(&self) -> bool {
213 self.staged.len() < self.max_volumes
214 }
215}
216
217#[cfg(test)]
222mod tests {
223 #[allow(unused_imports)]
224 use alloc::string::ToString;
225 #[allow(unused_imports)]
226 use alloc::vec;
227
228 use super::*;
229
230 fn make_service() -> NodeService {
231 NodeService::new(String::from("node-1"))
232 }
233
234 #[test]
235 fn test_stage_volume() {
236 let mut svc = make_service();
237 svc.node_stage_volume(
238 1,
239 String::from("/staging/vol-1"),
240 String::from("/dev/vdb"),
241 String::from("ext4"),
242 Vec::new(),
243 )
244 .unwrap();
245 assert_eq!(svc.staged_count(), 1);
246 let staged = svc.get_staged(1).unwrap();
247 assert_eq!(staged.device_path, "/dev/vdb");
248 }
249
250 #[test]
251 fn test_double_stage() {
252 let mut svc = make_service();
253 svc.node_stage_volume(
254 1,
255 String::from("/s/v1"),
256 String::from("/dev/vdb"),
257 String::from("ext4"),
258 Vec::new(),
259 )
260 .unwrap();
261 assert_eq!(
262 svc.node_stage_volume(
263 1,
264 String::from("/s/v1"),
265 String::from("/dev/vdb"),
266 String::from("ext4"),
267 Vec::new()
268 ),
269 Err(NodeError::AlreadyStaged(1))
270 );
271 }
272
273 #[test]
274 fn test_unstage_volume() {
275 let mut svc = make_service();
276 svc.node_stage_volume(
277 1,
278 String::from("/s/v1"),
279 String::from("/dev/vdb"),
280 String::from("ext4"),
281 Vec::new(),
282 )
283 .unwrap();
284 svc.node_unstage_volume(1).unwrap();
285 assert_eq!(svc.staged_count(), 0);
286 }
287
288 #[test]
289 fn test_unstage_not_staged() {
290 let mut svc = make_service();
291 assert_eq!(svc.node_unstage_volume(999), Err(NodeError::NotStaged(999)));
292 }
293
294 #[test]
295 fn test_publish_volume() {
296 let mut svc = make_service();
297 svc.node_stage_volume(
298 1,
299 String::from("/s/v1"),
300 String::from("/dev/vdb"),
301 String::from("ext4"),
302 Vec::new(),
303 )
304 .unwrap();
305 svc.node_publish_volume(1, String::from("/mnt/data"), false)
306 .unwrap();
307 assert_eq!(svc.published_count(), 1);
308 }
309
310 #[test]
311 fn test_publish_not_staged() {
312 let mut svc = make_service();
313 assert_eq!(
314 svc.node_publish_volume(999, String::from("/mnt"), false),
315 Err(NodeError::NotStaged(999))
316 );
317 }
318
319 #[test]
320 fn test_unpublish_volume() {
321 let mut svc = make_service();
322 svc.node_stage_volume(
323 1,
324 String::from("/s/v1"),
325 String::from("/dev/vdb"),
326 String::from("ext4"),
327 Vec::new(),
328 )
329 .unwrap();
330 svc.node_publish_volume(1, String::from("/mnt/data"), false)
331 .unwrap();
332 svc.node_unpublish_volume(1, String::from("/mnt/data"))
333 .unwrap();
334 assert_eq!(svc.published_count(), 0);
335 }
336
337 #[test]
338 fn test_unstage_with_active_publish() {
339 let mut svc = make_service();
340 svc.node_stage_volume(
341 1,
342 String::from("/s/v1"),
343 String::from("/dev/vdb"),
344 String::from("ext4"),
345 Vec::new(),
346 )
347 .unwrap();
348 svc.node_publish_volume(1, String::from("/mnt/data"), false)
349 .unwrap();
350 assert!(svc.node_unstage_volume(1).is_err());
351 }
352}