Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(log-fetcher): binary fields that end with \n are not parsed correctly #213

Merged
merged 3 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 141 additions & 5 deletions rs/log-fetcher/src/journald_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,81 @@ pub struct JournalEntry {
pub fields: Vec<(String, JournalField)>,
}

pub fn parse_journal_entries(body: &[u8]) -> Vec<JournalEntry> {
#[derive(Debug, Clone)]
enum LineStatus {
NotStarted,
Started,
Utf8,
Binary,
}

pub fn parse_journal_entries_new(body: &[u8]) -> Vec<JournalEntry> {
let mut entries = Vec::new();
let mut current_entry = Vec::new();
let mut current_line = Vec::new();

let mut first_found = LineStatus::NotStarted;

let mut iter = body.iter();
while let Some(byte) = iter.next() {
NikolaMilosa marked this conversation as resolved.
Show resolved Hide resolved
match (byte, first_found.clone()) {
(b'=', LineStatus::Started) => {
current_line.push(*byte);
first_found = LineStatus::Utf8;
}
(b'\n', LineStatus::Started) => {
current_entry.push(current_line.clone());
current_line.clear();
let mut next = vec![];
for _ in 0..8 {
let current = iter.next().unwrap();
next.push(*current);
current_line.push(*current)
}

let to_take =
i64::from_le_bytes([next[0], next[1], next[2], next[3], next[4], next[5], next[6], next[7]]);
for _ in 0..to_take {
current_line.push(*iter.next().unwrap())
}
// To remove the added '\n' by format
iter.next();
current_entry.push(current_line.clone());
current_line.clear();
first_found = LineStatus::NotStarted;
}
(b'\n', LineStatus::Utf8) => {
current_entry.push(current_line.clone());
current_line.clear();
first_found = LineStatus::NotStarted;
}
(b'\n', LineStatus::NotStarted) => {
if let Some(entry) = parse_journal_entry(current_entry.as_slice()) {
entries.push(entry);
}
current_entry.clear();
current_line.clear();
first_found = LineStatus::NotStarted;
}
(_, LineStatus::Started) | (_, LineStatus::Utf8) => current_line.push(*byte),
(_, LineStatus::NotStarted) => {
current_line.push(*byte);
first_found = LineStatus::Started;
}
(a, b) => unreachable!("Shouldn't happen: {}, {:?}", a, b),
}
}
// Check if there's an entry at the end of the body
if !current_entry.is_empty() {
if let Some(entry) = parse_journal_entry(&current_entry) {
entries.push(entry);
}
}

entries
}

pub fn _parse_journal_entries(body: &[u8]) -> Vec<JournalEntry> {
let mut entries = Vec::new();
let mut current_entry = Vec::new();
let lines: Vec<_> = body.split(|&c| c == b'\n').collect();
Expand Down Expand Up @@ -140,7 +214,7 @@ mod tests {
fn test_parse_journal_entries() {
// Test case with two entries
let body = b"field1=value1\nfield2=value2\n\nfield3=value3\nfield4=\n";
let entries = parse_journal_entries(body);
let entries = parse_journal_entries_new(body);
assert_eq!(entries.len(), 2);

// Verify the first entry
Expand Down Expand Up @@ -178,7 +252,7 @@ mod tests {
serialize_string_field("MESSAGE", "foo\nbar", &mut serialized_data).unwrap();
body.extend(serialized_data.clone());

let entries = parse_journal_entries(&body);
let entries = parse_journal_entries_new(&body);
assert_eq!(entries.len(), 1);

// Verify the entry with binary data
Expand Down Expand Up @@ -211,7 +285,7 @@ mod tests {
serialize_string_field("MESSAGE", "foo\nbar", &mut serialized_data).unwrap();
body.extend(serialized_data);

let entries = parse_journal_entries(&body);
let entries = parse_journal_entries_new(&body);
assert_eq!(entries.len(), 1);

// Verify the entry with binary data
Expand Down Expand Up @@ -256,11 +330,73 @@ mod tests {
);
}

#[test]
fn test_parse_journal_entries_binary_field_with_newline_end() {
// Test case with binary data

let mut body = vec![];
let mut serialized_data = Vec::new();
body.extend(b"__CURSOR=s=bcce4fb8ffcb40e9a6e05eee8b7831bf;i=5ef603;b=ec25d6795f0645619ddac9afdef453ee;m=545242e7049;t=50f1202\n");
body.extend(b"__REALTIME_TIMESTAMP=1423944916375353\n");
body.extend(b"_SYSTEMD_OWNER_UID=1001\n");
serialize_string_field("OTHER_BIN", "some random data\nbar\n", &mut serialized_data).unwrap();
body.extend(serialized_data.clone());
body.extend(b"_AUDIT_LOGINUID=1001\n");
body.extend(b"SYSLOG_IDENTIFIER=python3\n");
serialized_data.clear();
serialize_string_field("MESSAGE", "foo\nbar", &mut serialized_data).unwrap();
body.extend(serialized_data);

let entries = parse_journal_entries_new(&body);
assert_eq!(entries.len(), 1);

// Verify the entry with binary data
let entry = &entries[0];
assert_eq!(entry.fields.len(), 7);
assert_eq!(
entry.fields[0],
("__CURSOR".to_string(), JournalField::Utf8("s=bcce4fb8ffcb40e9a6e05eee8b7831bf;i=5ef603;b=ec25d6795f0645619ddac9afdef453ee;m=545242e7049;t=50f1202".to_string()))
);
assert_eq!(
entry.fields[1],
(
"__REALTIME_TIMESTAMP".to_string(),
JournalField::Utf8("1423944916375353".to_string())
)
);
assert_eq!(
entry.fields[2],
("_SYSTEMD_OWNER_UID".to_string(), JournalField::Utf8("1001".to_string()))
);
assert_eq!(
entry.fields[3],
(
"OTHER_BIN".to_string(),
JournalField::Binary("some random data\nbar\n".to_string())
)
);
assert_eq!(
entry.fields[4],
("_AUDIT_LOGINUID".to_string(), JournalField::Utf8("1001".to_string()))
);
assert_eq!(
entry.fields[5],
(
"SYSLOG_IDENTIFIER".to_string(),
JournalField::Utf8("python3".to_string())
)
);
assert_eq!(
entry.fields[6],
("MESSAGE".to_string(), JournalField::Binary("foo\nbar".to_string()))
);
}

#[test]
fn test_parse_journal_entries_empty() {
// Test case with empty body
let body = b"";
let entries = parse_journal_entries(body);
let entries = parse_journal_entries_new(body);
assert_eq!(entries.len(), 0);
}
}
8 changes: 4 additions & 4 deletions rs/log-fetcher/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::BTreeMap,
fs::{self, File},
io::{Read, Write},
path::PathBuf,
Expand All @@ -15,7 +15,7 @@ use tokio::select;
use tokio::sync::mpsc;
use url::Url;

use crate::journald_parser::{parse_journal_entries, JournalField};
use crate::journald_parser::{parse_journal_entries_new, JournalField};

mod journald_parser;

Expand Down Expand Up @@ -95,10 +95,10 @@ async fn main() -> Result<(), anyhow::Error> {
}
};

let entries = parse_journal_entries(&body);
let entries = parse_journal_entries_new(&body);

for entry in &entries {
let map: HashMap<String, String> = entry
let map: BTreeMap<String, String> = entry
.fields
.iter()
.map(|(name, val)| match val {
Expand Down
Loading