Skip to content

Commit

Permalink
Rename to SmartStreamFilter, add back SmartStreamEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholastmosher committed Jul 9, 2021
1 parent d110111 commit 9928e65
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
16 changes: 9 additions & 7 deletions src/spu/src/services/public/stream_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::core::DefaultSharedGlobalContext;
use crate::replication::leader::SharedFileLeaderState;
use publishers::INIT_OFFSET;
use wasmtime::{Engine, Module};
use crate::smart_stream::filter::SmartFilter;
use crate::smart_stream::filter::SmartStreamFilter;
use crate::smart_stream::SmartStreamEngine;

/// Fetch records as stream
pub struct StreamFetchHandler {
Expand All @@ -41,7 +42,7 @@ pub struct StreamFetchHandler {
consumer_offset_listener: OffsetChangeListener,
leader_state: SharedFileLeaderState,
stream_id: u32,
sm_engine: Engine,
sm_engine: SmartStreamEngine,
sm_bytes: Vec<u8>,
}

Expand Down Expand Up @@ -97,7 +98,7 @@ impl StreamFetchHandler {
consumer_offset_listener: offset_listener,
stream_id,
leader_state: leader_state.clone(),
sm_engine: Engine::default(),
sm_engine: SmartStreamEngine::default(),
sm_bytes,
max_fetch_bytes,
};
Expand Down Expand Up @@ -150,7 +151,7 @@ impl StreamFetchHandler {
// and can't be send across Send
let module = if !self.sm_bytes.is_empty() {
Some(
Module::from_binary(&self.sm_engine, &self.sm_bytes).map_err(
Module::from_binary(&self.sm_engine.0, &self.sm_bytes).map_err(
|err| -> FlvSocketError {
FlvSocketError::IoError(IoError::new(
ErrorKind::Other,
Expand Down Expand Up @@ -311,9 +312,10 @@ impl StreamFetchHandler {

debug!("creating smart filter");
let filter_batch = {
let mut filter = SmartFilter::new(&self.sm_engine, module).map_err(|err| {
IoError::new(ErrorKind::Other, format!("creating filter {}", err))
})?;
let mut filter =
SmartStreamFilter::new(&self.sm_engine.0, module).map_err(|err| {
IoError::new(ErrorKind::Other, format!("creating filter {}", err))
})?;

let records = &file_partition_response.records;

Expand Down
4 changes: 2 additions & 2 deletions src/spu/src/smart_stream/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use crate::smart_stream::file_batch::FileBatchIterator;
const FILTER_FN_NAME: &str = "filter";
type FilterFn = TypedFunc<(i32, i32), i32>;

pub struct SmartFilter {
pub struct SmartStreamFilter {
store: Store<()>,
instance: Instance,
filter_fn: FilterFn,
records_cb: Arc<RecordsCallBack>,
}

impl SmartFilter {
impl SmartStreamFilter {
pub fn new(engine: &Engine, module: &Module) -> Result<Self> {
let mut store = Store::new(engine, ());
let cb = Arc::new(RecordsCallBack::new());
Expand Down
5 changes: 4 additions & 1 deletion src/spu/src/smart_stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::sync::Mutex;
use anyhow::Result;
use wasmtime::{Memory, Store};
use wasmtime::{Memory, Store, Engine};

mod memory;
pub mod filter;
pub mod file_batch;

#[derive(Debug, Default)]
pub struct SmartStreamEngine(pub(crate) Engine);

#[derive(Clone)]
pub struct RecordsMemory {
ptr: i32,
Expand Down

0 comments on commit 9928e65

Please sign in to comment.