virtual_fs/
pipe.rs

1use bytes::{Buf, Bytes};
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::Mutex;
5use std::task::Context;
6use std::task::Poll;
7use std::{io::IoSlice, sync::MutexGuard};
8use std::{
9    io::{self, Read, Seek, SeekFrom},
10    sync::Weak,
11};
12use tokio::sync::mpsc;
13use tokio::{
14    io::{AsyncRead, AsyncSeek, AsyncWrite},
15    sync::mpsc::error::TryRecvError,
16};
17use virtual_mio::{InterestHandler, InterestType};
18
19use crate::{ArcFile, FsError, VirtualFile};
20
21// Each pipe end is separately cloneable. The overall pipe
22// remains open as long as at least one tx end and one rx
23// end are still alive.
24// As such, closing a pipe isn't a well-defined operation,
25// since more references to the ends may still be alive.
26#[derive(Debug, Clone)]
27pub struct Pipe {
28    /// Transmit side of the pipe
29    send: PipeTx,
30    /// Receive side of the pipe
31    recv: PipeRx,
32}
33
34#[derive(Debug, Clone)]
35pub struct PipeTx {
36    /// Sends bytes down the pipe
37    tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
38    rx_end: Weak<Mutex<PipeReceiver>>,
39}
40
41#[derive(Debug, Clone)]
42pub struct PipeRx {
43    /// Receives bytes from the pipe
44    /// Also, buffers the last read message from the pipe while its being consumed
45    rx: Option<Arc<Mutex<PipeReceiver>>>,
46}
47
48impl PipeRx {
49    // Tries to read from the internal buffer if data is available.
50    fn try_read_from_buffer(
51        rx: &mut MutexGuard<'_, PipeReceiver>,
52        max_len: usize,
53        // Should return how much actual data was read from the provided slice
54        write: impl FnOnce(&[u8]) -> Option<usize>,
55    ) -> Option<usize> {
56        rx.buffer.as_mut().and_then(|read_buffer| {
57            let buf_len = read_buffer.len();
58            if buf_len > 0 {
59                let mut read = buf_len.min(max_len);
60                let inner_buf = &read_buffer[..read];
61                // read = ?;
62                read = write(inner_buf)?;
63                read_buffer.advance(read);
64                Some(read)
65            } else {
66                None
67            }
68        })
69    }
70
71    pub fn close(&mut self) {
72        _ = self.rx.take();
73    }
74
75    pub fn try_read(&mut self, buf: &mut [u8]) -> Option<usize> {
76        let Some(ref mut rx) = self.rx else {
77            return Some(0);
78        };
79
80        let mut rx = rx.lock().unwrap();
81        loop {
82            if let Some(read) = Self::try_read_from_buffer(&mut rx, buf.len(), |mut read_buf| {
83                Read::read(&mut read_buf, buf).ok()
84            }) {
85                return Some(read);
86            };
87
88            let data = {
89                match rx.chan.try_recv() {
90                    Ok(a) => a,
91                    Err(TryRecvError::Empty) => {
92                        return None;
93                    }
94                    Err(TryRecvError::Disconnected) => {
95                        return Some(0);
96                    }
97                }
98            };
99            rx.buffer.replace(Bytes::from(data));
100        }
101    }
102
103    pub fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
104        let Some(ref rx) = self.rx else {
105            return Poll::Ready(Err(std::io::Error::new(
106                std::io::ErrorKind::BrokenPipe,
107                "PipeRx is closed",
108            )));
109        };
110
111        let mut rx = rx.lock().unwrap();
112        loop {
113            {
114                if let Some(inner_buf) = rx.buffer.as_mut() {
115                    let buf_len = inner_buf.len();
116                    if buf_len > 0 {
117                        return Poll::Ready(Ok(buf_len));
118                    }
119                }
120            }
121
122            let mut pinned_rx = Pin::new(&mut rx.chan);
123            let data = match pinned_rx.poll_recv(cx) {
124                Poll::Ready(Some(a)) => a,
125                Poll::Ready(None) => return Poll::Ready(Ok(0)),
126                Poll::Pending => return Poll::Pending,
127            };
128
129            rx.buffer.replace(Bytes::from(data));
130        }
131    }
132
133    pub fn set_interest_handler(&self, interest_handler: Box<dyn InterestHandler>) {
134        let Some(ref rx) = self.rx else {
135            return;
136        };
137        let mut rx = rx.lock().unwrap();
138        rx.interest_handler.replace(interest_handler);
139    }
140
141    pub fn remove_interest_handler(&self) -> Option<Box<dyn InterestHandler>> {
142        let rx = self.rx.as_ref()?;
143        let mut rx = rx.lock().unwrap();
144        rx.interest_handler.take()
145    }
146}
147
148#[derive(Debug)]
149struct PipeReceiver {
150    // Note: Since we need to store the buffer alongside the
151    // actual receiver, we can't make use of an mpmc channel
152    chan: mpsc::UnboundedReceiver<Vec<u8>>,
153    buffer: Option<Bytes>,
154    interest_handler: Option<Box<dyn InterestHandler>>,
155}
156
157impl Pipe {
158    pub fn new() -> Self {
159        let (tx, rx) = mpsc::unbounded_channel();
160
161        let recv = Arc::new(Mutex::new(PipeReceiver {
162            chan: rx,
163            buffer: None,
164            interest_handler: None,
165        }));
166        Pipe {
167            send: PipeTx {
168                tx: Some(tx),
169                rx_end: Arc::downgrade(&recv),
170            },
171            recv: PipeRx { rx: Some(recv) },
172        }
173    }
174
175    pub fn channel() -> (Pipe, Pipe) {
176        let (tx1, rx1) = Pipe::new().split();
177        let (tx2, rx2) = Pipe::new().split();
178
179        let end1 = Pipe::combine(tx1, rx2);
180        let end2 = Pipe::combine(tx2, rx1);
181        (end1, end2)
182    }
183
184    pub fn split(self) -> (PipeTx, PipeRx) {
185        (self.send, self.recv)
186    }
187
188    pub fn combine(tx: PipeTx, rx: PipeRx) -> Self {
189        Self { send: tx, recv: rx }
190    }
191
192    pub fn try_read(&mut self, buf: &mut [u8]) -> Option<usize> {
193        self.recv.try_read(buf)
194    }
195
196    pub fn close(&mut self) {
197        self.send.close();
198        self.recv.close();
199    }
200
201    pub fn set_interest_handler(&self, interest_handler: Box<dyn InterestHandler>) {
202        self.recv.set_interest_handler(interest_handler);
203    }
204
205    pub fn remove_interest_handler(&self) -> Option<Box<dyn InterestHandler>> {
206        self.recv.remove_interest_handler()
207    }
208}
209
210impl Default for Pipe {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216impl PipeTx {
217    pub fn close(&mut self) {
218        _ = self.tx.take();
219    }
220
221    pub fn poll_write_ready(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
222        let Some(ref tx) = self.tx else {
223            return Poll::Ready(Err(std::io::Error::new(
224                std::io::ErrorKind::BrokenPipe,
225                "PipeTx is closed",
226            )));
227        };
228
229        if tx.is_closed() {
230            Poll::Ready(Ok(0))
231        } else {
232            Poll::Ready(Ok(8192))
233        }
234    }
235
236    fn mark_other_end_readable(&self) {
237        if let Some(rx_end) = self.rx_end.upgrade() {
238            let mut guard = rx_end.lock().unwrap();
239            if let Some(interest_handler) = guard.interest_handler.as_mut() {
240                interest_handler.push_interest(InterestType::Readable);
241            }
242        }
243    }
244}
245
246impl Seek for Pipe {
247    fn seek(&mut self, from: SeekFrom) -> io::Result<u64> {
248        self.recv.seek(from)
249    }
250}
251
252impl Seek for PipeRx {
253    fn seek(&mut self, _: SeekFrom) -> io::Result<u64> {
254        Ok(0)
255    }
256}
257
258impl Seek for PipeTx {
259    fn seek(&mut self, _: SeekFrom) -> io::Result<u64> {
260        Ok(0)
261    }
262}
263
264impl Read for Pipe {
265    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
266        self.recv.read(buf)
267    }
268}
269
270impl Read for PipeRx {
271    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
272        let Some(ref mut rx) = self.rx else {
273            return Err(std::io::Error::new(
274                std::io::ErrorKind::BrokenPipe,
275                "PipeRx is closed",
276            ));
277        };
278
279        let mut rx = rx.lock().unwrap();
280        loop {
281            if let Some(read) = Self::try_read_from_buffer(&mut rx, buf.len(), |mut read_buf| {
282                Read::read(&mut read_buf, buf).ok()
283            }) {
284                return Ok(read);
285            }
286
287            let data = {
288                match rx.chan.blocking_recv() {
289                    Some(a) => a,
290                    None => {
291                        return Ok(0);
292                    }
293                }
294            };
295            rx.buffer.replace(Bytes::from(data));
296        }
297    }
298}
299
300impl std::io::Write for Pipe {
301    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
302        self.send.write(buf)
303    }
304
305    fn flush(&mut self) -> std::io::Result<()> {
306        self.send.flush()
307    }
308
309    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
310        self.send.write_all(buf)
311    }
312
313    fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> io::Result<()> {
314        self.send.write_fmt(fmt)
315    }
316
317    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
318        self.send.write_vectored(bufs)
319    }
320}
321
322impl std::io::Write for PipeTx {
323    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
324        let Some(ref tx) = self.tx else {
325            return Err(std::io::Error::new(
326                std::io::ErrorKind::BrokenPipe,
327                "PipeTx is closed",
328            ));
329        };
330
331        tx.send(buf.to_vec())
332            .map_err(|_| Into::<std::io::Error>::into(std::io::ErrorKind::BrokenPipe))?;
333        self.mark_other_end_readable();
334        Ok(buf.len())
335    }
336
337    fn flush(&mut self) -> std::io::Result<()> {
338        Ok(())
339    }
340}
341
342impl AsyncSeek for Pipe {
343    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
344        let this = Pin::new(&mut self.recv);
345        this.start_seek(position)
346    }
347
348    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
349        let this = Pin::new(&mut self.recv);
350        this.poll_complete(cx)
351    }
352}
353
354impl AsyncSeek for PipeRx {
355    fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> io::Result<()> {
356        Ok(())
357    }
358    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
359        Poll::Ready(Ok(0))
360    }
361}
362
363impl AsyncSeek for PipeTx {
364    fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> io::Result<()> {
365        Ok(())
366    }
367    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
368        Poll::Ready(Ok(0))
369    }
370}
371
372impl AsyncWrite for Pipe {
373    fn poll_write(
374        mut self: Pin<&mut Self>,
375        cx: &mut Context<'_>,
376        buf: &[u8],
377    ) -> Poll<io::Result<usize>> {
378        let this = Pin::new(&mut self.send);
379        this.poll_write(cx, buf)
380    }
381
382    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
383        let this = Pin::new(&mut self.send);
384        this.poll_flush(cx)
385    }
386
387    fn poll_shutdown(
388        mut self: Pin<&mut Self>,
389        cx: &mut Context<'_>,
390    ) -> Poll<Result<(), io::Error>> {
391        let this = Pin::new(&mut self.send);
392        this.poll_shutdown(cx)
393    }
394
395    fn poll_write_vectored(
396        mut self: Pin<&mut Self>,
397        cx: &mut Context<'_>,
398        bufs: &[IoSlice<'_>],
399    ) -> Poll<Result<usize, io::Error>> {
400        let this = Pin::new(&mut self.send);
401        this.poll_write_vectored(cx, bufs)
402    }
403}
404
405impl AsyncWrite for PipeTx {
406    fn poll_write(
407        self: Pin<&mut Self>,
408        _cx: &mut Context<'_>,
409        buf: &[u8],
410    ) -> Poll<io::Result<usize>> {
411        let Some(ref tx) = self.tx else {
412            return Poll::Ready(Err(std::io::Error::new(
413                std::io::ErrorKind::BrokenPipe,
414                "PipeTx is closed",
415            )));
416        };
417
418        match tx.send(buf.to_vec()) {
419            Ok(()) => {
420                self.mark_other_end_readable();
421                Poll::Ready(Ok(buf.len()))
422            }
423            Err(_) => Poll::Ready(Err(Into::<std::io::Error>::into(
424                std::io::ErrorKind::BrokenPipe,
425            ))),
426        }
427    }
428
429    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
430        Poll::Ready(Ok(()))
431    }
432
433    fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
434        self.close();
435        Poll::Ready(Ok(()))
436    }
437}
438
439impl AsyncRead for Pipe {
440    fn poll_read(
441        mut self: Pin<&mut Self>,
442        cx: &mut Context<'_>,
443        buf: &mut tokio::io::ReadBuf<'_>,
444    ) -> Poll<io::Result<()>> {
445        let this = Pin::new(&mut self.recv);
446        this.poll_read(cx, buf)
447    }
448}
449
450impl AsyncRead for PipeRx {
451    fn poll_read(
452        mut self: Pin<&mut Self>,
453        cx: &mut Context<'_>,
454        buf: &mut tokio::io::ReadBuf<'_>,
455    ) -> Poll<io::Result<()>> {
456        let Some(ref mut rx) = self.rx else {
457            return Poll::Ready(Err(std::io::Error::new(
458                std::io::ErrorKind::BrokenPipe,
459                "PipeRx is closed",
460            )));
461        };
462
463        let mut rx = rx.lock().unwrap();
464        loop {
465            if Self::try_read_from_buffer(&mut rx, buf.remaining(), |read_buf| {
466                buf.put_slice(read_buf);
467                Some(read_buf.len())
468            })
469            .is_some()
470            {
471                return Poll::Ready(Ok(()));
472            }
473
474            let data = match rx.chan.poll_recv(cx) {
475                Poll::Ready(Some(a)) => a,
476                Poll::Ready(None) => return Poll::Ready(Ok(())),
477                Poll::Pending => return Poll::Pending,
478            };
479
480            rx.buffer.replace(Bytes::from(data));
481        }
482    }
483}
484
485impl VirtualFile for Pipe {
486    /// the last time the file was accessed in nanoseconds as a UNIX timestamp
487    fn last_accessed(&self) -> u64 {
488        0
489    }
490
491    /// the last time the file was modified in nanoseconds as a UNIX timestamp
492    fn last_modified(&self) -> u64 {
493        0
494    }
495
496    /// the time at which the file was created in nanoseconds as a UNIX timestamp
497    fn created_time(&self) -> u64 {
498        0
499    }
500
501    /// the size of the file in bytes
502    fn size(&self) -> u64 {
503        0
504    }
505
506    /// Change the size of the file, if the `new_size` is greater than the current size
507    /// the extra bytes will be allocated and zeroed
508    fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
509        Ok(())
510    }
511
512    /// Request deletion of the file
513    fn unlink(&mut self) -> Result<(), FsError> {
514        Ok(())
515    }
516
517    /// Indicates if the file is opened or closed. This function must not block
518    /// Defaults to a status of being constantly open
519    fn is_open(&self) -> bool {
520        self.send
521            .tx
522            .as_ref()
523            .map(|a| !a.is_closed())
524            .unwrap_or_else(|| false)
525    }
526
527    /// Polls the file for when there is data to be read
528    fn poll_read_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
529        Pin::new(&mut self.recv).poll_read_ready(cx)
530    }
531
532    /// Polls the file for when it is available for writing
533    fn poll_write_ready(
534        mut self: Pin<&mut Self>,
535        _cx: &mut Context<'_>,
536    ) -> Poll<io::Result<usize>> {
537        Pin::new(&mut self.send).poll_write_ready()
538    }
539}
540
541/// A pair of pipes that are connected together.
542#[derive(Clone, Debug)]
543pub struct DuplexPipe {
544    front: Pipe,
545    back: Pipe,
546}
547
548impl DuplexPipe {
549    /// Get the sender pipe.
550    pub fn front(&self) -> &Pipe {
551        &self.front
552    }
553
554    /// Get the receiver pipe.
555    pub fn back(&self) -> &Pipe {
556        &self.back
557    }
558
559    /// Get the mutable sender pipe.
560    pub fn front_mut(&mut self) -> &mut Pipe {
561        &mut self.front
562    }
563
564    /// Get the receiver pipe.
565    pub fn back_mut(&mut self) -> &mut Pipe {
566        &mut self.back
567    }
568
569    /// Split into two pipes that are connected to each other
570    pub fn split(self) -> (Pipe, Pipe) {
571        (self.front, self.back)
572    }
573
574    /// Combines two ends of a duplex pipe back together again
575    pub fn combine(front: Pipe, back: Pipe) -> Self {
576        Self { front, back }
577    }
578
579    pub fn reverse(self) -> Self {
580        let (front, back) = self.split();
581        Self::combine(back, front)
582    }
583}
584
585impl Default for DuplexPipe {
586    fn default() -> Self {
587        Self::new()
588    }
589}
590
591impl DuplexPipe {
592    pub fn new() -> DuplexPipe {
593        let (end1, end2) = Pipe::channel();
594        Self {
595            front: end1,
596            back: end2,
597        }
598    }
599}
600
601/// Shared version of BidiPipe for situations where you need
602/// to emulate the old behaviour of `Pipe` (both send and recv on one channel).
603pub type WasiBidirectionalSharedPipePair = ArcFile<DuplexPipe>;