From d459a9345334911be4af1bba58b6c796d92fde66 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Fri, 17 Mar 2023 02:22:42 -0700 Subject: [PATCH] net: add `UdpSocket::peek_sender()` (#5520) --- tokio/Cargo.toml | 4 +- tokio/src/net/udp.rs | 178 ++++++++++++++++++++++++++++++++++++++++++- tokio/tests/udp.rs | 125 ++++++++++++++++++++++++++++++ 3 files changed, 304 insertions(+), 3 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index ae64fd5cff7..0b4e011eff0 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -109,7 +109,7 @@ num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.12.0", optional = true } [target.'cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))'.dependencies] -socket2 = { version = "0.4.4", optional = true, features = [ "all" ] } +socket2 = { version = "0.4.9", optional = true, features = [ "all" ] } # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. @@ -146,7 +146,7 @@ mockall = "0.11.1" async-stream = "0.3" [target.'cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))'.dev-dependencies] -socket2 = "0.4" +socket2 = "0.4.9" tempfile = "3.1.0" [target.'cfg(not(all(any(target_arch = "wasm32", target_arch = "wasm64"), target_os = "unknown")))'.dev-dependencies] diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index e7a65f1b29d..40732a0a294 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -954,6 +954,15 @@ impl UdpSocket { /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `readable()`. /// + /// # Notes + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection + /// /// # Examples /// /// ```no_run @@ -1177,6 +1186,15 @@ impl UdpSocket { /// Ok(()) /// } /// ``` + /// + /// # Notes + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io .registration() @@ -1201,6 +1219,15 @@ impl UdpSocket { /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// # Notes + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub fn poll_recv_from( &self, cx: &mut Context<'_>, @@ -1233,6 +1260,16 @@ impl UdpSocket { /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `readable()`. /// + /// # Notes + /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection + /// /// # Examples /// /// ```no_run @@ -1367,6 +1404,17 @@ impl UdpSocket { /// Make sure to always use a sufficiently large buffer to hold the /// maximum UDP packet size, which can be up to 65536 bytes in size. /// + /// MacOS will return an error if you pass a zero-sized buffer. + /// + /// If you're merely interested in learning the sender of the data at the head of the queue, + /// try [`peek_sender`]. + /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// /// # Examples /// /// ```no_run @@ -1385,6 +1433,9 @@ impl UdpSocket { /// Ok(()) /// } /// ``` + /// + /// [`peek_sender`]: method@Self::peek_sender + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io .registration() @@ -1393,7 +1444,7 @@ impl UdpSocket { } /// Receives data from the socket, without removing it from the input queue. - /// On success, returns the number of bytes read. + /// On success, returns the sending address of the datagram. /// /// # Notes /// @@ -1407,6 +1458,17 @@ impl UdpSocket { /// Make sure to always use a sufficiently large buffer to hold the /// maximum UDP packet size, which can be up to 65536 bytes in size. /// + /// MacOS will return an error if you pass a zero-sized buffer. + /// + /// If you're merely interested in learning the sender of the data at the head of the queue, + /// try [`poll_peek_sender`]. + /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// /// # Return value /// /// The function returns: @@ -1418,6 +1480,9 @@ impl UdpSocket { /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`poll_peek_sender`]: method@Self::poll_peek_sender + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection pub fn poll_peek_from( &self, cx: &mut Context<'_>, @@ -1440,6 +1505,117 @@ impl UdpSocket { Poll::Ready(Ok(addr)) } + /// Tries to receive data on the socket without removing it from the input queue. + /// On success, returns the number of bytes read and the sending address of the + /// datagram. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + /// + /// # Notes + /// + /// On Windows, if the data is larger than the buffer specified, the buffer + /// is filled with the first part of the data, and peek returns the error + /// WSAEMSGSIZE(10040). The excess data is lost. + /// Make sure to always use a sufficiently large buffer to hold the + /// maximum UDP packet size, which can be up to 65536 bytes in size. + /// + /// MacOS will return an error if you pass a zero-sized buffer. + /// + /// If you're merely interested in learning the sender of the data at the head of the queue, + /// try [`try_peek_sender`]. + /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [`try_peek_sender`]: method@Self::try_peek_sender + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection + pub fn try_peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io + .registration() + .try_io(Interest::READABLE, || self.io.peek_from(buf)) + } + + /// Retrieve the sender of the data at the head of the input queue, waiting if empty. + /// + /// This is equivalent to calling [`peek_from`] with a zero-sized buffer, + /// but suppresses the `WSAEMSGSIZE` error on Windows and the "invalid argument" error on macOS. + /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [`peek_from`]: method@Self::peek_from + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection + pub async fn peek_sender(&self) -> io::Result { + self.io + .registration() + .async_io(Interest::READABLE, || self.peek_sender_inner()) + .await + } + + /// Retrieve the sender of the data at the head of the input queue, + /// scheduling a wakeup if empty. + /// + /// This is equivalent to calling [`poll_peek_from`] with a zero-sized buffer, + /// but suppresses the `WSAEMSGSIZE` error on Windows and the "invalid argument" error on macOS. + /// + /// # Notes + /// + /// Note that on multiple calls to a `poll_*` method in the recv direction, only the + /// `Waker` from the `Context` passed to the most recent call will be scheduled to + /// receive a wakeup. + /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [`poll_peek_from`]: method@Self::poll_peek_from + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection + pub fn poll_peek_sender(&self, cx: &mut Context<'_>) -> Poll> { + self.io + .registration() + .poll_read_io(cx, || self.peek_sender_inner()) + } + + /// Try to retrieve the sender of the data at the head of the input queue. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + /// + /// Note that the socket address **cannot** be implicitly trusted, because it is relatively + /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack]. + /// Because UDP is stateless and does not validate the origin of a packet, + /// the attacker does not need to be able to intercept traffic in order to interfere. + /// It is important to be aware of this when designing your application-level protocol. + /// + /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection + pub fn try_peek_sender(&self) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || self.peek_sender_inner()) + } + + #[inline] + fn peek_sender_inner(&self) -> io::Result { + self.io.try_io(|| { + self.as_socket() + .peek_sender()? + // May be `None` if the platform doesn't populate the sender for some reason. + // In testing, that only occurred on macOS if you pass a zero-sized buffer, + // but the implementation of `Socket::peek_sender()` covers that. + .as_socket() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "sender not available")) + }) + } + /// Gets the value of the `SO_BROADCAST` option for this socket. /// /// For more information about this option, see [`set_broadcast`]. diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index 2b6ab4d2ad2..6825ab8810d 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -106,6 +106,45 @@ async fn send_to_peek_from() -> std::io::Result<()> { Ok(()) } +#[tokio::test] +async fn send_to_try_peek_from() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let receiver_addr = receiver.local_addr()?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?; + + // peek + let mut recv_buf = [0u8; 32]; + + loop { + match receiver.try_peek_from(&mut recv_buf) { + Ok((n, addr)) => { + assert_eq!(&recv_buf[..n], MSG); + assert_eq!(addr, sender.local_addr()?); + break; + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + receiver.readable().await?; + } + Err(e) => return Err(e), + } + } + + // peek + let mut recv_buf = [0u8; 32]; + let (n, addr) = receiver.peek_from(&mut recv_buf).await?; + assert_eq!(&recv_buf[..n], MSG); + assert_eq!(addr, sender.local_addr()?); + + let mut recv_buf = [0u8; 32]; + let (n, addr) = receiver.recv_from(&mut recv_buf).await?; + assert_eq!(&recv_buf[..n], MSG); + assert_eq!(addr, sender.local_addr()?); + + Ok(()) +} + #[tokio::test] async fn send_to_peek_from_poll() -> std::io::Result<()> { let sender = UdpSocket::bind("127.0.0.1:0").await?; @@ -134,6 +173,92 @@ async fn send_to_peek_from_poll() -> std::io::Result<()> { Ok(()) } +#[tokio::test] +async fn peek_sender() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let sender_addr = sender.local_addr()?; + let receiver_addr = receiver.local_addr()?; + + let msg = b"Hello, world!"; + sender.send_to(msg, receiver_addr).await?; + + let peeked_sender = receiver.peek_sender().await?; + assert_eq!(peeked_sender, sender_addr); + + // Assert that `peek_sender()` returns the right sender but + // doesn't remove from the receive queue. + let mut recv_buf = [0u8; 32]; + let (read, received_sender) = receiver.recv_from(&mut recv_buf).await?; + + assert_eq!(&recv_buf[..read], msg); + assert_eq!(received_sender, peeked_sender); + + Ok(()) +} + +#[tokio::test] +async fn poll_peek_sender() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let sender_addr = sender.local_addr()?; + let receiver_addr = receiver.local_addr()?; + + let msg = b"Hello, world!"; + poll_fn(|cx| sender.poll_send_to(cx, msg, receiver_addr)).await?; + + let peeked_sender = poll_fn(|cx| receiver.poll_peek_sender(cx)).await?; + assert_eq!(peeked_sender, sender_addr); + + // Assert that `poll_peek_sender()` returns the right sender but + // doesn't remove from the receive queue. + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let received_sender = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?; + + assert_eq!(read.filled(), msg); + assert_eq!(received_sender, peeked_sender); + + Ok(()) +} + +#[tokio::test] +async fn try_peek_sender() -> std::io::Result<()> { + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let sender_addr = sender.local_addr()?; + let receiver_addr = receiver.local_addr()?; + + let msg = b"Hello, world!"; + sender.send_to(msg, receiver_addr).await?; + + let peeked_sender = loop { + match receiver.try_peek_sender() { + Ok(peeked_sender) => break peeked_sender, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + receiver.readable().await?; + } + Err(e) => return Err(e), + } + }; + + assert_eq!(peeked_sender, sender_addr); + + // Assert that `try_peek_sender()` returns the right sender but + // didn't remove from the receive queue. + let mut recv_buf = [0u8; 32]; + // We already peeked the sender so there must be data in the receive queue. + let (read, received_sender) = receiver.try_recv_from(&mut recv_buf).unwrap(); + + assert_eq!(&recv_buf[..read], msg); + assert_eq!(received_sender, peeked_sender); + + Ok(()) +} + #[tokio::test] async fn split() -> std::io::Result<()> { let socket = UdpSocket::bind("127.0.0.1:0").await?;