wasmer_wasix/syscalls/wasix/
sock_send_file.rs

1use virtual_fs::AsyncReadExt;
2
3use super::*;
4use crate::{WasiInodes, net::socket::TimeType, syscalls::*};
5
6/// ### `sock_send_file()`
7/// Sends the entire contents of a file down a socket
8///
9/// ## Parameters
10///
11/// * `in_fd` - Open file that has the data to be transmitted
12/// * `offset` - Offset into the file to start reading at
13/// * `count` - Number of bytes to be sent
14///
15/// ## Return
16///
17/// Number of bytes transmitted.
18#[instrument(level = "trace", skip_all, fields(%sock, %in_fd, %offset, %count, nsent = field::Empty), ret)]
19pub fn sock_send_file<M: MemorySize>(
20    mut ctx: FunctionEnvMut<'_, WasiEnv>,
21    sock: WasiFd,
22    in_fd: WasiFd,
23    offset: Filesize,
24    count: Filesize,
25    ret_sent: WasmPtr<Filesize, M>,
26) -> Result<Errno, WasiError> {
27    WasiEnv::do_pending_operations(&mut ctx)?;
28
29    let total_written = wasi_try_ok!(sock_send_file_internal(
30        &mut ctx, sock, in_fd, offset, count
31    )?);
32
33    #[cfg(feature = "journal")]
34    if ctx.data().enable_journal {
35        JournalEffector::save_sock_send_file::<M>(&mut ctx, sock, in_fd, offset, total_written)
36            .map_err(|err| {
37                tracing::error!("failed to save sock_send_file event - {}", err);
38                WasiError::Exit(ExitCode::from(Errno::Fault))
39            })?;
40    }
41
42    Span::current().record("nsent", total_written);
43
44    let env = ctx.data();
45    let memory = unsafe { env.memory_view(&ctx) };
46    wasi_try_mem_ok!(ret_sent.write(&memory, total_written as Filesize));
47
48    Ok(Errno::Success)
49}
50
51#[allow(clippy::await_holding_lock)]
52pub(crate) fn sock_send_file_internal(
53    ctx: &mut FunctionEnvMut<'_, WasiEnv>,
54    sock: WasiFd,
55    in_fd: WasiFd,
56    offset: Filesize,
57    mut count: Filesize,
58) -> Result<Result<Filesize, Errno>, WasiError> {
59    let mut env = ctx.data();
60    let net = env.net();
61    let tasks = env.tasks().clone();
62    let state = env.state.clone();
63
64    // Set the offset of the file
65    {
66        let mut fd_map = state.fs.fd_map.write().unwrap();
67        let fd_entry = wasi_try_ok_ok!(fd_map.get_mut(in_fd).ok_or(Errno::Badf));
68        fd_entry.offset.store(offset, Ordering::Release);
69    }
70
71    // Enter a loop that will process all the data
72    let mut total_written: Filesize = 0;
73    while (count > 0) {
74        let sub_count = count.min(4096);
75        count -= sub_count;
76
77        let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(in_fd));
78        let fd_flags = fd_entry.inner.flags;
79
80        let data = {
81            match in_fd {
82                __WASI_STDIN_FILENO => {
83                    let mut stdin = wasi_try_ok_ok!(
84                        WasiInodes::stdin_mut(&state.fs.fd_map).map_err(fs_error_into_wasi_err)
85                    );
86                    let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
87                        // TODO: optimize with MaybeUninit
88                        let mut buf = vec![0u8; sub_count as usize];
89                        let amt = stdin.read(&mut buf[..]).await.map_err(map_io_err)?;
90                        buf.truncate(amt);
91                        Ok(buf)
92                    })?);
93                    env = ctx.data();
94                    data
95                }
96                __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => return Ok(Err(Errno::Inval)),
97                _ => {
98                    if !fd_entry.inner.rights.contains(Rights::FD_READ) {
99                        // TODO: figure out the error to return when lacking rights
100                        return Ok(Err(Errno::Access));
101                    }
102
103                    let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
104                    let inode = fd_entry.inode;
105                    let data = {
106                        let mut guard = inode.write();
107                        match guard.deref_mut() {
108                            Kind::File { handle, .. } => {
109                                if let Some(handle) = handle {
110                                    let data =
111                                        wasi_try_ok_ok!(__asyncify(ctx, None, async move {
112                                            let mut buf = vec![0u8; sub_count as usize];
113
114                                            let mut handle = handle.write().unwrap();
115                                            handle
116                                                .seek(std::io::SeekFrom::Start(offset as u64))
117                                                .await
118                                                .map_err(map_io_err)?;
119                                            let amt = handle
120                                                .read(&mut buf[..])
121                                                .await
122                                                .map_err(map_io_err)?;
123                                            buf.truncate(amt);
124                                            Ok(buf)
125                                        })?);
126                                    env = ctx.data();
127                                    data
128                                } else {
129                                    return Ok(Err(Errno::Inval));
130                                }
131                            }
132                            Kind::Socket { socket, .. } => {
133                                let socket = socket.clone();
134                                let tasks = tasks.clone();
135                                drop(guard);
136
137                                let read_timeout = socket
138                                    .opt_time(TimeType::WriteTimeout)
139                                    .ok()
140                                    .flatten()
141                                    .unwrap_or(Duration::from_secs(30));
142
143                                let data = wasi_try_ok_ok!(__asyncify(ctx, None, async {
144                                    let mut buf = Vec::with_capacity(sub_count as usize);
145                                    unsafe {
146                                        buf.set_len(sub_count as usize);
147                                    }
148                                    socket
149                                        .recv(
150                                            tasks.deref(),
151                                            &mut buf,
152                                            Some(read_timeout),
153                                            false,
154                                            false,
155                                        )
156                                        .await
157                                        .map(|amt| {
158                                            unsafe {
159                                                buf.set_len(amt);
160                                            }
161                                            let buf: Vec<u8> = unsafe { std::mem::transmute(buf) };
162                                            buf
163                                        })
164                                })?);
165                                env = ctx.data();
166                                data
167                            }
168                            Kind::PipeRx { rx } => {
169                                let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
170                                    // TODO: optimize with MaybeUninit
171                                    let mut buf = vec![0u8; sub_count as usize];
172                                    let amt = virtual_fs::AsyncReadExt::read(rx, &mut buf[..])
173                                        .await
174                                        .map_err(map_io_err)?;
175                                    buf.truncate(amt);
176                                    Ok(buf)
177                                })?);
178                                env = ctx.data();
179                                data
180                            }
181                            Kind::DuplexPipe { pipe } => {
182                                let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
183                                    // TODO: optimize with MaybeUninit
184                                    let mut buf = vec![0u8; sub_count as usize];
185                                    let amt = virtual_fs::AsyncReadExt::read(pipe, &mut buf[..])
186                                        .await
187                                        .map_err(map_io_err)?;
188                                    buf.truncate(amt);
189                                    Ok(buf)
190                                })?);
191                                env = ctx.data();
192                                data
193                            }
194                            Kind::PipeTx { .. }
195                            | Kind::Epoll { .. }
196                            | Kind::EventNotifications { .. } => {
197                                return Ok(Err(Errno::Inval));
198                            }
199                            Kind::Dir { .. } | Kind::Root { .. } => {
200                                return Ok(Err(Errno::Isdir));
201                            }
202                            Kind::Symlink { .. } => unimplemented!("Symlinks in wasi::fd_read"),
203                            Kind::Buffer { buffer } => {
204                                // TODO: optimize with MaybeUninit
205                                let mut buf = vec![0u8; sub_count as usize];
206
207                                let mut buf_read = &buffer[offset..];
208                                let amt = wasi_try_ok_ok!(
209                                    std::io::Read::read(&mut buf_read, &mut buf[..])
210                                        .map_err(map_io_err)
211                                );
212                                buf.truncate(amt);
213                                buf
214                            }
215                        }
216                    };
217
218                    // reborrow
219                    let mut fd_map = state.fs.fd_map.write().unwrap();
220                    let fd_entry = wasi_try_ok_ok!(fd_map.get_mut(in_fd).ok_or(Errno::Badf));
221                    fd_entry
222                        .offset
223                        .fetch_add(data.len() as u64, Ordering::AcqRel);
224
225                    data
226                }
227            }
228        };
229
230        // Write it down to the socket
231        let tasks = ctx.data().tasks().clone();
232        let bytes_written = wasi_try_ok_ok!(__sock_asyncify_mut(
233            ctx,
234            sock,
235            Rights::SOCK_SEND,
236            |socket, fd| async move {
237                let write_timeout = socket
238                    .opt_time(TimeType::ReadTimeout)
239                    .ok()
240                    .flatten()
241                    .unwrap_or(Duration::from_secs(30));
242                socket
243                    .send(tasks.deref(), &data, Some(write_timeout), true)
244                    .await
245            },
246        ));
247        env = ctx.data();
248
249        total_written += bytes_written as u64;
250    }
251
252    Ok(Ok(total_written))
253}