1use bytes::{Buf, Bytes};
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::Mutex;
5use std::task::Context;
6use std::task::Poll;
7use std::{io::IoSlice, sync::MutexGuard};
8use std::{
9 io::{self, Read, Seek, SeekFrom},
10 sync::Weak,
11};
12use tokio::sync::mpsc;
13use tokio::{
14 io::{AsyncRead, AsyncSeek, AsyncWrite},
15 sync::mpsc::error::TryRecvError,
16};
17use virtual_mio::{InterestHandler, InterestType};
18
19use crate::{ArcFile, FsError, VirtualFile};
20
21#[derive(Debug, Clone)]
27pub struct Pipe {
28 send: PipeTx,
30 recv: PipeRx,
32}
33
34#[derive(Debug, Clone)]
35pub struct PipeTx {
36 tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
38 rx_end: Weak<Mutex<PipeReceiver>>,
39}
40
41#[derive(Debug, Clone)]
42pub struct PipeRx {
43 rx: Option<Arc<Mutex<PipeReceiver>>>,
46}
47
48impl PipeRx {
49 fn try_read_from_buffer(
51 rx: &mut MutexGuard<'_, PipeReceiver>,
52 max_len: usize,
53 write: impl FnOnce(&[u8]) -> Option<usize>,
55 ) -> Option<usize> {
56 rx.buffer.as_mut().and_then(|read_buffer| {
57 let buf_len = read_buffer.len();
58 if buf_len > 0 {
59 let mut read = buf_len.min(max_len);
60 let inner_buf = &read_buffer[..read];
61 read = write(inner_buf)?;
63 read_buffer.advance(read);
64 Some(read)
65 } else {
66 None
67 }
68 })
69 }
70
71 pub fn close(&mut self) {
72 _ = self.rx.take();
73 }
74
75 pub fn try_read(&mut self, buf: &mut [u8]) -> Option<usize> {
76 let Some(ref mut rx) = self.rx else {
77 return Some(0);
78 };
79
80 let mut rx = rx.lock().unwrap();
81 loop {
82 if let Some(read) = Self::try_read_from_buffer(&mut rx, buf.len(), |mut read_buf| {
83 Read::read(&mut read_buf, buf).ok()
84 }) {
85 return Some(read);
86 };
87
88 let data = {
89 match rx.chan.try_recv() {
90 Ok(a) => a,
91 Err(TryRecvError::Empty) => {
92 return None;
93 }
94 Err(TryRecvError::Disconnected) => {
95 return Some(0);
96 }
97 }
98 };
99 rx.buffer.replace(Bytes::from(data));
100 }
101 }
102
103 pub fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
104 let Some(ref rx) = self.rx else {
105 return Poll::Ready(Err(std::io::Error::new(
106 std::io::ErrorKind::BrokenPipe,
107 "PipeRx is closed",
108 )));
109 };
110
111 let mut rx = rx.lock().unwrap();
112 loop {
113 {
114 if let Some(inner_buf) = rx.buffer.as_mut() {
115 let buf_len = inner_buf.len();
116 if buf_len > 0 {
117 return Poll::Ready(Ok(buf_len));
118 }
119 }
120 }
121
122 let mut pinned_rx = Pin::new(&mut rx.chan);
123 let data = match pinned_rx.poll_recv(cx) {
124 Poll::Ready(Some(a)) => a,
125 Poll::Ready(None) => return Poll::Ready(Ok(0)),
126 Poll::Pending => return Poll::Pending,
127 };
128
129 rx.buffer.replace(Bytes::from(data));
130 }
131 }
132
133 pub fn set_interest_handler(&self, interest_handler: Box<dyn InterestHandler>) {
134 let Some(ref rx) = self.rx else {
135 return;
136 };
137 let mut rx = rx.lock().unwrap();
138 rx.interest_handler.replace(interest_handler);
139 }
140
141 pub fn remove_interest_handler(&self) -> Option<Box<dyn InterestHandler>> {
142 let rx = self.rx.as_ref()?;
143 let mut rx = rx.lock().unwrap();
144 rx.interest_handler.take()
145 }
146}
147
148#[derive(Debug)]
149struct PipeReceiver {
150 chan: mpsc::UnboundedReceiver<Vec<u8>>,
153 buffer: Option<Bytes>,
154 interest_handler: Option<Box<dyn InterestHandler>>,
155}
156
157impl Pipe {
158 pub fn new() -> Self {
159 let (tx, rx) = mpsc::unbounded_channel();
160
161 let recv = Arc::new(Mutex::new(PipeReceiver {
162 chan: rx,
163 buffer: None,
164 interest_handler: None,
165 }));
166 Pipe {
167 send: PipeTx {
168 tx: Some(tx),
169 rx_end: Arc::downgrade(&recv),
170 },
171 recv: PipeRx { rx: Some(recv) },
172 }
173 }
174
175 pub fn channel() -> (Pipe, Pipe) {
176 let (tx1, rx1) = Pipe::new().split();
177 let (tx2, rx2) = Pipe::new().split();
178
179 let end1 = Pipe::combine(tx1, rx2);
180 let end2 = Pipe::combine(tx2, rx1);
181 (end1, end2)
182 }
183
184 pub fn split(self) -> (PipeTx, PipeRx) {
185 (self.send, self.recv)
186 }
187
188 pub fn combine(tx: PipeTx, rx: PipeRx) -> Self {
189 Self { send: tx, recv: rx }
190 }
191
192 pub fn try_read(&mut self, buf: &mut [u8]) -> Option<usize> {
193 self.recv.try_read(buf)
194 }
195
196 pub fn close(&mut self) {
197 self.send.close();
198 self.recv.close();
199 }
200
201 pub fn set_interest_handler(&self, interest_handler: Box<dyn InterestHandler>) {
202 self.recv.set_interest_handler(interest_handler);
203 }
204
205 pub fn remove_interest_handler(&self) -> Option<Box<dyn InterestHandler>> {
206 self.recv.remove_interest_handler()
207 }
208}
209
210impl Default for Pipe {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216impl PipeTx {
217 pub fn close(&mut self) {
218 _ = self.tx.take();
219 }
220
221 pub fn poll_write_ready(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
222 let Some(ref tx) = self.tx else {
223 return Poll::Ready(Err(std::io::Error::new(
224 std::io::ErrorKind::BrokenPipe,
225 "PipeTx is closed",
226 )));
227 };
228
229 if tx.is_closed() {
230 Poll::Ready(Ok(0))
231 } else {
232 Poll::Ready(Ok(8192))
233 }
234 }
235
236 fn mark_other_end_readable(&self) {
237 if let Some(rx_end) = self.rx_end.upgrade() {
238 let mut guard = rx_end.lock().unwrap();
239 if let Some(interest_handler) = guard.interest_handler.as_mut() {
240 interest_handler.push_interest(InterestType::Readable);
241 }
242 }
243 }
244}
245
246impl Seek for Pipe {
247 fn seek(&mut self, from: SeekFrom) -> io::Result<u64> {
248 self.recv.seek(from)
249 }
250}
251
252impl Seek for PipeRx {
253 fn seek(&mut self, _: SeekFrom) -> io::Result<u64> {
254 Ok(0)
255 }
256}
257
258impl Seek for PipeTx {
259 fn seek(&mut self, _: SeekFrom) -> io::Result<u64> {
260 Ok(0)
261 }
262}
263
264impl Read for Pipe {
265 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
266 self.recv.read(buf)
267 }
268}
269
270impl Read for PipeRx {
271 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
272 let Some(ref mut rx) = self.rx else {
273 return Err(std::io::Error::new(
274 std::io::ErrorKind::BrokenPipe,
275 "PipeRx is closed",
276 ));
277 };
278
279 let mut rx = rx.lock().unwrap();
280 loop {
281 if let Some(read) = Self::try_read_from_buffer(&mut rx, buf.len(), |mut read_buf| {
282 Read::read(&mut read_buf, buf).ok()
283 }) {
284 return Ok(read);
285 }
286
287 let data = {
288 match rx.chan.blocking_recv() {
289 Some(a) => a,
290 None => {
291 return Ok(0);
292 }
293 }
294 };
295 rx.buffer.replace(Bytes::from(data));
296 }
297 }
298}
299
300impl std::io::Write for Pipe {
301 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
302 self.send.write(buf)
303 }
304
305 fn flush(&mut self) -> std::io::Result<()> {
306 self.send.flush()
307 }
308
309 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
310 self.send.write_all(buf)
311 }
312
313 fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> io::Result<()> {
314 self.send.write_fmt(fmt)
315 }
316
317 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
318 self.send.write_vectored(bufs)
319 }
320}
321
322impl std::io::Write for PipeTx {
323 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
324 let Some(ref tx) = self.tx else {
325 return Err(std::io::Error::new(
326 std::io::ErrorKind::BrokenPipe,
327 "PipeTx is closed",
328 ));
329 };
330
331 tx.send(buf.to_vec())
332 .map_err(|_| Into::<std::io::Error>::into(std::io::ErrorKind::BrokenPipe))?;
333 self.mark_other_end_readable();
334 Ok(buf.len())
335 }
336
337 fn flush(&mut self) -> std::io::Result<()> {
338 Ok(())
339 }
340}
341
342impl AsyncSeek for Pipe {
343 fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
344 let this = Pin::new(&mut self.recv);
345 this.start_seek(position)
346 }
347
348 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
349 let this = Pin::new(&mut self.recv);
350 this.poll_complete(cx)
351 }
352}
353
354impl AsyncSeek for PipeRx {
355 fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> io::Result<()> {
356 Ok(())
357 }
358 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
359 Poll::Ready(Ok(0))
360 }
361}
362
363impl AsyncSeek for PipeTx {
364 fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> io::Result<()> {
365 Ok(())
366 }
367 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
368 Poll::Ready(Ok(0))
369 }
370}
371
372impl AsyncWrite for Pipe {
373 fn poll_write(
374 mut self: Pin<&mut Self>,
375 cx: &mut Context<'_>,
376 buf: &[u8],
377 ) -> Poll<io::Result<usize>> {
378 let this = Pin::new(&mut self.send);
379 this.poll_write(cx, buf)
380 }
381
382 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
383 let this = Pin::new(&mut self.send);
384 this.poll_flush(cx)
385 }
386
387 fn poll_shutdown(
388 mut self: Pin<&mut Self>,
389 cx: &mut Context<'_>,
390 ) -> Poll<Result<(), io::Error>> {
391 let this = Pin::new(&mut self.send);
392 this.poll_shutdown(cx)
393 }
394
395 fn poll_write_vectored(
396 mut self: Pin<&mut Self>,
397 cx: &mut Context<'_>,
398 bufs: &[IoSlice<'_>],
399 ) -> Poll<Result<usize, io::Error>> {
400 let this = Pin::new(&mut self.send);
401 this.poll_write_vectored(cx, bufs)
402 }
403}
404
405impl AsyncWrite for PipeTx {
406 fn poll_write(
407 self: Pin<&mut Self>,
408 _cx: &mut Context<'_>,
409 buf: &[u8],
410 ) -> Poll<io::Result<usize>> {
411 let Some(ref tx) = self.tx else {
412 return Poll::Ready(Err(std::io::Error::new(
413 std::io::ErrorKind::BrokenPipe,
414 "PipeTx is closed",
415 )));
416 };
417
418 match tx.send(buf.to_vec()) {
419 Ok(()) => {
420 self.mark_other_end_readable();
421 Poll::Ready(Ok(buf.len()))
422 }
423 Err(_) => Poll::Ready(Err(Into::<std::io::Error>::into(
424 std::io::ErrorKind::BrokenPipe,
425 ))),
426 }
427 }
428
429 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
430 Poll::Ready(Ok(()))
431 }
432
433 fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
434 self.close();
435 Poll::Ready(Ok(()))
436 }
437}
438
439impl AsyncRead for Pipe {
440 fn poll_read(
441 mut self: Pin<&mut Self>,
442 cx: &mut Context<'_>,
443 buf: &mut tokio::io::ReadBuf<'_>,
444 ) -> Poll<io::Result<()>> {
445 let this = Pin::new(&mut self.recv);
446 this.poll_read(cx, buf)
447 }
448}
449
450impl AsyncRead for PipeRx {
451 fn poll_read(
452 mut self: Pin<&mut Self>,
453 cx: &mut Context<'_>,
454 buf: &mut tokio::io::ReadBuf<'_>,
455 ) -> Poll<io::Result<()>> {
456 let Some(ref mut rx) = self.rx else {
457 return Poll::Ready(Err(std::io::Error::new(
458 std::io::ErrorKind::BrokenPipe,
459 "PipeRx is closed",
460 )));
461 };
462
463 let mut rx = rx.lock().unwrap();
464 loop {
465 if Self::try_read_from_buffer(&mut rx, buf.remaining(), |read_buf| {
466 buf.put_slice(read_buf);
467 Some(read_buf.len())
468 })
469 .is_some()
470 {
471 return Poll::Ready(Ok(()));
472 }
473
474 let data = match rx.chan.poll_recv(cx) {
475 Poll::Ready(Some(a)) => a,
476 Poll::Ready(None) => return Poll::Ready(Ok(())),
477 Poll::Pending => return Poll::Pending,
478 };
479
480 rx.buffer.replace(Bytes::from(data));
481 }
482 }
483}
484
485impl VirtualFile for Pipe {
486 fn last_accessed(&self) -> u64 {
488 0
489 }
490
491 fn last_modified(&self) -> u64 {
493 0
494 }
495
496 fn created_time(&self) -> u64 {
498 0
499 }
500
501 fn size(&self) -> u64 {
503 0
504 }
505
506 fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
509 Ok(())
510 }
511
512 fn unlink(&mut self) -> Result<(), FsError> {
514 Ok(())
515 }
516
517 fn is_open(&self) -> bool {
520 self.send
521 .tx
522 .as_ref()
523 .map(|a| !a.is_closed())
524 .unwrap_or_else(|| false)
525 }
526
527 fn poll_read_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
529 Pin::new(&mut self.recv).poll_read_ready(cx)
530 }
531
532 fn poll_write_ready(
534 mut self: Pin<&mut Self>,
535 _cx: &mut Context<'_>,
536 ) -> Poll<io::Result<usize>> {
537 Pin::new(&mut self.send).poll_write_ready()
538 }
539}
540
541#[derive(Clone, Debug)]
543pub struct DuplexPipe {
544 front: Pipe,
545 back: Pipe,
546}
547
548impl DuplexPipe {
549 pub fn front(&self) -> &Pipe {
551 &self.front
552 }
553
554 pub fn back(&self) -> &Pipe {
556 &self.back
557 }
558
559 pub fn front_mut(&mut self) -> &mut Pipe {
561 &mut self.front
562 }
563
564 pub fn back_mut(&mut self) -> &mut Pipe {
566 &mut self.back
567 }
568
569 pub fn split(self) -> (Pipe, Pipe) {
571 (self.front, self.back)
572 }
573
574 pub fn combine(front: Pipe, back: Pipe) -> Self {
576 Self { front, back }
577 }
578
579 pub fn reverse(self) -> Self {
580 let (front, back) = self.split();
581 Self::combine(back, front)
582 }
583}
584
585impl Default for DuplexPipe {
586 fn default() -> Self {
587 Self::new()
588 }
589}
590
591impl DuplexPipe {
592 pub fn new() -> DuplexPipe {
593 let (end1, end2) = Pipe::channel();
594 Self {
595 front: end1,
596 back: end2,
597 }
598 }
599}
600
601pub type WasiBidirectionalSharedPipePair = ArcFile<DuplexPipe>;