1use crate::{
2 DirEntry, FileType, FsError, Metadata, OpenOptions, OpenOptionsConfig, ReadDir, Result,
3 VirtualFile,
4};
5use bytes::{Buf, Bytes};
6use futures::future::BoxFuture;
7#[cfg(feature = "enable-serde")]
8use serde::{Deserialize, Serialize, de};
9use std::convert::TryInto;
10use std::fs;
11use std::io::{self, Seek};
12use std::path::{Component, Path, PathBuf};
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::fs as tfs;
18use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
19use tokio::runtime::Handle;
20
21#[derive(Debug, Clone)]
22#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
23pub struct FileSystem {
24 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
25 handle: Handle,
26 root: PathBuf,
27}
28
29#[allow(dead_code)]
30fn default_handle() -> Handle {
31 Handle::current()
32}
33
34pub fn canonicalize(path: &Path) -> Result<PathBuf> {
35 if !path.exists() {
36 return Err(FsError::InvalidInput);
37 }
38 dunce::canonicalize(path).map_err(Into::into)
39}
40
41pub fn normalize_path(path: &Path) -> PathBuf {
44 let mut components = path.components().peekable();
45 let mut ret = if let Some(c @ Component::Prefix(..)) = components.peek().cloned() {
46 components.next();
47 PathBuf::from(c.as_os_str())
48 } else {
49 PathBuf::new()
50 };
51
52 for component in components {
53 match component {
54 Component::Prefix(..) => unreachable!(),
55 Component::RootDir => {
56 ret.push(component.as_os_str());
57 }
58 Component::CurDir => {}
59 Component::ParentDir => {
60 ret.pop();
61 }
62 Component::Normal(c) => {
63 ret.push(c);
64 }
65 }
66 }
67 ret
68}
69
70impl FileSystem {
71 pub fn new(handle: Handle, root: impl Into<PathBuf>) -> Result<Self> {
72 let root = canonicalize(&root.into())?;
73
74 Ok(FileSystem { handle, root })
75 }
76
77 fn prepare_path(&self, path: &Path) -> Result<PathBuf> {
78 let path = normalize_path(path);
79
80 if matches!(path.components().next(), Some(Component::Prefix(..))) {
81 return Err(FsError::InvalidInput);
82 }
83
84 if self.root != Path::new("/") && path.starts_with(&self.root) {
85 return Err(FsError::InvalidInput);
86 }
87
88 let path = path.strip_prefix("/").unwrap_or(&path);
89 let path = self.root.join(path);
90
91 debug_assert!(path.starts_with(&self.root));
92 Ok(path)
93 }
94}
95
96impl crate::FileSystem for FileSystem {
97 fn readlink(&self, path: &Path) -> Result<PathBuf> {
98 let path = self.prepare_path(path)?;
99
100 fs::read_link(path).map_err(Into::into)
101 }
102
103 fn read_dir(&self, path: &Path) -> Result<ReadDir> {
104 let path = self.prepare_path(path)?;
105
106 let read_dir = fs::read_dir(path)?;
107 let mut data = read_dir
108 .map(|entry| {
109 let entry = entry?;
110
111 let path = entry
112 .path()
113 .strip_prefix(&self.root)
114 .map_err(|_| FsError::InvalidData)?
115 .to_owned();
116 let path = Path::new("/").join(path);
117
118 let metadata = entry.metadata()?;
119
120 Ok(DirEntry {
121 path,
122 metadata: Ok(metadata.try_into()?),
123 })
124 })
125 .collect::<std::result::Result<Vec<DirEntry>, io::Error>>()
126 .map_err::<FsError, _>(Into::into)?;
127 data.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
128 Ok(ReadDir::new(data))
129 }
130
131 fn create_dir(&self, path: &Path) -> Result<()> {
132 let path = self.prepare_path(path)?;
133
134 if path.parent().is_none() {
135 return Err(FsError::BaseNotDirectory);
136 }
137
138 fs::create_dir(path).map_err(Into::into)
139 }
140
141 fn remove_dir(&self, path: &Path) -> Result<()> {
142 let path = self.prepare_path(path)?;
143
144 if path.parent().is_none() {
145 return Err(FsError::BaseNotDirectory);
146 }
147
148 if path.is_dir()
151 && fs::read_dir(&path)
152 .map(|mut s| s.next().is_some())
153 .unwrap_or(false)
154 {
155 return Err(FsError::DirectoryNotEmpty);
156 }
157 fs::remove_dir(path).map_err(Into::into)
158 }
159
160 fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> BoxFuture<'a, Result<()>> {
161 Box::pin(async move {
162 use filetime::{FileTime, set_file_mtime};
163 let norm_from = normalize_path(from);
164 let norm_to = normalize_path(to);
165
166 if norm_from.parent().is_none() {
167 return Err(FsError::BaseNotDirectory);
168 }
169 if norm_to.parent().is_none() {
170 return Err(FsError::BaseNotDirectory);
171 }
172
173 let from = self.prepare_path(from)?;
174 let to = self.prepare_path(to)?;
175
176 if !from.exists() {
177 return Err(FsError::EntryNotFound);
178 }
179 let from_parent = from.parent().unwrap();
180 let to_parent = to.parent().unwrap();
181 if !from_parent.exists() {
182 return Err(FsError::EntryNotFound);
183 }
184 if !to_parent.exists() {
185 return Err(FsError::EntryNotFound);
186 }
187 let result = if from_parent != to_parent {
188 let _ = std::fs::create_dir_all(to_parent);
189 if from.is_dir() {
190 fs_extra::move_items(
191 &[&from],
192 &to,
193 &fs_extra::dir::CopyOptions {
194 copy_inside: true,
195 ..Default::default()
196 },
197 )
198 .map(|_| ())
199 .map_err(|_| FsError::UnknownError)?;
200 let _ = fs_extra::remove_items(&[&from]);
201 Ok(())
202 } else {
203 fs::copy(&from, &to).map(|_| ()).map_err(FsError::from)?;
204 fs::remove_file(&from).map(|_| ()).map_err(Into::into)
205 }
206 } else {
207 fs::rename(&from, &to).map_err(Into::into)
208 };
209 let _ = set_file_mtime(&to, FileTime::now()).map(|_| ());
210 result
211 })
212 }
213
214 fn remove_file(&self, path: &Path) -> Result<()> {
215 let path = self.prepare_path(path)?;
216
217 if path.parent().is_none() {
218 return Err(FsError::BaseNotDirectory);
219 }
220
221 fs::remove_file(path).map_err(Into::into)
222 }
223
224 fn new_open_options(&self) -> OpenOptions<'_> {
225 OpenOptions::new(self)
226 }
227
228 fn metadata(&self, path: &Path) -> Result<Metadata> {
229 let path = self.prepare_path(path)?;
230
231 fs::metadata(path)
232 .and_then(TryInto::try_into)
233 .map_err(Into::into)
234 }
235
236 fn symlink_metadata(&self, path: &Path) -> Result<Metadata> {
237 let path = self.prepare_path(path)?;
238
239 fs::symlink_metadata(path)
240 .and_then(TryInto::try_into)
241 .map_err(Into::into)
242 }
243}
244
245impl TryInto<Metadata> for std::fs::Metadata {
246 type Error = io::Error;
247
248 fn try_into(self) -> std::result::Result<Metadata, Self::Error> {
249 let filetype = self.file_type();
250 let (char_device, block_device, socket, fifo) = {
251 #[cfg(unix)]
252 {
253 use std::os::unix::fs::FileTypeExt;
254 (
255 filetype.is_char_device(),
256 filetype.is_block_device(),
257 filetype.is_socket(),
258 filetype.is_fifo(),
259 )
260 }
261 #[cfg(not(unix))]
262 {
263 (false, false, false, false)
264 }
265 };
266
267 Ok(Metadata {
268 ft: FileType {
269 dir: filetype.is_dir(),
270 file: filetype.is_file(),
271 symlink: filetype.is_symlink(),
272 char_device,
273 block_device,
274 socket,
275 fifo,
276 },
277 accessed: self
278 .accessed()
279 .and_then(|time| time.duration_since(UNIX_EPOCH).map_err(io::Error::other))
280 .map_or(0, |time| time.as_nanos() as u64),
281 created: self
282 .created()
283 .and_then(|time| time.duration_since(UNIX_EPOCH).map_err(io::Error::other))
284 .map_or(0, |time| time.as_nanos() as u64),
285 modified: self
286 .modified()
287 .and_then(|time| time.duration_since(UNIX_EPOCH).map_err(io::Error::other))
288 .map_or(0, |time| time.as_nanos() as u64),
289 len: self.len(),
290 })
291 }
292}
293
294impl crate::FileOpener for FileSystem {
295 fn open(
296 &self,
297 path: &Path,
298 conf: &OpenOptionsConfig,
299 ) -> Result<Box<dyn VirtualFile + Send + Sync + 'static>> {
300 let path = self.prepare_path(path)?;
301
302 let read = conf.read();
304 let write = conf.write();
305
306 let append = if conf.truncate { false } else { conf.append() };
312
313 let mut oo = fs::OpenOptions::new();
314 oo.read(conf.read())
315 .write(conf.write())
316 .create_new(conf.create_new())
317 .create(conf.create())
318 .append(append)
319 .truncate(conf.truncate())
320 .open(&path)
321 .map_err(Into::into)
322 .map(|file| {
323 Box::new(File::new(
324 self.handle.clone(),
325 file,
326 path.to_owned(),
327 read,
328 write,
329 append,
330 )) as Box<dyn VirtualFile + Send + Sync + 'static>
331 })
332 }
333}
334
335#[derive(Debug)]
337#[cfg_attr(feature = "enable-serde", derive(Serialize))]
338pub struct File {
339 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
340 handle: Handle,
341 #[cfg_attr(feature = "enable-serde", serde(skip))]
342 inner: tfs::File,
343 #[cfg_attr(feature = "enable-serde", serde(skip_serializing))]
344 inner_std: fs::File,
345 pub host_path: PathBuf,
346 #[cfg(feature = "enable-serde")]
347 flags: u16,
348}
349
350#[cfg(feature = "enable-serde")]
351impl<'de> Deserialize<'de> for File {
352 fn deserialize<D>(deserializer: D) -> std::result::Result<File, D::Error>
353 where
354 D: serde::Deserializer<'de>,
355 {
356 #[derive(Deserialize)]
357 #[serde(field_identifier, rename_all = "snake_case")]
358 enum Field {
359 HostPath,
360 Flags,
361 }
362
363 struct FileVisitor;
364
365 impl<'de> de::Visitor<'de> for FileVisitor {
366 type Value = File;
367
368 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
369 formatter.write_str("struct File")
370 }
371
372 fn visit_seq<V>(self, mut seq: V) -> std::result::Result<Self::Value, V::Error>
373 where
374 V: de::SeqAccess<'de>,
375 {
376 let host_path = seq
377 .next_element()?
378 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
379 let flags = seq
380 .next_element()?
381 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
382 let inner = fs::OpenOptions::new()
383 .read(flags & File::READ != 0)
384 .write(flags & File::WRITE != 0)
385 .append(flags & File::APPEND != 0)
386 .open(&host_path)
387 .map_err(|_| de::Error::custom("Could not open file on this system"))?;
388 Ok(File {
389 handle: Handle::current(),
390 inner: tokio::fs::File::from_std(inner.try_clone().unwrap()),
391 inner_std: inner,
392 host_path,
393 flags,
394 })
395 }
396
397 fn visit_map<V>(self, mut map: V) -> std::result::Result<Self::Value, V::Error>
398 where
399 V: de::MapAccess<'de>,
400 {
401 let mut host_path = None;
402 let mut flags = None;
403 while let Some(key) = map.next_key()? {
404 match key {
405 Field::HostPath => {
406 if host_path.is_some() {
407 return Err(de::Error::duplicate_field("host_path"));
408 }
409 host_path = Some(map.next_value()?);
410 }
411 Field::Flags => {
412 if flags.is_some() {
413 return Err(de::Error::duplicate_field("flags"));
414 }
415 flags = Some(map.next_value()?);
416 }
417 }
418 }
419 let host_path = host_path.ok_or_else(|| de::Error::missing_field("host_path"))?;
420 let flags = flags.ok_or_else(|| de::Error::missing_field("flags"))?;
421 let inner = fs::OpenOptions::new()
422 .read(flags & File::READ != 0)
423 .write(flags & File::WRITE != 0)
424 .append(flags & File::APPEND != 0)
425 .open(&host_path)
426 .map_err(|_| de::Error::custom("Could not open file on this system"))?;
427 Ok(File {
428 handle: Handle::current(),
429 inner: tokio::fs::File::from_std(inner.try_clone().unwrap()),
430 inner_std: inner,
431 host_path,
432 flags,
433 })
434 }
435 }
436
437 const FIELDS: &[&str] = &["host_path", "flags"];
438 deserializer.deserialize_struct("File", FIELDS, FileVisitor)
439 }
440}
441
442impl File {
443 const READ: u16 = 1;
444 const WRITE: u16 = 2;
445 const APPEND: u16 = 4;
446
447 pub fn new(
449 handle: Handle,
450 file: fs::File,
451 host_path: PathBuf,
452 read: bool,
453 write: bool,
454 append: bool,
455 ) -> Self {
456 let mut _flags = 0;
457
458 if read {
459 _flags |= Self::READ;
460 }
461
462 if write {
463 _flags |= Self::WRITE;
464 }
465
466 if append {
467 _flags |= Self::APPEND;
468 }
469
470 let async_file = tfs::File::from_std(file.try_clone().unwrap());
471 Self {
472 handle,
473 inner_std: file,
474 inner: async_file,
475 host_path,
476 #[cfg(feature = "enable-serde")]
477 flags: _flags,
478 }
479 }
480
481 fn metadata(&self) -> std::fs::Metadata {
482 self.inner_std.metadata().unwrap()
484 }
485}
486
487#[async_trait::async_trait]
489impl VirtualFile for File {
490 fn last_accessed(&self) -> u64 {
491 self.metadata()
492 .accessed()
493 .ok()
494 .and_then(|ct| ct.duration_since(SystemTime::UNIX_EPOCH).ok())
495 .map(|ct| ct.as_nanos() as u64)
496 .unwrap_or(0)
497 }
498
499 fn last_modified(&self) -> u64 {
500 self.metadata()
501 .modified()
502 .ok()
503 .and_then(|ct| ct.duration_since(SystemTime::UNIX_EPOCH).ok())
504 .map(|ct| ct.as_nanos() as u64)
505 .unwrap_or(0)
506 }
507
508 fn created_time(&self) -> u64 {
509 self.metadata()
510 .created()
511 .ok()
512 .and_then(|ct| ct.duration_since(SystemTime::UNIX_EPOCH).ok())
513 .map(|ct| ct.as_nanos() as u64)
514 .unwrap_or(0)
515 }
516
517 fn set_times(&mut self, atime: Option<u64>, mtime: Option<u64>) -> crate::Result<()> {
518 let atime = atime.map(|t| filetime::FileTime::from_unix_time(t as i64, 0));
519 let mtime = mtime.map(|t| filetime::FileTime::from_unix_time(t as i64, 0));
520
521 filetime::set_file_handle_times(&self.inner_std, atime, mtime)
522 .map_err(|_| crate::FsError::IOError)
523 }
524
525 fn size(&self) -> u64 {
526 self.metadata().len()
527 }
528
529 fn set_len(&mut self, new_size: u64) -> crate::Result<()> {
530 fs::File::set_len(&self.inner_std, new_size).map_err(Into::into)
531 }
532
533 fn unlink(&mut self) -> Result<()> {
534 fs::remove_file(&self.host_path).map_err(Into::into)
535 }
536
537 fn get_special_fd(&self) -> Option<u32> {
538 None
539 }
540
541 fn poll_read_ready(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
542 let cursor = match self.inner_std.stream_position() {
543 Ok(a) => a,
544 Err(err) => return Poll::Ready(Err(err)),
545 };
546 let end = match self.inner_std.seek(io::SeekFrom::End(0)) {
547 Ok(a) => a,
548 Err(err) => return Poll::Ready(Err(err)),
549 };
550 let _ = self.inner_std.seek(io::SeekFrom::Start(cursor));
551
552 let remaining = end - cursor;
553 Poll::Ready(Ok(remaining as usize))
554 }
555
556 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
557 Poll::Ready(Ok(8192))
558 }
559}
560
561impl AsyncRead for File {
562 fn poll_read(
563 mut self: Pin<&mut Self>,
564 cx: &mut Context<'_>,
565 buf: &mut tokio::io::ReadBuf<'_>,
566 ) -> Poll<io::Result<()>> {
567 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
568 let inner = Pin::new(&mut self.inner);
569 inner.poll_read(cx, buf)
570 }
571}
572
573impl AsyncWrite for File {
574 fn poll_write(
575 mut self: Pin<&mut Self>,
576 cx: &mut Context<'_>,
577 buf: &[u8],
578 ) -> Poll<io::Result<usize>> {
579 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
580 let inner = Pin::new(&mut self.inner);
581 inner.poll_write(cx, buf)
582 }
583
584 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
585 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
586 let inner = Pin::new(&mut self.inner);
587 inner.poll_flush(cx)
588 }
589
590 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
591 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
592 let inner = Pin::new(&mut self.inner);
593 inner.poll_shutdown(cx)
594 }
595
596 fn poll_write_vectored(
597 mut self: Pin<&mut Self>,
598 cx: &mut Context<'_>,
599 bufs: &[io::IoSlice<'_>],
600 ) -> Poll<io::Result<usize>> {
601 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
602 let inner = Pin::new(&mut self.inner);
603 inner.poll_write_vectored(cx, bufs)
604 }
605
606 fn is_write_vectored(&self) -> bool {
607 self.inner.is_write_vectored()
608 }
609}
610
611impl AsyncSeek for File {
612 fn start_seek(mut self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> {
613 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
614 let inner = Pin::new(&mut self.inner);
615 inner.start_seek(position)
616 }
617
618 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
619 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
620 let inner = Pin::new(&mut self.inner);
621 inner.poll_complete(cx)
622 }
623}
624
625impl Drop for File {
626 fn drop(&mut self) {
627 tracing::trace!(?self.host_path, "Closing host file");
628 }
629}
630
631#[derive(Debug)]
633#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
634pub struct Stdout {
635 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
636 handle: Handle,
637 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_stdout"))]
638 inner: tokio::io::Stdout,
639}
640#[allow(dead_code)]
641fn default_stdout() -> tokio::io::Stdout {
642 tokio::io::stdout()
643}
644impl Default for Stdout {
645 fn default() -> Self {
646 Self {
647 handle: Handle::current(),
648 inner: tokio::io::stdout(),
649 }
650 }
651}
652
653const DEFAULT_BUF_SIZE_HINT: usize = 8 * 1024;
661
662#[async_trait::async_trait]
664impl VirtualFile for Stdout {
665 fn last_accessed(&self) -> u64 {
666 0
667 }
668
669 fn last_modified(&self) -> u64 {
670 0
671 }
672
673 fn created_time(&self) -> u64 {
674 0
675 }
676
677 fn size(&self) -> u64 {
678 0
679 }
680
681 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
682 Ok(())
683 }
684
685 fn unlink(&mut self) -> Result<()> {
686 Ok(())
687 }
688
689 fn get_special_fd(&self) -> Option<u32> {
690 Some(1)
691 }
692
693 fn poll_read_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
694 Poll::Ready(Ok(0))
695 }
696
697 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
698 Poll::Ready(Ok(DEFAULT_BUF_SIZE_HINT))
699 }
700}
701
702impl AsyncRead for Stdout {
703 fn poll_read(
704 self: Pin<&mut Self>,
705 _cx: &mut Context<'_>,
706 _buf: &mut tokio::io::ReadBuf<'_>,
707 ) -> Poll<io::Result<()>> {
708 Poll::Ready(Err(io::Error::other("can not read from stdout")))
709 }
710}
711
712impl AsyncWrite for Stdout {
713 fn poll_write(
714 mut self: Pin<&mut Self>,
715 cx: &mut Context<'_>,
716 buf: &[u8],
717 ) -> Poll<io::Result<usize>> {
718 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
719 let inner = Pin::new(&mut self.inner);
720 inner.poll_write(cx, buf)
721 }
722
723 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
724 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
725 let inner = Pin::new(&mut self.inner);
726 inner.poll_flush(cx)
727 }
728
729 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
730 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
731 let inner = Pin::new(&mut self.inner);
732 inner.poll_shutdown(cx)
733 }
734
735 fn poll_write_vectored(
736 mut self: Pin<&mut Self>,
737 cx: &mut Context<'_>,
738 bufs: &[io::IoSlice<'_>],
739 ) -> Poll<io::Result<usize>> {
740 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
741 let inner = Pin::new(&mut self.inner);
742 inner.poll_write_vectored(cx, bufs)
743 }
744
745 fn is_write_vectored(&self) -> bool {
746 self.inner.is_write_vectored()
747 }
748}
749
750impl AsyncSeek for Stdout {
751 fn start_seek(self: Pin<&mut Self>, _position: io::SeekFrom) -> io::Result<()> {
752 Err(io::Error::other("can not seek stdout"))
753 }
754
755 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
756 Poll::Ready(Err(io::Error::other("can not seek stdout")))
757 }
758}
759
760#[derive(Debug)]
762#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
763pub struct Stderr {
764 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
765 handle: Handle,
766 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_stderr"))]
767 inner: tokio::io::Stderr,
768}
769#[allow(dead_code)]
770fn default_stderr() -> tokio::io::Stderr {
771 tokio::io::stderr()
772}
773impl Default for Stderr {
774 fn default() -> Self {
775 Self {
776 handle: Handle::current(),
777 inner: tokio::io::stderr(),
778 }
779 }
780}
781
782impl AsyncRead for Stderr {
783 fn poll_read(
784 self: Pin<&mut Self>,
785 _cx: &mut Context<'_>,
786 _buf: &mut tokio::io::ReadBuf<'_>,
787 ) -> Poll<io::Result<()>> {
788 Poll::Ready(Err(io::Error::other("can not read from stderr")))
789 }
790}
791
792impl AsyncWrite for Stderr {
793 fn poll_write(
794 mut self: Pin<&mut Self>,
795 cx: &mut Context<'_>,
796 buf: &[u8],
797 ) -> Poll<io::Result<usize>> {
798 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
799 let inner = Pin::new(&mut self.inner);
800 inner.poll_write(cx, buf)
801 }
802
803 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
804 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
805 let inner = Pin::new(&mut self.inner);
806 inner.poll_flush(cx)
807 }
808
809 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
810 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
811 let inner = Pin::new(&mut self.inner);
812 inner.poll_shutdown(cx)
813 }
814
815 fn poll_write_vectored(
816 mut self: Pin<&mut Self>,
817 cx: &mut Context<'_>,
818 bufs: &[io::IoSlice<'_>],
819 ) -> Poll<io::Result<usize>> {
820 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
821 let inner = Pin::new(&mut self.inner);
822 inner.poll_write_vectored(cx, bufs)
823 }
824
825 fn is_write_vectored(&self) -> bool {
826 self.inner.is_write_vectored()
827 }
828}
829
830impl AsyncSeek for Stderr {
831 fn start_seek(self: Pin<&mut Self>, _position: io::SeekFrom) -> io::Result<()> {
832 Err(io::Error::other("can not seek stderr"))
833 }
834
835 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
836 Poll::Ready(Err(io::Error::other("can not seek stderr")))
837 }
838}
839
840#[async_trait::async_trait]
842impl VirtualFile for Stderr {
843 fn last_accessed(&self) -> u64 {
844 0
845 }
846
847 fn last_modified(&self) -> u64 {
848 0
849 }
850
851 fn created_time(&self) -> u64 {
852 0
853 }
854
855 fn size(&self) -> u64 {
856 0
857 }
858
859 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
860 Ok(())
861 }
862
863 fn unlink(&mut self) -> Result<()> {
864 Ok(())
865 }
866
867 fn get_special_fd(&self) -> Option<u32> {
868 Some(2)
869 }
870
871 fn poll_read_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
872 Poll::Ready(Ok(0))
873 }
874
875 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
876 Poll::Ready(Ok(8192))
877 }
878}
879
880#[derive(Debug)]
882#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
883pub struct Stdin {
884 read_buffer: Arc<std::sync::Mutex<Option<Bytes>>>,
885 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
886 handle: Handle,
887 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_stdin"))]
888 inner: tokio::io::Stdin,
889}
890#[allow(dead_code)]
891fn default_stdin() -> tokio::io::Stdin {
892 tokio::io::stdin()
893}
894impl Default for Stdin {
895 fn default() -> Self {
896 Self {
897 handle: Handle::current(),
898 read_buffer: Arc::new(std::sync::Mutex::new(None)),
899 inner: tokio::io::stdin(),
900 }
901 }
902}
903
904impl AsyncRead for Stdin {
905 fn poll_read(
906 mut self: Pin<&mut Self>,
907 cx: &mut Context<'_>,
908 buf: &mut tokio::io::ReadBuf<'_>,
909 ) -> Poll<io::Result<()>> {
910 let max_size = buf.remaining();
911 {
912 let mut read_buffer = self.read_buffer.lock().unwrap();
913 if let Some(read_buffer) = read_buffer.as_mut() {
914 let buf_len = read_buffer.len();
915 if buf_len > 0 {
916 let read = buf_len.min(max_size);
917 buf.put_slice(&read_buffer[..read]);
918 read_buffer.advance(read);
919 return Poll::Ready(Ok(()));
920 }
921 }
922 }
923
924 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
925 let inner = Pin::new(&mut self.inner);
926 inner.poll_read(cx, buf)
927 }
928}
929
930impl AsyncWrite for Stdin {
931 fn poll_write(
932 self: Pin<&mut Self>,
933 _cx: &mut Context<'_>,
934 _buf: &[u8],
935 ) -> Poll<io::Result<usize>> {
936 Poll::Ready(Err(io::Error::other("can not wrote to stdin")))
937 }
938
939 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
940 Poll::Ready(Err(io::Error::other("can not flush stdin")))
941 }
942
943 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
944 Poll::Ready(Err(io::Error::other("can not wrote to stdin")))
945 }
946
947 fn poll_write_vectored(
948 self: Pin<&mut Self>,
949 _cx: &mut Context<'_>,
950 _bufs: &[io::IoSlice<'_>],
951 ) -> Poll<io::Result<usize>> {
952 Poll::Ready(Err(io::Error::other("can not wrote to stdin")))
953 }
954}
955
956impl AsyncSeek for Stdin {
957 fn start_seek(self: Pin<&mut Self>, _position: io::SeekFrom) -> io::Result<()> {
958 Err(io::Error::other("can not seek stdin"))
959 }
960
961 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
962 Poll::Ready(Err(io::Error::other("can not seek stdin")))
963 }
964}
965
966#[async_trait::async_trait]
968impl VirtualFile for Stdin {
969 fn last_accessed(&self) -> u64 {
970 0
971 }
972 fn last_modified(&self) -> u64 {
973 0
974 }
975 fn created_time(&self) -> u64 {
976 0
977 }
978 fn size(&self) -> u64 {
979 0
980 }
981 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
982 Ok(())
983 }
984 fn unlink(&mut self) -> Result<()> {
985 Ok(())
986 }
987 fn get_special_fd(&self) -> Option<u32> {
988 Some(0)
989 }
990 fn poll_read_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
991 {
992 let read_buffer = self.read_buffer.lock().unwrap();
993 if let Some(read_buffer) = read_buffer.as_ref() {
994 let buf_len = read_buffer.len();
995 if buf_len > 0 {
996 return Poll::Ready(Ok(buf_len));
997 }
998 }
999 }
1000
1001 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
1002 let inner = Pin::new(&mut self.inner);
1003
1004 let mut buf = [0u8; 8192];
1005 let mut read_buf = ReadBuf::new(&mut buf[..]);
1006 match inner.poll_read(cx, &mut read_buf) {
1007 Poll::Pending => Poll::Pending,
1008 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1009 Poll::Ready(Ok(())) => {
1010 let buf = read_buf.filled();
1011 let buf_len = buf.len();
1012
1013 let mut read_buffer = self.read_buffer.lock().unwrap();
1014 read_buffer.replace(Bytes::from(buf.to_vec()));
1015 Poll::Ready(Ok(buf_len))
1016 }
1017 }
1018 }
1019 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1020 Poll::Ready(Ok(0))
1021 }
1022}
1023
1024#[cfg(test)]
1025mod tests {
1026 use tempfile::TempDir;
1027 use tokio::runtime::Handle;
1028
1029 use super::FileSystem;
1030 use crate::FileSystem as FileSystemTrait;
1031 use crate::FsError;
1032 use std::path::Path;
1033
1034 #[tokio::test]
1035 async fn test_new_filesystem() {
1036 let temp = TempDir::new().unwrap();
1037 std::fs::write(temp.path().join("foo2.txt"), b"").unwrap();
1038
1039 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1040 assert!(
1041 fs.read_dir(Path::new("/")).is_ok(),
1042 "NativeFS can read root"
1043 );
1044 assert!(
1045 fs.new_open_options()
1046 .read(true)
1047 .open(Path::new("/foo2.txt"))
1048 .is_ok(),
1049 "created foo2.txt"
1050 );
1051 }
1052
1053 #[tokio::test]
1054 async fn test_create_dir() {
1055 let temp: TempDir = TempDir::new().unwrap();
1056 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1057
1058 assert_eq!(
1059 fs.create_dir(Path::new("../")),
1060 Err(FsError::AlreadyExists),
1061 "creating a directory out of bounds",
1062 );
1063
1064 assert_eq!(
1065 fs.create_dir(Path::new("/foo")),
1066 Ok(()),
1067 "creating a directory",
1068 );
1069
1070 assert!(
1071 temp.path().join("foo").exists(),
1072 "foo dir exists in host_fs"
1073 );
1074
1075 let cur_dir = read_dir_names(&fs, "/");
1076
1077 if !cur_dir.contains(&"foo".to_string()) {
1078 panic!("cur_dir does not contain foo: {cur_dir:#?}");
1079 }
1080
1081 assert!(
1082 cur_dir.contains(&"foo".to_string()),
1083 "the root is updated and well-defined"
1084 );
1085
1086 assert_eq!(
1087 fs.create_dir(Path::new("foo/bar")),
1088 Ok(()),
1089 "creating a sub-directory",
1090 );
1091
1092 assert!(
1093 temp.path().join("foo").join("bar").exists(),
1094 "foo dir exists in host_fs"
1095 );
1096
1097 let foo_dir = read_dir_names(&fs, Path::new("/foo"));
1098
1099 assert!(
1100 foo_dir.contains(&"bar".to_string()),
1101 "the foo directory is updated and well-defined"
1102 );
1103
1104 let bar_dir = read_dir_names(&fs, Path::new("/foo/bar"));
1105
1106 assert!(
1107 bar_dir.is_empty(),
1108 "the foo directory is updated and well-defined"
1109 );
1110 }
1111
1112 #[tokio::test]
1113 async fn test_remove_dir() {
1114 let temp: TempDir = TempDir::new().unwrap();
1115 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1116
1117 assert_eq!(
1118 fs.remove_dir(Path::new("/foo")),
1119 Err(FsError::EntryNotFound),
1120 "cannot remove a directory that doesn't exist",
1121 );
1122
1123 assert_eq!(
1124 fs.create_dir(Path::new("foo")),
1125 Ok(()),
1126 "creating a directory",
1127 );
1128
1129 assert_eq!(
1130 fs.create_dir(Path::new("foo/bar")),
1131 Ok(()),
1132 "creating a sub-directory",
1133 );
1134
1135 assert!(temp.path().join("foo/bar").exists(), "./foo/bar exists");
1136
1137 assert_eq!(
1138 fs.remove_dir(Path::new("foo")),
1139 Err(FsError::DirectoryNotEmpty),
1140 "removing a directory that has children",
1141 );
1142
1143 assert_eq!(
1144 fs.remove_dir(Path::new("foo/bar")),
1145 Ok(()),
1146 "removing a sub-directory",
1147 );
1148
1149 assert_eq!(
1150 fs.remove_dir(Path::new("foo")),
1151 Ok(()),
1152 "removing a directory",
1153 );
1154
1155 let cur_dir = read_dir_names(&fs, "/");
1156
1157 assert!(
1158 !cur_dir.contains(&"foo".to_string()),
1159 "the foo directory still exists"
1160 );
1161 }
1162
1163 fn read_dir_names(fs: &FileSystem, path: impl AsRef<Path>) -> Vec<String> {
1164 fs.read_dir(path.as_ref())
1165 .unwrap()
1166 .filter_map(|entry| Some(entry.ok()?.file_name().to_str()?.to_string()))
1167 .collect::<Vec<_>>()
1168 }
1169
1170 #[tokio::test]
1171 async fn test_rename() {
1172 let temp: TempDir = TempDir::new().unwrap();
1173 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1174 std::fs::create_dir_all(temp.path().join("foo").join("qux")).unwrap();
1175 let foo = Path::new("foo");
1176 let bar = Path::new("bar");
1177 let foo_realpath = temp.path().join(foo);
1178 let bar_realpath = temp.path().join(bar);
1179
1180 assert_eq!(
1181 fs.rename(Path::new("/"), Path::new("/bar")).await,
1182 Err(FsError::BaseNotDirectory),
1183 "renaming a directory that has no parent",
1184 );
1185 assert_eq!(
1186 fs.rename(Path::new("/foo"), Path::new("/")).await,
1187 Err(FsError::BaseNotDirectory),
1188 "renaming to a directory that has no parent",
1189 );
1190
1191 assert_eq!(
1192 fs.rename(foo, &foo.join("bar").join("baz"),).await,
1193 Err(FsError::EntryNotFound),
1194 "renaming to a directory that has parent that doesn't exist",
1195 );
1196
1197 #[cfg(not(target_os = "windows"))]
1199 assert_eq!(fs.create_dir(bar), Ok(()));
1200
1201 assert_eq!(
1202 fs.rename(foo, bar).await,
1203 Ok(()),
1204 "renaming to a directory that has parent that exists",
1205 );
1206
1207 assert!(
1208 fs.new_open_options()
1209 .write(true)
1210 .create_new(true)
1211 .open(bar.join("hello1.txt"))
1212 .is_ok(),
1213 "creating a new file (`hello1.txt`)",
1214 );
1215 assert!(
1216 fs.new_open_options()
1217 .write(true)
1218 .create_new(true)
1219 .open(bar.join("hello2.txt"))
1220 .is_ok(),
1221 "creating a new file (`hello2.txt`)",
1222 );
1223
1224 let cur_dir = read_dir_names(&fs, Path::new("/"));
1225
1226 assert!(
1227 !cur_dir.contains(&"foo".to_string()),
1228 "the foo directory still exists"
1229 );
1230
1231 assert!(
1232 cur_dir.contains(&"bar".to_string()),
1233 "the bar directory still exists"
1234 );
1235
1236 let bar_dir = read_dir_names(&fs, bar);
1237
1238 if !bar_dir.contains(&"qux".to_string()) {
1239 println!("qux does not exist: {bar_dir:?}")
1240 }
1241
1242 let qux_dir = read_dir_names(&fs, bar.join("qux"));
1243
1244 assert!(qux_dir.is_empty(), "the qux directory is empty");
1245
1246 assert!(
1247 bar_realpath.join("hello1.txt").exists(),
1248 "the /bar/hello1.txt file exists"
1249 );
1250
1251 assert!(
1252 bar_realpath.join("hello2.txt").exists(),
1253 "the /bar/hello2.txt file exists"
1254 );
1255
1256 assert_eq!(fs.create_dir(foo), Ok(()), "create ./foo again");
1257
1258 assert_eq!(
1259 fs.rename(&bar.join("hello2.txt"), &foo.join("world2.txt"))
1260 .await,
1261 Ok(()),
1262 "renaming (and moving) a file",
1263 );
1264
1265 assert_eq!(
1266 fs.rename(foo, &bar.join("baz")).await,
1267 Ok(()),
1268 "renaming a directory",
1269 );
1270
1271 assert_eq!(
1272 fs.rename(&bar.join("hello1.txt"), &bar.join("world1.txt"))
1273 .await,
1274 Ok(()),
1275 "renaming a file (in the same directory)",
1276 );
1277
1278 assert!(bar_realpath.exists(), "./bar exists");
1279 assert!(bar_realpath.join("baz").exists(), "./bar/baz exists");
1280 assert!(!foo_realpath.exists(), "foo does not exist anymore");
1281 assert!(
1282 bar_realpath.join("baz/world2.txt").exists(),
1283 "/bar/baz/world2.txt exists"
1284 );
1285 assert!(
1286 bar_realpath.join("world1.txt").exists(),
1287 "/bar/world1.txt (ex hello1.txt) exists"
1288 );
1289 assert!(
1290 !bar_realpath.join("hello1.txt").exists(),
1291 "hello1.txt was moved"
1292 );
1293 assert!(
1294 !bar_realpath.join("hello2.txt").exists(),
1295 "hello2.txt was moved"
1296 );
1297 assert!(
1298 bar_realpath.join("baz/world2.txt").exists(),
1299 "world2.txt was moved to the correct place"
1300 );
1301 }
1302
1303 #[tokio::test]
1304 async fn test_metadata() {
1305 use std::thread::sleep;
1306 use std::time::Duration;
1307
1308 let temp = TempDir::new().unwrap();
1309
1310 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1311
1312 let root_metadata = fs.metadata(Path::new("/")).unwrap();
1313
1314 assert!(root_metadata.ft.dir);
1315 #[cfg(not(target_env = "musl"))]
1317 assert_eq!(root_metadata.accessed, root_metadata.created);
1318 #[cfg(not(target_env = "musl"))]
1319 assert_eq!(root_metadata.modified, root_metadata.created);
1320 assert!(root_metadata.modified > 0);
1321
1322 let foo = Path::new("foo");
1323
1324 assert_eq!(fs.create_dir(foo), Ok(()));
1325
1326 let foo_metadata = fs.metadata(foo);
1327 assert!(foo_metadata.is_ok());
1328 let foo_metadata = foo_metadata.unwrap();
1329
1330 assert!(foo_metadata.ft.dir);
1331 #[cfg(not(target_env = "musl"))]
1332 assert_eq!(foo_metadata.accessed, foo_metadata.created);
1333 #[cfg(not(target_env = "musl"))]
1334 assert_eq!(foo_metadata.modified, foo_metadata.created);
1335 assert!(foo_metadata.modified > 0);
1336
1337 sleep(Duration::from_secs(3));
1338
1339 let bar = Path::new("bar");
1340
1341 assert_eq!(fs.rename(foo, bar).await, Ok(()));
1342
1343 let bar_metadata = fs.metadata(bar).unwrap();
1344 assert!(bar_metadata.ft.dir);
1345 assert!(bar_metadata.accessed >= foo_metadata.accessed);
1346 assert_eq!(bar_metadata.created, foo_metadata.created);
1347 assert!(bar_metadata.modified > foo_metadata.modified);
1348
1349 let root_metadata = fs.metadata(bar).unwrap();
1350 assert!(
1351 root_metadata.modified > foo_metadata.modified,
1352 "the parent modified time was updated"
1353 );
1354 }
1355
1356 #[tokio::test]
1357 async fn test_rejects_host_absolute_paths_inside_root() {
1358 let temp = TempDir::new().unwrap();
1359 let temp_canon = super::canonicalize(temp.path()).expect("canonicalize temp dir");
1362
1363 let file_path = temp_canon.join("foo.txt");
1364 std::fs::write(&file_path, b"hello").unwrap();
1365
1366 let fs = FileSystem::new(Handle::current(), &temp_canon).expect("get filesystem");
1367
1368 assert_eq!(fs.metadata(&file_path), Err(FsError::InvalidInput));
1369 assert!(matches!(
1370 fs.new_open_options().read(true).open(&file_path),
1371 Err(FsError::InvalidInput)
1372 ));
1373 }
1374
1375 #[tokio::test]
1376 async fn test_remove_file() {
1377 let temp = TempDir::new().unwrap();
1378 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1379
1380 assert!(
1381 fs.new_open_options()
1382 .write(true)
1383 .create_new(true)
1384 .open(Path::new("foo.txt"))
1385 .is_ok(),
1386 "creating a new file",
1387 );
1388
1389 assert!(read_dir_names(&fs, Path::new("/")).contains(&"foo.txt".to_string()));
1390
1391 assert!(temp.path().join("foo.txt").is_file());
1392
1393 assert_eq!(
1394 fs.remove_file(Path::new("foo.txt")),
1395 Ok(()),
1396 "removing a file that exists",
1397 );
1398
1399 assert!(!temp.path().join("foo.txt").exists());
1400
1401 assert_eq!(
1402 fs.remove_file(Path::new("foo.txt")),
1403 Err(FsError::EntryNotFound),
1404 "removing a file that doesn't exists",
1405 );
1406 }
1407
1408 #[tokio::test]
1409 async fn test_readdir() {
1410 let temp = TempDir::new().unwrap();
1411 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1412
1413 assert_eq!(fs.create_dir(Path::new("foo")), Ok(()), "creating `foo`");
1414 assert_eq!(
1415 fs.create_dir(Path::new("foo/sub")),
1416 Ok(()),
1417 "creating `sub`"
1418 );
1419 assert_eq!(fs.create_dir(Path::new("bar")), Ok(()), "creating `bar`");
1420 assert_eq!(fs.create_dir(Path::new("baz")), Ok(()), "creating `bar`");
1421 assert!(
1422 fs.new_open_options()
1423 .write(true)
1424 .create_new(true)
1425 .open(Path::new("a.txt"))
1426 .is_ok(),
1427 "creating `a.txt`",
1428 );
1429 assert!(
1430 fs.new_open_options()
1431 .write(true)
1432 .create_new(true)
1433 .open(Path::new("b.txt"))
1434 .is_ok(),
1435 "creating `b.txt`",
1436 );
1437
1438 let readdir = fs.read_dir(Path::new("/"));
1439
1440 assert!(
1441 readdir.is_ok(),
1442 "reading the directory `{}`",
1443 Path::new("/").display()
1444 );
1445
1446 let mut readdir = readdir.unwrap();
1447
1448 let next = readdir.next().unwrap().unwrap();
1449 assert!(next.path.ends_with("a.txt"), "checking entry #1");
1450 assert!(next.metadata().unwrap().is_file(), "checking entry #1");
1451
1452 let next = readdir.next().unwrap().unwrap();
1453 assert!(next.path.ends_with("b.txt"), "checking entry #2");
1454 assert!(next.metadata().unwrap().is_file(), "checking entry #2");
1455
1456 let next = readdir.next().unwrap().unwrap();
1457 assert!(next.path.ends_with("bar"), "checking entry #3");
1458 assert!(next.metadata().unwrap().is_dir(), "checking entry #3");
1459
1460 let next = readdir.next().unwrap().unwrap();
1461 assert!(next.path.ends_with("baz"), "checking entry #4");
1462 assert!(next.metadata().unwrap().is_dir(), "checking entry #4");
1463
1464 let next = readdir.next().unwrap().unwrap();
1465 assert!(next.path.ends_with("foo"), "checking entry #5");
1466 assert!(next.metadata().unwrap().is_dir(), "checking entry #5");
1467
1468 if let Some(s) = readdir.next() {
1469 panic!("next: {s:?}");
1470 }
1471 }
1472}