From 132f7afd1d8972a3c968a86e27262ae268f9ffae Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Mon, 5 Jun 2023 13:17:27 -0600 Subject: [PATCH] feat(s2n-quic-platform): add socket tasks for sync rings --- quic/s2n-quic-platform/src/socket.rs | 1 + quic/s2n-quic-platform/src/socket/task.rs | 9 ++ .../src/socket/task/events.rs | 135 ++++++++++++++++++ quic/s2n-quic-platform/src/socket/task/rx.rs | 97 +++++++++++++ quic/s2n-quic-platform/src/socket/task/tx.rs | 98 +++++++++++++ 5 files changed, 340 insertions(+) create mode 100644 quic/s2n-quic-platform/src/socket/task.rs create mode 100644 quic/s2n-quic-platform/src/socket/task/events.rs create mode 100644 quic/s2n-quic-platform/src/socket/task/rx.rs create mode 100644 quic/s2n-quic-platform/src/socket/task/tx.rs diff --git a/quic/s2n-quic-platform/src/socket.rs b/quic/s2n-quic-platform/src/socket.rs index 61e9060716..c2a02c3a57 100644 --- a/quic/s2n-quic-platform/src/socket.rs +++ b/quic/s2n-quic-platform/src/socket.rs @@ -9,6 +9,7 @@ pub mod mmsg; pub mod msg; pub mod ring; pub mod std; +pub mod task; cfg_if! { if #[cfg(s2n_quic_platform_socket_mmsg)] { diff --git a/quic/s2n-quic-platform/src/socket/task.rs b/quic/s2n-quic-platform/src/socket/task.rs new file mode 100644 index 0000000000..336c294ea7 --- /dev/null +++ b/quic/s2n-quic-platform/src/socket/task.rs @@ -0,0 +1,9 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +pub mod events; +pub mod rx; +pub mod tx; + +pub use rx::Receiver; +pub use tx::Sender; diff --git a/quic/s2n-quic-platform/src/socket/task/events.rs b/quic/s2n-quic-platform/src/socket/task/events.rs new file mode 100644 index 0000000000..c2bcfbe7aa --- /dev/null +++ b/quic/s2n-quic-platform/src/socket/task/events.rs @@ -0,0 +1,135 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Some of the functions in these impls are not used on non-unix systems +#![cfg_attr(not(unix), allow(dead_code))] + +use crate::features::Gso; +use core::ops::ControlFlow; + +#[derive(Debug)] +pub struct TxEvents { + count: usize, + is_blocked: bool, + #[cfg_attr(not(s2n_quic_platform_gso), allow(dead_code))] + gso: Gso, +} + +impl TxEvents { + #[inline] + pub fn new(gso: Gso) -> Self { + Self { + count: 0, + is_blocked: false, + gso, + } + } + + #[inline] + pub fn is_blocked(&self) -> bool { + self.is_blocked + } + + #[inline] + pub fn take_blocked(&mut self) -> bool { + core::mem::take(&mut self.is_blocked) + } + + #[inline] + pub fn blocked(&mut self) { + self.is_blocked = true; + } + + #[inline] + pub fn take_count(&mut self) -> usize { + core::mem::take(&mut self.count) + } +} + +impl crate::syscall::SocketEvents for TxEvents { + #[inline] + fn on_complete(&mut self, count: usize) -> ControlFlow<(), ()> { + self.count += count; + self.is_blocked = false; + ControlFlow::Continue(()) + } + + #[inline] + fn on_error(&mut self, error: ::std::io::Error) -> ControlFlow<(), ()> { + use std::io::ErrorKind::*; + + match error.kind() { + WouldBlock => { + self.is_blocked = true; + ControlFlow::Break(()) + } + Interrupted => ControlFlow::Break(()), + #[cfg(s2n_quic_platform_gso)] + _ if errno::errno().0 == libc::EIO => { + self.count += 1; + + self.gso.disable(); + + ControlFlow::Continue(()) + } + _ => { + self.count += 1; + ControlFlow::Continue(()) + } + } + } +} + +#[derive(Debug, Default)] +pub struct RxEvents { + count: usize, + is_blocked: bool, +} + +impl RxEvents { + #[inline] + pub fn is_blocked(&self) -> bool { + self.is_blocked + } + + #[inline] + pub fn take_blocked(&mut self) -> bool { + core::mem::take(&mut self.is_blocked) + } + + #[inline] + pub fn blocked(&mut self) { + self.is_blocked = true; + } + + #[inline] + pub fn take_count(&mut self) -> usize { + core::mem::take(&mut self.count) + } +} + +impl crate::syscall::SocketEvents for RxEvents { + #[inline] + fn on_complete(&mut self, count: usize) -> ControlFlow<(), ()> { + self.count += count; + self.is_blocked = false; + ControlFlow::Continue(()) + } + + #[inline] + fn on_error(&mut self, error: ::std::io::Error) -> ControlFlow<(), ()> { + use std::io::ErrorKind::*; + + match error.kind() { + WouldBlock => { + self.is_blocked = true; + ControlFlow::Break(()) + } + Interrupted => ControlFlow::Break(()), + _ => { + self.count += 1; + ControlFlow::Break(()) + } + } + } +} diff --git a/quic/s2n-quic-platform/src/socket/task/rx.rs b/quic/s2n-quic-platform/src/socket/task/rx.rs new file mode 100644 index 0000000000..fc5b6e249e --- /dev/null +++ b/quic/s2n-quic-platform/src/socket/task/rx.rs @@ -0,0 +1,97 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + message::Message, + socket::{ring::Producer, task::events}, +}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use futures::ready; + +pub trait Socket { + type Error; + + fn recv( + &mut self, + cx: &mut Context, + entries: &mut [T], + events: &mut events::RxEvents, + ) -> Result<(), Self::Error>; +} + +pub struct Receiver> { + ring: Producer, + rx: S, + pending: u32, +} + +impl Receiver +where + T: Message + Unpin, + S: Socket + Unpin, +{ + #[inline] + pub fn new(ring: Producer, rx: S) -> Self { + Self { + ring, + rx, + pending: 0, + } + } + + #[inline] + fn poll_ring(&mut self, watermark: u32, cx: &mut Context) -> Poll> { + loop { + let count = ready!(self.ring.poll_acquire(watermark, cx)); + + if count > self.pending { + return Some(self.pending as usize).into(); + } + + self.release(); + } + } + + #[inline] + fn release(&mut self) { + self.ring.release(self.pending); + self.pending = 0; + } +} + +impl Future for Receiver +where + T: Message + Unpin, + S: Socket + Unpin, +{ + type Output = Option; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + let mut events = events::RxEvents::default(); + + while !events.take_blocked() { + let pending = match ready!(this.poll_ring(u32::MAX, cx)) { + Some(entries) => entries, + None => return None.into(), + }; + + let entries = &mut this.ring.data()[pending..]; + + match this.rx.recv(cx, entries, &mut events) { + Ok(()) => this.pending += events.take_count() as u32, + Err(err) => return Some(err).into(), + } + } + + this.release(); + + Poll::Pending + } +} diff --git a/quic/s2n-quic-platform/src/socket/task/tx.rs b/quic/s2n-quic-platform/src/socket/task/tx.rs new file mode 100644 index 0000000000..bd97c277d5 --- /dev/null +++ b/quic/s2n-quic-platform/src/socket/task/tx.rs @@ -0,0 +1,98 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + features::Gso, + message::Message, + socket::{ring::Consumer, task::events}, +}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use futures::ready; + +pub trait Socket { + type Error; + + fn send( + &mut self, + cx: &mut Context, + entries: &mut [T], + events: &mut events::TxEvents, + ) -> Result<(), Self::Error>; +} + +pub struct Sender> { + ring: Consumer, + tx: S, + pending: u32, + events: events::TxEvents, +} + +impl Sender +where + T: Message + Unpin, + S: Socket + Unpin, +{ + #[inline] + pub fn new(ring: Consumer, tx: S, gso: Gso) -> Self { + Self { + ring, + tx, + pending: 0, + events: events::TxEvents::new(gso), + } + } + + #[inline] + fn poll_ring(&mut self, watermark: u32, cx: &mut Context) -> Poll> { + loop { + let count = ready!(self.ring.poll_acquire(watermark, cx)); + + if count > self.pending { + return Some(self.pending as usize).into(); + } + + self.release(); + } + } + + #[inline] + fn release(&mut self) { + self.ring.release(self.pending); + self.pending = 0; + } +} + +impl Future for Sender +where + T: Message + Unpin, + S: Socket + Unpin, +{ + type Output = Option; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + while !this.events.take_blocked() { + let pending = match ready!(this.poll_ring(u32::MAX, cx)) { + Some(entries) => entries, + None => return None.into(), + }; + + let entries = &mut this.ring.data()[pending..]; + + match this.tx.send(cx, entries, &mut this.events) { + Ok(()) => this.pending += this.events.take_count() as u32, + Err(err) => return Some(err).into(), + } + } + + this.release(); + + Poll::Pending + } +}