wasmer_wasix/syscalls/wasix/
sock_send_file.rs

1use virtual_fs::AsyncReadExt;
2
3use super::*;
4use crate::{WasiInodes, net::socket::TimeType, syscalls::*};
5
6/// ### `sock_send_file()`
7/// Sends the entire contents of a file down a socket
8///
9/// ## Parameters
10///
11/// * `in_fd` - Open file that has the data to be transmitted
12/// * `offset` - Offset into the file to start reading at
13/// * `count` - Number of bytes to be sent
14///
15/// ## Return
16///
17/// Number of bytes transmitted.
18#[instrument(level = "trace", skip_all, fields(%sock, %in_fd, %offset, %count, nsent = field::Empty), ret)]
19pub fn sock_send_file<M: MemorySize>(
20    mut ctx: FunctionEnvMut<'_, WasiEnv>,
21    sock: WasiFd,
22    in_fd: WasiFd,
23    offset: Filesize,
24    count: Filesize,
25    ret_sent: WasmPtr<Filesize, M>,
26) -> Result<Errno, WasiError> {
27    WasiEnv::do_pending_operations(&mut ctx)?;
28
29    let total_written = wasi_try_ok!(sock_send_file_internal(
30        &mut ctx, sock, in_fd, offset, count
31    )?);
32
33    #[cfg(feature = "journal")]
34    if ctx.data().enable_journal {
35        JournalEffector::save_sock_send_file::<M>(&mut ctx, sock, in_fd, offset, total_written)
36            .map_err(|err| {
37                tracing::error!("failed to save sock_send_file event - {}", err);
38                WasiError::Exit(ExitCode::from(Errno::Fault))
39            })?;
40    }
41
42    Span::current().record("nsent", total_written);
43
44    let env = ctx.data();
45    let memory = unsafe { env.memory_view(&ctx) };
46    wasi_try_mem_ok!(ret_sent.write(&memory, total_written as Filesize));
47
48    Ok(Errno::Success)
49}
50
51#[allow(clippy::await_holding_lock)]
52pub(crate) fn sock_send_file_internal(
53    ctx: &mut FunctionEnvMut<'_, WasiEnv>,
54    sock: WasiFd,
55    in_fd: WasiFd,
56    offset: Filesize,
57    mut count: Filesize,
58) -> Result<Result<Filesize, Errno>, WasiError> {
59    let mut env = ctx.data();
60    let net = env.net();
61    let tasks = env.tasks().clone();
62    let state = env.state.clone();
63    let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(in_fd));
64
65    // Set the offset of the file
66    fd_entry.inner.offset.store(offset, Ordering::Release);
67
68    // Enter a loop that will process all the data
69    let mut total_written: Filesize = 0;
70    while (count > 0) {
71        let sub_count = count.min(4096);
72        count -= sub_count;
73
74        let fd_flags = fd_entry.inner.flags;
75
76        let data = {
77            match in_fd {
78                __WASI_STDIN_FILENO => {
79                    let mut stdin = wasi_try_ok_ok!(
80                        WasiInodes::stdin_mut(&state.fs.fd_map).map_err(fs_error_into_wasi_err)
81                    );
82                    let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
83                        // TODO: optimize with MaybeUninit
84                        let mut buf = vec![0u8; sub_count as usize];
85                        let amt = stdin.read(&mut buf[..]).await.map_err(map_io_err)?;
86                        buf.truncate(amt);
87                        Ok(buf)
88                    })?);
89                    env = ctx.data();
90                    data
91                }
92                __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => return Ok(Err(Errno::Inval)),
93                _ => {
94                    if !fd_entry.inner.rights.contains(Rights::FD_READ) {
95                        // TODO: figure out the error to return when lacking rights
96                        return Ok(Err(Errno::Access));
97                    }
98
99                    let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
100                    let inode = fd_entry.inode.clone();
101                    let data = {
102                        let mut guard = inode.write();
103                        match guard.deref_mut() {
104                            Kind::File { handle, .. } => {
105                                if let Some(handle) = handle {
106                                    let data =
107                                        wasi_try_ok_ok!(__asyncify(ctx, None, async move {
108                                            let mut buf = vec![0u8; sub_count as usize];
109
110                                            let mut handle = handle.write().unwrap();
111                                            handle
112                                                .seek(std::io::SeekFrom::Start(offset as u64))
113                                                .await
114                                                .map_err(map_io_err)?;
115                                            let amt = handle
116                                                .read(&mut buf[..])
117                                                .await
118                                                .map_err(map_io_err)?;
119                                            buf.truncate(amt);
120                                            Ok(buf)
121                                        })?);
122                                    env = ctx.data();
123                                    data
124                                } else {
125                                    return Ok(Err(Errno::Inval));
126                                }
127                            }
128                            Kind::Socket { socket, .. } => {
129                                let socket = socket.clone();
130                                let tasks = tasks.clone();
131                                drop(guard);
132
133                                let read_timeout = socket
134                                    .opt_time(TimeType::WriteTimeout)
135                                    .ok()
136                                    .flatten()
137                                    .unwrap_or(Duration::from_secs(30));
138
139                                let data = wasi_try_ok_ok!(__asyncify(ctx, None, async {
140                                    let mut buf = Vec::with_capacity(sub_count as usize);
141                                    unsafe {
142                                        buf.set_len(sub_count as usize);
143                                    }
144                                    socket
145                                        .recv(
146                                            tasks.deref(),
147                                            &mut buf,
148                                            Some(read_timeout),
149                                            false,
150                                            false,
151                                        )
152                                        .await
153                                        .map(|amt| {
154                                            unsafe {
155                                                buf.set_len(amt);
156                                            }
157                                            let buf: Vec<u8> = unsafe { std::mem::transmute(buf) };
158                                            buf
159                                        })
160                                })?);
161                                env = ctx.data();
162                                data
163                            }
164                            Kind::PipeRx { rx } => {
165                                let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
166                                    // TODO: optimize with MaybeUninit
167                                    let mut buf = vec![0u8; sub_count as usize];
168                                    let amt = virtual_fs::AsyncReadExt::read(rx, &mut buf[..])
169                                        .await
170                                        .map_err(map_io_err)?;
171                                    buf.truncate(amt);
172                                    Ok(buf)
173                                })?);
174                                env = ctx.data();
175                                data
176                            }
177                            Kind::DuplexPipe { pipe } => {
178                                let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
179                                    // TODO: optimize with MaybeUninit
180                                    let mut buf = vec![0u8; sub_count as usize];
181                                    let amt = virtual_fs::AsyncReadExt::read(pipe, &mut buf[..])
182                                        .await
183                                        .map_err(map_io_err)?;
184                                    buf.truncate(amt);
185                                    Ok(buf)
186                                })?);
187                                env = ctx.data();
188                                data
189                            }
190                            Kind::PipeTx { .. }
191                            | Kind::Epoll { .. }
192                            | Kind::EventNotifications { .. } => {
193                                return Ok(Err(Errno::Inval));
194                            }
195                            Kind::Dir { .. } | Kind::Root { .. } => {
196                                return Ok(Err(Errno::Isdir));
197                            }
198                            Kind::Symlink { .. } => unimplemented!("Symlinks in wasi::fd_read"),
199                            Kind::Buffer { buffer } => {
200                                // TODO: optimize with MaybeUninit
201                                let mut buf = vec![0u8; sub_count as usize];
202
203                                let mut buf_read = &buffer[offset..];
204                                let amt = wasi_try_ok_ok!(
205                                    std::io::Read::read(&mut buf_read, &mut buf[..])
206                                        .map_err(map_io_err)
207                                );
208                                buf.truncate(amt);
209                                buf
210                            }
211                        }
212                    };
213
214                    fd_entry
215                        .inner
216                        .offset
217                        .fetch_add(data.len() as u64, Ordering::AcqRel);
218
219                    data
220                }
221            }
222        };
223
224        // Write it down to the socket
225        let tasks = ctx.data().tasks().clone();
226        let bytes_written = wasi_try_ok_ok!(__sock_asyncify_mut(
227            ctx,
228            sock,
229            Rights::SOCK_SEND,
230            |socket, fd| async move {
231                let write_timeout = socket
232                    .opt_time(TimeType::ReadTimeout)
233                    .ok()
234                    .flatten()
235                    .unwrap_or(Duration::from_secs(30));
236                socket
237                    .send(tasks.deref(), &data, Some(write_timeout), true)
238                    .await
239            },
240        ));
241        env = ctx.data();
242
243        total_written += bytes_written as u64;
244    }
245
246    Ok(Ok(total_written))
247}