From 4a5cac6419ca91ab758fa87a8a8c62319944acc6 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Mon, 27 Feb 2023 15:50:09 -0800 Subject: [PATCH 1/3] net: add `UdpSocket::peek_sender()` and variants closes #5491 --- tokio/Cargo.toml | 4 +-- tokio/src/net/udp.rs | 69 +++++++++++++++++++++++++++++++++++ tokio/tests/udp.rs | 86 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+), 2 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index ae64fd5cff7..0b4e011eff0 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -109,7 +109,7 @@ num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.12.0", optional = true } [target.'cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))'.dependencies] -socket2 = { version = "0.4.4", optional = true, features = [ "all" ] } +socket2 = { version = "0.4.9", optional = true, features = [ "all" ] } # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. @@ -146,7 +146,7 @@ mockall = "0.11.1" async-stream = "0.3" [target.'cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))'.dev-dependencies] -socket2 = "0.4" +socket2 = "0.4.9" tempfile = "3.1.0" [target.'cfg(not(all(any(target_arch = "wasm32", target_arch = "wasm64"), target_os = "unknown")))'.dev-dependencies] diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index 213d9149dad..110406252fd 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -1331,6 +1331,11 @@ impl UdpSocket { /// Make sure to always use a sufficiently large buffer to hold the /// maximum UDP packet size, which can be up to 65536 bytes in size. /// + /// MacOS will return an error if you pass a zero-sized buffer. + /// + /// If you're merely interested in learning the sender of the data at the head of the queue, + /// try [`peek_sender`]. + /// /// # Examples /// /// ```no_run @@ -1349,6 +1354,8 @@ impl UdpSocket { /// Ok(()) /// } /// ``` + /// + /// [`peek_sender`]: method@Self::peek_sender pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io .registration() @@ -1371,6 +1378,11 @@ impl UdpSocket { /// Make sure to always use a sufficiently large buffer to hold the /// maximum UDP packet size, which can be up to 65536 bytes in size. /// + /// MacOS will return an error if you pass a zero-sized buffer. + /// + /// If you're merely interested in learning the sender of the data at the head of the queue, + /// try [`poll_peek_sender`]. + /// /// # Return value /// /// The function returns: @@ -1382,6 +1394,8 @@ impl UdpSocket { /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`poll_peek_sender`]: method@Self::poll_peek_sender pub fn poll_peek_from( &self, cx: &mut Context<'_>, @@ -1404,6 +1418,61 @@ impl UdpSocket { Poll::Ready(Ok(addr)) } + /// Retrieve the sender of the data at the head of the input queue, waiting if empty. + /// + /// This is equivalent to calling [`peek_from`] with a zero-sized buffer, + /// but suppresses the `WSAEMSGSIZE` error on Windows and the "invalid argument" error on macOS. + /// + /// [`peek_from`]: method@Self::peek_from + pub async fn peek_sender(&self) -> io::Result { + self.io + .registration() + .async_io(Interest::READABLE, || self.peek_sender_inner()) + .await + } + + /// Retrieve the sender of the data at the head of the input queue, + /// scheduling a wakeup if empty. + /// + /// This is equivalent to calling [`poll_peek_from`] with a zero-sized buffer, + /// but suppresses the `WSAEMSGSIZE` error on Windows and the "invalid argument" error on macOS. + /// + /// # Notes + /// + /// Note that on multiple calls to a `poll_*` method in the recv direction, only the + /// `Waker` from the `Context` passed to the most recent call will be scheduled to + /// receive a wakeup. + /// + /// [`poll_peek_from`]: method@Self::poll_peek_from + pub fn poll_peek_sender(&self, cx: &mut Context<'_>) -> Poll> { + self.io + .registration() + .poll_read_io(cx, || self.peek_sender_inner()) + } + + /// Try to retrieve the sender of the data at the head of the input queue. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + pub fn try_peek_sender(&self) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || self.peek_sender_inner()) + } + + #[inline] + fn peek_sender_inner(&self) -> io::Result { + self.io.try_io(|| { + self.as_socket() + .peek_sender()? + // May be `None` if the platform doesn't populate the sender for some reason. + // In testing, that only occurred on macOS if you pass a zero-sized buffer, + // but the implementation of `Socket::peek_sender()` covers that. + .as_socket() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "sender not available")) + }) + } + /// Gets the value of the `SO_BROADCAST` option for this socket. /// /// For more information about this option, see [`set_broadcast`]. diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index 2b6ab4d2ad2..bd98e4840ce 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -134,6 +134,92 @@ async fn send_to_peek_from_poll() -> std::io::Result<()> { Ok(()) } +#[tokio::test] +async fn peek_sender() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let sender_addr = sender.local_addr()?; + let receiver_addr = receiver.local_addr()?; + + let msg = b"Hello, world!"; + sender.send_to(msg, receiver_addr).await?; + + let peeked_sender = receiver.peek_sender().await?; + assert_eq!(peeked_sender, sender_addr); + + // Assert that `peek_sender()` returns the right sender but + // doesn't remove from the receive queue. + let mut recv_buf = [0u8; 32]; + let (read, received_sender) = receiver.recv_from(&mut recv_buf).await?; + + assert_eq!(&recv_buf[..read], msg); + assert_eq!(received_sender, peeked_sender); + + Ok(()) +} + +#[tokio::test] +async fn poll_peek_sender() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let sender_addr = sender.local_addr()?; + let receiver_addr = receiver.local_addr()?; + + let msg = b"Hello, world!"; + poll_fn(|cx| sender.poll_send_to(cx, msg, receiver_addr)).await?; + + let peeked_sender = poll_fn(|cx| receiver.poll_peek_sender(cx)).await?; + assert_eq!(peeked_sender, sender_addr); + + // Assert that `poll_peek_sender()` returns the right sender but + // doesn't remove from the receive queue. + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let received_sender = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?; + + assert_eq!(read.filled(), msg); + assert_eq!(received_sender, peeked_sender); + + Ok(()) +} + +#[tokio::test] +async fn try_peek_sender() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let sender_addr = sender.local_addr()?; + let receiver_addr = receiver.local_addr()?; + + let msg = b"Hello, world!"; + sender.send_to(msg, receiver_addr).await?; + + let peeked_sender = loop { + match receiver.try_peek_sender() { + Ok(peeked_sender) => break peeked_sender, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + receiver.readable().await?; + } + Err(e) => return Err(e), + } + }; + + assert_eq!(peeked_sender, sender_addr); + + // Assert that `try_peek_sender()` returns the right sender but + // didn't remove from the receive queue. + let mut recv_buf = [0u8; 32]; + // We already peeked the sender so there must be data in the receive queue. + let (read, received_sender) = receiver.try_recv_from(&mut recv_buf).unwrap(); + + assert_eq!(&recv_buf[..read], msg); + assert_eq!(received_sender, peeked_sender); + + Ok(()) +} + #[tokio::test] async fn split() -> std::io::Result<()> { let socket = UdpSocket::bind("127.0.0.1:0").await?; From 1609eddf09124dcd61c794a11db12ef3b856f4e1 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Tue, 7 Mar 2023 15:14:31 -0800 Subject: [PATCH 2/3] net: add `UdpSocket::try_peek_from()`, fix docs on `poll_peek_from` --- tokio/src/net/udp.rs | 29 ++++++++++++++++++++++++++++- tokio/tests/udp.rs | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index 110406252fd..a12d0479dc4 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -1364,7 +1364,7 @@ impl UdpSocket { } /// Receives data from the socket, without removing it from the input queue. - /// On success, returns the number of bytes read. + /// On success, returns the sending address of the datagram. /// /// # Notes /// @@ -1418,6 +1418,33 @@ impl UdpSocket { Poll::Ready(Ok(addr)) } + /// Tries to receive data on the socket without removing it from the input queue. + /// On success, returns the number of bytes read and the sending address of the + /// datagram. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + /// + /// # Notes + /// + /// On Windows, if the data is larger than the buffer specified, the buffer + /// is filled with the first part of the data, and peek returns the error + /// WSAEMSGSIZE(10040). The excess data is lost. + /// Make sure to always use a sufficiently large buffer to hold the + /// maximum UDP packet size, which can be up to 65536 bytes in size. + /// + /// MacOS will return an error if you pass a zero-sized buffer. + /// + /// If you're merely interested in learning the sender of the data at the head of the queue, + /// try [`try_peek_sender`]. + /// + /// [`try_peek_sender`]: method@Self::try_peek_sender + pub fn try_peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io + .registration() + .try_io(Interest::READABLE, || self.io.peek_from(buf)) + } + /// Retrieve the sender of the data at the head of the input queue, waiting if empty. /// /// This is equivalent to calling [`peek_from`] with a zero-sized buffer, diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index bd98e4840ce..6825ab8810d 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -106,6 +106,45 @@ async fn send_to_peek_from() -> std::io::Result<()> { Ok(()) } +#[tokio::test] +async fn send_to_try_peek_from() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let receiver_addr = receiver.local_addr()?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?; + + // peek + let mut recv_buf = [0u8; 32]; + + loop { + match receiver.try_peek_from(&mut recv_buf) { + Ok((n, addr)) => { + assert_eq!(&recv_buf[..n], MSG); + assert_eq!(addr, sender.local_addr()?); + break; + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + receiver.readable().await?; + } + Err(e) => return Err(e), + } + } + + // peek + let mut recv_buf = [0u8; 32]; + let (n, addr) = receiver.peek_from(&mut recv_buf).await?; + assert_eq!(&recv_buf[..n], MSG); + assert_eq!(addr, sender.local_addr()?); + + let mut recv_buf = [0u8; 32]; + let (n, addr) = receiver.recv_from(&mut recv_buf).await?; + assert_eq!(&recv_buf[..n], MSG); + assert_eq!(addr, sender.local_addr()?); + + Ok(()) +} + #[tokio::test] async fn send_to_peek_from_poll() -> std::io::Result<()> { let sender = UdpSocket::bind("127.0.0.1:0").await?; From 3af836074891e9d86af41813e8a0d653602ed00b Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Tue, 7 Mar 2023 15:32:49 -0800 Subject: [PATCH 3/3] net: `UdpSocket`: add notes about trusting sender addresses --- tokio/src/net/udp.rs | 80 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index a12d0479dc4..c2da3cac082 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -954,6 +954,15 @@ impl UdpSocket { /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `readable()`. /// + /// # Notes + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection + /// /// # Examples /// /// ```no_run @@ -1177,6 +1186,15 @@ impl UdpSocket { /// Ok(()) /// } /// ``` + /// + /// # Notes + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io .registration() @@ -1201,6 +1219,15 @@ impl UdpSocket { /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// # Notes + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub fn poll_recv_from( &self, cx: &mut Context<'_>, @@ -1233,6 +1260,16 @@ impl UdpSocket { /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `readable()`. /// + /// # Notes + /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection + /// /// # Examples /// /// ```no_run @@ -1336,6 +1373,12 @@ impl UdpSocket { /// If you're merely interested in learning the sender of the data at the head of the queue, /// try [`peek_sender`]. /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// /// # Examples /// /// ```no_run @@ -1356,6 +1399,7 @@ impl UdpSocket { /// ``` /// /// [`peek_sender`]: method@Self::peek_sender + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io .registration() @@ -1383,6 +1427,12 @@ impl UdpSocket { /// If you're merely interested in learning the sender of the data at the head of the queue, /// try [`poll_peek_sender`]. /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// /// # Return value /// /// The function returns: @@ -1396,6 +1446,7 @@ impl UdpSocket { /// This function may encounter any standard I/O error except `WouldBlock`. /// /// [`poll_peek_sender`]: method@Self::poll_peek_sender + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub fn poll_peek_from( &self, cx: &mut Context<'_>, @@ -1438,7 +1489,14 @@ impl UdpSocket { /// If you're merely interested in learning the sender of the data at the head of the queue, /// try [`try_peek_sender`]. /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// /// [`try_peek_sender`]: method@Self::try_peek_sender + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub fn try_peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io .registration() @@ -1450,7 +1508,14 @@ impl UdpSocket { /// This is equivalent to calling [`peek_from`] with a zero-sized buffer, /// but suppresses the `WSAEMSGSIZE` error on Windows and the "invalid argument" error on macOS. /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// /// [`peek_from`]: method@Self::peek_from + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub async fn peek_sender(&self) -> io::Result { self.io .registration() @@ -1470,7 +1535,14 @@ impl UdpSocket { /// `Waker` from the `Context` passed to the most recent call will be scheduled to /// receive a wakeup. /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// /// [`poll_peek_from`]: method@Self::poll_peek_from + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub fn poll_peek_sender(&self, cx: &mut Context<'_>) -> Poll> { self.io .registration() @@ -1481,6 +1553,14 @@ impl UdpSocket { /// /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `readable()`. + /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub fn try_peek_sender(&self) -> io::Result { self.io .registration()