Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement full-duplex secret connection #938

Merged
merged 29 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
02a94f6
Implement thread-safe cloning of a secret connection
thanethomson Jul 23, 2021
31b8cad
Expand documentation for SecretConnection on threading considerations
thanethomson Jul 23, 2021
94c43c0
Extract peer construction into its own method
thanethomson Jul 23, 2021
8e7f694
Add test for cloned SecretConnection
thanethomson Jul 23, 2021
1409f91
Add more messages to test
thanethomson Jul 23, 2021
b161ba5
Expand comment for clarity
thanethomson Jul 23, 2021
b48fb23
Add .changelog entry
thanethomson Jul 23, 2021
0a90dc3
Restore half-duplex operations
thanethomson Jul 26, 2021
3c45316
Extract encrypt/decrypt fns as independent methods
thanethomson Jul 26, 2021
1d2f658
Remove unnecessary trait bounds
thanethomson Jul 26, 2021
4c60492
Extract send/receive state
thanethomson Jul 26, 2021
929eeb5
Extract read/write functionality as standalone methods
thanethomson Jul 26, 2021
465d627
Add logic to facilitate splitting SecretConnection into its sending a…
thanethomson Jul 26, 2021
34a7de5
Restore split SecretConnection test using new semantics
thanethomson Jul 26, 2021
b220c8f
Update changelog entry
thanethomson Jul 26, 2021
ba3b16c
Update docs for `SecretConnection`
thanethomson Jul 26, 2021
e65aeee
Condense error reporting
thanethomson Jul 26, 2021
1f6a706
Extract TryClone trait into its own crate
thanethomson Jul 27, 2021
8627925
Reorder imports
thanethomson Jul 27, 2021
6571b59
Assert validation regardless of debug build
thanethomson Jul 27, 2021
042b3f4
Remove remote_pubkey optionality from sender/receiver halves
thanethomson Jul 27, 2021
c5be1b4
Update SecretConnection docs with comment content
thanethomson Jul 27, 2021
9da6097
Fix doc link to TryClone trait
thanethomson Jul 27, 2021
2c9aafe
Fix doc link to TryClone trait
thanethomson Jul 27, 2021
4f32c17
Add docs on SecretConnection failures and connection integrity
thanethomson Jul 27, 2021
b8913c0
Synchronize sending/receiving failures to comply with crypto algorith…
thanethomson Jul 27, 2021
68ad278
Rename try_split method to split for SecretConnection
thanethomson Jul 28, 2021
a9c9b98
Remove redundant field name prefixes
thanethomson Jul 28, 2021
2d5ef8b
Fix broken link in docs
thanethomson Jul 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[tendermint-p2p]` The `SecretConnection` can now be cloned such that it can
be accessed from multiple threads simultaneously. The main use case for this
is to separate reading into one thread, and writing into another.
([#938](https://github.com/informalsystems/tendermint-rs/pull/938))
103 changes: 82 additions & 21 deletions p2p/src/secret_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub use self::{
protocol::Version,
public_key::PublicKey,
};
use crate::transport::TryClone;
use std::sync::{Arc, Mutex};

#[cfg(feature = "amino")]
mod amino_types;
Expand Down Expand Up @@ -218,16 +220,49 @@ impl Handshake<AwaitingAuthSig> {
}
}

struct SecretConnectionSendState {
send_nonce: Nonce,
}

impl Default for SecretConnectionSendState {
fn default() -> Self {
Self {
send_nonce: Nonce::default(),
}
}
}

struct SecretConnectionRecvState {
recv_nonce: Nonce,
recv_buffer: Vec<u8>,
}

impl Default for SecretConnectionRecvState {
fn default() -> Self {
Self {
recv_nonce: Nonce::default(),
recv_buffer: vec![],
}
}
}

/// Encrypted connection between peers in a Tendermint network.
pub struct SecretConnection<IoHandler: Read + Write + Send + Sync> {
///
/// Can be safely shared between multiple threads, however only one thread can
/// read from and one other thread can write to the secret connection
/// simultaneously. This is intended to allow for use from two threads
/// simultaneously: one thread for reading from the connection, and one thread
/// for simultaneously writing to it.
pub struct SecretConnection<IoHandler> {
io_handler: IoHandler,
protocol_version: Version,
recv_nonce: Nonce,
send_nonce: Nonce,
recv_cipher: ChaCha20Poly1305,
send_cipher: ChaCha20Poly1305,
remote_pubkey: Option<PublicKey>,
recv_buffer: Vec<u8>,
// The portion of the connection state that is mutated when sending.
send_state: Arc<Mutex<SecretConnectionSendState>>,
// The portion of the connection state that is mutated when receiving.
recv_state: Arc<Mutex<SecretConnectionRecvState>>,
}
thanethomson marked this conversation as resolved.
Show resolved Hide resolved

impl<IoHandler: Read + Write + Send + Sync> SecretConnection<IoHandler> {
Expand Down Expand Up @@ -262,12 +297,11 @@ impl<IoHandler: Read + Write + Send + Sync> SecretConnection<IoHandler> {
let mut sc = Self {
io_handler,
protocol_version,
recv_buffer: vec![],
recv_nonce: Nonce::default(),
send_nonce: Nonce::default(),
recv_cipher: h.state.recv_cipher.clone(),
send_cipher: h.state.send_cipher.clone(),
remote_pubkey: None,
send_state: Arc::new(Mutex::new(SecretConnectionSendState::default())),
recv_state: Arc::new(Mutex::new(SecretConnectionRecvState::default())),
};

// Share each other's pubkey & challenge signature.
Expand All @@ -291,6 +325,7 @@ impl<IoHandler: Read + Write + Send + Sync> SecretConnection<IoHandler> {
fn encrypt(
&self,
chunk: &[u8],
send_nonce: &Nonce,
sealed_frame: &mut [u8; TAG_SIZE + TOTAL_FRAME_SIZE],
) -> Result<()> {
debug_assert!(!chunk.is_empty(), "chunk is empty");
Expand All @@ -306,7 +341,7 @@ impl<IoHandler: Read + Write + Send + Sync> SecretConnection<IoHandler> {
let tag = self
.send_cipher
.encrypt_in_place_detached(
GenericArray::from_slice(self.send_nonce.to_bytes()),
GenericArray::from_slice(send_nonce.to_bytes()),
b"",
&mut sealed_frame[..TOTAL_FRAME_SIZE],
)
Expand All @@ -318,7 +353,7 @@ impl<IoHandler: Read + Write + Send + Sync> SecretConnection<IoHandler> {
}

/// Decrypt AEAD authenticated data
fn decrypt(&self, ciphertext: &[u8], out: &mut [u8]) -> Result<usize> {
fn decrypt(&self, ciphertext: &[u8], recv_nonce: &Nonce, out: &mut [u8]) -> Result<usize> {
if ciphertext.len() < TAG_SIZE {
return Err(Error::Crypto).wrap_err_with(|| {
format!(
Expand All @@ -340,7 +375,7 @@ impl<IoHandler: Read + Write + Send + Sync> SecretConnection<IoHandler> {

self.recv_cipher
.decrypt_in_place_detached(
GenericArray::from_slice(self.recv_nonce.to_bytes()),
GenericArray::from_slice(recv_nonce.to_bytes()),
b"",
in_out,
tag.into(),
Expand All @@ -357,18 +392,23 @@ where
{
// CONTRACT: data smaller than DATA_MAX_SIZE is read atomically.
fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
if !self.recv_buffer.is_empty() {
let n = cmp::min(data.len(), self.recv_buffer.len());
data.copy_from_slice(&self.recv_buffer[..n]);
let mut recv_state = self
.recv_state
.lock()
.expect("failed to obtain lock on secret connection recv state");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return error result here instead of panicking? Same for the other uses of .expect().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we (and should we be able to) recover from such an error?

My understanding is that, when Mutex::lock fails, it's due to a panic while the lock is held by another thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable. Perhaps we can add a comment here that we are panicking because a poisoned mutex is unrecoverable. This is mainly to distinguish it from the other uses of .expect in the same file, which IMO should be refactored to return Result::Err instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fortunately in the new iteration there's no need for mutexes any more 😁

if !recv_state.recv_buffer.is_empty() {
let n = cmp::min(data.len(), recv_state.recv_buffer.len());
data.copy_from_slice(&recv_state.recv_buffer[..n]);
let mut leftover_portion = vec![
0;
self.recv_buffer
recv_state
.recv_buffer
.len()
.checked_sub(n)
.expect("leftover calculation failed")
];
leftover_portion.clone_from_slice(&self.recv_buffer[n..]);
self.recv_buffer = leftover_portion;
leftover_portion.clone_from_slice(&recv_state.recv_buffer[n..]);
recv_state.recv_buffer = leftover_portion;

return Ok(n);
}
Expand All @@ -378,13 +418,13 @@ where

// decrypt the frame
let mut frame = [0_u8; TOTAL_FRAME_SIZE];
let res = self.decrypt(&sealed_frame, &mut frame);
let res = self.decrypt(&sealed_frame, &recv_state.recv_nonce, &mut frame);

if let Err(err) = res {
return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
}

self.recv_nonce.increment();
recv_state.recv_nonce.increment();
// end decryption

let chunk_length = u32::from_le_bytes(frame[..4].try_into().expect("chunk framing failed"));
Expand All @@ -406,7 +446,7 @@ where

let n = cmp::min(data.len(), chunk.len());
data[..n].copy_from_slice(&chunk[..n]);
self.recv_buffer.copy_from_slice(&chunk[n..]);
recv_state.recv_buffer.copy_from_slice(&chunk[n..]);

Ok(n)
}
Expand All @@ -419,6 +459,10 @@ where
// Writes encrypted frames of `TAG_SIZE` + `TOTAL_FRAME_SIZE`
// CONTRACT: data smaller than DATA_MAX_SIZE is read atomically.
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
let mut send_state = self
.send_state
.lock()
.expect("failed to obtain lock on secret connection write state");
let mut n = 0_usize;
let mut data_copy = data;
while !data_copy.is_empty() {
Expand All @@ -431,11 +475,11 @@ where
data_copy = &[0_u8; 0];
}
let sealed_frame = &mut [0_u8; TAG_SIZE + TOTAL_FRAME_SIZE];
let res = self.encrypt(chunk, sealed_frame);
let res = self.encrypt(chunk, &send_state.send_nonce, sealed_frame);
thanethomson marked this conversation as resolved.
Show resolved Hide resolved
if let Err(err) = res {
return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
}
self.send_nonce.increment();
send_state.send_nonce.increment();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have higher level abstraction to guarantee that the nonce is always incremented after being acquired? Or should it never be incremented at all if there is error? Would the state be inconsistent if the line above or below returns error prematurely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tony-iqlusion, what's your take on this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preventing nonce reuse is the most important thing as it fails catastrophically with ChaCha20Poly1305, but also if it's ever incremented without being used it will be out-of-sync with the peer and abort the connection that way.

So it's best to have a sort of simple state machine that aborts the whole connection on errors, and if that can wipe the nonce somehow that's best, but so long as you can prevent repeat nonce reuse that's probably the most important thing.

// end encryption

self.io_handler.write_all(&sealed_frame[..])?;
Expand All @@ -452,6 +496,23 @@ where
}
}

impl<IoHandler: TryClone> TryClone for SecretConnection<IoHandler> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the advantage of using this trait as compared to holding an Arc<IoHandler> and Clone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type Error = IoHandler::Error;

fn try_clone(&self) -> Result<Self, Self::Error> {
let io_handler_clone = self.io_handler.try_clone()?;
Ok(Self {
io_handler: io_handler_clone,
protocol_version: self.protocol_version,
recv_cipher: self.recv_cipher.clone(),
send_cipher: self.send_cipher.clone(),
remote_pubkey: self.remote_pubkey,
send_state: self.send_state.clone(),
recv_state: self.recv_state.clone(),
})
}
}

/// Returns `remote_eph_pubkey`
fn share_eph_pubkey<IoHandler: Read + Write + Send + Sync>(
handler: &mut IoHandler,
Expand Down
32 changes: 29 additions & 3 deletions p2p/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Abstractions that describe types which support the physical transport - i.e. connection
//! management - used in the p2p stack.

use std::net::{SocketAddr, ToSocketAddrs};
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};

use eyre::Result;

Expand All @@ -16,7 +16,7 @@ where
/// List of addresses to be communicated as publicly reachable to other nodes, which in turn
/// can use that to share with third parties.
///
/// TODO(xla): Dependning on where this information is going to be disseminated it might be
/// TODO(xla): Depending on where this information is going to be disseminated it might be
/// better placed in a higher-level protocol. What stands in opposition to that is the fact
/// that advertised addresses will be helpful for hole punching and other involved network
/// traversals.
Expand Down Expand Up @@ -140,6 +140,32 @@ where
///
/// # Errors
///
/// * If resource allocation fails for lack of priviliges or being not available.
/// * If resource allocation fails for lack of privileges or being not available.
fn bind(self, bind_info: BindInfo<A>) -> Result<(Self::Endpoint, Self::Incoming)>;
}

/// Types that can be cloned where success is not guaranteed can implement this
/// trait.
pub trait TryClone: Sized {
thanethomson marked this conversation as resolved.
Show resolved Hide resolved
/// The type of error that can be returned when an attempted clone
/// operation fails.
type Error;

/// Attempt to clone this instance.
///
/// # Errors
/// Can fail if the underlying instance cannot be cloned (e.g. the OS could
/// be out of file descriptors, or some low-level OS-specific error could
/// be produced).
fn try_clone(&self) -> Result<Self, Self::Error>;
}

impl TryClone for TcpStream {
type Error = std::io::Error;

fn try_clone(&self) -> Result<Self, Self::Error> {
// Uses the TcpStream struct's method. See
// https://doc.rust-lang.org/stable/book/ch19-03-advanced-traits.html#fully-qualified-syntax-for-disambiguation-calling-methods-with-the-same-name
self.try_clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler if we just use TcpStream::try_clone(self) here and not having to explain in the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't obvious to me whether the TcpStream's try_clone method was going to be called here or the TryClone trait's try_clone, so I thought I'd leave a comment here for posterity to explain it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, also, I tried explicitly calling TcpStream::try_clone(self) here and clippy complains that the better approach is to use Self::try_clone(self), which, to me, isn't nearly as clear as TcpStream::try_clone(self).

}
}
Loading