1use crate::{
2 DirEntry, FileType, FsError, Metadata, OpenOptions, OpenOptionsConfig, ReadDir, Result,
3 VirtualFile,
4};
5use bytes::{Buf, Bytes};
6use futures::future::BoxFuture;
7#[cfg(feature = "enable-serde")]
8use serde::{Deserialize, Serialize, de};
9use std::convert::TryInto;
10use std::fs;
11use std::io::{self, Seek};
12use std::path::{Component, Path, PathBuf};
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::fs as tfs;
18use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
19use tokio::runtime::Handle;
20
21#[derive(Debug, Clone)]
22#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
23pub struct FileSystem {
24 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
25 handle: Handle,
26 root: PathBuf,
27}
28
29#[allow(dead_code)]
30fn default_handle() -> Handle {
31 Handle::current()
32}
33
34pub fn canonicalize(path: &Path) -> Result<PathBuf> {
35 if !path.exists() {
36 return Err(FsError::InvalidInput);
37 }
38 dunce::canonicalize(path).map_err(Into::into)
39}
40
41pub fn normalize_path(path: &Path) -> PathBuf {
44 let mut components = path.components().peekable();
45 let mut ret = if let Some(c @ Component::Prefix(..)) = components.peek().cloned() {
46 components.next();
47 PathBuf::from(c.as_os_str())
48 } else {
49 PathBuf::new()
50 };
51
52 for component in components {
53 match component {
54 Component::Prefix(..) => unreachable!(),
55 Component::RootDir => {
56 ret.push(component.as_os_str());
57 }
58 Component::CurDir => {}
59 Component::ParentDir => {
60 ret.pop();
61 }
62 Component::Normal(c) => {
63 ret.push(c);
64 }
65 }
66 }
67 ret
68}
69
70impl FileSystem {
71 pub fn new(handle: Handle, root: impl Into<PathBuf>) -> Result<Self> {
72 let root = canonicalize(&root.into())?;
73
74 Ok(FileSystem { handle, root })
75 }
76}
77
78impl FileSystem {
79 fn prepare_path(&self, path: &Path) -> Result<PathBuf> {
80 let path = normalize_path(path);
81
82 if matches!(path.components().next(), Some(Component::Prefix(..))) {
83 return Err(FsError::InvalidInput);
84 }
85
86 if self.root != Path::new("/") && path.starts_with(&self.root) {
87 return Err(FsError::InvalidInput);
88 }
89
90 let path = path.strip_prefix("/").unwrap_or(&path);
91 let path = self.root.join(path);
92
93 debug_assert!(path.starts_with(&self.root));
94 Ok(path)
95 }
96}
97
98impl crate::FileSystem for FileSystem {
99 fn readlink(&self, path: &Path) -> Result<PathBuf> {
100 let path = self.prepare_path(path)?;
101
102 fs::read_link(path).map_err(Into::into)
103 }
104
105 fn read_dir(&self, path: &Path) -> Result<ReadDir> {
106 let path = self.prepare_path(path)?;
107
108 let read_dir = fs::read_dir(path)?;
109 let mut data = read_dir
110 .map(|entry| {
111 let entry = entry?;
112
113 let path = entry
114 .path()
115 .strip_prefix(&self.root)
116 .map_err(|_| FsError::InvalidData)?
117 .to_owned();
118 let path = Path::new("/").join(path);
119
120 let metadata = entry.metadata()?;
121
122 Ok(DirEntry {
123 path,
124 metadata: Ok(metadata.try_into()?),
125 })
126 })
127 .collect::<std::result::Result<Vec<DirEntry>, io::Error>>()
128 .map_err::<FsError, _>(Into::into)?;
129 data.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
130 Ok(ReadDir::new(data))
131 }
132
133 fn create_dir(&self, path: &Path) -> Result<()> {
134 let path = self.prepare_path(path)?;
135
136 if path.parent().is_none() {
137 return Err(FsError::BaseNotDirectory);
138 }
139
140 fs::create_dir(path).map_err(Into::into)
141 }
142
143 fn remove_dir(&self, path: &Path) -> Result<()> {
144 let path = self.prepare_path(path)?;
145
146 if path.parent().is_none() {
147 return Err(FsError::BaseNotDirectory);
148 }
149
150 if path.is_dir()
153 && fs::read_dir(&path)
154 .map(|mut s| s.next().is_some())
155 .unwrap_or(false)
156 {
157 return Err(FsError::DirectoryNotEmpty);
158 }
159 fs::remove_dir(path).map_err(Into::into)
160 }
161
162 fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> BoxFuture<'a, Result<()>> {
163 Box::pin(async move {
164 use filetime::{FileTime, set_file_mtime};
165 let norm_from = normalize_path(from);
166 let norm_to = normalize_path(to);
167
168 if norm_from.parent().is_none() {
169 return Err(FsError::BaseNotDirectory);
170 }
171 if norm_to.parent().is_none() {
172 return Err(FsError::BaseNotDirectory);
173 }
174
175 let from = self.prepare_path(from)?;
176 let to = self.prepare_path(to)?;
177
178 if !from.exists() {
179 return Err(FsError::EntryNotFound);
180 }
181 let from_parent = from.parent().unwrap();
182 let to_parent = to.parent().unwrap();
183 if !from_parent.exists() {
184 return Err(FsError::EntryNotFound);
185 }
186 if !to_parent.exists() {
187 return Err(FsError::EntryNotFound);
188 }
189 let result = if from_parent != to_parent {
190 let _ = std::fs::create_dir_all(to_parent);
191 if from.is_dir() {
192 fs_extra::move_items(
193 &[&from],
194 &to,
195 &fs_extra::dir::CopyOptions {
196 copy_inside: true,
197 ..Default::default()
198 },
199 )
200 .map(|_| ())
201 .map_err(|_| FsError::UnknownError)?;
202 let _ = fs_extra::remove_items(&[&from]);
203 Ok(())
204 } else {
205 fs::copy(&from, &to).map(|_| ()).map_err(FsError::from)?;
206 fs::remove_file(&from).map(|_| ()).map_err(Into::into)
207 }
208 } else {
209 fs::rename(&from, &to).map_err(Into::into)
210 };
211 let _ = set_file_mtime(&to, FileTime::now()).map(|_| ());
212 result
213 })
214 }
215
216 fn remove_file(&self, path: &Path) -> Result<()> {
217 let path = self.prepare_path(path)?;
218
219 if path.parent().is_none() {
220 return Err(FsError::BaseNotDirectory);
221 }
222
223 fs::remove_file(path).map_err(Into::into)
224 }
225
226 fn new_open_options(&self) -> OpenOptions<'_> {
227 OpenOptions::new(self)
228 }
229
230 fn metadata(&self, path: &Path) -> Result<Metadata> {
231 let path = self.prepare_path(path)?;
232
233 fs::metadata(path)
234 .and_then(TryInto::try_into)
235 .map_err(Into::into)
236 }
237
238 fn symlink_metadata(&self, path: &Path) -> Result<Metadata> {
239 let path = self.prepare_path(path)?;
240
241 fs::symlink_metadata(path)
242 .and_then(TryInto::try_into)
243 .map_err(Into::into)
244 }
245}
246
247impl TryInto<Metadata> for std::fs::Metadata {
248 type Error = io::Error;
249
250 fn try_into(self) -> std::result::Result<Metadata, Self::Error> {
251 let filetype = self.file_type();
252 let (char_device, block_device, socket, fifo) = {
253 #[cfg(unix)]
254 {
255 use std::os::unix::fs::FileTypeExt;
256 (
257 filetype.is_char_device(),
258 filetype.is_block_device(),
259 filetype.is_socket(),
260 filetype.is_fifo(),
261 )
262 }
263 #[cfg(not(unix))]
264 {
265 (false, false, false, false)
266 }
267 };
268
269 Ok(Metadata {
270 ft: FileType {
271 dir: filetype.is_dir(),
272 file: filetype.is_file(),
273 symlink: filetype.is_symlink(),
274 char_device,
275 block_device,
276 socket,
277 fifo,
278 },
279 accessed: self
280 .accessed()
281 .and_then(|time| time.duration_since(UNIX_EPOCH).map_err(io::Error::other))
282 .map_or(0, |time| time.as_nanos() as u64),
283 created: self
284 .created()
285 .and_then(|time| time.duration_since(UNIX_EPOCH).map_err(io::Error::other))
286 .map_or(0, |time| time.as_nanos() as u64),
287 modified: self
288 .modified()
289 .and_then(|time| time.duration_since(UNIX_EPOCH).map_err(io::Error::other))
290 .map_or(0, |time| time.as_nanos() as u64),
291 len: self.len(),
292 })
293 }
294}
295
296impl crate::FileOpener for FileSystem {
297 fn open(
298 &self,
299 path: &Path,
300 conf: &OpenOptionsConfig,
301 ) -> Result<Box<dyn VirtualFile + Send + Sync + 'static>> {
302 let path = self.prepare_path(path)?;
303
304 let read = conf.read();
306 let write = conf.write();
307
308 let append = if conf.truncate { false } else { conf.append() };
314
315 let mut oo = fs::OpenOptions::new();
316 oo.read(conf.read())
317 .write(conf.write())
318 .create_new(conf.create_new())
319 .create(conf.create())
320 .append(append)
321 .truncate(conf.truncate())
322 .open(&path)
323 .map_err(Into::into)
324 .map(|file| {
325 Box::new(File::new(
326 self.handle.clone(),
327 file,
328 path.to_owned(),
329 read,
330 write,
331 append,
332 )) as Box<dyn VirtualFile + Send + Sync + 'static>
333 })
334 }
335}
336
337#[derive(Debug)]
339#[cfg_attr(feature = "enable-serde", derive(Serialize))]
340pub struct File {
341 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
342 handle: Handle,
343 #[cfg_attr(feature = "enable-serde", serde(skip))]
344 inner: tfs::File,
345 #[cfg_attr(feature = "enable-serde", serde(skip_serializing))]
346 inner_std: fs::File,
347 pub host_path: PathBuf,
348 #[cfg(feature = "enable-serde")]
349 flags: u16,
350}
351
352#[cfg(feature = "enable-serde")]
353impl<'de> Deserialize<'de> for File {
354 fn deserialize<D>(deserializer: D) -> std::result::Result<File, D::Error>
355 where
356 D: serde::Deserializer<'de>,
357 {
358 #[derive(Deserialize)]
359 #[serde(field_identifier, rename_all = "snake_case")]
360 enum Field {
361 HostPath,
362 Flags,
363 }
364
365 struct FileVisitor;
366
367 impl<'de> de::Visitor<'de> for FileVisitor {
368 type Value = File;
369
370 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
371 formatter.write_str("struct File")
372 }
373
374 fn visit_seq<V>(self, mut seq: V) -> std::result::Result<Self::Value, V::Error>
375 where
376 V: de::SeqAccess<'de>,
377 {
378 let host_path = seq
379 .next_element()?
380 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
381 let flags = seq
382 .next_element()?
383 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
384 let inner = fs::OpenOptions::new()
385 .read(flags & File::READ != 0)
386 .write(flags & File::WRITE != 0)
387 .append(flags & File::APPEND != 0)
388 .open(&host_path)
389 .map_err(|_| de::Error::custom("Could not open file on this system"))?;
390 Ok(File {
391 handle: Handle::current(),
392 inner: tokio::fs::File::from_std(inner.try_clone().unwrap()),
393 inner_std: inner,
394 host_path,
395 flags,
396 })
397 }
398
399 fn visit_map<V>(self, mut map: V) -> std::result::Result<Self::Value, V::Error>
400 where
401 V: de::MapAccess<'de>,
402 {
403 let mut host_path = None;
404 let mut flags = None;
405 while let Some(key) = map.next_key()? {
406 match key {
407 Field::HostPath => {
408 if host_path.is_some() {
409 return Err(de::Error::duplicate_field("host_path"));
410 }
411 host_path = Some(map.next_value()?);
412 }
413 Field::Flags => {
414 if flags.is_some() {
415 return Err(de::Error::duplicate_field("flags"));
416 }
417 flags = Some(map.next_value()?);
418 }
419 }
420 }
421 let host_path = host_path.ok_or_else(|| de::Error::missing_field("host_path"))?;
422 let flags = flags.ok_or_else(|| de::Error::missing_field("flags"))?;
423 let inner = fs::OpenOptions::new()
424 .read(flags & File::READ != 0)
425 .write(flags & File::WRITE != 0)
426 .append(flags & File::APPEND != 0)
427 .open(&host_path)
428 .map_err(|_| de::Error::custom("Could not open file on this system"))?;
429 Ok(File {
430 handle: Handle::current(),
431 inner: tokio::fs::File::from_std(inner.try_clone().unwrap()),
432 inner_std: inner,
433 host_path,
434 flags,
435 })
436 }
437 }
438
439 const FIELDS: &[&str] = &["host_path", "flags"];
440 deserializer.deserialize_struct("File", FIELDS, FileVisitor)
441 }
442}
443
444impl File {
445 const READ: u16 = 1;
446 const WRITE: u16 = 2;
447 const APPEND: u16 = 4;
448
449 pub fn new(
451 handle: Handle,
452 file: fs::File,
453 host_path: PathBuf,
454 read: bool,
455 write: bool,
456 append: bool,
457 ) -> Self {
458 let mut _flags = 0;
459
460 if read {
461 _flags |= Self::READ;
462 }
463
464 if write {
465 _flags |= Self::WRITE;
466 }
467
468 if append {
469 _flags |= Self::APPEND;
470 }
471
472 let async_file = tfs::File::from_std(file.try_clone().unwrap());
473 Self {
474 handle,
475 inner_std: file,
476 inner: async_file,
477 host_path,
478 #[cfg(feature = "enable-serde")]
479 flags: _flags,
480 }
481 }
482
483 fn metadata(&self) -> std::fs::Metadata {
484 self.inner_std.metadata().unwrap()
486 }
487}
488
489#[async_trait::async_trait]
491impl VirtualFile for File {
492 fn last_accessed(&self) -> u64 {
493 self.metadata()
494 .accessed()
495 .ok()
496 .and_then(|ct| ct.duration_since(SystemTime::UNIX_EPOCH).ok())
497 .map(|ct| ct.as_nanos() as u64)
498 .unwrap_or(0)
499 }
500
501 fn last_modified(&self) -> u64 {
502 self.metadata()
503 .modified()
504 .ok()
505 .and_then(|ct| ct.duration_since(SystemTime::UNIX_EPOCH).ok())
506 .map(|ct| ct.as_nanos() as u64)
507 .unwrap_or(0)
508 }
509
510 fn created_time(&self) -> u64 {
511 self.metadata()
512 .created()
513 .ok()
514 .and_then(|ct| ct.duration_since(SystemTime::UNIX_EPOCH).ok())
515 .map(|ct| ct.as_nanos() as u64)
516 .unwrap_or(0)
517 }
518
519 fn set_times(&mut self, atime: Option<u64>, mtime: Option<u64>) -> crate::Result<()> {
520 let atime = atime.map(|t| filetime::FileTime::from_unix_time(t as i64, 0));
521 let mtime = mtime.map(|t| filetime::FileTime::from_unix_time(t as i64, 0));
522
523 filetime::set_file_handle_times(&self.inner_std, atime, mtime)
524 .map_err(|_| crate::FsError::IOError)
525 }
526
527 fn size(&self) -> u64 {
528 self.metadata().len()
529 }
530
531 fn set_len(&mut self, new_size: u64) -> crate::Result<()> {
532 fs::File::set_len(&self.inner_std, new_size).map_err(Into::into)
533 }
534
535 fn unlink(&mut self) -> Result<()> {
536 fs::remove_file(&self.host_path).map_err(Into::into)
537 }
538
539 fn get_special_fd(&self) -> Option<u32> {
540 None
541 }
542
543 fn poll_read_ready(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
544 let cursor = match self.inner_std.stream_position() {
545 Ok(a) => a,
546 Err(err) => return Poll::Ready(Err(err)),
547 };
548 let end = match self.inner_std.seek(io::SeekFrom::End(0)) {
549 Ok(a) => a,
550 Err(err) => return Poll::Ready(Err(err)),
551 };
552 let _ = self.inner_std.seek(io::SeekFrom::Start(cursor));
553
554 let remaining = end - cursor;
555 Poll::Ready(Ok(remaining as usize))
556 }
557
558 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
559 Poll::Ready(Ok(8192))
560 }
561}
562
563impl AsyncRead for File {
564 fn poll_read(
565 mut self: Pin<&mut Self>,
566 cx: &mut Context<'_>,
567 buf: &mut tokio::io::ReadBuf<'_>,
568 ) -> Poll<io::Result<()>> {
569 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
570 let inner = Pin::new(&mut self.inner);
571 inner.poll_read(cx, buf)
572 }
573}
574
575impl AsyncWrite for File {
576 fn poll_write(
577 mut self: Pin<&mut Self>,
578 cx: &mut Context<'_>,
579 buf: &[u8],
580 ) -> Poll<io::Result<usize>> {
581 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
582 let inner = Pin::new(&mut self.inner);
583 inner.poll_write(cx, buf)
584 }
585
586 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
587 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
588 let inner = Pin::new(&mut self.inner);
589 inner.poll_flush(cx)
590 }
591
592 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
593 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
594 let inner = Pin::new(&mut self.inner);
595 inner.poll_shutdown(cx)
596 }
597
598 fn poll_write_vectored(
599 mut self: Pin<&mut Self>,
600 cx: &mut Context<'_>,
601 bufs: &[io::IoSlice<'_>],
602 ) -> Poll<io::Result<usize>> {
603 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
604 let inner = Pin::new(&mut self.inner);
605 inner.poll_write_vectored(cx, bufs)
606 }
607
608 fn is_write_vectored(&self) -> bool {
609 self.inner.is_write_vectored()
610 }
611}
612
613impl AsyncSeek for File {
614 fn start_seek(mut self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> {
615 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
616 let inner = Pin::new(&mut self.inner);
617 inner.start_seek(position)
618 }
619
620 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
621 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
622 let inner = Pin::new(&mut self.inner);
623 inner.poll_complete(cx)
624 }
625}
626
627impl Drop for File {
628 fn drop(&mut self) {
629 tracing::trace!(?self.host_path, "Closing host file");
630 }
631}
632
633#[derive(Debug)]
635#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
636pub struct Stdout {
637 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
638 handle: Handle,
639 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_stdout"))]
640 inner: tokio::io::Stdout,
641}
642#[allow(dead_code)]
643fn default_stdout() -> tokio::io::Stdout {
644 tokio::io::stdout()
645}
646impl Default for Stdout {
647 fn default() -> Self {
648 Self {
649 handle: Handle::current(),
650 inner: tokio::io::stdout(),
651 }
652 }
653}
654
655const DEFAULT_BUF_SIZE_HINT: usize = 8 * 1024;
663
664#[async_trait::async_trait]
666impl VirtualFile for Stdout {
667 fn last_accessed(&self) -> u64 {
668 0
669 }
670
671 fn last_modified(&self) -> u64 {
672 0
673 }
674
675 fn created_time(&self) -> u64 {
676 0
677 }
678
679 fn size(&self) -> u64 {
680 0
681 }
682
683 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
684 Ok(())
685 }
686
687 fn unlink(&mut self) -> Result<()> {
688 Ok(())
689 }
690
691 fn get_special_fd(&self) -> Option<u32> {
692 Some(1)
693 }
694
695 fn poll_read_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
696 Poll::Ready(Ok(0))
697 }
698
699 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
700 Poll::Ready(Ok(DEFAULT_BUF_SIZE_HINT))
701 }
702}
703
704impl AsyncRead for Stdout {
705 fn poll_read(
706 self: Pin<&mut Self>,
707 _cx: &mut Context<'_>,
708 _buf: &mut tokio::io::ReadBuf<'_>,
709 ) -> Poll<io::Result<()>> {
710 Poll::Ready(Err(io::Error::other("can not read from stdout")))
711 }
712}
713
714impl AsyncWrite for Stdout {
715 fn poll_write(
716 mut self: Pin<&mut Self>,
717 cx: &mut Context<'_>,
718 buf: &[u8],
719 ) -> Poll<io::Result<usize>> {
720 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
721 let inner = Pin::new(&mut self.inner);
722 inner.poll_write(cx, buf)
723 }
724
725 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
726 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
727 let inner = Pin::new(&mut self.inner);
728 inner.poll_flush(cx)
729 }
730
731 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
732 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
733 let inner = Pin::new(&mut self.inner);
734 inner.poll_shutdown(cx)
735 }
736
737 fn poll_write_vectored(
738 mut self: Pin<&mut Self>,
739 cx: &mut Context<'_>,
740 bufs: &[io::IoSlice<'_>],
741 ) -> Poll<io::Result<usize>> {
742 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
743 let inner = Pin::new(&mut self.inner);
744 inner.poll_write_vectored(cx, bufs)
745 }
746
747 fn is_write_vectored(&self) -> bool {
748 self.inner.is_write_vectored()
749 }
750}
751
752impl AsyncSeek for Stdout {
753 fn start_seek(self: Pin<&mut Self>, _position: io::SeekFrom) -> io::Result<()> {
754 Err(io::Error::other("can not seek stdout"))
755 }
756
757 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
758 Poll::Ready(Err(io::Error::other("can not seek stdout")))
759 }
760}
761
762#[derive(Debug)]
764#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
765pub struct Stderr {
766 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
767 handle: Handle,
768 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_stderr"))]
769 inner: tokio::io::Stderr,
770}
771#[allow(dead_code)]
772fn default_stderr() -> tokio::io::Stderr {
773 tokio::io::stderr()
774}
775impl Default for Stderr {
776 fn default() -> Self {
777 Self {
778 handle: Handle::current(),
779 inner: tokio::io::stderr(),
780 }
781 }
782}
783
784impl AsyncRead for Stderr {
785 fn poll_read(
786 self: Pin<&mut Self>,
787 _cx: &mut Context<'_>,
788 _buf: &mut tokio::io::ReadBuf<'_>,
789 ) -> Poll<io::Result<()>> {
790 Poll::Ready(Err(io::Error::other("can not read from stderr")))
791 }
792}
793
794impl AsyncWrite for Stderr {
795 fn poll_write(
796 mut self: Pin<&mut Self>,
797 cx: &mut Context<'_>,
798 buf: &[u8],
799 ) -> Poll<io::Result<usize>> {
800 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
801 let inner = Pin::new(&mut self.inner);
802 inner.poll_write(cx, buf)
803 }
804
805 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
806 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
807 let inner = Pin::new(&mut self.inner);
808 inner.poll_flush(cx)
809 }
810
811 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
812 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
813 let inner = Pin::new(&mut self.inner);
814 inner.poll_shutdown(cx)
815 }
816
817 fn poll_write_vectored(
818 mut self: Pin<&mut Self>,
819 cx: &mut Context<'_>,
820 bufs: &[io::IoSlice<'_>],
821 ) -> Poll<io::Result<usize>> {
822 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
823 let inner = Pin::new(&mut self.inner);
824 inner.poll_write_vectored(cx, bufs)
825 }
826
827 fn is_write_vectored(&self) -> bool {
828 self.inner.is_write_vectored()
829 }
830}
831
832impl AsyncSeek for Stderr {
833 fn start_seek(self: Pin<&mut Self>, _position: io::SeekFrom) -> io::Result<()> {
834 Err(io::Error::other("can not seek stderr"))
835 }
836
837 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
838 Poll::Ready(Err(io::Error::other("can not seek stderr")))
839 }
840}
841
842#[async_trait::async_trait]
844impl VirtualFile for Stderr {
845 fn last_accessed(&self) -> u64 {
846 0
847 }
848
849 fn last_modified(&self) -> u64 {
850 0
851 }
852
853 fn created_time(&self) -> u64 {
854 0
855 }
856
857 fn size(&self) -> u64 {
858 0
859 }
860
861 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
862 Ok(())
863 }
864
865 fn unlink(&mut self) -> Result<()> {
866 Ok(())
867 }
868
869 fn get_special_fd(&self) -> Option<u32> {
870 Some(2)
871 }
872
873 fn poll_read_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
874 Poll::Ready(Ok(0))
875 }
876
877 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
878 Poll::Ready(Ok(8192))
879 }
880}
881
882#[derive(Debug)]
884#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
885pub struct Stdin {
886 read_buffer: Arc<std::sync::Mutex<Option<Bytes>>>,
887 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_handle"))]
888 handle: Handle,
889 #[cfg_attr(feature = "enable-serde", serde(skip, default = "default_stdin"))]
890 inner: tokio::io::Stdin,
891}
892#[allow(dead_code)]
893fn default_stdin() -> tokio::io::Stdin {
894 tokio::io::stdin()
895}
896impl Default for Stdin {
897 fn default() -> Self {
898 Self {
899 handle: Handle::current(),
900 read_buffer: Arc::new(std::sync::Mutex::new(None)),
901 inner: tokio::io::stdin(),
902 }
903 }
904}
905
906impl AsyncRead for Stdin {
907 fn poll_read(
908 mut self: Pin<&mut Self>,
909 cx: &mut Context<'_>,
910 buf: &mut tokio::io::ReadBuf<'_>,
911 ) -> Poll<io::Result<()>> {
912 let max_size = buf.remaining();
913 {
914 let mut read_buffer = self.read_buffer.lock().unwrap();
915 if let Some(read_buffer) = read_buffer.as_mut() {
916 let buf_len = read_buffer.len();
917 if buf_len > 0 {
918 let read = buf_len.min(max_size);
919 buf.put_slice(&read_buffer[..read]);
920 read_buffer.advance(read);
921 return Poll::Ready(Ok(()));
922 }
923 }
924 }
925
926 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
927 let inner = Pin::new(&mut self.inner);
928 inner.poll_read(cx, buf)
929 }
930}
931
932impl AsyncWrite for Stdin {
933 fn poll_write(
934 self: Pin<&mut Self>,
935 _cx: &mut Context<'_>,
936 _buf: &[u8],
937 ) -> Poll<io::Result<usize>> {
938 Poll::Ready(Err(io::Error::other("can not wrote to stdin")))
939 }
940
941 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
942 Poll::Ready(Err(io::Error::other("can not flush stdin")))
943 }
944
945 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
946 Poll::Ready(Err(io::Error::other("can not wrote to stdin")))
947 }
948
949 fn poll_write_vectored(
950 self: Pin<&mut Self>,
951 _cx: &mut Context<'_>,
952 _bufs: &[io::IoSlice<'_>],
953 ) -> Poll<io::Result<usize>> {
954 Poll::Ready(Err(io::Error::other("can not wrote to stdin")))
955 }
956}
957
958impl AsyncSeek for Stdin {
959 fn start_seek(self: Pin<&mut Self>, _position: io::SeekFrom) -> io::Result<()> {
960 Err(io::Error::other("can not seek stdin"))
961 }
962
963 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
964 Poll::Ready(Err(io::Error::other("can not seek stdin")))
965 }
966}
967
968#[async_trait::async_trait]
970impl VirtualFile for Stdin {
971 fn last_accessed(&self) -> u64 {
972 0
973 }
974 fn last_modified(&self) -> u64 {
975 0
976 }
977 fn created_time(&self) -> u64 {
978 0
979 }
980 fn size(&self) -> u64 {
981 0
982 }
983 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
984 Ok(())
985 }
986 fn unlink(&mut self) -> Result<()> {
987 Ok(())
988 }
989 fn get_special_fd(&self) -> Option<u32> {
990 Some(0)
991 }
992 fn poll_read_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
993 {
994 let read_buffer = self.read_buffer.lock().unwrap();
995 if let Some(read_buffer) = read_buffer.as_ref() {
996 let buf_len = read_buffer.len();
997 if buf_len > 0 {
998 return Poll::Ready(Ok(buf_len));
999 }
1000 }
1001 }
1002
1003 let _guard = Handle::try_current().map_err(|_| self.handle.enter());
1004 let inner = Pin::new(&mut self.inner);
1005
1006 let mut buf = [0u8; 8192];
1007 let mut read_buf = ReadBuf::new(&mut buf[..]);
1008 match inner.poll_read(cx, &mut read_buf) {
1009 Poll::Pending => Poll::Pending,
1010 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1011 Poll::Ready(Ok(())) => {
1012 let buf = read_buf.filled();
1013 let buf_len = buf.len();
1014
1015 let mut read_buffer = self.read_buffer.lock().unwrap();
1016 read_buffer.replace(Bytes::from(buf.to_vec()));
1017 Poll::Ready(Ok(buf_len))
1018 }
1019 }
1020 }
1021 fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1022 Poll::Ready(Ok(0))
1023 }
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028 use tempfile::TempDir;
1029 use tokio::runtime::Handle;
1030
1031 use super::FileSystem;
1032 use crate::FileSystem as FileSystemTrait;
1033 use crate::FsError;
1034 use std::path::Path;
1035
1036 #[tokio::test]
1037 async fn test_new_filesystem() {
1038 let temp = TempDir::new().unwrap();
1039 std::fs::write(temp.path().join("foo2.txt"), b"").unwrap();
1040
1041 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1042 assert!(
1043 fs.read_dir(Path::new("/")).is_ok(),
1044 "NativeFS can read root"
1045 );
1046 assert!(
1047 fs.new_open_options()
1048 .read(true)
1049 .open(Path::new("/foo2.txt"))
1050 .is_ok(),
1051 "created foo2.txt"
1052 );
1053 }
1054
1055 #[tokio::test]
1056 async fn test_create_dir() {
1057 let temp: TempDir = TempDir::new().unwrap();
1058 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1059
1060 assert_eq!(
1061 fs.create_dir(Path::new("../")),
1062 Err(FsError::AlreadyExists),
1063 "creating a directory out of bounds",
1064 );
1065
1066 assert_eq!(
1067 fs.create_dir(Path::new("/foo")),
1068 Ok(()),
1069 "creating a directory",
1070 );
1071
1072 assert!(
1073 temp.path().join("foo").exists(),
1074 "foo dir exists in host_fs"
1075 );
1076
1077 let cur_dir = read_dir_names(&fs, "/");
1078
1079 if !cur_dir.contains(&"foo".to_string()) {
1080 panic!("cur_dir does not contain foo: {cur_dir:#?}");
1081 }
1082
1083 assert!(
1084 cur_dir.contains(&"foo".to_string()),
1085 "the root is updated and well-defined"
1086 );
1087
1088 assert_eq!(
1089 fs.create_dir(Path::new("foo/bar")),
1090 Ok(()),
1091 "creating a sub-directory",
1092 );
1093
1094 assert!(
1095 temp.path().join("foo").join("bar").exists(),
1096 "foo dir exists in host_fs"
1097 );
1098
1099 let foo_dir = read_dir_names(&fs, Path::new("/foo"));
1100
1101 assert!(
1102 foo_dir.contains(&"bar".to_string()),
1103 "the foo directory is updated and well-defined"
1104 );
1105
1106 let bar_dir = read_dir_names(&fs, Path::new("/foo/bar"));
1107
1108 assert!(
1109 bar_dir.is_empty(),
1110 "the foo directory is updated and well-defined"
1111 );
1112 }
1113
1114 #[tokio::test]
1115 async fn test_remove_dir() {
1116 let temp: TempDir = TempDir::new().unwrap();
1117 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1118
1119 assert_eq!(
1120 fs.remove_dir(Path::new("/foo")),
1121 Err(FsError::EntryNotFound),
1122 "cannot remove a directory that doesn't exist",
1123 );
1124
1125 assert_eq!(
1126 fs.create_dir(Path::new("foo")),
1127 Ok(()),
1128 "creating a directory",
1129 );
1130
1131 assert_eq!(
1132 fs.create_dir(Path::new("foo/bar")),
1133 Ok(()),
1134 "creating a sub-directory",
1135 );
1136
1137 assert!(temp.path().join("foo/bar").exists(), "./foo/bar exists");
1138
1139 assert_eq!(
1140 fs.remove_dir(Path::new("foo")),
1141 Err(FsError::DirectoryNotEmpty),
1142 "removing a directory that has children",
1143 );
1144
1145 assert_eq!(
1146 fs.remove_dir(Path::new("foo/bar")),
1147 Ok(()),
1148 "removing a sub-directory",
1149 );
1150
1151 assert_eq!(
1152 fs.remove_dir(Path::new("foo")),
1153 Ok(()),
1154 "removing a directory",
1155 );
1156
1157 let cur_dir = read_dir_names(&fs, "/");
1158
1159 assert!(
1160 !cur_dir.contains(&"foo".to_string()),
1161 "the foo directory still exists"
1162 );
1163 }
1164
1165 fn read_dir_names(fs: &FileSystem, path: impl AsRef<Path>) -> Vec<String> {
1166 fs.read_dir(path.as_ref())
1167 .unwrap()
1168 .filter_map(|entry| Some(entry.ok()?.file_name().to_str()?.to_string()))
1169 .collect::<Vec<_>>()
1170 }
1171
1172 #[tokio::test]
1173 async fn test_rename() {
1174 let temp: TempDir = TempDir::new().unwrap();
1175 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1176 std::fs::create_dir_all(temp.path().join("foo").join("qux")).unwrap();
1177 let foo = Path::new("foo");
1178 let bar = Path::new("bar");
1179 let foo_realpath = temp.path().join(foo);
1180 let bar_realpath = temp.path().join(bar);
1181
1182 assert_eq!(
1183 fs.rename(Path::new("/"), Path::new("/bar")).await,
1184 Err(FsError::BaseNotDirectory),
1185 "renaming a directory that has no parent",
1186 );
1187 assert_eq!(
1188 fs.rename(Path::new("/foo"), Path::new("/")).await,
1189 Err(FsError::BaseNotDirectory),
1190 "renaming to a directory that has no parent",
1191 );
1192
1193 assert_eq!(
1194 fs.rename(foo, &foo.join("bar").join("baz"),).await,
1195 Err(FsError::EntryNotFound),
1196 "renaming to a directory that has parent that doesn't exist",
1197 );
1198
1199 #[cfg(not(target_os = "windows"))]
1201 assert_eq!(fs.create_dir(bar), Ok(()));
1202
1203 assert_eq!(
1204 fs.rename(foo, bar).await,
1205 Ok(()),
1206 "renaming to a directory that has parent that exists",
1207 );
1208
1209 assert!(
1210 fs.new_open_options()
1211 .write(true)
1212 .create_new(true)
1213 .open(bar.join("hello1.txt"))
1214 .is_ok(),
1215 "creating a new file (`hello1.txt`)",
1216 );
1217 assert!(
1218 fs.new_open_options()
1219 .write(true)
1220 .create_new(true)
1221 .open(bar.join("hello2.txt"))
1222 .is_ok(),
1223 "creating a new file (`hello2.txt`)",
1224 );
1225
1226 let cur_dir = read_dir_names(&fs, Path::new("/"));
1227
1228 assert!(
1229 !cur_dir.contains(&"foo".to_string()),
1230 "the foo directory still exists"
1231 );
1232
1233 assert!(
1234 cur_dir.contains(&"bar".to_string()),
1235 "the bar directory still exists"
1236 );
1237
1238 let bar_dir = read_dir_names(&fs, bar);
1239
1240 if !bar_dir.contains(&"qux".to_string()) {
1241 println!("qux does not exist: {bar_dir:?}")
1242 }
1243
1244 let qux_dir = read_dir_names(&fs, bar.join("qux"));
1245
1246 assert!(qux_dir.is_empty(), "the qux directory is empty");
1247
1248 assert!(
1249 bar_realpath.join("hello1.txt").exists(),
1250 "the /bar/hello1.txt file exists"
1251 );
1252
1253 assert!(
1254 bar_realpath.join("hello2.txt").exists(),
1255 "the /bar/hello2.txt file exists"
1256 );
1257
1258 assert_eq!(fs.create_dir(foo), Ok(()), "create ./foo again");
1259
1260 assert_eq!(
1261 fs.rename(&bar.join("hello2.txt"), &foo.join("world2.txt"))
1262 .await,
1263 Ok(()),
1264 "renaming (and moving) a file",
1265 );
1266
1267 assert_eq!(
1268 fs.rename(foo, &bar.join("baz")).await,
1269 Ok(()),
1270 "renaming a directory",
1271 );
1272
1273 assert_eq!(
1274 fs.rename(&bar.join("hello1.txt"), &bar.join("world1.txt"))
1275 .await,
1276 Ok(()),
1277 "renaming a file (in the same directory)",
1278 );
1279
1280 assert!(bar_realpath.exists(), "./bar exists");
1281 assert!(bar_realpath.join("baz").exists(), "./bar/baz exists");
1282 assert!(!foo_realpath.exists(), "foo does not exist anymore");
1283 assert!(
1284 bar_realpath.join("baz/world2.txt").exists(),
1285 "/bar/baz/world2.txt exists"
1286 );
1287 assert!(
1288 bar_realpath.join("world1.txt").exists(),
1289 "/bar/world1.txt (ex hello1.txt) exists"
1290 );
1291 assert!(
1292 !bar_realpath.join("hello1.txt").exists(),
1293 "hello1.txt was moved"
1294 );
1295 assert!(
1296 !bar_realpath.join("hello2.txt").exists(),
1297 "hello2.txt was moved"
1298 );
1299 assert!(
1300 bar_realpath.join("baz/world2.txt").exists(),
1301 "world2.txt was moved to the correct place"
1302 );
1303 }
1304
1305 #[tokio::test]
1306 async fn test_metadata() {
1307 use std::thread::sleep;
1308 use std::time::Duration;
1309
1310 let temp = TempDir::new().unwrap();
1311
1312 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1313
1314 let root_metadata = fs.metadata(Path::new("/")).unwrap();
1315
1316 assert!(root_metadata.ft.dir);
1317 #[cfg(not(target_env = "musl"))]
1319 assert_eq!(root_metadata.accessed, root_metadata.created);
1320 #[cfg(not(target_env = "musl"))]
1321 assert_eq!(root_metadata.modified, root_metadata.created);
1322 assert!(root_metadata.modified > 0);
1323
1324 let foo = Path::new("foo");
1325
1326 assert_eq!(fs.create_dir(foo), Ok(()));
1327
1328 let foo_metadata = fs.metadata(foo);
1329 assert!(foo_metadata.is_ok());
1330 let foo_metadata = foo_metadata.unwrap();
1331
1332 assert!(foo_metadata.ft.dir);
1333 #[cfg(not(target_env = "musl"))]
1334 assert_eq!(foo_metadata.accessed, foo_metadata.created);
1335 #[cfg(not(target_env = "musl"))]
1336 assert_eq!(foo_metadata.modified, foo_metadata.created);
1337 assert!(foo_metadata.modified > 0);
1338
1339 sleep(Duration::from_secs(3));
1340
1341 let bar = Path::new("bar");
1342
1343 assert_eq!(fs.rename(foo, bar).await, Ok(()));
1344
1345 let bar_metadata = fs.metadata(bar).unwrap();
1346 assert!(bar_metadata.ft.dir);
1347 assert!(bar_metadata.accessed >= foo_metadata.accessed);
1348 assert_eq!(bar_metadata.created, foo_metadata.created);
1349 assert!(bar_metadata.modified > foo_metadata.modified);
1350
1351 let root_metadata = fs.metadata(bar).unwrap();
1352 assert!(
1353 root_metadata.modified > foo_metadata.modified,
1354 "the parent modified time was updated"
1355 );
1356 }
1357
1358 #[tokio::test]
1359 async fn test_rejects_host_absolute_paths_inside_root() {
1360 let temp = TempDir::new().unwrap();
1361 let file_path = temp.path().join("foo.txt");
1362 std::fs::write(&file_path, b"hello").unwrap();
1363
1364 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1365
1366 assert_eq!(fs.metadata(&file_path), Err(FsError::InvalidInput));
1367 assert!(matches!(
1368 fs.new_open_options().read(true).open(&file_path),
1369 Err(FsError::InvalidInput)
1370 ));
1371 }
1372
1373 #[tokio::test]
1374 async fn test_remove_file() {
1375 let temp = TempDir::new().unwrap();
1376 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1377
1378 assert!(
1379 fs.new_open_options()
1380 .write(true)
1381 .create_new(true)
1382 .open(Path::new("foo.txt"))
1383 .is_ok(),
1384 "creating a new file",
1385 );
1386
1387 assert!(read_dir_names(&fs, Path::new("/")).contains(&"foo.txt".to_string()));
1388
1389 assert!(temp.path().join("foo.txt").is_file());
1390
1391 assert_eq!(
1392 fs.remove_file(Path::new("foo.txt")),
1393 Ok(()),
1394 "removing a file that exists",
1395 );
1396
1397 assert!(!temp.path().join("foo.txt").exists());
1398
1399 assert_eq!(
1400 fs.remove_file(Path::new("foo.txt")),
1401 Err(FsError::EntryNotFound),
1402 "removing a file that doesn't exists",
1403 );
1404 }
1405
1406 #[tokio::test]
1407 async fn test_readdir() {
1408 let temp = TempDir::new().unwrap();
1409 let fs = FileSystem::new(Handle::current(), temp.path()).expect("get filesystem");
1410
1411 assert_eq!(fs.create_dir(Path::new("foo")), Ok(()), "creating `foo`");
1412 assert_eq!(
1413 fs.create_dir(Path::new("foo/sub")),
1414 Ok(()),
1415 "creating `sub`"
1416 );
1417 assert_eq!(fs.create_dir(Path::new("bar")), Ok(()), "creating `bar`");
1418 assert_eq!(fs.create_dir(Path::new("baz")), Ok(()), "creating `bar`");
1419 assert!(
1420 fs.new_open_options()
1421 .write(true)
1422 .create_new(true)
1423 .open(Path::new("a.txt"))
1424 .is_ok(),
1425 "creating `a.txt`",
1426 );
1427 assert!(
1428 fs.new_open_options()
1429 .write(true)
1430 .create_new(true)
1431 .open(Path::new("b.txt"))
1432 .is_ok(),
1433 "creating `b.txt`",
1434 );
1435
1436 let readdir = fs.read_dir(Path::new("/"));
1437
1438 assert!(
1439 readdir.is_ok(),
1440 "reading the directory `{}`",
1441 Path::new("/").display()
1442 );
1443
1444 let mut readdir = readdir.unwrap();
1445
1446 let next = readdir.next().unwrap().unwrap();
1447 assert!(next.path.ends_with("a.txt"), "checking entry #1");
1448 assert!(next.metadata().unwrap().is_file(), "checking entry #1");
1449
1450 let next = readdir.next().unwrap().unwrap();
1451 assert!(next.path.ends_with("b.txt"), "checking entry #2");
1452 assert!(next.metadata().unwrap().is_file(), "checking entry #2");
1453
1454 let next = readdir.next().unwrap().unwrap();
1455 assert!(next.path.ends_with("bar"), "checking entry #3");
1456 assert!(next.metadata().unwrap().is_dir(), "checking entry #3");
1457
1458 let next = readdir.next().unwrap().unwrap();
1459 assert!(next.path.ends_with("baz"), "checking entry #4");
1460 assert!(next.metadata().unwrap().is_dir(), "checking entry #4");
1461
1462 let next = readdir.next().unwrap().unwrap();
1463 assert!(next.path.ends_with("foo"), "checking entry #5");
1464 assert!(next.metadata().unwrap().is_dir(), "checking entry #5");
1465
1466 if let Some(s) = readdir.next() {
1467 panic!("next: {s:?}");
1468 }
1469 }
1470}