1use crate::{
2 DirEntry, FileType, FsError, Metadata, OpenOptions, OpenOptionsConfig, ReadDir, Result,
3 VirtualFile,
4};
5use bytes::{Buf, Bytes};
6use futures::future::BoxFuture;
7#[cfg(feature = "enable-serde")]
8use serde::{Deserialize, Serialize, de};
9use std::convert::TryInto;
10use std::fs;
11use std::io::{self, Seek};
12use std::path::{Component, Path, PathBuf};
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::fs as tfs;
18use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
19use tokio::runtime::Handle;
20
21#[derive(Debug, Clone)]
22#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
23pub struct FileSystem {
24 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
25 handle: Handle,
26 root: PathBuf,
27}
28
29#[allow(dead_code)]
30fn default_handle() -> Handle {
31 Handle::current()
32}
33
34pub fn canonicalize(path: &Path) -> Result<PathBuf> {
35 if !path.exists() {
36 return Err(FsError::InvalidInput);
37 }
38 dunce::canonicalize(path).map_err(Into::into)
39}
40
41pub fn normalize_path(path: &Path) -> PathBuf {
44 let mut components = path.components().peekable();
45 let mut ret = if let Some(c @ Component::Prefix(..)) = components.peek().cloned() {
46 components.next();
47 PathBuf::from(c.as_os_str())
48 } else {
49 PathBuf::new()
50 };
51
52 for component in components {
53 match component {
54 Component::Prefix(..) => unreachable!(),
55 Component::RootDir => {
56 ret.push(component.as_os_str());
57 }
58 Component::CurDir => {}
59 Component::ParentDir => {
60 ret.pop();
61 }
62 Component::Normal(c) => {
63 ret.push(c);
64 }
65 }
66 }
67 ret
68}
69
70impl FileSystem {
71 pub fn new(handle: Handle, root: impl Into<PathBuf>) -> Result<Self> {
72 let root = canonicalize(&root.into())?;
73
74 Ok(FileSystem { handle, root })
75 }
76}
77
78impl FileSystem {
79 fn prepare_path(&self, path: &Path) -> PathBuf {
80 let path = normalize_path(path);
81
82 let path = if !path.starts_with(&self.root) {
83 let path = path.strip_prefix("/").unwrap_or(&path);
84
85 self.root.join(path)
86 } else {
87 path.to_owned()
88 };
89
90 debug_assert!(path.starts_with(&self.root));
91 path
92 }
93}
94
95impl crate::FileSystem for FileSystem {
96 fn readlink(&self, path: &Path) -> Result<PathBuf> {
97 let path = self.prepare_path(path);
98
99 fs::read_link(path).map_err(Into::into)
100 }
101
102 fn read_dir(&self, path: &Path) -> Result<ReadDir> {
103 let path = self.prepare_path(path);
104
105 let read_dir = fs::read_dir(path)?;
106 let mut data = read_dir
107 .map(|entry| {
108 let entry = entry?;
109
110 let path = entry
111 .path()
112 .strip_prefix(&self.root)
113 .map_err(|_| FsError::InvalidData)?
114 .to_owned();
115 let path = Path::new("/").join(path);
116
117 let metadata = entry.metadata()?;
118
119 Ok(DirEntry {
120 path,
121 metadata: Ok(metadata.try_into()?),
122 })
123 })
124 .collect::<std::result::Result<Vec<DirEntry>, io::Error>>()
125 .map_err::<FsError, _>(Into::into)?;
126 data.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
127 Ok(ReadDir::new(data))
128 }
129
130 fn create_dir(&self, path: &Path) -> Result<()> {
131 let path = self.prepare_path(path);
132
133 if path.parent().is_none() {
134 return Err(FsError::BaseNotDirectory);
135 }
136
137 fs::create_dir(path).map_err(Into::into)
138 }
139
140 fn remove_dir(&self, path: &Path) -> Result<()> {
141 let path = self.prepare_path(path);
142
143 if path.parent().is_none() {
144 return Err(FsError::BaseNotDirectory);
145 }
146
147 if path.is_dir() && self.read_dir(&path).map(|s| !s.is_empty()).unwrap_or(false) {
150 return Err(FsError::DirectoryNotEmpty);
151 }
152 fs::remove_dir(path).map_err(Into::into)
153 }
154
155 fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> BoxFuture<'a, Result<()>> {
156 Box::pin(async move {
157 use filetime::{FileTime, set_file_mtime};
158 let norm_from = normalize_path(from);
159 let norm_to = normalize_path(to);
160
161 if norm_from.parent().is_none() {
162 return Err(FsError::BaseNotDirectory);
163 }
164 if norm_to.parent().is_none() {
165 return Err(FsError::BaseNotDirectory);
166 }
167
168 let from = self.prepare_path(from);
169 let to = self.prepare_path(to);
170
171 if !from.exists() {
172 return Err(FsError::EntryNotFound);
173 }
174 let from_parent = from.parent().unwrap();
175 let to_parent = to.parent().unwrap();
176 if !from_parent.exists() {
177 return Err(FsError::EntryNotFound);
178 }
179 if !to_parent.exists() {
180 return Err(FsError::EntryNotFound);
181 }
182 let result = if from_parent != to_parent {
183 let _ = std::fs::create_dir_all(to_parent);
184 if from.is_dir() {
185 fs_extra::move_items(
186 &[&from],
187 &to,
188 &fs_extra::dir::CopyOptions {
189 copy_inside: true,
190 ..Default::default()
191 },
192 )
193 .map(|_| ())
194 .map_err(|_| FsError::UnknownError)?;
195 let _ = fs_extra::remove_items(&[&from]);
196 Ok(())
197 } else {
198 fs::copy(&from, &to).map(|_| ()).map_err(FsError::from)?;
199 fs::remove_file(&from).map(|_| ()).map_err(Into::into)
200 }
201 } else {
202 fs::rename(&from, &to).map_err(Into::into)
203 };
204 let _ = set_file_mtime(&to, FileTime::now()).map(|_| ());
205 result
206 })
207 }
208
209 fn remove_file(&self, path: &Path) -> Result<()> {
210 let path = self.prepare_path(path);
211
212 if path.parent().is_none() {
213 return Err(FsError::BaseNotDirectory);
214 }
215
216 fs::remove_file(path).map_err(Into::into)
217 }
218
219 fn new_open_options(&self) -> OpenOptions<'_> {
220 OpenOptions::new(self)
221 }
222
223 fn metadata(&self, path: &Path) -> Result<Metadata> {
224 let path = self.prepare_path(path);
225
226 fs::metadata(path)
227 .and_then(TryInto::try_into)
228 .map_err(Into::into)
229 }
230
231 fn symlink_metadata(&self, path: &Path) -> Result<Metadata> {
232 let path = self.prepare_path(path);
233
234 fs::symlink_metadata(path)
235 .and_then(TryInto::try_into)
236 .map_err(Into::into)
237 }
238
239 fn mount(
240 &self,
241 _name: String,
242 _path: &Path,
243 _fs: Box<dyn crate::FileSystem + Send + Sync>,
244 ) -> Result<()> {
245 Err(FsError::Unsupported)
246 }
247}
248
249impl TryInto<Metadata> for std::fs::Metadata {
250 type Error = io::Error;
251
252 fn try_into(self) -> std::result::Result<Metadata, Self::Error> {
253 let filetype = self.file_type();
254 let (char_device, block_device, socket, fifo) = {
255 #[cfg(unix)]
256 {
257 use std::os::unix::fs::FileTypeExt;
258 (
259 filetype.is_char_device(),
260 filetype.is_block_device(),
261 filetype.is_socket(),
262 filetype.is_fifo(),
263 )
264 }
265 #[cfg(not(unix))]
266 {
267 (false, false, false, false)
268 }
269 };
270
271 Ok(Metadata {
272 ft: FileType {
273 dir: filetype.is_dir(),
274 file: filetype.is_file(),
275 symlink: filetype.is_symlink(),
276 char_device,
277 block_device,
278 socket,
279 fifo,
280 },
281 accessed: self
282 .accessed()
283 .and_then(|time| time.duration_since(UNIX_EPOCH).map_err(io::Error::other))
284 .map_or(0, |time| time.as_nanos() as u64),
285 created: self
286 .created()
287 .and_then(|time| time.duration_since(UNIX_EPOCH).map_err(io::Error::other))
288 .map_or(0, |time| time.as_nanos() as u64),
289 modified: self
290 .modified()
291 .and_then(|time| time.duration_since(UNIX_EPOCH).map_err(io::Error::other))
292 .map_or(0, |time| time.as_nanos() as u64),
293 len: self.len(),
294 })
295 }
296}
297
298impl crate::FileOpener for FileSystem {
299 fn open(
300 &self,
301 path: &Path,
302 conf: &OpenOptionsConfig,
303 ) -> Result<Box<dyn VirtualFile + Send + Sync + 'static>> {
304 let path = self.prepare_path(path);
305
306 let read = conf.read();
308 let write = conf.write();
309
310 let append = if conf.truncate { false } else { conf.append() };
316
317 let mut oo = fs::OpenOptions::new();
318 oo.read(conf.read())
319 .write(conf.write())
320 .create_new(conf.create_new())
321 .create(conf.create())
322 .append(append)
323 .truncate(conf.truncate())
324 .open(&path)
325 .map_err(Into::into)
326 .map(|file| {
327 Box::new(File::new(
328 self.handle.clone(),
329 file,
330 path.to_owned(),
331 read,
332 write,
333 append,
334 )) as Box<dyn VirtualFile + Send + Sync + 'static>
335 })
336 }
337}
338
339#[derive(Debug)]
341#[cfg_attr(feature = "enable-serde", derive(Serialize))]
342pub struct File {
343 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
344 handle: Handle,
345 #[cfg_attr(feature = "enable-serde", serde(skip))]
346 inner: tfs::File,
347 #[cfg_attr(feature = "enable-serde", serde(skip_serializing))]
348 inner_std: fs::File,
349 pub host_path: PathBuf,
350 #[cfg(feature = "enable-serde")]
351 flags: u16,
352}
353
354#[cfg(feature = "enable-serde")]
355impl<'de> Deserialize<'de> for File {
356 fn deserialize<D>(deserializer: D) -> std::result::Result<File, D::Error>
357 where
358 D: serde::Deserializer<'de>,
359 {
360 #[derive(Deserialize)]
361 #[serde(field_identifier, rename_all = "snake_case")]
362 enum Field {
363 HostPath,
364 Flags,
365 }
366
367 struct FileVisitor;
368
369 impl<'de> de::Visitor<'de> for FileVisitor {
370 type Value = File;
371
372 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
373 formatter.write_str("struct File")
374 }
375
376 fn visit_seq<V>(self, mut seq: V) -> std::result::Result<Self::Value, V::Error>
377 where
378 V: de::SeqAccess<'de>,
379 {
380 let host_path = seq
381 .next_element()?
382 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
383 let flags = seq
384 .next_element()?
385 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
386 let inner = fs::OpenOptions::new()
387 .read(flags & File::READ != 0)
388 .write(flags & File::WRITE != 0)
389 .append(flags & File::APPEND != 0)
390 .open(&host_path)
391 .map_err(|_| de::Error::custom("Could not open file on this system"))?;
392 Ok(File {
393 handle: Handle::current(),
394 inner: tokio::fs::File::from_std(inner.try_clone().unwrap()),
395 inner_std: inner,
396 host_path,
397 flags,
398 })
399 }
400
401 fn visit_map<V>(self, mut map: V) -> std::result::Result<Self::Value, V::Error>
402 where
403 V: de::MapAccess<'de>,
404 {
405 let mut host_path = None;
406 let mut flags = None;
407 while let Some(key) = map.next_key()? {
408 match key {
409 Field::HostPath => {
410 if host_path.is_some() {
411 return Err(de::Error::duplicate_field("host_path"));
412 }
413 host_path = Some(map.next_value()?);
414 }
415 Field::Flags => {
416 if flags.is_some() {
417 return Err(de::Error::duplicate_field("flags"));
418 }
419 flags = Some(map.next_value()?);
420 }
421 }
422 }
423 let host_path = host_path.ok_or_else(|| de::Error::missing_field("host_path"))?;
424 let flags = flags.ok_or_else(|| de::Error::missing_field("flags"))?;
425 let inner = fs::OpenOptions::new()
426 .read(flags & File::READ != 0)
427 .write(flags & File::WRITE != 0)
428 .append(flags & File::APPEND != 0)
429 .open(&host_path)
430 .map_err(|_| de::Error::custom("Could not open file on this system"))?;
431 Ok(File {
432 handle: Handle::current(),
433 inner: tokio::fs::File::from_std(inner.try_clone().unwrap()),
434 inner_std: inner,
435 host_path,
436 flags,
437 })
438 }
439 }
440
441 const FIELDS: &[&str] = &["host_path", "flags"];
442 deserializer.deserialize_struct("File", FIELDS, FileVisitor)
443 }
444}
445
446impl File {
447 const READ: u16 = 1;
448 const WRITE: u16 = 2;
449 const APPEND: u16 = 4;
450
451 pub fn new(
453 handle: Handle,
454 file: fs::File,
455 host_path: PathBuf,
456 read: bool,
457 write: bool,
458 append: bool,
459 ) -> Self {
460 let mut _flags = 0;
461
462 if read {
463 _flags |= Self::READ;
464 }
465
466 if write {
467 _flags |= Self::WRITE;
468 }
469
470 if append {
471 _flags |= Self::APPEND;
472 }
473
474 let async_file = tfs::File::from_std(file.try_clone().unwrap());
475 Self {
476 handle,
477 inner_std: file,
478 inner: async_file,
479 host_path,
480 #[cfg(feature = "enable-serde")]
481 flags: _flags,
482 }
483 }
484
485 fn metadata(&self) -> std::fs::Metadata {
486 self.inner_std.metadata().unwrap()
488 }
489}
490
491#[async_trait::async_trait]
493impl VirtualFile for File {
494 fn last_accessed(&self) -> u64 {
495 self.metadata()
496 .accessed()
497 .ok()
498 .and_then(|ct| ct.duration_since(SystemTime::UNIX_EPOCH).ok())
499 .map(|ct| ct.as_nanos() as u64)
500 .unwrap_or(0)
501 }
502
503 fn last_modified(&self) -> u64 {
504 self.metadata()
505 .modified()
506 .ok()
507 .and_then(|ct| ct.duration_since(SystemTime::UNIX_EPOCH).ok())
508 .map(|ct| ct.as_nanos() as u64)
509 .unwrap_or(0)
510 }
511
512 fn created_time(&self) -> u64 {
513 self.metadata()
514 .created()
515 .ok()
516 .and_then(|ct| ct.duration_since(SystemTime::UNIX_EPOCH).ok())
517 .map(|ct| ct.as_nanos() as u64)
518 .unwrap_or(0)
519 }
520
521 fn set_times(&mut self, atime: Option<u64>, mtime: Option<u64>) -> crate::Result<()> {
522 let atime = atime.map(|t| filetime::FileTime::from_unix_time(t as i64, 0));
523 let mtime = mtime.map(|t| filetime::FileTime::from_unix_time(t as i64, 0));
524
525 filetime::set_file_handle_times(&self.inner_std, atime, mtime)
526 .map_err(|_| crate::FsError::IOError)
527 }
528
529 fn size(&self) -> u64 {
530 self.metadata().len()
531 }
532
533 fn set_len(&mut self, new_size: u64) -> crate::Result<()> {
534 fs::File::set_len(&self.inner_std, new_size).map_err(Into::into)
535 }
536
537 fn unlink(&mut self) -> Result<()> {
538 fs::remove_file(&self.host_path).map_err(Into::into)
539 }
540
541 fn get_special_fd(&self) -> Option<u32> {
542 None
543 }
544
545 fn poll_read_ready(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
546 let cursor = match self.inner_std.stream_position() {
547 Ok(a) => a,
548 Err(err) => return Poll::Ready(Err(err)),
549 };
550 let end = match self.inner_std.seek(io::SeekFrom::End(0)) {
551 Ok(a) => a,
552 Err(err) => return Poll::Ready(Err(err)),
553 };
554 let _ = self.inner_std.seek(io::SeekFrom::Start(cursor));
555
556 let remaining = end - cursor;
557 Poll::Ready(Ok(remaining as usize))
558 }
559
560 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
561 Poll::Ready(Ok(8192))
562 }
563}
564
565impl AsyncRead for File {
566 fn poll_read(
567 mut self: Pin<&mut Self>,
568 cx: &mut Context<'_>,
569 buf: &mut tokio::io::ReadBuf<'_>,
570 ) -> Poll<io::Result<()>> {
571 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
572 let inner = Pin::new(&mut self.inner);
573 inner.poll_read(cx, buf)
574 }
575}
576
577impl AsyncWrite for File {
578 fn poll_write(
579 mut self: Pin<&mut Self>,
580 cx: &mut Context<'_>,
581 buf: &[u8],
582 ) -> Poll<io::Result<usize>> {
583 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
584 let inner = Pin::new(&mut self.inner);
585 inner.poll_write(cx, buf)
586 }
587
588 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
589 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
590 let inner = Pin::new(&mut self.inner);
591 inner.poll_flush(cx)
592 }
593
594 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
595 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
596 let inner = Pin::new(&mut self.inner);
597 inner.poll_shutdown(cx)
598 }
599
600 fn poll_write_vectored(
601 mut self: Pin<&mut Self>,
602 cx: &mut Context<'_>,
603 bufs: &[io::IoSlice<'_>],
604 ) -> Poll<io::Result<usize>> {
605 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
606 let inner = Pin::new(&mut self.inner);
607 inner.poll_write_vectored(cx, bufs)
608 }
609
610 fn is_write_vectored(&self) -> bool {
611 self.inner.is_write_vectored()
612 }
613}
614
615impl AsyncSeek for File {
616 fn start_seek(mut self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> {
617 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
618 let inner = Pin::new(&mut self.inner);
619 inner.start_seek(position)
620 }
621
622 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
623 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
624 let inner = Pin::new(&mut self.inner);
625 inner.poll_complete(cx)
626 }
627}
628
629impl Drop for File {
630 fn drop(&mut self) {
631 tracing::trace!(?self.host_path, "Closing host file");
632 }
633}
634
635#[derive(Debug)]
637#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
638pub struct Stdout {
639 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
640 handle: Handle,
641 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_stdout"))]
642 inner: tokio::io::Stdout,
643}
644#[allow(dead_code)]
645fn default_stdout() -> tokio::io::Stdout {
646 tokio::io::stdout()
647}
648impl Default for Stdout {
649 fn default() -> Self {
650 Self {
651 handle: Handle::current(),
652 inner: tokio::io::stdout(),
653 }
654 }
655}
656
657const DEFAULT_BUF_SIZE_HINT: usize = 8 * 1024;
665
666#[async_trait::async_trait]
668impl VirtualFile for Stdout {
669 fn last_accessed(&self) -> u64 {
670 0
671 }
672
673 fn last_modified(&self) -> u64 {
674 0
675 }
676
677 fn created_time(&self) -> u64 {
678 0
679 }
680
681 fn size(&self) -> u64 {
682 0
683 }
684
685 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
686 Ok(())
687 }
688
689 fn unlink(&mut self) -> Result<()> {
690 Ok(())
691 }
692
693 fn get_special_fd(&self) -> Option<u32> {
694 Some(1)
695 }
696
697 fn poll_read_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
698 Poll::Ready(Ok(0))
699 }
700
701 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
702 Poll::Ready(Ok(DEFAULT_BUF_SIZE_HINT))
703 }
704}
705
706impl AsyncRead for Stdout {
707 fn poll_read(
708 self: Pin<&mut Self>,
709 _cx: &mut Context<'_>,
710 _buf: &mut tokio::io::ReadBuf<'_>,
711 ) -> Poll<io::Result<()>> {
712 Poll::Ready(Err(io::Error::other("can not read from stdout")))
713 }
714}
715
716impl AsyncWrite for Stdout {
717 fn poll_write(
718 mut self: Pin<&mut Self>,
719 cx: &mut Context<'_>,
720 buf: &[u8],
721 ) -> Poll<io::Result<usize>> {
722 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
723 let inner = Pin::new(&mut self.inner);
724 inner.poll_write(cx, buf)
725 }
726
727 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
728 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
729 let inner = Pin::new(&mut self.inner);
730 inner.poll_flush(cx)
731 }
732
733 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
734 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
735 let inner = Pin::new(&mut self.inner);
736 inner.poll_shutdown(cx)
737 }
738
739 fn poll_write_vectored(
740 mut self: Pin<&mut Self>,
741 cx: &mut Context<'_>,
742 bufs: &[io::IoSlice<'_>],
743 ) -> Poll<io::Result<usize>> {
744 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
745 let inner = Pin::new(&mut self.inner);
746 inner.poll_write_vectored(cx, bufs)
747 }
748
749 fn is_write_vectored(&self) -> bool {
750 self.inner.is_write_vectored()
751 }
752}
753
754impl AsyncSeek for Stdout {
755 fn start_seek(self: Pin<&mut Self>, _position: io::SeekFrom) -> io::Result<()> {
756 Err(io::Error::other("can not seek stdout"))
757 }
758
759 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
760 Poll::Ready(Err(io::Error::other("can not seek stdout")))
761 }
762}
763
764#[derive(Debug)]
766#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
767pub struct Stderr {
768 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
769 handle: Handle,
770 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_stderr"))]
771 inner: tokio::io::Stderr,
772}
773#[allow(dead_code)]
774fn default_stderr() -> tokio::io::Stderr {
775 tokio::io::stderr()
776}
777impl Default for Stderr {
778 fn default() -> Self {
779 Self {
780 handle: Handle::current(),
781 inner: tokio::io::stderr(),
782 }
783 }
784}
785
786impl AsyncRead for Stderr {
787 fn poll_read(
788 self: Pin<&mut Self>,
789 _cx: &mut Context<'_>,
790 _buf: &mut tokio::io::ReadBuf<'_>,
791 ) -> Poll<io::Result<()>> {
792 Poll::Ready(Err(io::Error::other("can not read from stderr")))
793 }
794}
795
796impl AsyncWrite for Stderr {
797 fn poll_write(
798 mut self: Pin<&mut Self>,
799 cx: &mut Context<'_>,
800 buf: &[u8],
801 ) -> Poll<io::Result<usize>> {
802 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
803 let inner = Pin::new(&mut self.inner);
804 inner.poll_write(cx, buf)
805 }
806
807 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
808 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
809 let inner = Pin::new(&mut self.inner);
810 inner.poll_flush(cx)
811 }
812
813 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
814 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
815 let inner = Pin::new(&mut self.inner);
816 inner.poll_shutdown(cx)
817 }
818
819 fn poll_write_vectored(
820 mut self: Pin<&mut Self>,
821 cx: &mut Context<'_>,
822 bufs: &[io::IoSlice<'_>],
823 ) -> Poll<io::Result<usize>> {
824 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
825 let inner = Pin::new(&mut self.inner);
826 inner.poll_write_vectored(cx, bufs)
827 }
828
829 fn is_write_vectored(&self) -> bool {
830 self.inner.is_write_vectored()
831 }
832}
833
834impl AsyncSeek for Stderr {
835 fn start_seek(self: Pin<&mut Self>, _position: io::SeekFrom) -> io::Result<()> {
836 Err(io::Error::other("can not seek stderr"))
837 }
838
839 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
840 Poll::Ready(Err(io::Error::other("can not seek stderr")))
841 }
842}
843
844#[async_trait::async_trait]
846impl VirtualFile for Stderr {
847 fn last_accessed(&self) -> u64 {
848 0
849 }
850
851 fn last_modified(&self) -> u64 {
852 0
853 }
854
855 fn created_time(&self) -> u64 {
856 0
857 }
858
859 fn size(&self) -> u64 {
860 0
861 }
862
863 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
864 Ok(())
865 }
866
867 fn unlink(&mut self) -> Result<()> {
868 Ok(())
869 }
870
871 fn get_special_fd(&self) -> Option<u32> {
872 Some(2)
873 }
874
875 fn poll_read_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
876 Poll::Ready(Ok(0))
877 }
878
879 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
880 Poll::Ready(Ok(8192))
881 }
882}
883
884#[derive(Debug)]
886#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
887pub struct Stdin {
888 read_buffer: Arc<std::sync::Mutex<Option<Bytes>>>,
889 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
890 handle: Handle,
891 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_stdin"))]
892 inner: tokio::io::Stdin,
893}
894#[allow(dead_code)]
895fn default_stdin() -> tokio::io::Stdin {
896 tokio::io::stdin()
897}
898impl Default for Stdin {
899 fn default() -> Self {
900 Self {
901 handle: Handle::current(),
902 read_buffer: Arc::new(std::sync::Mutex::new(None)),
903 inner: tokio::io::stdin(),
904 }
905 }
906}
907
908impl AsyncRead for Stdin {
909 fn poll_read(
910 mut self: Pin<&mut Self>,
911 cx: &mut Context<'_>,
912 buf: &mut tokio::io::ReadBuf<'_>,
913 ) -> Poll<io::Result<()>> {
914 let max_size = buf.remaining();
915 {
916 let mut read_buffer = self.read_buffer.lock().unwrap();
917 if let Some(read_buffer) = read_buffer.as_mut() {
918 let buf_len = read_buffer.len();
919 if buf_len > 0 {
920 let read = buf_len.min(max_size);
921 buf.put_slice(&read_buffer[..read]);
922 read_buffer.advance(read);
923 return Poll::Ready(Ok(()));
924 }
925 }
926 }
927
928 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
929 let inner = Pin::new(&mut self.inner);
930 inner.poll_read(cx, buf)
931 }
932}
933
934impl AsyncWrite for Stdin {
935 fn poll_write(
936 self: Pin<&mut Self>,
937 _cx: &mut Context<'_>,
938 _buf: &[u8],
939 ) -> Poll<io::Result<usize>> {
940 Poll::Ready(Err(io::Error::other("can not wrote to stdin")))
941 }
942
943 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
944 Poll::Ready(Err(io::Error::other("can not flush stdin")))
945 }
946
947 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
948 Poll::Ready(Err(io::Error::other("can not wrote to stdin")))
949 }
950
951 fn poll_write_vectored(
952 self: Pin<&mut Self>,
953 _cx: &mut Context<'_>,
954 _bufs: &[io::IoSlice<'_>],
955 ) -> Poll<io::Result<usize>> {
956 Poll::Ready(Err(io::Error::other("can not wrote to stdin")))
957 }
958}
959
960impl AsyncSeek for Stdin {
961 fn start_seek(self: Pin<&mut Self>, _position: io::SeekFrom) -> io::Result<()> {
962 Err(io::Error::other("can not seek stdin"))
963 }
964
965 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
966 Poll::Ready(Err(io::Error::other("can not seek stdin")))
967 }
968}
969
970#[async_trait::async_trait]
972impl VirtualFile for Stdin {
973 fn last_accessed(&self) -> u64 {
974 0
975 }
976 fn last_modified(&self) -> u64 {
977 0
978 }
979 fn created_time(&self) -> u64 {
980 0
981 }
982 fn size(&self) -> u64 {
983 0
984 }
985 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
986 Ok(())
987 }
988 fn unlink(&mut self) -> Result<()> {
989 Ok(())
990 }
991 fn get_special_fd(&self) -> Option<u32> {
992 Some(0)
993 }
994 fn poll_read_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
995 {
996 let read_buffer = self.read_buffer.lock().unwrap();
997 if let Some(read_buffer) = read_buffer.as_ref() {
998 let buf_len = read_buffer.len();
999 if buf_len > 0 {
1000 return Poll::Ready(Ok(buf_len));
1001 }
1002 }
1003 }
1004
1005 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
1006 let inner = Pin::new(&mut self.inner);
1007
1008 let mut buf = [0u8; 8192];
1009 let mut read_buf = ReadBuf::new(&mut buf[..]);
1010 match inner.poll_read(cx, &mut read_buf) {
1011 Poll::Pending => Poll::Pending,
1012 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1013 Poll::Ready(Ok(())) => {
1014 let buf = read_buf.filled();
1015 let buf_len = buf.len();
1016
1017 let mut read_buffer = self.read_buffer.lock().unwrap();
1018 read_buffer.replace(Bytes::from(buf.to_vec()));
1019 Poll::Ready(Ok(buf_len))
1020 }
1021 }
1022 }
1023 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1024 Poll::Ready(Ok(0))
1025 }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use tempfile::TempDir;
1031 use tokio::runtime::Handle;
1032
1033 use super::FileSystem;
1034 use crate::FileSystem as FileSystemTrait;
1035 use crate::FsError;
1036 use std::path::Path;
1037
1038 #[tokio::test]
1039 async fn test_new_filesystem() {
1040 let temp = TempDir::new().unwrap();
1041 std::fs::write(temp.path().join("foo2.txt"), b"").unwrap();
1042
1043 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1044 assert!(
1045 fs.read_dir(Path::new("/")).is_ok(),
1046 "NativeFS can read root"
1047 );
1048 assert!(
1049 fs.new_open_options()
1050 .read(true)
1051 .open(Path::new("/foo2.txt"))
1052 .is_ok(),
1053 "created foo2.txt"
1054 );
1055 }
1056
1057 #[tokio::test]
1058 async fn test_create_dir() {
1059 let temp: TempDir = TempDir::new().unwrap();
1060 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1061
1062 assert_eq!(
1063 fs.create_dir(Path::new("../")),
1064 Err(FsError::AlreadyExists),
1065 "creating a directory out of bounds",
1066 );
1067
1068 assert_eq!(
1069 fs.create_dir(Path::new("/foo")),
1070 Ok(()),
1071 "creating a directory",
1072 );
1073
1074 assert!(
1075 temp.path().join("foo").exists(),
1076 "foo dir exists in host_fs"
1077 );
1078
1079 let cur_dir = read_dir_names(&fs, "/");
1080
1081 if !cur_dir.contains(&"foo".to_string()) {
1082 panic!("cur_dir does not contain foo: {cur_dir:#?}");
1083 }
1084
1085 assert!(
1086 cur_dir.contains(&"foo".to_string()),
1087 "the root is updated and well-defined"
1088 );
1089
1090 assert_eq!(
1091 fs.create_dir(Path::new("foo/bar")),
1092 Ok(()),
1093 "creating a sub-directory",
1094 );
1095
1096 assert!(
1097 temp.path().join("foo").join("bar").exists(),
1098 "foo dir exists in host_fs"
1099 );
1100
1101 let foo_dir = read_dir_names(&fs, Path::new("/foo"));
1102
1103 assert!(
1104 foo_dir.contains(&"bar".to_string()),
1105 "the foo directory is updated and well-defined"
1106 );
1107
1108 let bar_dir = read_dir_names(&fs, Path::new("/foo/bar"));
1109
1110 assert!(
1111 bar_dir.is_empty(),
1112 "the foo directory is updated and well-defined"
1113 );
1114 }
1115
1116 #[tokio::test]
1117 async fn test_remove_dir() {
1118 let temp: TempDir = TempDir::new().unwrap();
1119 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1120
1121 assert_eq!(
1122 fs.remove_dir(Path::new("/foo")),
1123 Err(FsError::EntryNotFound),
1124 "cannot remove a directory that doesn't exist",
1125 );
1126
1127 assert_eq!(
1128 fs.create_dir(Path::new("foo")),
1129 Ok(()),
1130 "creating a directory",
1131 );
1132
1133 assert_eq!(
1134 fs.create_dir(Path::new("foo/bar")),
1135 Ok(()),
1136 "creating a sub-directory",
1137 );
1138
1139 assert!(temp.path().join("foo/bar").exists(), "./foo/bar exists");
1140
1141 assert_eq!(
1142 fs.remove_dir(Path::new("foo")),
1143 Err(FsError::DirectoryNotEmpty),
1144 "removing a directory that has children",
1145 );
1146
1147 assert_eq!(
1148 fs.remove_dir(Path::new("foo/bar")),
1149 Ok(()),
1150 "removing a sub-directory",
1151 );
1152
1153 assert_eq!(
1154 fs.remove_dir(Path::new("foo")),
1155 Ok(()),
1156 "removing a directory",
1157 );
1158
1159 let cur_dir = read_dir_names(&fs, "/");
1160
1161 assert!(
1162 !cur_dir.contains(&"foo".to_string()),
1163 "the foo directory still exists"
1164 );
1165 }
1166
1167 fn read_dir_names(fs: &FileSystem, path: impl AsRef<Path>) -> Vec<String> {
1168 fs.read_dir(path.as_ref())
1169 .unwrap()
1170 .filter_map(|entry| Some(entry.ok()?.file_name().to_str()?.to_string()))
1171 .collect::<Vec<_>>()
1172 }
1173
1174 #[tokio::test]
1175 async fn test_rename() {
1176 let temp: TempDir = TempDir::new().unwrap();
1177 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1178 std::fs::create_dir_all(temp.path().join("foo").join("qux")).unwrap();
1179 let foo = Path::new("foo");
1180 let bar = Path::new("bar");
1181 let foo_realpath = temp.path().join(foo);
1182 let bar_realpath = temp.path().join(bar);
1183
1184 assert_eq!(
1185 fs.rename(Path::new("/"), Path::new("/bar")).await,
1186 Err(FsError::BaseNotDirectory),
1187 "renaming a directory that has no parent",
1188 );
1189 assert_eq!(
1190 fs.rename(Path::new("/foo"), Path::new("/")).await,
1191 Err(FsError::BaseNotDirectory),
1192 "renaming to a directory that has no parent",
1193 );
1194
1195 assert_eq!(
1196 fs.rename(foo, &foo.join("bar").join("baz"),).await,
1197 Err(FsError::EntryNotFound),
1198 "renaming to a directory that has parent that doesn't exist",
1199 );
1200
1201 #[cfg(not(target_os = "windows"))]
1203 assert_eq!(fs.create_dir(bar), Ok(()));
1204
1205 assert_eq!(
1206 fs.rename(foo, bar).await,
1207 Ok(()),
1208 "renaming to a directory that has parent that exists",
1209 );
1210
1211 assert!(
1212 fs.new_open_options()
1213 .write(true)
1214 .create_new(true)
1215 .open(bar.join("hello1.txt"))
1216 .is_ok(),
1217 "creating a new file (`hello1.txt`)",
1218 );
1219 assert!(
1220 fs.new_open_options()
1221 .write(true)
1222 .create_new(true)
1223 .open(bar.join("hello2.txt"))
1224 .is_ok(),
1225 "creating a new file (`hello2.txt`)",
1226 );
1227
1228 let cur_dir = read_dir_names(&fs, Path::new("/"));
1229
1230 assert!(
1231 !cur_dir.contains(&"foo".to_string()),
1232 "the foo directory still exists"
1233 );
1234
1235 assert!(
1236 cur_dir.contains(&"bar".to_string()),
1237 "the bar directory still exists"
1238 );
1239
1240 let bar_dir = read_dir_names(&fs, bar);
1241
1242 if !bar_dir.contains(&"qux".to_string()) {
1243 println!("qux does not exist: {bar_dir:?}")
1244 }
1245
1246 let qux_dir = read_dir_names(&fs, bar.join("qux"));
1247
1248 assert!(qux_dir.is_empty(), "the qux directory is empty");
1249
1250 assert!(
1251 bar_realpath.join("hello1.txt").exists(),
1252 "the /bar/hello1.txt file exists"
1253 );
1254
1255 assert!(
1256 bar_realpath.join("hello2.txt").exists(),
1257 "the /bar/hello2.txt file exists"
1258 );
1259
1260 assert_eq!(fs.create_dir(foo), Ok(()), "create ./foo again");
1261
1262 assert_eq!(
1263 fs.rename(&bar.join("hello2.txt"), &foo.join("world2.txt"))
1264 .await,
1265 Ok(()),
1266 "renaming (and moving) a file",
1267 );
1268
1269 assert_eq!(
1270 fs.rename(foo, &bar.join("baz")).await,
1271 Ok(()),
1272 "renaming a directory",
1273 );
1274
1275 assert_eq!(
1276 fs.rename(&bar.join("hello1.txt"), &bar.join("world1.txt"))
1277 .await,
1278 Ok(()),
1279 "renaming a file (in the same directory)",
1280 );
1281
1282 assert!(bar_realpath.exists(), "./bar exists");
1283 assert!(bar_realpath.join("baz").exists(), "./bar/baz exists");
1284 assert!(!foo_realpath.exists(), "foo does not exist anymore");
1285 assert!(
1286 bar_realpath.join("baz/world2.txt").exists(),
1287 "/bar/baz/world2.txt exists"
1288 );
1289 assert!(
1290 bar_realpath.join("world1.txt").exists(),
1291 "/bar/world1.txt (ex hello1.txt) exists"
1292 );
1293 assert!(
1294 !bar_realpath.join("hello1.txt").exists(),
1295 "hello1.txt was moved"
1296 );
1297 assert!(
1298 !bar_realpath.join("hello2.txt").exists(),
1299 "hello2.txt was moved"
1300 );
1301 assert!(
1302 bar_realpath.join("baz/world2.txt").exists(),
1303 "world2.txt was moved to the correct place"
1304 );
1305 }
1306
1307 #[tokio::test]
1308 async fn test_metadata() {
1309 use std::thread::sleep;
1310 use std::time::Duration;
1311
1312 let temp = TempDir::new().unwrap();
1313
1314 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1315
1316 let root_metadata = fs.metadata(Path::new("/")).unwrap();
1317
1318 assert!(root_metadata.ft.dir);
1319 #[cfg(not(target_env = "musl"))]
1321 assert_eq!(root_metadata.accessed, root_metadata.created);
1322 #[cfg(not(target_env = "musl"))]
1323 assert_eq!(root_metadata.modified, root_metadata.created);
1324 assert!(root_metadata.modified > 0);
1325
1326 let foo = Path::new("foo");
1327
1328 assert_eq!(fs.create_dir(foo), Ok(()));
1329
1330 let foo_metadata = fs.metadata(foo);
1331 assert!(foo_metadata.is_ok());
1332 let foo_metadata = foo_metadata.unwrap();
1333
1334 assert!(foo_metadata.ft.dir);
1335 #[cfg(not(target_env = "musl"))]
1336 assert_eq!(foo_metadata.accessed, foo_metadata.created);
1337 #[cfg(not(target_env = "musl"))]
1338 assert_eq!(foo_metadata.modified, foo_metadata.created);
1339 assert!(foo_metadata.modified > 0);
1340
1341 sleep(Duration::from_secs(3));
1342
1343 let bar = Path::new("bar");
1344
1345 assert_eq!(fs.rename(foo, bar).await, Ok(()));
1346
1347 let bar_metadata = fs.metadata(bar).unwrap();
1348 assert!(bar_metadata.ft.dir);
1349 assert!(bar_metadata.accessed >= foo_metadata.accessed);
1350 assert_eq!(bar_metadata.created, foo_metadata.created);
1351 assert!(bar_metadata.modified > foo_metadata.modified);
1352
1353 let root_metadata = fs.metadata(bar).unwrap();
1354 assert!(
1355 root_metadata.modified > foo_metadata.modified,
1356 "the parent modified time was updated"
1357 );
1358 }
1359
1360 #[tokio::test]
1361 async fn test_remove_file() {
1362 let temp = TempDir::new().unwrap();
1363 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1364
1365 assert!(
1366 fs.new_open_options()
1367 .write(true)
1368 .create_new(true)
1369 .open(Path::new("foo.txt"))
1370 .is_ok(),
1371 "creating a new file",
1372 );
1373
1374 assert!(read_dir_names(&fs, Path::new("/")).contains(&"foo.txt".to_string()));
1375
1376 assert!(temp.path().join("foo.txt").is_file());
1377
1378 assert_eq!(
1379 fs.remove_file(Path::new("foo.txt")),
1380 Ok(()),
1381 "removing a file that exists",
1382 );
1383
1384 assert!(!temp.path().join("foo.txt").exists());
1385
1386 assert_eq!(
1387 fs.remove_file(Path::new("foo.txt")),
1388 Err(FsError::EntryNotFound),
1389 "removing a file that doesn't exists",
1390 );
1391 }
1392
1393 #[tokio::test]
1394 async fn test_readdir() {
1395 let temp = TempDir::new().unwrap();
1396 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1397
1398 assert_eq!(fs.create_dir(Path::new("foo")), Ok(()), "creating `foo`");
1399 assert_eq!(
1400 fs.create_dir(Path::new("foo/sub")),
1401 Ok(()),
1402 "creating `sub`"
1403 );
1404 assert_eq!(fs.create_dir(Path::new("bar")), Ok(()), "creating `bar`");
1405 assert_eq!(fs.create_dir(Path::new("baz")), Ok(()), "creating `bar`");
1406 assert!(
1407 fs.new_open_options()
1408 .write(true)
1409 .create_new(true)
1410 .open(Path::new("a.txt"))
1411 .is_ok(),
1412 "creating `a.txt`",
1413 );
1414 assert!(
1415 fs.new_open_options()
1416 .write(true)
1417 .create_new(true)
1418 .open(Path::new("b.txt"))
1419 .is_ok(),
1420 "creating `b.txt`",
1421 );
1422
1423 let readdir = fs.read_dir(Path::new("/"));
1424
1425 assert!(
1426 readdir.is_ok(),
1427 "reading the directory `{}`",
1428 Path::new("/").display()
1429 );
1430
1431 let mut readdir = readdir.unwrap();
1432
1433 let next = readdir.next().unwrap().unwrap();
1434 assert!(next.path.ends_with("a.txt"), "checking entry #1");
1435 assert!(next.metadata().unwrap().is_file(), "checking entry #1");
1436
1437 let next = readdir.next().unwrap().unwrap();
1438 assert!(next.path.ends_with("b.txt"), "checking entry #2");
1439 assert!(next.metadata().unwrap().is_file(), "checking entry #2");
1440
1441 let next = readdir.next().unwrap().unwrap();
1442 assert!(next.path.ends_with("bar"), "checking entry #3");
1443 assert!(next.metadata().unwrap().is_dir(), "checking entry #3");
1444
1445 let next = readdir.next().unwrap().unwrap();
1446 assert!(next.path.ends_with("baz"), "checking entry #4");
1447 assert!(next.metadata().unwrap().is_dir(), "checking entry #4");
1448
1449 let next = readdir.next().unwrap().unwrap();
1450 assert!(next.path.ends_with("foo"), "checking entry #5");
1451 assert!(next.metadata().unwrap().is_dir(), "checking entry #5");
1452
1453 if let Some(s) = readdir.next() {
1454 panic!("next: {s:?}");
1455 }
1456 }
1457}