Skip to content
This repository has been archived by the owner on Nov 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #113 from FlorentinDUBOIS/fix/router
Browse files Browse the repository at this point in the history
fix(router): implement recovery on process error
  • Loading branch information
Florentin DUBOIS committed Sep 26, 2019
2 parents 57b8ce1 + 3dc3319 commit 6183a79
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 24 deletions.
4 changes: 2 additions & 2 deletions src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::path::PathBuf;
use std::time::Duration;

use config::{Config, File};
use failure::{Error, format_err, ResultExt};
use failure::{format_err, Error, ResultExt};
use humanize_rs::bytes::{Bytes, Unit};
use humanize_rs::duration::parse;
use hyper::Uri;
Expand Down Expand Up @@ -264,7 +264,7 @@ impl TryFrom<(String, RawScraper)> for Scraper {
Err(format_err!(
"protocol is missing or incorrect, it should be one of 'http' or 'https'"
))
.with_context(|err| format!("could not parse 'url' setting, {}", err))?;
.with_context(|err| format!("could not parse 'url' setting, {}", err))?;
}

Ok(Self {
Expand Down
6 changes: 3 additions & 3 deletions src/lib/asynch/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::fs::Metadata;
use std::path::PathBuf;
use std::time::{Duration, Instant};

use failure::{Error, format_err};
use futures::{Async, Future, Poll, Stream, try_ready};
use failure::{format_err, Error};
use futures::future::{err, join_all, ok};
use futures::{try_ready, Async, Future, Poll, Stream};
use prometheus::GaugeVec;
use tokio::fs::{metadata, read_dir, remove_file};
use tokio::timer::Interval;
Expand Down Expand Up @@ -72,7 +72,7 @@ impl Stream for Scanner {
}

impl Scanner {
fn scan(path: PathBuf) -> impl Future<Item=HashMap<PathBuf, Metadata>, Error=Error> {
fn scan(path: PathBuf) -> impl Future<Item = HashMap<PathBuf, Metadata>, Error = Error> {
read_dir(path)
.map_err(|err| format_err!("{}", err))
.and_then(move |entries| {
Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::sleep;

use ctrlc;
use failure::{Error, format_err};
use prometheus::{Encoder, gather, TextEncoder};
use failure::{format_err, Error};
use prometheus::{gather, Encoder, TextEncoder};
use structopt::StructOpt;
use tokio::prelude::*;
use tokio::runtime::Builder;
use warp::{Filter, path, serve};
use warp::{path, serve, Filter};

use crate::conf::Conf;
use crate::constants::{KEEP_ALIVE_TOKIO_RUNTIME, MAX_HANDLERS_PER_REACTOR, THREAD_SLEEP};
Expand Down
64 changes: 48 additions & 16 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;

use failure::{Error, format_err};
use failure::{format_err, Error};
use futures::future::ok;
use tokio::fs::{File, rename};
use tokio::fs::remove_file;
use tokio::fs::{rename, File};
use tokio::prelude::*;
use tokio::runtime::Runtime;

use crate::conf;
use crate::lib::{add_labels, Runner};
use crate::lib::asynch::fs::Scanner;
use crate::lib::{add_labels, Runner};

#[derive(Clone, Debug)]
pub struct Router {
Expand Down Expand Up @@ -51,19 +51,35 @@ impl Runner for Router {
let executor = rt.executor();

let scanner = Scanner::from((dir, self.params.scan_period.to_owned()))
.fold(HashSet::new(), move |acc, entries| {
.fold(mutex!(HashSet::new()), move |acc, entries| {
let paths: HashSet<PathBuf> =
entries.iter().fold(HashSet::new(), |mut acc, (path, _)| {
acc.insert(path.to_owned());
acc
});

let new: Vec<PathBuf> = paths.difference(&acc).cloned().collect();
let new = {
let mut state = try_future!(acc.lock().map_err(|err| format_err!("could not get lock in router, {}", err)));
let new: Vec<PathBuf> = paths.difference(&state).cloned().collect();
let delete: Vec<PathBuf> = state.difference(&paths).cloned().collect();

for path in &new {
state.insert(path.to_owned());
}

for path in delete {
state.remove(&path);
}

new
};

for path in new {
let labels = labels.to_owned();
let sinks = sinks.to_owned();
let params = params.to_owned();
let epath = path.to_owned();
let state = acc.to_owned();

executor.spawn(
Self::load(path.to_owned())
Expand All @@ -72,11 +88,21 @@ impl Runner for Router {
.and_then(move |_| Self::remove(path))
.map_err(move |err| {
error!("could not process file in router"; "path" => epath.to_str(), "error" => err.to_string());
let mut state = match state.lock() {
Ok(state) => state,
Err(err) => {
crit!("could not lock state in router for recovery"; "path" => epath.to_str(), "error" => err.to_string());
sleep(Duration::from_millis(100)); // Sleep the time to display the message
abort();
}
};

state.remove(&epath);
}),
);
}

ok::<_, Error>(paths)
ok::<_, Error>(acc)
})
.and_then(|_| ok(()))
.map_err(|err| {
Expand All @@ -93,21 +119,23 @@ impl Runner for Router {
}

impl Router {
fn load(path: PathBuf) -> impl Future<Item=Vec<String>, Error=Error> {
fn load(path: PathBuf) -> impl Future<Item = Vec<String>, Error = Error> {
trace!("open file"; "path" => path.to_str());
File::open(path)
.map_err(|err| format_err!("{}", err))
.map_err(|err| format_err!("could not open file, {}", err))
.and_then(|mut file| {
let mut buf = String::new();
try_future!(file.read_to_string(&mut buf));
try_future!(file
.read_to_string(&mut buf)
.map_err(|err| format_err!("could not read file, {}", err)));
ok(buf.split('\n').map(String::from).collect())
})
}

fn process(
lines: &[String],
labels: &Arc<HashMap<String, String>>,
) -> impl Future<Item=Vec<String>, Error=Error> {
) -> impl Future<Item = Vec<String>, Error = Error> {
let labels: Vec<String> = labels
.to_owned()
.iter()
Expand All @@ -118,7 +146,9 @@ impl Router {
let mut body = vec![];
for line in lines {
if !line.is_empty() {
body.push(try_future!(add_labels(&line, &labels)))
body.push(try_future!(add_labels(&line, &labels).map_err(
|err| format_err!("could not add labels to time series, {}", err)
)))
}
}

Expand All @@ -129,7 +159,7 @@ impl Router {
lines: &[String],
params: &conf::Parameters,
sinks: &[conf::Sink],
) -> impl Future<Item=(), Error=Error> {
) -> impl Future<Item = (), Error = Error> {
let mut bulk = vec![];

let start = time::now_utc().to_timespec();
Expand Down Expand Up @@ -168,28 +198,30 @@ impl Router {
trace!("create tmp sink file"; "path" => temp_file.to_str());
bulk.push(
File::create(temp_file.to_owned())
.map_err(|err| format_err!("could not create file, {}", err))
.and_then(move |mut file| {
file.poll_write(&body.join("\n").into_bytes())
.and_then(|_| file.poll_flush())
.map_err(|err| format_err!("could not write into file, {}", err))
})
.and_then(move |_| {
let new = dir.join(format!("{}-{}-{}.metrics", name, idx, run_id));

debug!("rotate file"; "old" => temp_file.to_str(), "new" => new.to_str());
rename(temp_file, new)
.map_err(|err| format_err!("could not rename file, {}", err))
})
.and_then(|_| Ok(()))
.map_err(|err| format_err!("{}", err)),
.and_then(|_| Ok(())),
)
}

future::join_all(bulk).and_then(|_| ok(()))
}

fn remove(path: PathBuf) -> impl Future<Item=(), Error=Error> {
fn remove(path: PathBuf) -> impl Future<Item = (), Error = Error> {
trace!("remove file"; "path" => path.to_str());
remove_file(path.to_owned())
.map_err(|err| format_err!("{}", err))
.map_err(|err| format_err!("could not remove file, {}", err))
.and_then(|_| ok(()))
}
}

0 comments on commit 6183a79

Please sign in to comment.