From 9928e65f7f81d2a634e7ae8190d8afc8afa03346 Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Fri, 9 Jul 2021 16:45:58 -0400 Subject: [PATCH] Rename to SmartStreamFilter, add back SmartStreamEngine --- src/spu/src/services/public/stream_fetch.rs | 16 +++++++++------- src/spu/src/smart_stream/filter.rs | 4 ++-- src/spu/src/smart_stream/mod.rs | 5 ++++- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/spu/src/services/public/stream_fetch.rs b/src/spu/src/services/public/stream_fetch.rs index 698acd40d64..b4e809924a2 100644 --- a/src/spu/src/services/public/stream_fetch.rs +++ b/src/spu/src/services/public/stream_fetch.rs @@ -26,7 +26,8 @@ use crate::core::DefaultSharedGlobalContext; use crate::replication::leader::SharedFileLeaderState; use publishers::INIT_OFFSET; use wasmtime::{Engine, Module}; -use crate::smart_stream::filter::SmartFilter; +use crate::smart_stream::filter::SmartStreamFilter; +use crate::smart_stream::SmartStreamEngine; /// Fetch records as stream pub struct StreamFetchHandler { @@ -41,7 +42,7 @@ pub struct StreamFetchHandler { consumer_offset_listener: OffsetChangeListener, leader_state: SharedFileLeaderState, stream_id: u32, - sm_engine: Engine, + sm_engine: SmartStreamEngine, sm_bytes: Vec, } @@ -97,7 +98,7 @@ impl StreamFetchHandler { consumer_offset_listener: offset_listener, stream_id, leader_state: leader_state.clone(), - sm_engine: Engine::default(), + sm_engine: SmartStreamEngine::default(), sm_bytes, max_fetch_bytes, }; @@ -150,7 +151,7 @@ impl StreamFetchHandler { // and can't be send across Send let module = if !self.sm_bytes.is_empty() { Some( - Module::from_binary(&self.sm_engine, &self.sm_bytes).map_err( + Module::from_binary(&self.sm_engine.0, &self.sm_bytes).map_err( |err| -> FlvSocketError { FlvSocketError::IoError(IoError::new( ErrorKind::Other, @@ -311,9 +312,10 @@ impl StreamFetchHandler { debug!("creating smart filter"); let filter_batch = { - let mut filter = SmartFilter::new(&self.sm_engine, module).map_err(|err| { - IoError::new(ErrorKind::Other, format!("creating filter {}", err)) - })?; + let mut filter = + SmartStreamFilter::new(&self.sm_engine.0, module).map_err(|err| { + IoError::new(ErrorKind::Other, format!("creating filter {}", err)) + })?; let records = &file_partition_response.records; diff --git a/src/spu/src/smart_stream/filter.rs b/src/spu/src/smart_stream/filter.rs index c2bc095bc18..6db9326a96f 100644 --- a/src/spu/src/smart_stream/filter.rs +++ b/src/spu/src/smart_stream/filter.rs @@ -16,14 +16,14 @@ use crate::smart_stream::file_batch::FileBatchIterator; const FILTER_FN_NAME: &str = "filter"; type FilterFn = TypedFunc<(i32, i32), i32>; -pub struct SmartFilter { +pub struct SmartStreamFilter { store: Store<()>, instance: Instance, filter_fn: FilterFn, records_cb: Arc, } -impl SmartFilter { +impl SmartStreamFilter { pub fn new(engine: &Engine, module: &Module) -> Result { let mut store = Store::new(engine, ()); let cb = Arc::new(RecordsCallBack::new()); diff --git a/src/spu/src/smart_stream/mod.rs b/src/spu/src/smart_stream/mod.rs index a8841640595..abc4c4b53ca 100644 --- a/src/spu/src/smart_stream/mod.rs +++ b/src/spu/src/smart_stream/mod.rs @@ -1,11 +1,14 @@ use std::sync::Mutex; use anyhow::Result; -use wasmtime::{Memory, Store}; +use wasmtime::{Memory, Store, Engine}; mod memory; pub mod filter; pub mod file_batch; +#[derive(Debug, Default)] +pub struct SmartStreamEngine(pub(crate) Engine); + #[derive(Clone)] pub struct RecordsMemory { ptr: i32,