wasmer_wasix/fs/
inode_guard.rs

1use std::{
2    future::Future,
3    io::{IoSlice, SeekFrom},
4    ops::{Deref, DerefMut},
5    pin::Pin,
6    sync::{Arc, RwLock},
7    task::{Context, Poll},
8};
9
10use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
11use virtual_fs::{FsError, Pipe, PipeRx, PipeTx, VirtualFile};
12use wasmer_wasix_types::{
13    types::Eventtype,
14    wasi::{self, EpollType},
15    wasi::{Errno, EventFdReadwrite, Eventrwflags, Subscription},
16};
17
18use super::{InodeGuard, Kind, VirtualFileLock, notification::NotificationInner};
19use crate::{
20    net::socket::{InodeSocketInner, InodeSocketKind},
21    state::{PollEvent, PollEventSet, WasiState, iterate_poll_events},
22    syscalls::{EventResult, EventResultType, map_io_err},
23    utils::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard},
24};
25
26#[derive(Debug, Clone)]
27pub(crate) enum InodeValFilePollGuardMode {
28    File(VirtualFileLock),
29    EventNotifications(Arc<NotificationInner>),
30    Socket { inner: Arc<InodeSocketInner> },
31    PipeRx { rx: Arc<RwLock<Box<PipeRx>>> },
32    PipeTx { tx: Arc<RwLock<Box<PipeTx>>> },
33    DuplexPipe { pipe: Arc<RwLock<Box<Pipe>>> },
34}
35
36pub struct InodeValFilePollGuard {
37    pub(crate) fd: u32,
38    pub(crate) peb: PollEventSet,
39    pub(crate) subscription: Subscription,
40    pub(crate) mode: InodeValFilePollGuardMode,
41}
42
43impl InodeValFilePollGuard {
44    pub(crate) fn new(
45        fd: u32,
46        peb: PollEventSet,
47        subscription: Subscription,
48        guard: &Kind,
49    ) -> Option<Self> {
50        let mode = match guard {
51            Kind::EventNotifications { inner, .. } => {
52                InodeValFilePollGuardMode::EventNotifications(inner.clone())
53            }
54            Kind::Socket { socket, .. } => InodeValFilePollGuardMode::Socket {
55                inner: socket.inner.clone(),
56            },
57            Kind::File {
58                handle: Some(handle),
59                ..
60            } => InodeValFilePollGuardMode::File(handle.clone()),
61            Kind::PipeRx { rx } => InodeValFilePollGuardMode::PipeRx {
62                rx: Arc::new(RwLock::new(Box::new(rx.clone()))),
63            },
64            Kind::PipeTx { tx } => InodeValFilePollGuardMode::PipeTx {
65                tx: Arc::new(RwLock::new(Box::new(tx.clone()))),
66            },
67            Kind::DuplexPipe { pipe } => InodeValFilePollGuardMode::DuplexPipe {
68                pipe: Arc::new(RwLock::new(Box::new(pipe.clone()))),
69            },
70            _ => {
71                return None;
72            }
73        };
74        Some(Self {
75            fd,
76            mode,
77            peb,
78            subscription,
79        })
80    }
81}
82
83impl std::fmt::Debug for InodeValFilePollGuard {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        match &self.mode {
86            InodeValFilePollGuardMode::File(..) => {
87                write!(f, "guard-file(fd={}, peb={})", self.fd, self.peb)
88            }
89            InodeValFilePollGuardMode::EventNotifications { .. } => {
90                write!(f, "guard-notifications(fd={}, peb={})", self.fd, self.peb)
91            }
92            InodeValFilePollGuardMode::Socket { inner } => {
93                let inner = inner.protected.read().unwrap();
94                match &inner.kind {
95                    InodeSocketKind::TcpListener { .. } => {
96                        write!(f, "guard-tcp-listener(fd={}, peb={})", self.fd, self.peb)
97                    }
98                    InodeSocketKind::TcpStream { socket, .. } => {
99                        if socket.is_closed() {
100                            write!(
101                                f,
102                                "guard-tcp-stream (closed, fd={}, peb={})",
103                                self.fd, self.peb
104                            )
105                        } else {
106                            write!(f, "guard-tcp-stream(fd={}, peb={})", self.fd, self.peb)
107                        }
108                    }
109                    InodeSocketKind::UdpSocket { .. } => {
110                        write!(f, "guard-udp-socket(fd={}, peb={})", self.fd, self.peb)
111                    }
112                    InodeSocketKind::Raw(..) => {
113                        write!(f, "guard-raw-socket(fd={}, peb={})", self.fd, self.peb)
114                    }
115                    _ => write!(f, "guard-socket(fd={}), peb={})", self.fd, self.peb),
116                }
117            }
118            InodeValFilePollGuardMode::PipeRx { .. } => {
119                write!(f, "guard-pipe-rx(...)")
120            }
121            InodeValFilePollGuardMode::PipeTx { .. } => {
122                write!(f, "guard-pipe-tx(...)")
123            }
124            InodeValFilePollGuardMode::DuplexPipe { .. } => {
125                write!(f, "guard-duplex-pipe(...)")
126            }
127        }
128    }
129}
130
131#[derive(Debug)]
132pub struct InodeValFilePollGuardJoin {
133    mode: InodeValFilePollGuardMode,
134    fd: u32,
135    peb: PollEventSet,
136    subscription: Subscription,
137}
138
139impl InodeValFilePollGuardJoin {
140    pub(crate) fn new(guard: InodeValFilePollGuard) -> Self {
141        Self {
142            mode: guard.mode,
143            fd: guard.fd,
144            peb: guard.peb,
145            subscription: guard.subscription,
146        }
147    }
148    pub(crate) fn fd(&self) -> u32 {
149        self.fd
150    }
151    pub(crate) fn peb(&self) -> PollEventSet {
152        self.peb
153    }
154}
155
156pub const POLL_GUARD_MAX_RET: usize = 4;
157
158impl Future for InodeValFilePollGuardJoin {
159    type Output = heapless::Vec<(EventResult, EpollType), POLL_GUARD_MAX_RET>;
160
161    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
162        // Otherwise we need to register for the event
163        let waker = cx.waker();
164        let mut has_read = false;
165        let mut has_write = false;
166        let mut has_close = false;
167        let mut has_hangup = false;
168
169        let mut ret = heapless::Vec::new();
170        for in_event in iterate_poll_events(self.peb) {
171            match in_event {
172                PollEvent::PollIn => {
173                    has_read = true;
174                }
175                PollEvent::PollOut => {
176                    has_write = true;
177                }
178                PollEvent::PollHangUp => {
179                    has_hangup = true;
180                    has_close = true;
181                }
182                PollEvent::PollError | PollEvent::PollInvalid => {
183                    if !has_hangup {
184                        has_close = true;
185                    }
186                }
187            }
188        }
189        if has_read {
190            let poll_result = match &mut self.mode {
191                InodeValFilePollGuardMode::File(file) => {
192                    let mut guard = file.write().unwrap();
193                    let file = Pin::new(guard.as_mut());
194                    file.poll_read_ready(cx)
195                }
196                InodeValFilePollGuardMode::EventNotifications(inner) => inner.poll(waker).map(Ok),
197                InodeValFilePollGuardMode::Socket { inner } => {
198                    let mut guard = inner.protected.write().unwrap();
199                    guard.poll_read_ready(cx)
200                }
201                InodeValFilePollGuardMode::PipeRx { rx } => {
202                    let mut guard = rx.write().unwrap();
203                    let rx = Pin::new(guard.as_mut());
204                    rx.poll_read_ready(cx)
205                }
206                InodeValFilePollGuardMode::PipeTx { .. } => Poll::Ready(Err(std::io::Error::new(
207                    std::io::ErrorKind::InvalidInput,
208                    "Cannot read from a pipe write end",
209                ))),
210                InodeValFilePollGuardMode::DuplexPipe { pipe } => {
211                    let mut guard = pipe.write().unwrap();
212                    let pipe = Pin::new(guard.as_mut());
213                    pipe.poll_read_ready(cx)
214                }
215            };
216            match poll_result {
217                Poll::Ready(Err(err)) if has_close && is_err_closed(&err) => {
218                    let inner = match self.subscription.type_ {
219                        Eventtype::FdRead | Eventtype::FdWrite => {
220                            Some(EventResultType::Fd(EventFdReadwrite {
221                                nbytes: 0,
222                                flags: if has_hangup {
223                                    Eventrwflags::FD_READWRITE_HANGUP
224                                } else {
225                                    Eventrwflags::empty()
226                                },
227                            }))
228                        }
229                        Eventtype::Clock => Some(EventResultType::Clock(0)),
230                        _ => None,
231                    };
232                    if let Some(inner) = inner {
233                        ret.push((
234                            EventResult {
235                                userdata: self.subscription.userdata,
236                                error: Errno::Success,
237                                type_: self.subscription.type_,
238                                inner,
239                            },
240                            EpollType::EPOLLHUP,
241                        ))
242                        .ok();
243                    }
244                }
245                Poll::Ready(bytes_available) => {
246                    let mut error = Errno::Success;
247                    let bytes_available = match bytes_available {
248                        Ok(a) => a,
249                        Err(e) => {
250                            error = map_io_err(e);
251                            0
252                        }
253                    };
254                    let inner = match self.subscription.type_ {
255                        Eventtype::FdRead | Eventtype::FdWrite => {
256                            Some(EventResultType::Fd(EventFdReadwrite {
257                                nbytes: bytes_available as u64,
258                                flags: if bytes_available == 0 {
259                                    Eventrwflags::FD_READWRITE_HANGUP
260                                } else {
261                                    Eventrwflags::empty()
262                                },
263                            }))
264                        }
265                        Eventtype::Clock => Some(EventResultType::Clock(0)),
266                        _ => None,
267                    };
268                    if let Some(inner) = inner {
269                        ret.push((
270                            EventResult {
271                                userdata: self.subscription.userdata,
272                                error,
273                                type_: self.subscription.type_,
274                                inner,
275                            },
276                            if error == Errno::Success {
277                                EpollType::EPOLLIN
278                            } else {
279                                EpollType::EPOLLERR
280                            },
281                        ))
282                        .ok();
283                    }
284                }
285                Poll::Pending => {}
286            };
287        }
288        if has_write {
289            let poll_result = match &mut self.mode {
290                InodeValFilePollGuardMode::File(file) => {
291                    let mut guard = file.write().unwrap();
292                    let file = Pin::new(guard.as_mut());
293                    file.poll_write_ready(cx)
294                }
295                InodeValFilePollGuardMode::EventNotifications(inner) => inner.poll(waker).map(Ok),
296                InodeValFilePollGuardMode::Socket { inner } => {
297                    let mut guard = inner.protected.write().unwrap();
298                    guard.poll_write_ready(cx)
299                }
300                InodeValFilePollGuardMode::PipeRx { .. } => Poll::Ready(Err(std::io::Error::new(
301                    std::io::ErrorKind::InvalidInput,
302                    "Cannot write to a pipe read end",
303                ))),
304                InodeValFilePollGuardMode::PipeTx { tx } => {
305                    let mut guard = tx.write().unwrap();
306                    let tx = Pin::new(guard.as_mut());
307                    tx.poll_write_ready()
308                }
309                InodeValFilePollGuardMode::DuplexPipe { pipe } => {
310                    let mut guard = pipe.write().unwrap();
311                    let pipe = Pin::new(guard.as_mut());
312                    pipe.poll_write_ready(cx)
313                }
314            };
315            match poll_result {
316                Poll::Ready(Err(err)) if has_close && is_err_closed(&err) => {
317                    let inner = match self.subscription.type_ {
318                        Eventtype::FdRead | Eventtype::FdWrite => {
319                            Some(EventResultType::Fd(EventFdReadwrite {
320                                nbytes: 0,
321                                flags: if has_hangup {
322                                    Eventrwflags::FD_READWRITE_HANGUP
323                                } else {
324                                    Eventrwflags::empty()
325                                },
326                            }))
327                        }
328                        Eventtype::Clock => Some(EventResultType::Clock(0)),
329                        _ => None,
330                    };
331                    if let Some(inner) = inner {
332                        ret.push((
333                            EventResult {
334                                userdata: self.subscription.userdata,
335                                error: Errno::Success,
336                                type_: self.subscription.type_,
337                                inner,
338                            },
339                            EpollType::EPOLLHUP,
340                        ))
341                        .ok();
342                    }
343                }
344                Poll::Ready(bytes_available) => {
345                    let mut error = Errno::Success;
346                    let bytes_available = match bytes_available {
347                        Ok(a) => a,
348                        Err(e) => {
349                            error = map_io_err(e);
350                            0
351                        }
352                    };
353                    let inner = match self.subscription.type_ {
354                        Eventtype::FdRead | Eventtype::FdWrite => {
355                            Some(EventResultType::Fd(EventFdReadwrite {
356                                nbytes: bytes_available as u64,
357                                flags: if bytes_available == 0 {
358                                    Eventrwflags::FD_READWRITE_HANGUP
359                                } else {
360                                    Eventrwflags::empty()
361                                },
362                            }))
363                        }
364                        Eventtype::Clock => Some(EventResultType::Clock(0)),
365                        _ => None,
366                    };
367                    if let Some(inner) = inner {
368                        ret.push((
369                            EventResult {
370                                userdata: self.subscription.userdata,
371                                error,
372                                type_: self.subscription.type_,
373                                inner,
374                            },
375                            if error == Errno::Success {
376                                EpollType::EPOLLOUT
377                            } else {
378                                EpollType::EPOLLERR
379                            },
380                        ))
381                        .ok();
382                    }
383                }
384                Poll::Pending => {}
385            };
386        }
387        if !ret.is_empty() {
388            return Poll::Ready(ret);
389        }
390        Poll::Pending
391    }
392}
393
394#[derive(Debug)]
395pub(crate) struct InodeValFileReadGuard {
396    guard: OwnedRwLockReadGuard<Box<dyn VirtualFile + Send + Sync + 'static>>,
397}
398
399impl InodeValFileReadGuard {
400    pub(crate) fn new(file: &VirtualFileLock) -> Self {
401        Self {
402            guard: crate::utils::read_owned(file).unwrap(),
403        }
404    }
405}
406
407impl Deref for InodeValFileReadGuard {
408    type Target = dyn VirtualFile + Send + Sync + 'static;
409    fn deref(&self) -> &Self::Target {
410        self.guard.deref().deref()
411    }
412}
413
414#[derive(Debug)]
415pub struct InodeValFileWriteGuard {
416    guard: OwnedRwLockWriteGuard<Box<dyn VirtualFile + Send + Sync + 'static>>,
417}
418
419impl InodeValFileWriteGuard {
420    pub(crate) fn new(file: &VirtualFileLock) -> Self {
421        Self {
422            guard: crate::utils::write_owned(file).unwrap(),
423        }
424    }
425    pub(crate) fn swap(
426        &mut self,
427        mut file: Box<dyn VirtualFile + Send + Sync + 'static>,
428    ) -> Box<dyn VirtualFile + Send + Sync + 'static> {
429        std::mem::swap(self.guard.deref_mut(), &mut file);
430        file
431    }
432}
433
434impl Deref for InodeValFileWriteGuard {
435    type Target = dyn VirtualFile + Send + Sync + 'static;
436    fn deref(&self) -> &Self::Target {
437        self.guard.deref().deref()
438    }
439}
440impl DerefMut for InodeValFileWriteGuard {
441    fn deref_mut(&mut self) -> &mut Self::Target {
442        self.guard.deref_mut().deref_mut()
443    }
444}
445
446#[derive(Debug)]
447pub(crate) struct WasiStateFileGuard {
448    inode: InodeGuard,
449}
450
451impl WasiStateFileGuard {
452    pub fn new(state: &WasiState, fd: wasi::Fd) -> Result<Option<Self>, FsError> {
453        let fd_map = state.fs.fd_map.read().unwrap();
454        if let Some(fd) = fd_map.get(fd) {
455            Ok(Some(Self {
456                inode: fd.inode.clone(),
457            }))
458        } else {
459            Ok(None)
460        }
461    }
462
463    pub fn lock_read(&self) -> Option<InodeValFileReadGuard> {
464        let guard = self.inode.read();
465        if let Kind::File { handle, .. } = guard.deref() {
466            handle.as_ref().map(InodeValFileReadGuard::new)
467        } else {
468            // Our public API should ensure that this is not possible
469            unreachable!("Non-file found in standard device location")
470        }
471    }
472
473    pub fn lock_write(&self) -> Option<InodeValFileWriteGuard> {
474        let guard = self.inode.read();
475        if let Kind::File { handle, .. } = guard.deref() {
476            handle.as_ref().map(InodeValFileWriteGuard::new)
477        } else {
478            // Our public API should ensure that this is not possible
479            unreachable!("Non-file found in standard device location")
480        }
481    }
482}
483
484impl VirtualFile for WasiStateFileGuard {
485    fn last_accessed(&self) -> u64 {
486        let guard = self.lock_read();
487        if let Some(file) = guard.as_ref() {
488            file.last_accessed()
489        } else {
490            0
491        }
492    }
493
494    fn last_modified(&self) -> u64 {
495        let guard = self.lock_read();
496        if let Some(file) = guard.as_ref() {
497            file.last_modified()
498        } else {
499            0
500        }
501    }
502
503    fn created_time(&self) -> u64 {
504        let guard = self.lock_read();
505        if let Some(file) = guard.as_ref() {
506            file.created_time()
507        } else {
508            0
509        }
510    }
511
512    fn set_times(
513        &mut self,
514        atime: Option<u64>,
515        mtime: Option<u64>,
516    ) -> Result<(), virtual_fs::FsError> {
517        let mut guard = self.lock_write();
518        if let Some(file) = guard.as_mut() {
519            file.set_times(atime, mtime)
520        } else {
521            Err(crate::FsError::Lock)
522        }
523    }
524
525    fn size(&self) -> u64 {
526        let guard = self.lock_read();
527        if let Some(file) = guard.as_ref() {
528            file.size()
529        } else {
530            0
531        }
532    }
533
534    fn set_len(&mut self, new_size: u64) -> Result<(), FsError> {
535        let mut guard = self.lock_write();
536        if let Some(file) = guard.as_mut() {
537            file.set_len(new_size)
538        } else {
539            Err(FsError::IOError)
540        }
541    }
542
543    fn unlink(&mut self) -> Result<(), FsError> {
544        let mut guard = self.lock_write();
545        if let Some(file) = guard.as_mut() {
546            file.unlink()
547        } else {
548            Err(FsError::IOError)
549        }
550    }
551
552    fn is_open(&self) -> bool {
553        let guard = self.lock_read();
554        if let Some(file) = guard.as_ref() {
555            file.is_open()
556        } else {
557            false
558        }
559    }
560
561    fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<usize>> {
562        let mut guard = self.lock_write();
563        if let Some(file) = guard.as_mut() {
564            let file = Pin::new(file.deref_mut());
565            file.poll_read_ready(cx)
566        } else {
567            Poll::Ready(Ok(0))
568        }
569    }
570
571    fn poll_write_ready(
572        self: Pin<&mut Self>,
573        cx: &mut Context<'_>,
574    ) -> Poll<std::io::Result<usize>> {
575        let mut guard = self.lock_write();
576        if let Some(file) = guard.as_mut() {
577            let file = Pin::new(file.deref_mut());
578            file.poll_write_ready(cx)
579        } else {
580            Poll::Ready(Ok(0))
581        }
582    }
583}
584
585impl AsyncSeek for WasiStateFileGuard {
586    fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
587        let mut guard = self.lock_write();
588        if let Some(guard) = guard.as_mut() {
589            let file = Pin::new(guard.deref_mut());
590            file.start_seek(position)
591        } else {
592            Err(std::io::ErrorKind::Unsupported.into())
593        }
594    }
595    fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
596        let mut guard = self.lock_write();
597        if let Some(guard) = guard.as_mut() {
598            let file = Pin::new(guard.deref_mut());
599            file.poll_complete(cx)
600        } else {
601            Poll::Ready(Err(std::io::ErrorKind::Unsupported.into()))
602        }
603    }
604}
605
606impl AsyncWrite for WasiStateFileGuard {
607    fn poll_write(
608        self: Pin<&mut Self>,
609        cx: &mut Context<'_>,
610        buf: &[u8],
611    ) -> Poll<std::io::Result<usize>> {
612        let mut guard = self.lock_write();
613        if let Some(guard) = guard.as_mut() {
614            let file = Pin::new(guard.deref_mut());
615            file.poll_write(cx, buf)
616        } else {
617            Poll::Ready(Err(std::io::ErrorKind::Unsupported.into()))
618        }
619    }
620    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
621        let mut guard = self.lock_write();
622        if let Some(guard) = guard.as_mut() {
623            let file = Pin::new(guard.deref_mut());
624            file.poll_flush(cx)
625        } else {
626            Poll::Ready(Err(std::io::ErrorKind::Unsupported.into()))
627        }
628    }
629    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
630        let mut guard = self.lock_write();
631        if let Some(guard) = guard.as_mut() {
632            let file = Pin::new(guard.deref_mut());
633            file.poll_shutdown(cx)
634        } else {
635            Poll::Ready(Err(std::io::ErrorKind::Unsupported.into()))
636        }
637    }
638    fn poll_write_vectored(
639        self: Pin<&mut Self>,
640        cx: &mut Context<'_>,
641        bufs: &[IoSlice<'_>],
642    ) -> Poll<std::io::Result<usize>> {
643        let mut guard = self.lock_write();
644        if let Some(guard) = guard.as_mut() {
645            let file = Pin::new(guard.deref_mut());
646            file.poll_write_vectored(cx, bufs)
647        } else {
648            Poll::Ready(Err(std::io::ErrorKind::Unsupported.into()))
649        }
650    }
651    fn is_write_vectored(&self) -> bool {
652        let mut guard = self.lock_write();
653        if let Some(guard) = guard.as_mut() {
654            let file = Pin::new(guard.deref_mut());
655            file.is_write_vectored()
656        } else {
657            false
658        }
659    }
660}
661
662impl AsyncRead for WasiStateFileGuard {
663    fn poll_read(
664        self: Pin<&mut Self>,
665        cx: &mut Context<'_>,
666        buf: &mut tokio::io::ReadBuf<'_>,
667    ) -> Poll<std::io::Result<()>> {
668        let mut guard = self.lock_write();
669        if let Some(guard) = guard.as_mut() {
670            let file = Pin::new(guard.deref_mut());
671            file.poll_read(cx, buf)
672        } else {
673            Poll::Ready(Err(std::io::ErrorKind::Unsupported.into()))
674        }
675    }
676}
677
678fn is_err_closed(err: &std::io::Error) -> bool {
679    err.kind() == std::io::ErrorKind::ConnectionAborted
680        || err.kind() == std::io::ErrorKind::ConnectionRefused
681        || err.kind() == std::io::ErrorKind::ConnectionReset
682        || err.kind() == std::io::ErrorKind::BrokenPipe
683        || err.kind() == std::io::ErrorKind::NotConnected
684        || err.kind() == std::io::ErrorKind::UnexpectedEof
685}