Skip to content

Commit

Permalink
Use encode_varint for vector size
Browse files Browse the repository at this point in the history
  • Loading branch information
simlay committed Feb 8, 2021
1 parent 7be96e9 commit c2ede31
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 86 deletions.
2 changes: 1 addition & 1 deletion src/dataplane-protocol/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ mod test {

let decoded_record = batch.records.get(0).unwrap();
println!("record crc: {}", batch.header.crc);
assert_eq!(batch.header.crc, 2908671645);
assert_eq!(batch.header.crc, 843514105);
let b = decoded_record.value.as_ref();
assert_eq!(b, b"test");

Expand Down
137 changes: 61 additions & 76 deletions src/dataplane-protocol/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use std::io::ErrorKind;
use std::sync::Arc;

use content_inspector::{inspect, ContentType};
use log::{trace, warn};
use log::trace;
use once_cell::sync::Lazy;

use crate::core::bytes::Buf;
use crate::core::bytes::BufExt;
use crate::core::bytes::BufMut;

use crate::core::Decoder;
Expand Down Expand Up @@ -138,14 +137,22 @@ impl<'a> From<&'a [u8]> for DefaultAsyncBuffer {

impl Encoder for DefaultAsyncBuffer {
fn write_size(&self, version: Version) -> usize {
self.0.write_size(version)
let len = self.0.len() as i64;
self.0.iter().fold(len.var_write_size(), |sum, val| {
sum + val.write_size(version)
})
}

fn encode<T>(&self, src: &mut T, version: Version) -> Result<(), Error>
fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
where
T: BufMut,
{
self.0.encode(src, version)
let len: i64 = self.0.len() as i64;
len.encode_varint(dest)?;
for v in self.0.iter() {
v.encode(dest, version)?;
}
Ok(())
}
}

Expand All @@ -156,7 +163,14 @@ impl Decoder for DefaultAsyncBuffer {
{
trace!("decoding default asyncbuffer");
if let Some(ref mut buffer) = Arc::get_mut(&mut self.0) {
buffer.decode(src, version)
let mut len: i64 = 0;
len.decode_varint(src)?;
for _ in 0..len {
let mut value = <u8>::default();
value.decode(src, version)?;
buffer.push(value);
}
Ok(())
} else {
Err(Error::new(
ErrorKind::Other,
Expand Down Expand Up @@ -189,46 +203,13 @@ impl Decoder for RecordSet {
where
T: Buf,
{
trace!("raw buffer len: {}", src.remaining());
let mut len: i32 = 0;
len.decode(src, version)?;
trace!("Record sets decoded content len: {}", len);

if src.remaining() < len as usize {
return Err(Error::new(
ErrorKind::UnexpectedEof,
format!(
"expected message len: {} but founded {}",
len,
src.remaining()
),
));
}

let mut buf = src.take(len as usize);
let mut len: i64 = 0;
len.decode_varint(src)?;

let mut count = 0;
while buf.remaining() > 0 {
trace!(
"decoding batches: {}, remaining bytes: {}",
count,
buf.remaining()
);
for _ in 0..len {
let mut batch = DefaultBatch::default();
match batch.decode(&mut buf, version) {
Ok(_) => self.batches.push(batch),
Err(err) => match err.kind() {
ErrorKind::UnexpectedEof => {
warn!("not enough bytes for batch: {}", buf.remaining());
return Ok(());
}
_ => {
warn!("problem decoding batch: {}", err);
return Ok(());
}
},
}
count += 1;
batch.decode(src, version)?;
self.batches.push(batch)
}

Ok(())
Expand All @@ -237,9 +218,10 @@ impl Decoder for RecordSet {

impl Encoder for RecordSet {
fn write_size(&self, version: Version) -> usize {
self.batches
.iter()
.fold(4, |sum, val| sum + val.write_size(version))
let len = self.batches.len() as i64;
self.batches.iter().fold(len.var_write_size(), |sum, val| {
sum + val.write_size(version)
})
}

fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
Expand All @@ -248,18 +230,15 @@ impl Encoder for RecordSet {
{
trace!("Record set encoding");

let mut out: Vec<u8> = Vec::new();
let len: i64 = self.batches.len() as i64;
len.encode_varint(dest)?;

for batch in &self.batches {
trace!("encoding batch..");
batch.encode(&mut out, version)?;
batch.encode(dest, version)?;
}

let length: i32 = out.len() as i32;
trace!("Record Set encode len: {}", length);
length.encode(dest, version)?;

dest.put_slice(&out);
//dest.put_slice(&out);
Ok(())
}
}
Expand Down Expand Up @@ -428,16 +407,15 @@ mod test {
#[test]
fn test_decode_encode_record() -> Result<(), IoError> {
/* Below is how you generate the vec<u8> for the `data` varible below.
*
let mut record = DefaultRecord::from(String::from("dog"));
record.preamble.set_offset_delta(1);
let mut out = vec![];
record.encode(&mut out, 0)?;
println!("ENCODED: {:#x?}", out);
*/

let data = [
0x1e, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x64, 0x6f, 0x67, 0x0,
];
let data = [0x12, 0x0, 0x0, 0x2, 0x0, 0x6, 0x64, 0x6f, 0x67, 0x0];

let record = DefaultRecord::decode_from(&mut Cursor::new(&data), 0)?;
assert_eq!(record.as_bytes(0)?.len(), data.len());
Expand All @@ -453,9 +431,28 @@ mod test {
Ok(())
}

/// test decoding of records when one of the batch was truncated
#[test]
fn test_decode_batch_truncation() {
fn test_encode_and_decode_record() -> Result<(), IoError> {
let mut input_record = DefaultRecord::from(String::from("dog"));
input_record.preamble.set_offset_delta(1);
let mut encoded: Vec<u8> = vec![];
input_record.encode(&mut encoded, 0)?;

let decoded_record = DefaultRecord::decode_from(&mut Cursor::new(&encoded), 0);
assert!(decoded_record.is_ok());
let decoded_record = decoded_record.unwrap();

assert_eq!(
input_record.get_value().as_ref(),
decoded_record.get_value().as_ref()
);

Ok(())
}

/// test encoding and decoding of recordsets
#[test]
fn test_encode_and_decode_batch() {
use super::RecordSet;
use crate::batch::DefaultBatch;
use crate::record::DefaultRecord;
Expand All @@ -467,30 +464,18 @@ mod test {
batch
}

// add 3 batches
let batches = RecordSet::default()
.add(create_batch())
.add(create_batch())
.add(create_batch());

const TRUNCATED: usize = 10;

let mut bytes = batches.as_bytes(0).expect("bytes");

let original_len = bytes.len();
let _ = bytes.split_off(original_len - TRUNCATED); // truncate record sets
let body = bytes.split_off(4); // split off body so we can manipulate len

let new_len = (original_len - TRUNCATED - 4) as i32;
let mut out = vec![];
new_len.encode(&mut out, 0).expect("encoding");
out.extend_from_slice(&body);

assert_eq!(out.len(), original_len - TRUNCATED);
assert_eq!(batches.batches.len(), 3);
let bytes = batches.as_bytes(0).expect("bytes");
println!("bytes: {:#x?}", bytes);

println!("decoding...");
let decoded_batches = RecordSet::decode_from(&mut Cursor::new(out), 0).expect("decoding");
assert_eq!(decoded_batches.batches.len(), 2);
let decoded_batches = RecordSet::decode_from(&mut Cursor::new(bytes), 0).expect("decoding");
assert_eq!(decoded_batches.batches.len(), 3);
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/storage/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ mod tests {
(active_segment.find_offset_position(20).await?).expect("offset exists");
assert_eq!(offset_position.get_base_offset(), 20);
assert_eq!(offset_position.get_pos(), 0); //
assert_eq!(offset_position.len(), 64);
assert_eq!(offset_position.len(), 58);
assert!((active_segment.find_offset_position(30).await?).is_none());
Ok(())
}
Expand Down Expand Up @@ -492,7 +492,7 @@ mod tests {
(active_segment.find_offset_position(20).await?).expect("offset exists");
assert_eq!(offset_position.get_base_offset(), 20);
assert_eq!(offset_position.get_pos(), 0); //
assert_eq!(offset_position.len(), 109);
assert_eq!(offset_position.len(), 85);
assert!((active_segment.find_offset_position(30).await?).is_none());

Ok(())
Expand All @@ -516,10 +516,10 @@ mod tests {

assert_eq!(seg_sink.get_end_offset(), 46);

assert_eq!(seg_sink.get_log_pos(), 273); // each takes 91 bytes
assert_eq!(seg_sink.get_log_pos(), 237); // each takes 79 bytes

let index = seg_sink.get_index();
assert_eq!(index[0].to_be(), (2, 91));
assert_eq!(index[0].to_be(), (2, 79));

let bytes = read_bytes_from_file(&test_dir.join(TEST2_FILE_NAME))?;
debug!("read {} bytes", bytes.len());
Expand All @@ -536,16 +536,16 @@ mod tests {
let offset_pos1 = seg_sink.find_offset_position(40).await?.expect("pos");
assert_eq!(offset_pos1.get_base_offset(), 40);
assert_eq!(offset_pos1.get_pos(), 0);
assert_eq!(offset_pos1.len(), 79);
assert_eq!(offset_pos1.len(), 67);
let offset_pos2 = seg_sink.find_offset_position(42).await?.expect("pos");
assert_eq!(offset_pos2.get_base_offset(), 42);
assert_eq!(offset_pos2.get_pos(), 91);
assert_eq!(offset_pos2.len(), 79);
assert_eq!(offset_pos2.get_pos(), 79);
assert_eq!(offset_pos2.len(), 67);

let offset_pos3 = seg_sink.find_offset_position(44).await?.expect("pos");
assert_eq!(offset_pos3.get_base_offset(), 44);
assert_eq!(offset_pos3.get_pos(), 182);
assert_eq!(offset_pos3.len(), 79);
assert_eq!(offset_pos3.get_pos(), 158);
assert_eq!(offset_pos3.len(), 67);

// test whether you can send batch with non zero base offset
let mut next_batch = create_batch();
Expand Down

0 comments on commit c2ede31

Please sign in to comment.