Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat: Add Partitioner trait with SiphashRoundRobinPartitioner impl #965

Closed
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions examples/00-produce/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ async fn main() {
async fn produce() -> Result<(), fluvio::FluvioError> {
let producer = fluvio::producer("simple").await?;

let record = "Hello, Fluvio!";
producer.send_record(&record, 0).await?;
println!("{}", &record);
let value = "Hello, Fluvio!";
producer.send_keyless(value).await?;
nicholastmosher marked this conversation as resolved.
Show resolved Hide resolved
println!("{}", value);

Ok(())
}
1 change: 1 addition & 0 deletions examples/01-produce-key-value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ publish = false
[dependencies]
fluvio = { path = "../../src/client" }
async-std = { version = "1.8.0", default-features = false, features = ["attributes"] }
tracing-subscriber = "0.2.17"
5 changes: 5 additions & 0 deletions examples/01-produce-key-value/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

#[async_std::main]
async fn main() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

if let Err(e) = produce_key_value().await {
println!("Produce error: {:?}", e);
}
Expand All @@ -34,6 +38,7 @@ async fn produce_key_value() -> Result<(), fluvio::FluvioError> {
let key = "Hello";
let value = "Fluvio";

println!("About to send");
producer.send(key, value).await?;
println!("[{}] {}", key, value);
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions examples/03-echo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ async fn main() {
async fn produce() -> Result<(), FluvioError> {
let producer = fluvio::producer(TOPIC).await?;

for i in 0..10 {
for i in 0..10u32 {
println!("Sending record {}", i);
producer
.send(format!("Key {}", i), format!("Value {}", i))
.await?;
}
producer.send_record("Done!", 0).await?;
producer.send_keyless("Done!").await?;
nicholastmosher marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/cli/src/consumer/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ProduceOpt {
if self.kv_mode() {
self.produce_key_value(producer, string).await?;
} else {
producer.send_record(string, 0).await?;
producer.send_keyless(string).await?;
nicholastmosher marked this conversation as resolved.
Show resolved Hide resolved
if self.verbose {
println!("[null] {}", string);
}
Expand Down
4 changes: 3 additions & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ thiserror = "1.0.20"
once_cell = "1.5.2"
semver = "0.11.0"
pin-project-lite = "0.2"
siphasher = "0.3.5"

# Fluvio dependencies
fluvio-future = { version = "0.2.0", features = ["task", "native2_tls"] }
Expand All @@ -47,4 +48,5 @@ fluvio-protocol = { path = "../protocol", version = "0.4.0" }
dataplane = { version = "0.4.1", path = "../dataplane-protocol", package = "fluvio-dataplane-protocol" }

[dev-dependencies]
async-std = { version = "1.6.4", default-features = false}
async-std = { version = "1.6.4", default-features = false }
fluvio-future = { version = "0.2.0", features = ["fixture"] }
Loading