Skip to content

Commit

Permalink
perf(index): lightweight structure (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmdoret authored Aug 27, 2024
1 parent c294162 commit 01418d2
Show file tree
Hide file tree
Showing 15 changed files with 407 additions and 116 deletions.
18 changes: 15 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
rio_api = '0.8.4'
rio_turtle = '0.8.4'
rstest = '0.21.0'
serde_json = "1.0.127"
serde_yml = '0.0.10'
slog = '2.7.0'
slog-async = '2.8.0'
slog-term = '2.9.0'
smallvec = { version = "1.13.2", features = ["serde"] }
tempfile = '3.10.1'

[dependencies.clap]
Expand Down
10 changes: 9 additions & 1 deletion docs/development-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ just test

## Build the Package & Image

To build the package with Nix run:
To build the package with Nix run:b

```shell
just nix-package
Expand Down Expand Up @@ -110,3 +110,11 @@ It will:
**Note: If the release pipeline fails, you can just run this same command again.
Also rerun it when you made a mistake, it will cancel the current release (works
also when `--amend`ing on the current commit)**

## Benchmarking performances

A benchmarking script is provided in `tools/bench/benchmark.sh`, along with a nix devshell. To run the benchmark in the isolated environment, run:

```shell
just nix-develop-bench bash ./tools/bench/benchmark.sh
```
7 changes: 7 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ nix-develop-ci *args:
{ [ -n "${cmd:-}" ] || cmd=("zsh"); } && \
nix develop ./tools/nix#ci --command "${cmd[@]}"

# Enter nix development shell for benchmarking.
nix-develop-bench *args:
cd "{{root_dir}}" && \
cmd=("$@") && \
{ [ -n "${cmd:-}" ] || cmd=("zsh"); } && \
nix develop ./tools/nix#bench --command "${cmd[@]}"

## Standard stuff =============================================================
# Format the code.
format *args:
Expand Down
113 changes: 104 additions & 9 deletions src/index.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,133 @@
use rio_api::parser::TriplesParser;
use rio_turtle::TurtleError;
use std::{io::Write, path::Path};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::{
collections::HashMap,
hash::{DefaultHasher, Hash, Hasher},
path::Path,
};

use crate::{
io,
rdf_types::{Triple, TripleView},
};

fn index_triple(t: Triple, out: &mut impl Write) {
/// Stores a mapping from hashed instance uri to their types.
/// The type URIs are stored once as a vector of strings.
/// Each subject in map is stored as hash(subject_uri): u64
/// and refers to its types using their vector index.
#[derive(Serialize, Deserialize)]
pub struct TypeIndex {
pub types: Vec<String>,
map: HashMap<u64, SmallVec<[usize; 1]>>,
}

impl TypeIndex {
fn hash(&self, s: &impl Hash) -> u64 {
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish().to_le()
}

pub fn from_iter<'a>(type_map: impl Iterator<Item = (&'a str, &'a str)>) -> Self {
let mut idx = TypeIndex::new();

type_map.for_each(|(subject_uri, type_uri)| idx.insert(subject_uri, type_uri).unwrap());

return idx;
}

pub fn new() -> Self {
TypeIndex {
types: Vec::new(),
map: HashMap::new(),
}
}

// Insert input subject-type mapping into the index.
// The index will store the hash of the subject.
pub fn insert(&mut self, subject_uri: &str, type_uri: &str) -> Result<(), std::io::Error> {
let key = self.hash(&subject_uri.to_string());
let type_idx: usize;

// Get type index or add a new one.
if self.types.contains(&type_uri.to_string()) {
type_idx = self.types.iter().position(|x| *x == type_uri).unwrap();
} else {
type_idx = self.types.len();
self.types.push(type_uri.to_string());
}
// Insert mapping into the index.
match self.map.get_mut(&key) {
Some(v) => {
v.push(type_idx);
}
None => {
self.map.insert(key, smallvec![type_idx]);
}
}

Ok(())
}

pub fn get(&self, subject_key: &str) -> Option<Vec<&str>> {
let key = self.hash(&subject_key.to_string());
self.map
.get(&key)
.map(|v| v.iter().map(|i| self.types[*i].as_ref()).collect())
}
}

fn index_triple(t: Triple, index: &mut TypeIndex) {
if t.predicate.iri.as_str() == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" {
let r = || -> std::io::Result<()> {
out.write_all(t.to_string().as_bytes())?;
out.write_all(b" .\n")
}();
let r = { index.insert(&t.subject.to_string(), &t.object.to_string()) };

if let Err(e) = r {
panic!("Error writting to out buffer: {e}");
}
}
}

pub fn create_type_map(input: &Path, output: &Path) {
pub fn create_type_index(input: &Path, output: &Path) {
let buf_in = io::get_reader(input);
let mut buf_out = io::get_writer(output);
let buf_out = io::get_writer(output);
let mut triples = io::parse_ntriples(buf_in);
let mut index = TypeIndex::new();

while !triples.is_end() {
let _ = triples
.parse_step(&mut |t: TripleView| {
index_triple(t.into(), &mut buf_out);
index_triple(t.into(), &mut index);
Result::<(), TurtleError>::Ok(())
})
.inspect_err(|e| {
panic!("Parsing error occured: {e}");
});
}
let _ = serde_json::to_writer(buf_out, &index);
}

#[cfg(test)]
mod tests {
use super::*;
#[test]
// Test the parsing of a triple.
fn index_from_iter() {
let vals = vec![
("<urn:Alice>", "<urn:Person>"),
("<urn:Alice>", "<urn:Employee>"),
("<urn:ACME>", "<urn:Organization>"),
]
.into_iter()
.map(|(a, b)| (a, b));

let idx = TypeIndex::from_iter(vals);

assert_eq!(
idx.get("<urn:Alice>").unwrap(),
vec!["<urn:Person>", "<urn:Employee>"]
);
println!("{}", serde_json::to_string(&idx).unwrap());
}
}
12 changes: 10 additions & 2 deletions src/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::rules::Rules;
use crate::{index::TypeIndex, rules::Rules};
use rio_turtle::NTriplesParser;
use std::{
fs::File,
Expand Down Expand Up @@ -46,7 +46,15 @@ pub fn parse_ntriples(reader: impl BufRead) -> NTriplesParser<impl BufRead> {
pub fn parse_rules(path: &Path) -> Rules {
return match File::open(path) {
Ok(file) => serde_yml::from_reader(file).expect("Error parsing rules file."),
Err(e) => panic!("Cannot open file '{:?}': '{}'.", path, e),
Err(e) => panic!("Cannot open rules file '{:?}': '{}'.", path, e),
};
}

// Parse yaml type index
pub fn parse_index(path: &Path) -> TypeIndex {
return match File::open(path) {
Ok(file) => serde_json::from_reader(file).expect("Error parsing index file."),
Err(e) => panic!("Cannot open index file '{:?}': '{}'.", path, e),
};
}

Expand Down
2 changes: 1 addition & 1 deletion src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn create_logger(use_stdout: bool) -> Arc<Logger> {
.fuse();

let drain = slog_async::Async::new(drain)
.chan_size(5_000_000)
.chan_size(1_000)
.build()
.fuse();

Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod rules;

// Define the imports.
use crate::{
index::create_type_map,
index::create_type_index,
log::{create_logger, info},
pseudo::pseudonymize_graph,
};
Expand Down Expand Up @@ -87,7 +87,7 @@ fn main() {
match cli.command {
Subcommands::Index(args) => {
info!(log, "Args: {:?}", args);
create_type_map(&args.input, &args.output)
create_type_index(&args.input, &args.output)
}
Subcommands::Pseudo(args) => {
info!(log, "Args: {:?}", args);
Expand Down
33 changes: 7 additions & 26 deletions src/pseudo.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use rio_api::parser::TriplesParser;
use rio_turtle::TurtleError;
use std::{
collections::HashMap,
io::{BufRead, Write},
io::Write,
path::{Path, PathBuf},
};

use crate::{
crypto::{new_pseudonymizer, Pseudonymize},
index::TypeIndex,
io,
log::Logger,
rdf_types::*,
Expand All @@ -19,7 +19,7 @@ use crate::{
fn process_triple(
triple: Triple,
rules_config: &Rules,
node_to_type: &HashMap<String, String>,
node_to_type: &mut TypeIndex,
out: &mut impl Write,
hasher: &dyn Pseudonymize,
) {
Expand All @@ -35,24 +35,6 @@ fn process_triple(
}
}

// Create a index mapping node -> type from an input ntriples buffer
fn load_type_map(input: impl BufRead) -> HashMap<String, String> {
let mut node_to_type: HashMap<String, String> = HashMap::new();
let mut triples = io::parse_ntriples(input);

while !triples.is_end() {
let _: Result<(), TurtleError> = triples.parse_step(&mut |t| {
node_to_type.insert(
t.subject.to_string().replace(['<', '>'], ""),
t.object.to_string().replace(['<', '>'], ""),
);
Ok(())
});
}

return node_to_type;
}

pub fn pseudonymize_graph(
_: &Logger,
input: &Path,
Expand All @@ -62,11 +44,10 @@ pub fn pseudonymize_graph(
secret_path: &Option<PathBuf>,
) {
let buf_input = io::get_reader(input);
let buf_index = io::get_reader(index_path);
let mut buf_output = io::get_writer(output);

let rules = io::parse_rules(rules_path);
let node_to_type: HashMap<String, String> = load_type_map(buf_index);
let mut type_index = io::parse_index(index_path);

let secret = secret_path.as_ref().map(io::read_bytes);
let pseudonymizer = new_pseudonymizer(None, secret);
Expand All @@ -80,7 +61,7 @@ pub fn pseudonymize_graph(
process_triple(
t.into(),
&rules,
&node_to_type,
&mut type_index,
&mut buf_output,
&pseudonymizer,
);
Expand All @@ -102,14 +83,14 @@ mod tests {

#[test]
// Test the parsing of a triple.
fn encrypt_nt_file() {
fn pseudo_nt_file() {
let logger = log::create_logger(true);

let dir = tempdir().unwrap();
let input_path = Path::new("tests/data/test.nt");
let rules_path = Path::new("tests/data/rules.yaml");
let output_path = dir.path().join("output.nt");
let type_map_path = Path::new("tests/data/type_map.nt");
let type_map_path = Path::new("tests/data/type_index.json");
let key = None;
pseudonymize_graph(
&logger,
Expand Down
Loading

0 comments on commit 01418d2

Please sign in to comment.