Skip to content

Commit

Permalink
feat(s2n-quic-platform): add socket tasks for sync rings
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Jun 8, 2023
1 parent 901069f commit 132f7af
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 0 deletions.
1 change: 1 addition & 0 deletions quic/s2n-quic-platform/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod mmsg;
pub mod msg;
pub mod ring;
pub mod std;
pub mod task;

cfg_if! {
if #[cfg(s2n_quic_platform_socket_mmsg)] {
Expand Down
9 changes: 9 additions & 0 deletions quic/s2n-quic-platform/src/socket/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

pub mod events;
pub mod rx;
pub mod tx;

pub use rx::Receiver;
pub use tx::Sender;
135 changes: 135 additions & 0 deletions quic/s2n-quic-platform/src/socket/task/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// Some of the functions in these impls are not used on non-unix systems
#![cfg_attr(not(unix), allow(dead_code))]

use crate::features::Gso;
use core::ops::ControlFlow;

#[derive(Debug)]
pub struct TxEvents {
count: usize,
is_blocked: bool,
#[cfg_attr(not(s2n_quic_platform_gso), allow(dead_code))]
gso: Gso,
}

impl TxEvents {
#[inline]
pub fn new(gso: Gso) -> Self {
Self {
count: 0,
is_blocked: false,
gso,
}
}

#[inline]
pub fn is_blocked(&self) -> bool {
self.is_blocked
}

#[inline]
pub fn take_blocked(&mut self) -> bool {
core::mem::take(&mut self.is_blocked)
}

#[inline]
pub fn blocked(&mut self) {
self.is_blocked = true;
}

#[inline]
pub fn take_count(&mut self) -> usize {
core::mem::take(&mut self.count)
}
}

impl crate::syscall::SocketEvents for TxEvents {
#[inline]
fn on_complete(&mut self, count: usize) -> ControlFlow<(), ()> {
self.count += count;
self.is_blocked = false;
ControlFlow::Continue(())
}

#[inline]
fn on_error(&mut self, error: ::std::io::Error) -> ControlFlow<(), ()> {
use std::io::ErrorKind::*;

match error.kind() {
WouldBlock => {
self.is_blocked = true;
ControlFlow::Break(())
}
Interrupted => ControlFlow::Break(()),
#[cfg(s2n_quic_platform_gso)]
_ if errno::errno().0 == libc::EIO => {
self.count += 1;

self.gso.disable();

ControlFlow::Continue(())
}
_ => {
self.count += 1;
ControlFlow::Continue(())
}
}
}
}

#[derive(Debug, Default)]
pub struct RxEvents {
count: usize,
is_blocked: bool,
}

impl RxEvents {
#[inline]
pub fn is_blocked(&self) -> bool {
self.is_blocked
}

#[inline]
pub fn take_blocked(&mut self) -> bool {
core::mem::take(&mut self.is_blocked)
}

#[inline]
pub fn blocked(&mut self) {
self.is_blocked = true;
}

#[inline]
pub fn take_count(&mut self) -> usize {
core::mem::take(&mut self.count)
}
}

impl crate::syscall::SocketEvents for RxEvents {
#[inline]
fn on_complete(&mut self, count: usize) -> ControlFlow<(), ()> {
self.count += count;
self.is_blocked = false;
ControlFlow::Continue(())
}

#[inline]
fn on_error(&mut self, error: ::std::io::Error) -> ControlFlow<(), ()> {
use std::io::ErrorKind::*;

match error.kind() {
WouldBlock => {
self.is_blocked = true;
ControlFlow::Break(())
}
Interrupted => ControlFlow::Break(()),
_ => {
self.count += 1;
ControlFlow::Break(())
}
}
}
}
97 changes: 97 additions & 0 deletions quic/s2n-quic-platform/src/socket/task/rx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
message::Message,
socket::{ring::Producer, task::events},
};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::ready;

pub trait Socket<T: Message> {
type Error;

fn recv(
&mut self,
cx: &mut Context,
entries: &mut [T],
events: &mut events::RxEvents,
) -> Result<(), Self::Error>;
}

pub struct Receiver<T: Message, S: Socket<T>> {
ring: Producer<T>,
rx: S,
pending: u32,
}

impl<T, S> Receiver<T, S>
where
T: Message + Unpin,
S: Socket<T> + Unpin,
{
#[inline]
pub fn new(ring: Producer<T>, rx: S) -> Self {
Self {
ring,
rx,
pending: 0,
}
}

#[inline]
fn poll_ring(&mut self, watermark: u32, cx: &mut Context) -> Poll<Option<usize>> {
loop {
let count = ready!(self.ring.poll_acquire(watermark, cx));

if count > self.pending {
return Some(self.pending as usize).into();
}

self.release();
}
}

#[inline]
fn release(&mut self) {
self.ring.release(self.pending);
self.pending = 0;
}
}

impl<T, S> Future for Receiver<T, S>
where
T: Message + Unpin,
S: Socket<T> + Unpin,
{
type Output = Option<S::Error>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();

let mut events = events::RxEvents::default();

while !events.take_blocked() {
let pending = match ready!(this.poll_ring(u32::MAX, cx)) {
Some(entries) => entries,
None => return None.into(),
};

let entries = &mut this.ring.data()[pending..];

match this.rx.recv(cx, entries, &mut events) {
Ok(()) => this.pending += events.take_count() as u32,
Err(err) => return Some(err).into(),
}
}

this.release();

Poll::Pending
}
}
98 changes: 98 additions & 0 deletions quic/s2n-quic-platform/src/socket/task/tx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
features::Gso,
message::Message,
socket::{ring::Consumer, task::events},
};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::ready;

pub trait Socket<T: Message> {
type Error;

fn send(
&mut self,
cx: &mut Context,
entries: &mut [T],
events: &mut events::TxEvents,
) -> Result<(), Self::Error>;
}

pub struct Sender<T: Message, S: Socket<T>> {
ring: Consumer<T>,
tx: S,
pending: u32,
events: events::TxEvents,
}

impl<T, S> Sender<T, S>
where
T: Message + Unpin,
S: Socket<T> + Unpin,
{
#[inline]
pub fn new(ring: Consumer<T>, tx: S, gso: Gso) -> Self {
Self {
ring,
tx,
pending: 0,
events: events::TxEvents::new(gso),
}
}

#[inline]
fn poll_ring(&mut self, watermark: u32, cx: &mut Context) -> Poll<Option<usize>> {
loop {
let count = ready!(self.ring.poll_acquire(watermark, cx));

if count > self.pending {
return Some(self.pending as usize).into();
}

self.release();
}
}

#[inline]
fn release(&mut self) {
self.ring.release(self.pending);
self.pending = 0;
}
}

impl<T, S> Future for Sender<T, S>
where
T: Message + Unpin,
S: Socket<T> + Unpin,
{
type Output = Option<S::Error>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();

while !this.events.take_blocked() {
let pending = match ready!(this.poll_ring(u32::MAX, cx)) {
Some(entries) => entries,
None => return None.into(),
};

let entries = &mut this.ring.data()[pending..];

match this.tx.send(cx, entries, &mut this.events) {
Ok(()) => this.pending += this.events.take_count() as u32,
Err(err) => return Some(err).into(),
}
}

this.release();

Poll::Pending
}
}

0 comments on commit 132f7af

Please sign in to comment.