diff --git a/src/protocol/fluvio-protocol-codec/src/codec.rs b/src/protocol/fluvio-protocol-codec/src/codec.rs index 10ff55b327d..3017282d6fe 100644 --- a/src/protocol/fluvio-protocol-codec/src/codec.rs +++ b/src/protocol/fluvio-protocol-codec/src/codec.rs @@ -7,7 +7,7 @@ use tokio_util::codec::Encoder; use crate::core::Decoder as FluvioDecoder; use crate::core::Encoder as FluvioEncoder; -use crate::core::bytes::{BytesMut, BufMut}; +use crate::core::bytes::BytesMut; /// Implement Kafka codec as in https://kafka.apache.org/protocol#The_Messages_ListOffsets /// First 4 bytes are size of the message. Then total buffer = 4 + message content @@ -67,13 +67,24 @@ impl Decoder for FluvioCodec { } /// Implement encoder for Kafka Codec -/// This is straight pass thru, actual encoding is done in [`FluvioEncoder::as_kafka_frame_bytes`] impl Encoder for FluvioCodec { type Error = IoError; fn encode(&mut self, src: T, buf: &mut BytesMut) -> Result<(), IoError> { - let data = src.as_kafka_frame_bytes(0)?; - buf.put(data); + let size = src.write_size(0) as i32; + trace!("encoding data with {} bytes.", size); + buf.reserve(4 + size as usize); + + if !T::RAW_DATA { + // Appending 4 bytes at the beginning to indicate + // message size. Types with RAW_DATA = true + // are assumed to be already encoded. + let mut len_slice = Vec::new(); + size.encode(&mut len_slice, 0)?; + buf.extend_from_slice(&len_slice); + } + buf.extend_from_slice(&src.as_bytes(0)?); + Ok(()) } } @@ -85,7 +96,6 @@ mod test { use std::net::SocketAddr; use std::time; - use bytes::Bytes; use futures::future::join; use futures::sink::SinkExt; use futures::stream::StreamExt; @@ -96,35 +106,56 @@ mod test { use fluvio_future::net::TcpStream; use fluvio_future::timer::sleep; use fluvio_future::test_async; + use futures::AsyncWriteExt; use fluvio_protocol::Decoder as FluvioDecoder; use fluvio_protocol::Encoder as FluvioEncoder; use log::debug; use super::FluvioCodec; - async fn run_server_object(data: Vec, addr: &SocketAddr) -> Result<(), Error> { + async fn run_server_raw_data( + data: T, + addr: &SocketAddr, + ) -> Result<(), Error> { debug!("server: binding"); let listener = TcpListener::bind(&addr).await.expect("bind"); debug!("server: successfully binding. waiting for incoming"); let mut incoming = listener.incoming(); if let Some(stream) = incoming.next().await { debug!("server: got connection from client"); - let tcp_stream = stream.expect("stream"); - - let framed = Framed::new(tcp_stream.compat(), FluvioCodec {}); - let (mut sink, _) = framed.split(); - - // send 3 times in order - for _ in 0..3u16 { - sink.send(data.clone()).await.expect("sending"); - } + let mut tcp_stream = stream.expect("stream"); + + // write message_size since we are not using the encoder + let mut len_buf = vec![]; + let message_size = data.write_size(0) as i32; + message_size.encode(&mut len_buf, 0).expect("encoding len"); + tcp_stream.write(&len_buf).await?; + + let encoded_data = data.as_bytes(0).expect("encoding data"); + tcp_stream.write(&encoded_data).await?; + + // Now trying partial send: + // write message_size since we are not using the encoder + let mut len_buf = vec![]; + let message_size = data.write_size(0) as i32; + message_size.encode(&mut len_buf, 0).expect("encoding len"); + tcp_stream.write(&len_buf).await?; + + let mut encoded_data = data.as_bytes(0).expect("encoding data"); + let buf2 = encoded_data.split_off(3); + tcp_stream.write(&encoded_data).await?; + fluvio_future::timer::sleep(time::Duration::from_millis(10)).await; + tcp_stream.write(&buf2).await?; } fluvio_future::timer::sleep(time::Duration::from_millis(50)).await; debug!("finishing. terminating server"); Ok(()) } - async fn run_server_raw_data(data: Vec, addr: &SocketAddr) -> Result<(), Error> { + async fn run_server_object( + data: T, + addr: &SocketAddr, + ) -> Result<(), Error> { debug!("server: binding"); let listener = TcpListener::bind(&addr).await.expect("bind"); debug!("server: successfully binding. waiting for incoming"); @@ -137,48 +168,8 @@ mod test { let (mut sink, _) = framed.split(); // send 2 times in order - for _ in 0..2u16 { - // debug!("server encoding original vector with len: {}", data.len()); - let mut buf = vec![]; - data.encode(&mut buf, 0)?; - debug!( - "server: writing to client vector encoded len: {}", - buf.len() - ); - assert_eq!(buf.len(), 9); // 4(array len)+ 5 bytes - - // write buffer length since when the encoder is working - // over raw data, the length has to be send. - let mut len_buf = vec![]; - let len = buf.len() as i32; - len.encode(&mut len_buf, 0).expect("encoding"); - sink.send(Bytes::from(len_buf)).await.expect("sending"); - - sink.send(Bytes::from(buf)).await.expect("sending"); - } - - // last one, we send split send, to test partial send - { - let mut buf = vec![]; - data.encode(&mut buf, 0)?; - debug!( - "server: writing to client vector encoded len: {}", - buf.len() - ); - assert_eq!(buf.len(), 9); // 4(array len)+ 5 bytes - - // write buffer length since when the encoder is working - // over raw data, the length has to be send. - let mut len_buf = vec![]; - let len = buf.len() as i32; - len.encode(&mut len_buf, 0).expect("encoding"); - sink.send(Bytes::from(len_buf)).await.expect("sending"); - - // split buf into two segments, decode should reassembly them - let buf2 = buf.split_off(5); - sink.send(Bytes::from(buf)).await.expect("sending"); - fluvio_future::timer::sleep(time::Duration::from_millis(10)).await; - sink.send(Bytes::from(buf2)).await.expect("sending"); + for _ in 0..2_u8 { + sink.send(data.clone()).await.expect("sending"); } } fluvio_future::timer::sleep(time::Duration::from_millis(50)).await; @@ -186,26 +177,31 @@ mod test { Ok(()) } - async fn run_client(data: Vec, addr: &SocketAddr) -> Result<(), Error> { + async fn run_client< + T: PartialEq + std::fmt::Debug + Default + FluvioDecoder + FluvioEncoder, + >( + data: T, + addr: &SocketAddr, + ) -> Result<(), Error> { debug!("client: sleep to give server chance to come up"); sleep(time::Duration::from_millis(100)).await; debug!("client: trying to connect"); let tcp_stream = TcpStream::connect(&addr).await.expect("connect"); debug!("client: got connection. waiting"); let framed = Framed::new(tcp_stream.compat(), FluvioCodec {}); - let (_, mut stream) = framed.split::(); - for _ in 0..3u16 { + let (_, mut stream) = framed.split::(); + for _ in 0..2u16 { if let Some(value) = stream.next().await { debug!("client :received first value from server"); let mut bytes = value.expect("bytes"); - debug!("client: received bytes len: {}", bytes.len()); - assert_eq!(bytes.len(), 9, "total bytes is 9"); - let mut decoded_values: Vec = vec![]; - decoded_values + let bytes_len = bytes.len(); + debug!("client: received bytes len: {}", bytes_len); + let mut decoded_value = T::default(); + decoded_value .decode(&mut bytes, 0) - .expect("vector decoding failed"); - assert_eq!(decoded_values.len(), 5); - assert_eq!(decoded_values, data); + .expect("decoding failed"); + assert_eq!(bytes_len, decoded_value.write_size(0)); + assert_eq!(decoded_value, data); debug!("all test pass"); } else { panic!("no first value received"); @@ -217,14 +213,28 @@ mod test { } #[test_async] - async fn test_async_tcp_raw_data() -> Result<(), Error> { + async fn test_async_tcp_vec() -> Result<(), Error> { debug!("start running test"); - let addr = "127.0.0.1:11222".parse::().expect("parse"); + let addr = "127.0.0.1:11223".parse::().expect("parse"); let data: Vec = vec![0x1, 0x02, 0x03, 0x04, 0x5]; - let server_ft = run_server_raw_data(data.clone(), &addr); + let server_ft = run_server_object(data.clone(), &addr); + let client_ft = run_client(data, &addr); + + let _rt = join(client_ft, server_ft).await; + Ok(()) + } + + #[test_async] + async fn test_async_tcp_string() -> Result<(), Error> { + debug!("start running test"); + + let addr = "127.0.0.1:11224".parse::().expect("parse"); + let data: String = String::from("hello"); + + let server_ft = run_server_object(data.clone(), &addr); let client_ft = run_client(data, &addr); let _rt = join(client_ft, server_ft).await; @@ -233,14 +243,28 @@ mod test { } #[test_async] - async fn test_async_tcp_object() -> Result<(), Error> { + async fn test_async_tcp_i32() -> Result<(), Error> { debug!("start running test"); - let addr = "127.0.0.1:11223".parse::().expect("parse"); - let data: Vec = vec![0x1, 0x02, 0x03, 0x04, 0x5]; + let addr = "127.0.0.1:11225".parse::().expect("parse"); + let data: i32 = 1000; let server_ft = run_server_object(data.clone(), &addr); + let client_ft = run_client(data, &addr); + let _rt = join(client_ft, server_ft).await; + + Ok(()) + } + + #[test_async] + async fn test_async_tcp_raw_data() -> Result<(), Error> { + debug!("start running test"); + + let addr = "127.0.0.1:11226".parse::().expect("parse"); + let data: String = String::from("Raw text"); + + let server_ft = run_server_raw_data(data.clone(), &addr); let client_ft = run_client(data, &addr); let _rt = join(client_ft, server_ft).await; diff --git a/src/protocol/fluvio-protocol-core/src/encoder.rs b/src/protocol/fluvio-protocol-core/src/encoder.rs index d98fcbc9bcc..50af568a122 100644 --- a/src/protocol/fluvio-protocol-core/src/encoder.rs +++ b/src/protocol/fluvio-protocol-core/src/encoder.rs @@ -17,6 +17,7 @@ use super::varint::variant_size; // trait for encoding and decoding using Kafka Protocol pub trait Encoder { + const RAW_DATA: bool = false; /// size of this object in bytes fn write_size(&self, version: Version) -> usize; @@ -33,20 +34,6 @@ pub trait Encoder { buf.put_slice(&out); Ok(buf.freeze()) } - - /// Return a [`bytes::Bytes`] representation of the object that follows the - /// [Kafka Protocol](https://kafka.apache.org/protocol#The_Messages_ListOffsets). - /// First 4 bytes are size of the message. Then total buffer = 4 + message content - fn as_kafka_frame_bytes(&self, version: Version) -> Result { - trace!("encoding as kafka frame"); - let mut out = vec![]; - let size = self.write_size(version) as i32; - let mut buf = BytesMut::with_capacity(4 + size as usize); - size.encode(&mut out, version)?; - self.encode(&mut out, version)?; - buf.put_slice(&out); - Ok(buf.freeze()) - } } pub trait EncoderVarInt { @@ -161,32 +148,6 @@ where } } -/// Ignore encoding for raw bytes. -impl Encoder for Bytes { - fn write_size(&self, _version: Version) -> usize { - self.len() - } - - fn encode(&self, dest: &mut T, _version: Version) -> Result<(), Error> - where - T: BufMut, - { - if dest.remaining_mut() < self.len() { - return Err(Error::new( - ErrorKind::UnexpectedEof, - "not enough capacity for raw bytes", - )); - } - dest.put_slice(&self); - Ok(()) - } - - // Do not append length information to raw data. - fn as_kafka_frame_bytes(&self, version: Version) -> Result { - self.as_bytes(version) - } -} - impl Encoder for bool { fn write_size(&self, _version: Version) -> usize { 1 @@ -404,6 +365,44 @@ impl Encoder for String { } } +// Asume Bytes are raw data +impl Encoder for Bytes { + const RAW_DATA: bool = true; + fn write_size(&self, _version: Version) -> usize { + self.len() + } + + fn encode(&self, dest: &mut T, _version: Version) -> Result<(), Error> + where + T: BufMut, + { + if dest.remaining_mut() < self.len() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough capacity for raw bytes", + )); + } + dest.put_slice(&self); + Ok(()) + } +} + +impl Encoder for &M +where + M: Encoder, +{ + fn write_size(&self, version: Version) -> usize { + (*self).write_size(version) + } + + fn encode(&self, dest: &mut T, version: Version) -> Result<(), Error> + where + T: BufMut, + { + (*self).encode(dest, version) + } +} + #[cfg(test)] mod test { diff --git a/src/socket/src/sink.rs b/src/socket/src/sink.rs index cce9d2c43f5..c5e551ec383 100644 --- a/src/socket/src/sink.rs +++ b/src/socket/src/sink.rs @@ -71,13 +71,8 @@ impl FluvioSink { where RequestMessage: FlvEncoder + Default + Debug, { - let bytes = req_msg.as_kafka_frame_bytes(0)?; - trace!( - "sending one way request: {:#?}, bytes: {}", - &req_msg, - bytes.len() - ); - (&mut self.inner).send(bytes).await?; + trace!("sending one way request: {:#?}", &req_msg,); + (&mut self.inner).send(req_msg).await?; Ok(()) } @@ -85,14 +80,13 @@ impl FluvioSink { pub async fn send_response

( &mut self, resp_msg: &ResponseMessage

, - version: Version, + _version: Version, ) -> Result<(), FlvSocketError> where ResponseMessage

: FlvEncoder + Default + Debug, { - let bytes = resp_msg.as_kafka_frame_bytes(version)?; - trace!("sending response {:#?}, bytes: {}", &resp_msg, bytes.len()); - (&mut self.inner).send(bytes).await?; + trace!("sending response {:#?}", &resp_msg); + (&mut self.inner).send(resp_msg).await?; Ok(()) } }