Skip to content

Commit

Permalink
perf: improve performance of filter_period_intersect (#436)
Browse files Browse the repository at this point in the history
* perf: improve performance of filter_period_intersect

* forgot some files

* fix lint errors
  • Loading branch information
cjc7373 committed Nov 11, 2023
1 parent b6dac17 commit 55cad4a
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 24 deletions.
2 changes: 1 addition & 1 deletion aw-query/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ mod qfunctions {
let events: Vec<Event> = (&args[0]).try_into()?;
let filter_events: Vec<Event> = (&args[1]).try_into()?;

let mut filtered_events = aw_transform::filter_period_intersect(&events, &filter_events);
let mut filtered_events = aw_transform::filter_period_intersect(events, filter_events);
let mut filtered_tagged_events = Vec::new();
for event in filtered_events.drain(..) {
filtered_tagged_events.push(DataType::Event(event));
Expand Down
2 changes: 1 addition & 1 deletion aw-transform/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn bench_filter_period_intersect(c: &mut Criterion) {
c.bench_function("1000 events", |b| {
b.iter(|| {
let events1 = create_events(1000);
filter_period_intersect(&events1, &events2);
filter_period_intersect(events1, events2.clone());
})
});
}
Expand Down
105 changes: 83 additions & 22 deletions aw-transform/src/filter_period.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use aw_models::Event;
use chrono::{DateTime, Utc};
use chrono::Duration;

use crate::sort_by_timestamp;

/// Removes events not intersecting with the provided filter_events
///
Expand All @@ -15,33 +17,55 @@ use chrono::{DateTime, Utc};
/// filter_events: [ ] [ ]
/// output: [a ] [a ][b ]
/// ```
pub fn filter_period_intersect(events: &[Event], filter_events: &[Event]) -> Vec<Event> {
pub fn filter_period_intersect(events: Vec<Event>, filter_events: Vec<Event>) -> Vec<Event> {
if events.len() == 0 || filter_events.len() == 0 {
return Vec::new();
}

let mut filtered_events = Vec::new();
let events = sort_by_timestamp(events);
let filter_events = sort_by_timestamp(filter_events);

// Start with pre-calculating endtimes of events
let mut events_with_endtimes: Vec<(&Event, DateTime<Utc>)> = Vec::new();
for event in events {
events_with_endtimes.push((event, event.calculate_endtime()));
}
let mut events_iter = events.into_iter();
let mut filter_events_iter = filter_events.into_iter();
let mut cur_event = events_iter.next().unwrap();
let mut cur_filter_event = filter_events_iter.next().unwrap();

// Do actual filtering
for filter in filter_events {
let filter_endtime = filter.calculate_endtime();
for (event, event_endtime) in &events_with_endtimes {
if event.timestamp > filter_endtime {
continue;
loop {
let event_endtime = cur_event.calculate_endtime();
let filter_endtime = cur_filter_event.calculate_endtime();
if cur_event.duration == Duration::seconds(0) || event_endtime <= cur_filter_event.timestamp
{
match events_iter.next() {
Some(e) => {
cur_event = e;
continue;
}
None => return filtered_events,
}
if *event_endtime < filter.timestamp {
continue;
}
if cur_event.timestamp >= cur_filter_event.calculate_endtime() {
match filter_events_iter.next() {
Some(e) => {
cur_filter_event = e;
continue;
}
None => return filtered_events,
}
let mut e = (*event).clone();
e.timestamp = std::cmp::max(e.timestamp, filter.timestamp);
let endtime = std::cmp::min(*event_endtime, filter_endtime);
e.duration = endtime - e.timestamp;
filtered_events.push(e);
}

let mut e = cur_event.clone();
e.timestamp = std::cmp::max(e.timestamp, cur_filter_event.timestamp);
let endtime = std::cmp::min(event_endtime, filter_endtime);
e.duration = endtime - e.timestamp;

// trim current event
let old_timestamp = cur_event.timestamp;
cur_event.timestamp = e.timestamp + e.duration;
cur_event.duration = old_timestamp + cur_event.duration - cur_event.timestamp;

filtered_events.push(e);
}
filtered_events
}

#[cfg(test)]
Expand Down Expand Up @@ -81,7 +105,8 @@ mod tests {
data: json_map! {"test": json!(1)},
};

let filtered_events = filter_period_intersect(&vec![e1, e2, e3, e4, e5], &[filter_event]);
let filtered_events =
filter_period_intersect(vec![e1, e2, e3, e4, e5], vec![filter_event.clone()]);
assert_eq!(filtered_events.len(), 3);
assert_eq!(filtered_events[0].duration, Duration::milliseconds(500));
assert_eq!(filtered_events[1].duration, Duration::milliseconds(1000));
Expand All @@ -93,5 +118,41 @@ mod tests {
assert_eq!(filtered_events[1].timestamp, dt);
let dt: DateTime<Utc> = DateTime::from_str("2000-01-01T00:00:04.000Z").unwrap();
assert_eq!(filtered_events[2].timestamp, dt);

let timestamp_01s = DateTime::from_str("2000-01-01T00:00:01Z").unwrap();
let e = Event {
id: None,
timestamp: timestamp_01s,
duration: Duration::seconds(1),
data: json_map! {"test": json!(1)},
};
let mut f2 = filter_event.clone();
f2.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap();
f2.duration = Duration::milliseconds(1500);
let res = filter_period_intersect(vec![e.clone()], vec![f2]);
assert_eq!(res[0].timestamp, timestamp_01s);
assert_eq!(res[0].duration, Duration::milliseconds(500));

let timestamp_01_5s = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap();
let mut f3 = filter_event.clone();
f3.timestamp = timestamp_01_5s;
f3.duration = Duration::milliseconds(1000);
let res = filter_period_intersect(vec![e.clone()], vec![f3]);
assert_eq!(res[0].timestamp, timestamp_01_5s);
assert_eq!(res[0].duration, Duration::milliseconds(500));

let mut f4 = filter_event.clone();
f4.timestamp = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap();
f4.duration = Duration::milliseconds(100);
let res = filter_period_intersect(vec![e.clone()], vec![f4]);
assert_eq!(res[0].timestamp, timestamp_01_5s);
assert_eq!(res[0].duration, Duration::milliseconds(100));

let mut f5 = filter_event.clone();
f5.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap();
f5.duration = Duration::seconds(10);
let res = filter_period_intersect(vec![e.clone()], vec![f5]);
assert_eq!(res[0].timestamp, timestamp_01s);
assert_eq!(res[0].duration, Duration::milliseconds(1000));
}
}

0 comments on commit 55cad4a

Please sign in to comment.