wasmer_wasix/runners/dproxy/hyper_proxy/
stream.rs1use std::io;
2
3use futures::Stream;
4use hyper_util::{client::legacy::connect::Connected, rt::TokioIo};
5use tokio_stream::wrappers::BroadcastStream;
6use virtual_net::tcp_pair::{TcpSocketHalfRx, TcpSocketHalfTx};
7
8use super::*;
9
10#[derive(Debug)]
11pub struct HyperProxyStream {
12 pub(super) tx: TcpSocketHalfTx,
13 pub(super) rx: TokioIo<TcpSocketHalfRx>,
14 pub(super) terminate: BroadcastStream<()>,
15 pub(super) terminated: bool,
16}
17
18impl hyper_util::client::legacy::connect::Connection for HyperProxyStream {
19 fn connected(&self) -> Connected {
20 Connected::new().proxy(true)
21 }
22}
23
24impl hyper::rt::Read for HyperProxyStream {
25 fn poll_read(
26 mut self: Pin<&mut Self>,
27 cx: &mut Context<'_>,
28 buf: hyper::rt::ReadBufCursor<'_>,
29 ) -> Poll<Result<(), std::io::Error>> {
30 if let Poll::Ready(ret) = Pin::new(&mut self.rx).poll_read(cx, buf) {
31 return Poll::Ready(ret);
32 }
33 if self.terminated {
34 return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
35 }
36 if let Poll::Ready(Some(_)) = Pin::new(&mut self.terminate).poll_next(cx) {
37 return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
38 }
39 Poll::Pending
40 }
41}
42
43impl hyper::rt::Write for HyperProxyStream {
44 fn poll_write(
45 mut self: Pin<&mut Self>,
46 cx: &mut Context<'_>,
47 buf: &[u8],
48 ) -> Poll<Result<usize, std::io::Error>> {
49 if let Poll::Ready(ret) = Pin::new(&mut self.tx).poll_write(cx, buf) {
50 return Poll::Ready(ret);
51 }
52 if self.terminated {
53 return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
54 }
55 if let Poll::Ready(Some(_)) = Pin::new(&mut self.terminate).poll_next(cx) {
56 return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
57 }
58 Poll::Pending
59 }
60
61 fn poll_flush(
62 mut self: Pin<&mut Self>,
63 cx: &mut Context<'_>,
64 ) -> Poll<Result<(), std::io::Error>> {
65 if let Poll::Ready(ret) = Pin::new(&mut self.tx).poll_flush(cx) {
66 return Poll::Ready(ret);
67 }
68 if self.terminated {
69 return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
70 }
71 if let Poll::Ready(Some(_)) = Pin::new(&mut self.terminate).poll_next(cx) {
72 return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
73 }
74 Poll::Pending
75 }
76
77 fn poll_shutdown(
78 mut self: Pin<&mut Self>,
79 cx: &mut Context<'_>,
80 ) -> Poll<Result<(), std::io::Error>> {
81 if let Poll::Ready(ret) = Pin::new(&mut self.tx).poll_shutdown(cx) {
82 return Poll::Ready(ret);
83 }
84 if self.terminated {
85 return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
86 }
87 if let Poll::Ready(Some(_)) = Pin::new(&mut self.terminate).poll_next(cx) {
88 return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
89 }
90 Poll::Pending
91 }
92}