Skip to content

Commit

Permalink
refactor: address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
morenol committed May 13, 2021
1 parent c1e515f commit 3579342
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 125 deletions.
172 changes: 98 additions & 74 deletions src/protocol/fluvio-protocol-codec/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio_util::codec::Encoder;

use crate::core::Decoder as FluvioDecoder;
use crate::core::Encoder as FluvioEncoder;
use crate::core::bytes::{BytesMut, BufMut};
use crate::core::bytes::BytesMut;

/// Implement Kafka codec as in https://kafka.apache.org/protocol#The_Messages_ListOffsets
/// First 4 bytes are size of the message. Then total buffer = 4 + message content
Expand Down Expand Up @@ -67,13 +67,24 @@ impl Decoder for FluvioCodec {
}

/// Implement encoder for Kafka Codec
/// This is straight pass thru, actual encoding is done in [`FluvioEncoder::as_kafka_frame_bytes`]
impl<T: FluvioEncoder> Encoder<T> for FluvioCodec {
type Error = IoError;

fn encode(&mut self, src: T, buf: &mut BytesMut) -> Result<(), IoError> {
let data = src.as_kafka_frame_bytes(0)?;
buf.put(data);
let size = src.write_size(0) as i32;
trace!("encoding data with {} bytes.", size);
buf.reserve(4 + size as usize);

if !T::RAW_DATA {
// Appending 4 bytes at the beginning to indicate
// message size. Types with RAW_DATA = true
// are assumed to be already encoded.
let mut len_slice = Vec::new();
size.encode(&mut len_slice, 0)?;
buf.extend_from_slice(&len_slice);
}
buf.extend_from_slice(&src.as_bytes(0)?);

Ok(())
}
}
Expand All @@ -85,7 +96,6 @@ mod test {
use std::net::SocketAddr;
use std::time;

use bytes::Bytes;
use futures::future::join;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
Expand All @@ -96,35 +106,56 @@ mod test {
use fluvio_future::net::TcpStream;
use fluvio_future::timer::sleep;
use fluvio_future::test_async;
use futures::AsyncWriteExt;
use fluvio_protocol::Decoder as FluvioDecoder;
use fluvio_protocol::Encoder as FluvioEncoder;
use log::debug;

use super::FluvioCodec;

async fn run_server_object(data: Vec<u8>, addr: &SocketAddr) -> Result<(), Error> {
async fn run_server_raw_data<T: FluvioEncoder>(
data: T,
addr: &SocketAddr,
) -> Result<(), Error> {
debug!("server: binding");
let listener = TcpListener::bind(&addr).await.expect("bind");
debug!("server: successfully binding. waiting for incoming");
let mut incoming = listener.incoming();
if let Some(stream) = incoming.next().await {
debug!("server: got connection from client");
let tcp_stream = stream.expect("stream");

let framed = Framed::new(tcp_stream.compat(), FluvioCodec {});
let (mut sink, _) = framed.split();

// send 3 times in order
for _ in 0..3u16 {
sink.send(data.clone()).await.expect("sending");
}
let mut tcp_stream = stream.expect("stream");

// write message_size since we are not using the encoder
let mut len_buf = vec![];
let message_size = data.write_size(0) as i32;
message_size.encode(&mut len_buf, 0).expect("encoding len");
tcp_stream.write(&len_buf).await?;

let encoded_data = data.as_bytes(0).expect("encoding data");
tcp_stream.write(&encoded_data).await?;

// Now trying partial send:
// write message_size since we are not using the encoder
let mut len_buf = vec![];
let message_size = data.write_size(0) as i32;
message_size.encode(&mut len_buf, 0).expect("encoding len");
tcp_stream.write(&len_buf).await?;

let mut encoded_data = data.as_bytes(0).expect("encoding data");
let buf2 = encoded_data.split_off(3);
tcp_stream.write(&encoded_data).await?;
fluvio_future::timer::sleep(time::Duration::from_millis(10)).await;
tcp_stream.write(&buf2).await?;
}
fluvio_future::timer::sleep(time::Duration::from_millis(50)).await;
debug!("finishing. terminating server");
Ok(())
}

async fn run_server_raw_data(data: Vec<u8>, addr: &SocketAddr) -> Result<(), Error> {
async fn run_server_object<T: FluvioEncoder + Clone>(
data: T,
addr: &SocketAddr,
) -> Result<(), Error> {
debug!("server: binding");
let listener = TcpListener::bind(&addr).await.expect("bind");
debug!("server: successfully binding. waiting for incoming");
Expand All @@ -137,75 +168,40 @@ mod test {
let (mut sink, _) = framed.split();

// send 2 times in order
for _ in 0..2u16 {
// debug!("server encoding original vector with len: {}", data.len());
let mut buf = vec![];
data.encode(&mut buf, 0)?;
debug!(
"server: writing to client vector encoded len: {}",
buf.len()
);
assert_eq!(buf.len(), 9); // 4(array len)+ 5 bytes

// write buffer length since when the encoder is working
// over raw data, the length has to be send.
let mut len_buf = vec![];
let len = buf.len() as i32;
len.encode(&mut len_buf, 0).expect("encoding");
sink.send(Bytes::from(len_buf)).await.expect("sending");

sink.send(Bytes::from(buf)).await.expect("sending");
}

// last one, we send split send, to test partial send
{
let mut buf = vec![];
data.encode(&mut buf, 0)?;
debug!(
"server: writing to client vector encoded len: {}",
buf.len()
);
assert_eq!(buf.len(), 9); // 4(array len)+ 5 bytes

// write buffer length since when the encoder is working
// over raw data, the length has to be send.
let mut len_buf = vec![];
let len = buf.len() as i32;
len.encode(&mut len_buf, 0).expect("encoding");
sink.send(Bytes::from(len_buf)).await.expect("sending");

// split buf into two segments, decode should reassembly them
let buf2 = buf.split_off(5);
sink.send(Bytes::from(buf)).await.expect("sending");
fluvio_future::timer::sleep(time::Duration::from_millis(10)).await;
sink.send(Bytes::from(buf2)).await.expect("sending");
for _ in 0..2_u8 {
sink.send(data.clone()).await.expect("sending");
}
}
fluvio_future::timer::sleep(time::Duration::from_millis(50)).await;
debug!("finishing. terminating server");
Ok(())
}

async fn run_client(data: Vec<u8>, addr: &SocketAddr) -> Result<(), Error> {
async fn run_client<
T: PartialEq + std::fmt::Debug + Default + FluvioDecoder + FluvioEncoder,
>(
data: T,
addr: &SocketAddr,
) -> Result<(), Error> {
debug!("client: sleep to give server chance to come up");
sleep(time::Duration::from_millis(100)).await;
debug!("client: trying to connect");
let tcp_stream = TcpStream::connect(&addr).await.expect("connect");
debug!("client: got connection. waiting");
let framed = Framed::new(tcp_stream.compat(), FluvioCodec {});
let (_, mut stream) = framed.split::<Bytes>();
for _ in 0..3u16 {
let (_, mut stream) = framed.split::<T>();
for _ in 0..2u16 {
if let Some(value) = stream.next().await {
debug!("client :received first value from server");
let mut bytes = value.expect("bytes");
debug!("client: received bytes len: {}", bytes.len());
assert_eq!(bytes.len(), 9, "total bytes is 9");
let mut decoded_values: Vec<u8> = vec![];
decoded_values
let bytes_len = bytes.len();
debug!("client: received bytes len: {}", bytes_len);
let mut decoded_value = T::default();
decoded_value
.decode(&mut bytes, 0)
.expect("vector decoding failed");
assert_eq!(decoded_values.len(), 5);
assert_eq!(decoded_values, data);
.expect("decoding failed");
assert_eq!(bytes_len, decoded_value.write_size(0));
assert_eq!(decoded_value, data);
debug!("all test pass");
} else {
panic!("no first value received");
Expand All @@ -217,14 +213,28 @@ mod test {
}

#[test_async]
async fn test_async_tcp_raw_data() -> Result<(), Error> {
async fn test_async_tcp_vec() -> Result<(), Error> {
debug!("start running test");

let addr = "127.0.0.1:11222".parse::<SocketAddr>().expect("parse");
let addr = "127.0.0.1:11223".parse::<SocketAddr>().expect("parse");
let data: Vec<u8> = vec![0x1, 0x02, 0x03, 0x04, 0x5];

let server_ft = run_server_raw_data(data.clone(), &addr);
let server_ft = run_server_object(data.clone(), &addr);
let client_ft = run_client(data, &addr);

let _rt = join(client_ft, server_ft).await;

Ok(())
}

#[test_async]
async fn test_async_tcp_string() -> Result<(), Error> {
debug!("start running test");

let addr = "127.0.0.1:11224".parse::<SocketAddr>().expect("parse");
let data: String = String::from("hello");

let server_ft = run_server_object(data.clone(), &addr);
let client_ft = run_client(data, &addr);

let _rt = join(client_ft, server_ft).await;
Expand All @@ -233,14 +243,28 @@ mod test {
}

#[test_async]
async fn test_async_tcp_object() -> Result<(), Error> {
async fn test_async_tcp_i32() -> Result<(), Error> {
debug!("start running test");

let addr = "127.0.0.1:11223".parse::<SocketAddr>().expect("parse");
let data: Vec<u8> = vec![0x1, 0x02, 0x03, 0x04, 0x5];
let addr = "127.0.0.1:11225".parse::<SocketAddr>().expect("parse");
let data: i32 = 1000;

let server_ft = run_server_object(data.clone(), &addr);
let client_ft = run_client(data, &addr);

let _rt = join(client_ft, server_ft).await;

Ok(())
}

#[test_async]
async fn test_async_tcp_raw_data() -> Result<(), Error> {
debug!("start running test");

let addr = "127.0.0.1:11226".parse::<SocketAddr>().expect("parse");
let data: String = String::from("Raw text");

let server_ft = run_server_raw_data(data.clone(), &addr);
let client_ft = run_client(data, &addr);

let _rt = join(client_ft, server_ft).await;
Expand Down
79 changes: 39 additions & 40 deletions src/protocol/fluvio-protocol-core/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::varint::variant_size;

// trait for encoding and decoding using Kafka Protocol
pub trait Encoder {
const RAW_DATA: bool = false;
/// size of this object in bytes
fn write_size(&self, version: Version) -> usize;

Expand All @@ -33,20 +34,6 @@ pub trait Encoder {
buf.put_slice(&out);
Ok(buf.freeze())
}

/// Return a [`bytes::Bytes`] representation of the object that follows the
/// [Kafka Protocol](https://kafka.apache.org/protocol#The_Messages_ListOffsets).
/// First 4 bytes are size of the message. Then total buffer = 4 + message content
fn as_kafka_frame_bytes(&self, version: Version) -> Result<Bytes, Error> {
trace!("encoding as kafka frame");
let mut out = vec![];
let size = self.write_size(version) as i32;
let mut buf = BytesMut::with_capacity(4 + size as usize);
size.encode(&mut out, version)?;
self.encode(&mut out, version)?;
buf.put_slice(&out);
Ok(buf.freeze())
}
}

pub trait EncoderVarInt {
Expand Down Expand Up @@ -161,32 +148,6 @@ where
}
}

/// Ignore encoding for raw bytes.
impl Encoder for Bytes {
fn write_size(&self, _version: Version) -> usize {
self.len()
}

fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
where
T: BufMut,
{
if dest.remaining_mut() < self.len() {
return Err(Error::new(
ErrorKind::UnexpectedEof,
"not enough capacity for raw bytes",
));
}
dest.put_slice(&self);
Ok(())
}

// Do not append length information to raw data.
fn as_kafka_frame_bytes(&self, version: Version) -> Result<Bytes, Error> {
self.as_bytes(version)
}
}

impl Encoder for bool {
fn write_size(&self, _version: Version) -> usize {
1
Expand Down Expand Up @@ -404,6 +365,44 @@ impl Encoder for String {
}
}

// Asume Bytes are raw data
impl Encoder for Bytes {
const RAW_DATA: bool = true;
fn write_size(&self, _version: Version) -> usize {
self.len()
}

fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
where
T: BufMut,
{
if dest.remaining_mut() < self.len() {
return Err(Error::new(
ErrorKind::UnexpectedEof,
"not enough capacity for raw bytes",
));
}
dest.put_slice(&self);
Ok(())
}
}

impl<M> Encoder for &M
where
M: Encoder,
{
fn write_size(&self, version: Version) -> usize {
(*self).write_size(version)
}

fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
where
T: BufMut,
{
(*self).encode(dest, version)
}
}

#[cfg(test)]
mod test {

Expand Down
Loading

0 comments on commit 3579342

Please sign in to comment.