Skip to content

add vrl #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Debug",
"program": "${workspaceFolder}/runrunrun/",
"args": [],
"cwd": "${workspaceFolder}",
}
]
}
151 changes: 151 additions & 0 deletions adding-machine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,155 @@ edition = "2021"
name = "addingmachine"
crate-type = ["cdylib"]


[dependencies]
libc = "0.2"
regex = "1.5"
lazy_static = "1.4.0"

serde_json = "1.0.87"

value = { git = "https://github.com/vectordotdev/vector", default-features = false }
vrl = { git = "https://github.com/vectordotdev/vector", default-features = false }
vrl-diagnostic = { git = "https://github.com/vectordotdev/vector", package = "vrl-diagnostic" }

# serde stuff here
serde = { version = "1.0", features = ["derive"] }
gloo-utils = { version = "0.1", features = ["serde"] }
serde-wasm-bindgen = "0.4"


[dependencies.stdlib]
package = "vrl-stdlib"
git = "https://github.com/vectordotdev/vector"
default-features = false
features = [
"append",
"array",
"assert",
"assert_eq",
"ceil",
"chunks",
"compact",
"contains",
"decode_base64",
"decode_percent",
"del",
"downcase",
"encode_base64",
"encode_json",
"encode_key_value",
"encode_logfmt",
"encode_percent",
"ends_with",
"exists",
"filter",
"find",
"flatten",
"float",
"floor",
"for_each",
"format_int",
"format_number",
"format_timestamp",
"get",
"get_env_var",
"includes",
"ip_aton",
"ip_cidr_contains",
"ip_ntoa",
"ip_ntop",
"ip_pton",
"ip_subnet",
"ip_to_ipv6",
"ipv6_to_ipv4",
"is_array",
"is_boolean",
"is_empty",
"is_float",
"is_integer",
"is_ipv4",
"is_ipv6",
"is_json",
"is_null",
"is_nullish",
"is_object",
"is_regex",
"is_string",
"is_timestamp",
"join",
"keys",
"length",
"map_keys",
"map_values",
"match",
"match_any",
"match_array",
"match_datadog_query",
"md5",
"merge",
"mod",
"now",
"object",
"parse_apache_log",
"parse_aws_alb_log",
"parse_aws_cloudwatch_log_subscription_message",
"parse_aws_vpc_flow_log",
"parse_common_log",
"parse_csv",
"parse_duration",
"parse_glog",
"parse_int",
"parse_json",
"parse_key_value",
"parse_klog",
"parse_linux_authorization",
"parse_logfmt",
"parse_nginx_log",
"parse_query_string",
"parse_regex",
"parse_regex_all",
"parse_ruby_hash",
"parse_syslog",
"parse_timestamp",
"parse_tokens",
"parse_url",
"parse_user_agent",
"parse_xml",
"push",
"redact",
"remove",
"replace",
"round",
"set",
"sha1",
"sha2",
"sha3",
"slice",
"split",
"starts_with",
"string",
"strip_ansi_escape_codes",
"strip_whitespace",
"strlen",
"tally",
"tally_value",
"tag_types_externally",
"timestamp",
"to_bool",
"to_float",
"to_int",
"to_regex",
"to_string",
"to_syslog_facility",
"to_syslog_level",
"to_syslog_severity",
"to_timestamp",
"to_unix_timestamp",
"truncate",
"type_def",
"unique",
"unnest",
"upcase",
"values"
]
214 changes: 214 additions & 0 deletions adding-machine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,223 @@
// this is going to be running in wasmtime
// since this will be compiled to wasm32


// this is the guest rust program
extern crate alloc;
use std::slice;
use std::mem::MaybeUninit;
use alloc::vec::Vec;

use ::value::Value;
use value::Secrets;
use vrl::diagnostic::DiagnosticList;
use vrl::state::TypeState;
use vrl::{diagnostic::Formatter, prelude::BTreeMap, CompileConfig, Runtime};
use vrl::{TargetValue, Terminate, TimeZone};


// maybe not needed but will be the the formatted output glue
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
pub struct Input {
pub program: String,
pub event: Value,
}

impl Input {
pub fn new(program: &str, event: Value) -> Self {
Self {
program: program.to_owned(),
event,
}
}
}

// The module returns the result of the last expression and the event that results from the
// applied program
#[derive(Deserialize, Serialize)]
pub struct VrlCompileResult {
pub output: Value,
pub result: Value,
}

impl VrlCompileResult {
fn new(output: Value, result: Value) -> Self {
Self { output, result }
}
}

#[derive(Deserialize, Serialize, Default)]
pub struct VrlDiagnosticResult {
pub list: Vec<String>,
pub msg: String,
pub msg_colorized: String,
}

impl VrlDiagnosticResult {
fn new(program: &str, diagnostic_list: DiagnosticList) -> Self {
Self {
list: diagnostic_list
.clone()
.into_iter()
.map(|diag| String::from(diag.message()))
.collect(),
msg: Formatter::new(program, diagnostic_list.clone()).to_string(),
msg_colorized: Formatter::new(program, diagnostic_list)
.colored()
.to_string(),
}
}

fn new_runtime_error(program: &str, terminate: Terminate) -> Self {
Self {
list: Vec::with_capacity(1),
msg: Formatter::new(program, terminate.clone().get_expression_error()).to_string(),
msg_colorized: Formatter::new(program, terminate.get_expression_error())
.colored()
.to_string(),
}
}
}

fn compile(mut input: Input) -> Result<VrlCompileResult, VrlDiagnosticResult> {
let event = &mut input.event;
let functions = stdlib::all();
let state = TypeState::default();
let mut runtime = Runtime::default();
let config = CompileConfig::default();
let timezone = TimeZone::default();

let mut target_value = TargetValue {
value: event.clone(),
metadata: Value::Object(BTreeMap::new()),
secrets: Secrets::new(),
};

let program = match vrl::compile_with_state(&input.program, &functions, &state, config) {
Ok(program) => program,
Err(diagnostics) => return Err(VrlDiagnosticResult::new(&input.program, diagnostics)),
};

match runtime.resolve(&mut target_value, &program.program, &timezone) {
Ok(result) => Ok(VrlCompileResult::new(result, target_value.value)),
Err(err) => Err(VrlDiagnosticResult::new_runtime_error(&input.program, err)),
}
}

#[cfg_attr(all(target_arch = "wasm32"), export_name = "run_vrl_wasm")]
#[no_mangle]
pub unsafe extern "C" fn run_vrl(ptr: u32, len: u32) -> u32 {
Copy link
Owner

@scottopell scottopell Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The step that is currently being slow is the module instantiation Module::from_file.

If you remove this run_vrl function and compile new wasm, then module instantiation becomes very quick.

I'm not quite sure why, but without this function all of the VRL bits become dead code and will be optimized away, and from a brief look at a profile it seems to spend most of its time actually compiling (eg, I see lots of calls to regalloc2::compute_liveness).

I'm not sure why I don't see similar slowness when instantiating the module in the cgo-rust project, so maybe a starting point would be to use the wasm bytes from the cgo-rust project and try to load them with the Rust api?

Both wasm bytes contain VRL and are compiled using wasmtime, so I'd expect them to be very similar

let incoming = &ptr_to_string(ptr, len);

let input: Input = serde_json::from_str(incoming).unwrap();

match compile(input) {
Ok(res) => {
let res_json_str = serde_json::to_value(res).unwrap().to_string();

store_string_at_ptr(&res_json_str, ptr);
res_json_str.len() as u32
},
Err(err) => {
let err_json_str = serde_json::to_value(err).unwrap().to_string();
store_string_at_ptr(&err_json_str, ptr);
err_json_str.len() as u32
}
}
}

#[cfg_attr(all(target_arch = "wasm32"), export_name = "add_wasm")]
#[no_mangle]
pub fn add(left: usize, right: usize) -> usize {
left + right
}

#[cfg_attr(all(target_arch = "wasm32"), export_name = "echo_string")]
#[no_mangle]
pub unsafe extern "C" fn echo_string(ptr: u32, len: u32) -> u32 {
ptr + len
}

// this will read a string that was written
// by the runrunrun/src/main.rs file
// returns the length of the string read
#[cfg_attr(all(target_arch = "wasm32"), export_name = "read_string")]
#[no_mangle]
pub unsafe extern "C" fn read_string(ptr: u32, len: u32) -> u32 {
let incoming_string = &ptr_to_string(ptr, len);
incoming_string.len() as u32
}

#[cfg_attr(all(target_arch = "wasm32"), export_name = "return_string")]
#[no_mangle]
pub unsafe extern "C" fn return_string(ptr: u32, len: u32) -> u32 {
let incoming_string = &ptr_to_string(ptr, len);
let new_string = format!("Incoming: {incoming_string}\nThis string was written from adding-machine/src/lib.rs");
store_string_at_ptr(&new_string, ptr);

new_string.len() as u32
}

// WASM Memory-related helper functinos
//
// TODO explore using lol_alloc instead of default rust allocator
/// WebAssembly export that allocates a pointer (linear memory offset) that can
/// be used for a string.
///
/// This is an ownership transfer, which means the caller must call
/// [`deallocate`] when finished.
#[cfg_attr(all(target_arch = "wasm32"), export_name = "allocate")]
#[no_mangle]
pub extern "C" fn _allocate(size: u32) -> *mut u8 {
allocate(size as usize)
}

/// Allocates size bytes and leaks the pointer where they start.
fn allocate(size: usize) -> *mut u8 {
// Allocate the amount of bytes needed.
let vec: Vec<MaybeUninit<u8>> = Vec::with_capacity(size);

// into_raw leaks the memory to the caller.
Box::into_raw(vec.into_boxed_slice()) as *mut u8
}

/// WebAssembly export that deallocates a pointer of the given size (linear
/// memory offset, byteCount) allocated by [`allocate`].
#[cfg_attr(all(target_arch = "wasm32"), export_name = "deallocate")]
#[no_mangle]
pub unsafe extern "C" fn _deallocate(ptr: u32, size: u32) {
deallocate(ptr as *mut u8, size as usize);
}

/// Retakes the pointer which allows its memory to be freed.
unsafe fn deallocate(ptr: *mut u8, size: usize) {
// TODO - should this be Box::from_raw? (see Box::into_raw docs)
let _ = Vec::from_raw_parts(ptr, 0, size);
}



// WASM String-related helper functions
/// Returns a string from WebAssembly compatible numeric types representing
/// its pointer and length.
unsafe fn ptr_to_string(ptr: u32, len: u32) -> String {
let slice = slice::from_raw_parts_mut(ptr as *mut u8, len as usize);
let utf8 = std::str::from_utf8_unchecked_mut(slice);
return String::from(utf8);
}


/// Stores the given string 's' at the memory location pointed to by 'ptr'
/// This assumes no buffer overflows - here be dragons.
unsafe fn store_string_at_ptr(s: &str, ptr: u32) {
// Create a mutable slice of u8 pointing at the buffer given as 'ptr'
// with a length of the string we're about to copy into it
let dest = slice::from_raw_parts_mut(ptr as *mut u8, s.len() as usize);
dest.copy_from_slice(s.as_bytes());
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading