Skip to content

Commit

Permalink
Use version from the client in SmartEngine to encode/decode input/out…
Browse files Browse the repository at this point in the history
…put (#1924)

Closes #1894

Backward compatibility with old wasm modules is keep due to we now check for both 
```rust
 TypedFunc<(i32, i32), i32>;
 TypedFunc<(i32, i32, u32), i32>;
```
  • Loading branch information
morenol committed Nov 20, 2021
1 parent eec706f commit cd7a0cf
Show file tree
Hide file tree
Showing 21 changed files with 359 additions and 151 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Make Fluvio cluster working on Apple Silicon ([#1896](https://github.com/infinyon/fluvio/pull/1896))
* Rename `fluvio table` to `fluvio tableformat` ([#1918](https://github.com/infinyon/fluvio/pull/1918))
* Restrict max version in fluvio client ([#1930](https://github.com/infinyon/fluvio/issues/1930))
* Use version from the client in SmartEngine to encode/decode input/output ([#1924](https://github.com/infinyon/fluvio/pull/1924))

## Platform Version 0.9.12 - 2021-10-27
* Add examples for ArrayMap. ([#1804](https://github.com/infinyon/fluvio/issues/1804))
Expand Down
10 changes: 7 additions & 3 deletions crates/fluvio-dataplane-protocol/src/smartmodule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ mod encoding {
pub base_offset: Offset,
/// The records for the SmartModule to process
pub record_data: Vec<u8>,
pub params: SmartModuleExtraParams,
#[fluvio(min_version = 16)]
pub join_record: Vec<u8>,
pub params: SmartModuleExtraParams,
}
impl std::convert::TryFrom<Vec<Record>> for SmartModuleInput {
type Error = std::io::Error;
Expand Down Expand Up @@ -85,7 +85,7 @@ mod encoding {
pub struct SmartModuleAggregateOutput {
/// The base output required by all SmartModules
pub base: SmartModuleOutput,

#[fluvio(min_version = 16)]
pub accumulator: Vec<u8>,
}

Expand Down Expand Up @@ -191,10 +191,14 @@ mod encoding {
pub enum SmartModuleKind {
Filter,
Map,
#[fluvio(min_version = 15)]
ArrayMap,
#[fluvio(min_version = 13)]
Aggregate,
#[fluvio(min_version = 16)]
FilterMap,
#[fluvio(min_version = 16)]
Join,
Aggregate,
}

impl Default for SmartModuleKind {
Expand Down
45 changes: 35 additions & 10 deletions crates/fluvio-smartengine/src/smartmodule/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,61 @@
use std::convert::TryFrom;

use fluvio_spu_schema::server::stream_fetch::AGGREGATOR_API;
use tracing::{debug, instrument};
use anyhow::Result;
use wasmtime::TypedFunc;
use wasmtime::{AsContextMut, Trap, TypedFunc};

use crate::smartmodule::{SmartEngine, SmartModuleWithEngine, SmartModuleContext, SmartModuleInstance};
use crate::{
WasmSlice,
smartmodule::{SmartEngine, SmartModuleWithEngine, SmartModuleContext, SmartModuleInstance},
};
use dataplane::smartmodule::{
SmartModuleAggregateInput, SmartModuleInput, SmartModuleOutput, SmartModuleInternalError,
SmartModuleExtraParams, SmartModuleAggregateOutput,
};

const AGGREGATE_FN_NAME: &str = "aggregate";
type AggregateFn = TypedFunc<(i32, i32), i32>;
type OldAggregateFn = TypedFunc<(i32, i32), i32>;
type AggregateFn = TypedFunc<(i32, i32, u32), i32>;

pub struct SmartModuleAggregate {
base: SmartModuleContext,
aggregate_fn: AggregateFn,
aggregate_fn: AggregateFnKind,
accumulator: Vec<u8>,
}
pub enum AggregateFnKind {
Old(OldAggregateFn),
New(AggregateFn),
}

impl AggregateFnKind {
fn call(&self, store: impl AsContextMut, slice: WasmSlice) -> Result<i32, Trap> {
match self {
Self::Old(aggregate_fn) => aggregate_fn.call(store, (slice.0, slice.1)),
Self::New(aggregate_fn) => aggregate_fn.call(store, slice),
}
}
}

impl SmartModuleAggregate {
pub fn new(
engine: &SmartEngine,
module: &SmartModuleWithEngine,
params: SmartModuleExtraParams,
accumulator: Vec<u8>,
version: i16,
) -> Result<Self> {
let mut base = SmartModuleContext::new(engine, module, params)?;
let aggregate_fn: AggregateFn = base
let mut base = SmartModuleContext::new(engine, module, params, version)?;
let aggregate_fn: AggregateFnKind = if let Ok(agg_fn) = base
.instance
.get_typed_func(&mut base.store, AGGREGATE_FN_NAME)?;
.get_typed_func(&mut base.store, AGGREGATE_FN_NAME)
{
AggregateFnKind::New(agg_fn)
} else {
let agg_fn: OldAggregateFn = base
.instance
.get_typed_func(&mut base.store, AGGREGATE_FN_NAME)?;
AggregateFnKind::Old(agg_fn)
};

Ok(Self {
base,
Expand All @@ -48,7 +73,7 @@ impl SmartModuleInstance for SmartModuleAggregate {
base,
accumulator: self.accumulator.clone(),
};
let slice = self.base.write_input(&input, AGGREGATOR_API)?;
let slice = self.base.write_input(&input)?;
let aggregate_output = self.aggregate_fn.call(&mut self.base.store, slice)?;

debug!(aggregate_output);
Expand All @@ -58,7 +83,7 @@ impl SmartModuleInstance for SmartModuleAggregate {
return Err(internal_error.into());
}

let output: SmartModuleAggregateOutput = self.base.read_output(AGGREGATOR_API)?;
let output: SmartModuleAggregateOutput = self.base.read_output()?;
self.accumulator = output.accumulator;
Ok(output.base)
}
Expand Down
45 changes: 35 additions & 10 deletions crates/fluvio-smartengine/src/smartmodule/array_map.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,56 @@
use std::convert::TryFrom;
use anyhow::Result;
use fluvio_spu_schema::server::stream_fetch::ARRAY_MAP_WASM_API;
use wasmtime::TypedFunc;
use wasmtime::{AsContextMut, Trap, TypedFunc};

use dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleInternalError, SmartModuleExtraParams,
};
use crate::smartmodule::{SmartEngine, SmartModuleWithEngine, SmartModuleContext, SmartModuleInstance};
use crate::{
WasmSlice,
smartmodule::{SmartEngine, SmartModuleWithEngine, SmartModuleContext, SmartModuleInstance},
};

const ARRAY_MAP_FN_NAME: &str = "array_map";
type ArrayMapFn = TypedFunc<(i32, i32), i32>;
type OldArrayMapFn = TypedFunc<(i32, i32), i32>;
type ArrayMapFn = TypedFunc<(i32, i32, u32), i32>;

pub struct SmartModuleArrayMap {
base: SmartModuleContext,
array_map_fn: ArrayMapFn,
array_map_fn: ArrayMapFnKind,
}

enum ArrayMapFnKind {
Old(OldArrayMapFn),
New(ArrayMapFn),
}
impl ArrayMapFnKind {
fn call(&self, store: impl AsContextMut, slice: WasmSlice) -> Result<i32, Trap> {
match self {
Self::Old(array_map_fn) => array_map_fn.call(store, (slice.0, slice.1)),
Self::New(array_map_fn) => array_map_fn.call(store, slice),
}
}
}

impl SmartModuleArrayMap {
pub fn new(
engine: &SmartEngine,
module: &SmartModuleWithEngine,
params: SmartModuleExtraParams,
version: i16,
) -> Result<Self> {
let mut base = SmartModuleContext::new(engine, module, params)?;
let map_fn: ArrayMapFn = base
let mut base = SmartModuleContext::new(engine, module, params, version)?;
let map_fn = if let Ok(array_map_fn) = base
.instance
.get_typed_func(&mut base.store, ARRAY_MAP_FN_NAME)?;
.get_typed_func(&mut base.store, ARRAY_MAP_FN_NAME)
{
ArrayMapFnKind::New(array_map_fn)
} else {
let array_map_fn = base
.instance
.get_typed_func(&mut base.store, ARRAY_MAP_FN_NAME)?;
ArrayMapFnKind::Old(array_map_fn)
};

Ok(Self {
base,
Expand All @@ -36,7 +61,7 @@ impl SmartModuleArrayMap {

impl SmartModuleInstance for SmartModuleArrayMap {
fn process(&mut self, input: SmartModuleInput) -> Result<SmartModuleOutput> {
let slice = self.base.write_input(&input, ARRAY_MAP_WASM_API)?;
let slice = self.base.write_input(&input)?;
let map_output = self.array_map_fn.call(&mut self.base.store, slice)?;

if map_output < 0 {
Expand All @@ -45,7 +70,7 @@ impl SmartModuleInstance for SmartModuleArrayMap {
return Err(internal_error.into());
}

let output: SmartModuleOutput = self.base.read_output(ARRAY_MAP_WASM_API)?;
let output: SmartModuleOutput = self.base.read_output()?;
Ok(output)
}

Expand Down
46 changes: 36 additions & 10 deletions crates/fluvio-smartengine/src/smartmodule/filter.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,65 @@
use std::convert::TryFrom;
use anyhow::Result;
use fluvio_spu_schema::server::stream_fetch::WASM_MODULE_V2_API;
use wasmtime::TypedFunc;
use wasmtime::{AsContextMut, Trap, TypedFunc};

use dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleInternalError, SmartModuleExtraParams,
};
use crate::smartmodule::{SmartModuleWithEngine, SmartEngine, SmartModuleContext, SmartModuleInstance};
use crate::{
WasmSlice,
smartmodule::{SmartModuleWithEngine, SmartEngine, SmartModuleContext, SmartModuleInstance},
};

const FILTER_FN_NAME: &str = "filter";
type FilterFn = TypedFunc<(i32, i32), i32>;
type OldFilterFn = TypedFunc<(i32, i32), i32>;
type FilterFn = TypedFunc<(i32, i32, u32), i32>;

pub struct SmartModuleFilter {
base: SmartModuleContext,
filter_fn: FilterFn,
filter_fn: FilterFnKind,
}

enum FilterFnKind {
Old(OldFilterFn),
New(FilterFn),
}

impl FilterFnKind {
fn call(&self, store: impl AsContextMut, slice: WasmSlice) -> Result<i32, Trap> {
match self {
Self::Old(filter_fn) => filter_fn.call(store, (slice.0, slice.1)),
Self::New(filter_fn) => filter_fn.call(store, slice),
}
}
}

impl SmartModuleFilter {
pub fn new(
engine: &SmartEngine,
module: &SmartModuleWithEngine,
params: SmartModuleExtraParams,
version: i16,
) -> Result<Self> {
let mut base = SmartModuleContext::new(engine, module, params)?;
let filter_fn: FilterFn = base
let mut base = SmartModuleContext::new(engine, module, params, version)?;
let filter_fn = if let Ok(filt_fn) = base
.instance
.get_typed_func(&mut base.store, FILTER_FN_NAME)?;
.get_typed_func(&mut base.store, FILTER_FN_NAME)
{
FilterFnKind::New(filt_fn)
} else {
let filt_fn: OldFilterFn = base
.instance
.get_typed_func(&mut base.store, FILTER_FN_NAME)?;
FilterFnKind::Old(filt_fn)
};

Ok(Self { base, filter_fn })
}
}

impl SmartModuleInstance for SmartModuleFilter {
fn process(&mut self, input: SmartModuleInput) -> Result<SmartModuleOutput> {
let slice = self.base.write_input(&input, WASM_MODULE_V2_API)?;
let slice = self.base.write_input(&input)?;
let filter_output = self.filter_fn.call(&mut self.base.store, slice)?;

if filter_output < 0 {
Expand All @@ -42,7 +68,7 @@ impl SmartModuleInstance for SmartModuleFilter {
return Err(internal_error.into());
}

let output: SmartModuleOutput = self.base.read_output(WASM_MODULE_V2_API)?;
let output: SmartModuleOutput = self.base.read_output()?;
Ok(output)
}

Expand Down
50 changes: 38 additions & 12 deletions crates/fluvio-smartengine/src/smartmodule/filter_map.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,58 @@
use std::convert::TryFrom;
use anyhow::Result;
use fluvio_spu_schema::server::stream_fetch::ARRAY_MAP_WASM_API;
use wasmtime::TypedFunc;
use wasmtime::{AsContextMut, Trap, TypedFunc};

use dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput, SmartModuleInternalError};
use crate::smartmodule::{
SmartEngine, SmartModuleWithEngine, SmartModuleContext, SmartModuleInstance,
SmartModuleExtraParams,
use crate::{
WasmSlice,
smartmodule::{
SmartEngine, SmartModuleWithEngine, SmartModuleContext, SmartModuleInstance,
SmartModuleExtraParams,
},
};

const FILTER_MAP_FN_NAME: &str = "filter_map";
type FilterMapFn = TypedFunc<(i32, i32), i32>;
type OldFilterMapFn = TypedFunc<(i32, i32), i32>;
type FilterMapFn = TypedFunc<(i32, i32, u32), i32>;

pub struct SmartModuleFilterMap {
base: SmartModuleContext,
filter_map_fn: FilterMapFn,
filter_map_fn: FilterMapFnKind,
}

enum FilterMapFnKind {
Old(OldFilterMapFn),
New(FilterMapFn),
}

impl FilterMapFnKind {
fn call(&self, store: impl AsContextMut, slice: WasmSlice) -> Result<i32, Trap> {
match self {
Self::Old(filter_fn) => filter_fn.call(store, (slice.0, slice.1)),
Self::New(filter_fn) => filter_fn.call(store, slice),
}
}
}

impl SmartModuleFilterMap {
pub fn new(
engine: &SmartEngine,
module: &SmartModuleWithEngine,
params: SmartModuleExtraParams,
version: i16,
) -> Result<Self> {
let mut base = SmartModuleContext::new(engine, module, params)?;
let filter_map_fn: FilterMapFn = base
let mut base = SmartModuleContext::new(engine, module, params, version)?;
let filter_map_fn = if let Ok(fmap_fn) = base
.instance
.get_typed_func(&mut base.store, FILTER_MAP_FN_NAME)?;
.get_typed_func(&mut base.store, FILTER_MAP_FN_NAME)
{
FilterMapFnKind::New(fmap_fn)
} else {
let fmap_fn: OldFilterMapFn = base
.instance
.get_typed_func(&mut base.store, FILTER_MAP_FN_NAME)?;
FilterMapFnKind::Old(fmap_fn)
};

Ok(Self {
base,
Expand All @@ -37,7 +63,7 @@ impl SmartModuleFilterMap {

impl SmartModuleInstance for SmartModuleFilterMap {
fn process(&mut self, input: SmartModuleInput) -> Result<SmartModuleOutput> {
let slice = self.base.write_input(&input, ARRAY_MAP_WASM_API)?;
let slice = self.base.write_input(&input)?;
let map_output = self.filter_map_fn.call(&mut self.base.store, slice)?;

if map_output < 0 {
Expand All @@ -46,7 +72,7 @@ impl SmartModuleInstance for SmartModuleFilterMap {
return Err(internal_error.into());
}

let output: SmartModuleOutput = self.base.read_output(ARRAY_MAP_WASM_API)?;
let output: SmartModuleOutput = self.base.read_output()?;
Ok(output)
}

Expand Down
Loading

0 comments on commit cd7a0cf

Please sign in to comment.