Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple batch produce #896

Merged
merged 8 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# Release Notes
## Unreleased

## Platform Version 0.7.3 - 2020-04-02
* Added batching for producing records with `send_all` API

## Platform Version 0.7.2 - 2020-03-23
* `fluvio update` updates plugins as well as CLI ([#865](https://github.com/infinyon/fluvio/issues/865)).
* SPU controller uses SVC ingress annotation ([#888](https://github.com/infinyon/fluvio/pull/888)).

## Platform Version 0.7.1 - 2020-03-15
* Client Key/Value support for producers and consumers
([#828](https://github.com/infinyon/fluvio/pull/828)).
Expand Down
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"examples/00-produce",
"examples/01-produce-key-value",
"examples/01-produce-batch",
"examples/02-consume",
"examples/03-echo",
"src/auth",
Expand Down
12 changes: 12 additions & 0 deletions examples/01-produce-batch/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "produce-batch"
version = "0.1.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
edition = "2018"
publish = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fluvio = { path = "../../src/client" }
async-std = { version = "1.8.0", default-features = false, features = ["attributes"] }
40 changes: 40 additions & 0 deletions examples/01-produce-batch/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! A minimal example showing how to produce batches of records with Fluvio
//!
//! Before running this example, make sure you have created a topic
//! named `batch` with the following command:
//!
//! ```text
//! $ fluvio topic create batch
//! ```
//!
//! Run this example using the following:
//!
//! ```text
//! $ cargo run --bin produce-batch
//! ```
//!
//! After running this example, you can see the messages that have
//! been sent to the topic using the following command:
//!
//! ```text
//! $ fluvio consume batch -B -d
//! [Hello] Fluvio
//! ```

#[async_std::main]
async fn main() {
if let Err(e) = produce_batch().await {
println!("Produce error: {:?}", e);
}
}

async fn produce_batch() -> Result<(), fluvio::FluvioError> {
let producer = fluvio::producer("batch").await?;

let batch: Vec<_> = (0..10)
.map(|i| (Some(i.to_string()), format!("This is record {}", i)))
.collect();

producer.send_all(batch).await?;
Ok(())
}
2 changes: 1 addition & 1 deletion examples/01-produce-key-value/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn produce_key_value() -> Result<(), fluvio::FluvioError> {
let key = "Hello";
let value = "Fluvio";

producer.send(&key, &value).await?;
producer.send(key, value).await?;
println!("[{}] {}", key, value);
Ok(())
}
2 changes: 1 addition & 1 deletion src/cli/src/consumer/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl ProduceOpt {

let key = pieces[0];
let value: String = (&pieces[1..]).join(&*separator);
producer.send(key, &value).await?;
producer.send(key, &*value).await?;
if self.verbose {
println!("[{}] {}", key, value);
}
Expand Down
1 change: 1 addition & 0 deletions src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ admin = ["fluvio-sc-schema/use_serde"]
tracing = "0.1.19"
tracing-futures = "0.2.4"
futures-util = "0.3.6"
bytes = "1.0.1"
dirs = "1.0.2"
toml = "0.5.5"
async-rwlock = "1.1.0"
Expand Down
72 changes: 42 additions & 30 deletions src/client/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use dataplane::ReplicaKey;
use crate::FluvioError;
use crate::spu::SpuPool;
use crate::client::SerialFrame;
use bytes::Bytes;

/// An interface for producing events to a particular topic
///
Expand Down Expand Up @@ -44,24 +45,39 @@ impl TopicProducer {
)]
pub async fn send<K, V>(&self, key: K, value: V) -> Result<(), FluvioError>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
K: Into<Vec<u8>>,
V: Into<Vec<u8>>,
{
let key = key.as_ref();
let value = value.as_ref();
debug!(
key_size = key.len(),
value_size = value.len(),
"sending records:"
);
self.send_all(Some((Some(key), value))).await?;
Ok(())
}

#[instrument(
skip(self, records),
fields(topic = %self.topic),
)]
pub async fn send_all<K, V, I>(&self, records: I) -> Result<(), FluvioError>
nicholastmosher marked this conversation as resolved.
Show resolved Hide resolved
where
K: Into<Vec<u8>>,
V: Into<Vec<u8>>,
I: IntoIterator<Item = (Option<K>, V)>,
{
let replica = ReplicaKey::new(&self.topic, 0);
let spu_client = self.pool.create_serial_socket(&replica).await?;
debug!(
addr = spu_client.config().addr(),
"Connected to replica leader:"
);
send_record_raw(spu_client, &replica, Some(key), value).await
let records: Vec<_> = records
.into_iter()
.map(|(key, value): (Option<K>, V)| {
let key = key.map(|k| Bytes::from(k.into()));
let value = Bytes::from(value.into());
(key, value)
})
.collect();
send_records_raw(spu_client, &replica, records).await?;
Ok(())
}

nicholastmosher marked this conversation as resolved.
Show resolved Hide resolved
/// Sends an event to a specific partition within this producer's topic
Expand Down Expand Up @@ -93,16 +109,17 @@ impl TopicProducer {

debug!("connect to replica leader at: {}", spu_client);

send_record_raw(spu_client, &replica, None, record).await
let records = vec![(None, Bytes::from(Vec::from(buffer.as_ref())))];
send_records_raw(spu_client, &replica, records).await?;
Ok(())
}
}

/// Sends record to a target server (Kf, SPU, or SC)
async fn send_record_raw<F: SerialFrame>(
async fn send_records_raw<F: SerialFrame>(
mut leader: F,
replica: &ReplicaKey,
key: Option<&[u8]>,
value: &[u8],
records: Vec<(Option<Bytes>, Bytes)>,
) -> Result<(), FluvioError> {
use dataplane::produce::DefaultProduceRequest;
use dataplane::produce::DefaultPartitionRequest;
Expand All @@ -116,23 +133,18 @@ async fn send_record_raw<F: SerialFrame>(
let mut topic_request = DefaultTopicRequest::default();
let mut partition_request = DefaultPartitionRequest::default();

debug!(
"send record {} bytes to: replica: {}, {}",
value.len(),
replica,
leader
);

let mut record_msg = DefaultRecord {
value: DefaultAsyncBuffer::new(value.to_owned()),
..Default::default()
};
if let Some(key) = key {
record_msg.key = Some(key.into());
}
let mut batch = DefaultBatch::default();
batch.add_record(record_msg);
debug!("Putting together batch with {} records", records.len());

let records = records
.into_iter()
.map(|(key, value)| {
let key = key.map(DefaultAsyncBuffer::new);
let value = DefaultAsyncBuffer::new(value);
DefaultRecord::from((key, value))
})
.collect();

let batch = DefaultBatch::new(records);
partition_request.partition_index = replica.partition;
partition_request.records.batches.push(batch);
topic_request.name = replica.topic.to_owned();
Expand Down
59 changes: 58 additions & 1 deletion src/dataplane-protocol/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,26 @@ where
}
}

impl Batch<DefaultBatchRecords> {
impl DefaultBatch {
/// Create a new batch from a Vec of DefaultRecords
pub fn new(records: Vec<DefaultRecord>) -> Self {
let mut batch = DefaultBatch::default();

let records: Vec<_> = records
.into_iter()
.enumerate()
.map(|(i, mut record)| {
record.preamble.set_offset_delta(i as Offset);
record
})
.collect();

batch.records = records;
let len = batch.records.len() as i32;
batch.header.last_offset_delta = if len > 0 { len - 1 } else { len };
batch
}

/// add new record, this will update the offset to correct
pub fn add_record(&mut self, mut record: DefaultRecord) {
let last_offset_delta = if self.records.is_empty() {
Expand Down Expand Up @@ -294,10 +313,16 @@ mod test {
#[test]
fn test_records_offset() {
let mut batch = DefaultBatch::default();
assert_eq!(batch.get_last_offset_delta(), 0);

batch.add_record(DefaultRecord::default());
assert_eq!(batch.get_last_offset_delta(), 0);

batch.add_record(DefaultRecord::default());
assert_eq!(batch.get_last_offset_delta(), 1);

batch.add_record(DefaultRecord::default());
assert_eq!(batch.get_last_offset_delta(), 2);

assert_eq!(
batch
Expand Down Expand Up @@ -325,4 +350,36 @@ mod test {
);
assert_eq!(batch.get_last_offset_delta(), 2);
}

#[test]
fn test_batch_records_offset() {
let mut comparison = DefaultBatch::default();
comparison.add_record(DefaultRecord::default());
comparison.add_record(DefaultRecord::default());
comparison.add_record(DefaultRecord::default());

let batch_created = DefaultBatch::new(vec![
DefaultRecord::default(),
DefaultRecord::default(),
DefaultRecord::default(),
]);

for i in 0..3 {
assert_eq!(
batch_created
.records
.get(i)
.expect("get record")
.get_offset_delta(),
comparison
.records
.get(i)
.expect("get record")
.get_offset_delta(),
"Creating a DefaultBatch from a Vec gave wrong delta",
)
}

assert_eq!(batch_created.get_last_offset_delta(), 2);
}
}
Loading