-
Notifications
You must be signed in to change notification settings - Fork 507
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs: example of aggregation sm with initial value (#3126)
Fixes #3121
- Loading branch information
Alexander Galibey
committed
Apr 10, 2023
1 parent
2476e13
commit 8fbd0a5
Showing
7 changed files
with
134 additions
and
2 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
[package] | ||
name = "sm-integer-sum-aggegrate" | ||
version = "0.0.0" | ||
authors = ["Fluvio Contributors <team@fluvio.io>"] | ||
edition = "2021" | ||
publish = false | ||
|
||
[lib] | ||
crate-type = ['cdylib'] | ||
|
||
[dependencies] | ||
fluvio-smartmodule = { workspace = true } | ||
once_cell = { version = "1.17" } | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
In order to build and use this SmartModule in your cluster, follow these steps: | ||
|
||
1. Install `smdk` tool: | ||
```bash | ||
fluvio install smdk | ||
``` | ||
2. Build: | ||
```bash | ||
smdk build | ||
``` | ||
|
||
3. Load built SmartModule into the cluster: | ||
```bash | ||
smdk load | ||
``` | ||
|
||
After that, you can consume from your topic and apply the aggregation as trasnformation: | ||
```bash | ||
fluvio consume test-aggr --transforms-file transforms.yaml | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
[package] | ||
name = "integer-sum-aggegrate" | ||
group = "infinyon" | ||
version = "0.1.0" | ||
apiVersion = "0.1.0" | ||
description = "" | ||
license = "Apache-2.0" | ||
visibility = "private" | ||
|
||
[[params]] | ||
name = "initial_value" | ||
description = "initial value for aggregation" | ||
optional = false | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
|
||
use fluvio_smartmodule::{ | ||
dataplane::smartmodule::SmartModuleExtraParams, smartmodule, Record, RecordData, Result, | ||
}; | ||
|
||
use once_cell::sync::OnceCell; | ||
|
||
static INITIAL_VALUE: OnceCell<UseOnce<RecordData>> = OnceCell::new(); | ||
|
||
const PARAM_NAME: &str = "initial_value"; | ||
|
||
#[smartmodule(aggregate)] | ||
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> { | ||
let accumulator = if let Some(initial_value) = INITIAL_VALUE.get() { | ||
initial_value.get_or(&accumulator) | ||
} else { | ||
&accumulator | ||
}; | ||
// Parse the accumulator and current record as strings | ||
let accumulator_string = std::str::from_utf8(accumulator.as_ref())?; | ||
let current_string = std::str::from_utf8(current.value.as_ref())?; | ||
|
||
// Parse the strings into integers | ||
let accumulator_int = accumulator_string.trim().parse::<i32>().unwrap_or(0); | ||
let current_int = current_string.trim().parse::<i32>()?; | ||
|
||
// Take the sum of the two integers and return it as a string | ||
let sum = accumulator_int + current_int; | ||
Ok(sum.to_string().into()) | ||
} | ||
|
||
#[smartmodule(init)] | ||
fn init(params: SmartModuleExtraParams) -> Result<()> { | ||
if let Some(raw_spec) = params.get(PARAM_NAME) { | ||
INITIAL_VALUE | ||
.set(UseOnce::new(raw_spec.as_str().into())) | ||
.expect("initial value is already initialized"); | ||
}; | ||
Ok(()) | ||
} | ||
|
||
#[derive(Debug)] | ||
struct UseOnce<T> { | ||
value: T, | ||
used: AtomicBool, | ||
} | ||
|
||
impl<T> UseOnce<T> { | ||
fn new(value: T) -> Self { | ||
Self { | ||
value, | ||
used: AtomicBool::new(false), | ||
} | ||
} | ||
|
||
fn get_or<'a: 'b, 'b>(&'a self, default: &'b T) -> &'b T { | ||
if self.used.load(Ordering::SeqCst) { | ||
default | ||
} else { | ||
self.used.store(true, Ordering::SeqCst); | ||
&self.value | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
transforms: | ||
- uses: infinyon/integer-sum-aggegrate@0.1.0 | ||
with: | ||
initial_value: "10" |