Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat(smartmodule): added smartmodule chain support for consumer #2759

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions crates/fluvio-cli/src/client/consume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,19 +301,19 @@ mod cmd {
};

let smart_module = if let Some(smart_module_name) = &self.smartmodule {
Some(create_smartmodule(
vec![create_smartmodule(
smart_module_name,
self.smart_module_ctx(),
initial_param,
))
)]
} else if let Some(path) = &self.smartmodule_path {
Some(create_smartmodule_from_path(
vec![create_smartmodule_from_path(
path,
self.smart_module_ctx(),
initial_param,
)?)
)?]
} else {
None
Vec::new()
};

builder.smartmodule(smart_module);
Expand Down
6 changes: 2 additions & 4 deletions crates/fluvio-protocol-derive/src/ast/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ impl NamedProp {

if let Some(max) = self.attrs.max_version {
quote! {
sehz marked this conversation as resolved.
Show resolved Hide resolved
#[allow(clippy::double_comparisons)]
if version >= #min && version <= #max {
if (#min..=#max).contains(&version) {
#field_stream
} else {
tracing::trace!("Field: <{}> is skipped because version: {} is outside min: {}, max: {}",stringify!(#field_name),version,#min,#max);
Expand Down Expand Up @@ -93,8 +92,7 @@ impl UnnamedProp {

if let Some(max) = self.attrs.max_version {
quote! {
#[allow(clippy::double_comparisons)]
if version >= #min && version <= #max {
if (#min..=#max).contains(&version) {
#field_stream
} else {
tracing::trace!("Field from tuple struct:is skipped because version: {} is outside min: {}, max: {}",version,#min,#max);
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-protocol/src/link/error_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ pub enum ErrorCode {
DerivedStreamInvalid(String),
#[error("can't do recursive derivedstream yet: {0}->{1}")]
DerivedStreamRecursion(String, String),
#[error("the derivedstream already exists")]
DerivedStreamAlreadyExists,

// Compression errors
#[fluvio(tag = 9000)]
Expand Down
128 changes: 30 additions & 98 deletions crates/fluvio-spu-schema/src/server/smartmodule.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(deprecated)]

use std::io::Read;
use std::{io, borrow::Cow};
use std::io;
use std::fmt::{Debug, self};

use flate2::{
Expand All @@ -14,8 +16,11 @@ use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
///
/// This includes the WASM content as well as the type of SmartModule being used.
/// It also carries any data that is required for specific types of SmartModules.
/// TODO: remove in 0.10
#[derive(Debug, Default, Clone, Encoder, Decoder)]
#[deprecated(
since = "0.10.0",
note = "will be removed in the next version. Use SmartModuleInvocation instead "
)]
pub struct LegacySmartModulePayload {
pub wasm: SmartModuleWasmCompressed,
pub kind: SmartModuleKind,
Expand Down Expand Up @@ -45,6 +50,17 @@ impl SmartModuleInvocationWasm {
pub fn adhoc_from_bytes(bytes: &[u8]) -> io::Result<Self> {
Ok(Self::AdHoc(zip(bytes)?))
}

/// consume and get the raw bytes of the WASM module
pub fn into_raw(self) -> io::Result<Vec<u8>> {
match self {
Self::AdHoc(gzipped) => Ok(unzip(gzipped.as_ref())?),
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"unable to represent as raw data",
)),
}
}
}

impl Default for SmartModuleInvocationWasm {
Expand Down Expand Up @@ -124,8 +140,11 @@ pub enum SmartModuleContextData {
/// In a fetch request, a WASM module may be given directly in the request
/// as raw bytes.
///
// TODO: remove in 0.10
#[derive(Clone, Encoder, Decoder)]
#[deprecated(
since = "0.10.0",
note = "will be removed in the next version. Use SmartModuleInvocationWasm instead"
)]
#[derive(Clone, Encoder, Decoder, Debug)]
pub enum SmartModuleWasmCompressed {
Raw(Vec<u8>),
/// compressed WASM module payload using Gzip
Expand All @@ -135,6 +154,12 @@ pub enum SmartModuleWasmCompressed {
// Url(String),
}

impl Default for SmartModuleWasmCompressed {
fn default() -> Self {
Self::Raw(Default::default())
}
}

fn zip(raw: &[u8]) -> io::Result<Vec<u8>> {
let mut encoder = GzEncoder::new(raw, Compression::default());
let mut buffer = Vec::with_capacity(raw.len());
Expand All @@ -149,61 +174,6 @@ fn unzip(compressed: &[u8]) -> io::Result<Vec<u8>> {
Ok(buffer)
}

impl SmartModuleWasmCompressed {
/// returns the gzip-compressed WASM module bytes
pub fn to_gzip(&mut self) -> io::Result<()> {
if let Self::Raw(raw) = self {
*self = Self::Gzip(zip(raw.as_ref())?);
}
Ok(())
}

/// returns the raw WASM module bytes
pub fn to_raw(&mut self) -> io::Result<()> {
if let Self::Gzip(gzipped) = self {
*self = Self::Raw(unzip(gzipped)?);
}
Ok(())
}

/// get the raw bytes of the WASM module
pub fn get_raw(&self) -> io::Result<Cow<[u8]>> {
Ok(match self {
Self::Raw(raw) => Cow::Borrowed(raw),
Self::Gzip(gzipped) => Cow::Owned(unzip(gzipped.as_ref())?),
})
}

/// consume and get the raw bytes of the WASM module
pub fn into_raw(self) -> io::Result<Vec<u8>> {
Ok(match self {
Self::Raw(raw) => raw,
Self::Gzip(gzipped) => unzip(gzipped.as_ref())?,
})
}
}

impl Default for SmartModuleWasmCompressed {
fn default() -> Self {
Self::Raw(Vec::new())
}
}

impl Debug for SmartModuleWasmCompressed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Raw(bytes) => f
.debug_tuple("Raw")
.field(&format!("{} bytes", bytes.len()))
.finish(),
Self::Gzip(bytes) => f
.debug_tuple("Gzip")
.field(&format!("{} bytes", bytes.len()))
.finish(),
}
}
}

#[cfg(test)]
mod tests {

Expand All @@ -223,49 +193,11 @@ mod tests {
let bytes = vec![0x01];
let mut value: SmartModuleKind = Default::default();
value
.decode(&mut std::io::Cursor::new(bytes), 0)
.decode(&mut io::Cursor::new(bytes), 0)
.expect("should decode");
assert!(matches!(value, SmartModuleKind::Map));
}

#[test]
fn test_encode_smartmodulewasm() {
let mut dest = Vec::new();
let value: SmartModuleWasmCompressed =
SmartModuleWasmCompressed::Raw(vec![0xde, 0xad, 0xbe, 0xef]);
value.encode(&mut dest, 0).expect("should encode");
println!("{:02x?}", &dest);
assert_eq!(dest.len(), 9);
assert_eq!(dest[0], 0x00);
assert_eq!(dest[1], 0x00);
assert_eq!(dest[2], 0x00);
assert_eq!(dest[3], 0x00);
assert_eq!(dest[4], 0x04);
assert_eq!(dest[5], 0xde);
assert_eq!(dest[6], 0xad);
assert_eq!(dest[7], 0xbe);
assert_eq!(dest[8], 0xef);
}

#[test]
fn test_decode_smartmodulewasm() {
let bytes = vec![0x00, 0x00, 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef];
let mut value: SmartModuleWasmCompressed = Default::default();
value
.decode(&mut std::io::Cursor::new(bytes), 0)
.expect("should decode");
let inner = match value {
SmartModuleWasmCompressed::Raw(inner) => inner,
#[allow(unreachable_patterns)]
_ => panic!("should decode to SmartModuleWasm::Raw"),
};
assert_eq!(inner.len(), 4);
assert_eq!(inner[0], 0xde);
assert_eq!(inner[1], 0xad);
assert_eq!(inner[2], 0xbe);
assert_eq!(inner[3], 0xef);
}

#[test]
fn test_gzip_smartmoduleinvocationwasm() {
let bytes = vec![0xde, 0xad, 0xbe, 0xef];
Expand Down
Loading