wasmer_wasix/runners/dproxy/hyper_proxy/
stream.rs

1use std::io;
2
3use futures::Stream;
4use hyper_util::{client::legacy::connect::Connected, rt::TokioIo};
5use tokio_stream::wrappers::BroadcastStream;
6use virtual_net::tcp_pair::{TcpSocketHalfRx, TcpSocketHalfTx};
7
8use super::*;
9
10#[derive(Debug)]
11pub struct HyperProxyStream {
12    pub(super) tx: TcpSocketHalfTx,
13    pub(super) rx: TokioIo<TcpSocketHalfRx>,
14    pub(super) terminate: BroadcastStream<()>,
15    pub(super) terminated: bool,
16}
17
18impl hyper_util::client::legacy::connect::Connection for HyperProxyStream {
19    fn connected(&self) -> Connected {
20        Connected::new().proxy(true)
21    }
22}
23
24impl hyper::rt::Read for HyperProxyStream {
25    fn poll_read(
26        mut self: Pin<&mut Self>,
27        cx: &mut Context<'_>,
28        buf: hyper::rt::ReadBufCursor<'_>,
29    ) -> Poll<Result<(), std::io::Error>> {
30        if let Poll::Ready(ret) = Pin::new(&mut self.rx).poll_read(cx, buf) {
31            return Poll::Ready(ret);
32        }
33        if self.terminated {
34            return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
35        }
36        if let Poll::Ready(Some(_)) = Pin::new(&mut self.terminate).poll_next(cx) {
37            return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
38        }
39        Poll::Pending
40    }
41}
42
43impl hyper::rt::Write for HyperProxyStream {
44    fn poll_write(
45        mut self: Pin<&mut Self>,
46        cx: &mut Context<'_>,
47        buf: &[u8],
48    ) -> Poll<Result<usize, std::io::Error>> {
49        if let Poll::Ready(ret) = Pin::new(&mut self.tx).poll_write(cx, buf) {
50            return Poll::Ready(ret);
51        }
52        if self.terminated {
53            return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
54        }
55        if let Poll::Ready(Some(_)) = Pin::new(&mut self.terminate).poll_next(cx) {
56            return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
57        }
58        Poll::Pending
59    }
60
61    fn poll_flush(
62        mut self: Pin<&mut Self>,
63        cx: &mut Context<'_>,
64    ) -> Poll<Result<(), std::io::Error>> {
65        if let Poll::Ready(ret) = Pin::new(&mut self.tx).poll_flush(cx) {
66            return Poll::Ready(ret);
67        }
68        if self.terminated {
69            return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
70        }
71        if let Poll::Ready(Some(_)) = Pin::new(&mut self.terminate).poll_next(cx) {
72            return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
73        }
74        Poll::Pending
75    }
76
77    fn poll_shutdown(
78        mut self: Pin<&mut Self>,
79        cx: &mut Context<'_>,
80    ) -> Poll<Result<(), std::io::Error>> {
81        if let Poll::Ready(ret) = Pin::new(&mut self.tx).poll_shutdown(cx) {
82            return Poll::Ready(ret);
83        }
84        if self.terminated {
85            return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
86        }
87        if let Poll::Ready(Some(_)) = Pin::new(&mut self.terminate).poll_next(cx) {
88            return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
89        }
90        Poll::Pending
91    }
92}