Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

550 bob test mode #588

Merged
merged 28 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Bob versions changelog

## [Unreleased]
#### Added
- Bobd test mode (#550)
- Added optional get & exist optimization that skips old partitions by its timestamp (#702)
- Added mimalloc allocator for musl target (#688)
- Added jemalloc-profile for memory profiling (#797)
Expand Down
1 change: 1 addition & 0 deletions bob-apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ lazy_static = "1.4"
log = "0.4"
log4rs = "1.2"
metrics = { version = "0.17", features = ["std"] }
network-interface = "0.1.2"
mockall = "0.11"
prost = "0.11"
regex = "1.6.0"
Expand Down
187 changes: 155 additions & 32 deletions bob-apps/bin/bobd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use bob::{
VirtualMapper, BackendType, FactoryTlsConfig,
};
use bob_access::{Authenticator, BasicAuthenticator, DeclaredCredentials, StubAuthenticator, UsersMap, AuthenticationType};
use clap::{crate_version, App, Arg, ArgMatches};
use clap::{crate_version, App, Arg, ArgMatches, SubCommand};
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
error::Error as ErrorTrait,
net::{IpAddr, Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, str::FromStr,
};
use tokio::runtime::Handle;
use tonic::transport::Server;
Expand All @@ -25,35 +25,65 @@ use std::fs::create_dir;
#[macro_use]
extern crate log;

use log4rs::append::console::ConsoleAppender;
use log4rs::encode::pattern::PatternEncoder;
use log4rs::config::{Appender, Config, Root};

use network_interface::{NetworkInterface, NetworkInterfaceConfig, Addr};

use anyhow::{anyhow, Context, Result as AnyResult};

#[tokio::main]
async fn main() {
let matches = get_matches();

if matches.value_of("cluster").is_none() {
eprintln!("Expect cluster config");
eprintln!("use --help");
return;
}
let cluster;
let node;
if let (sc, Some(sub_matches)) = matches.subcommand() {
match sc {
"testmode" => match configure_testmode(sub_matches) {
Ok((c, n)) => {
cluster = c;
node = n;
}
Err(e) => {
eprintln!("Initialization error: {}", e);
eprintln!("use --help");
return;
}
},
_ => unreachable!("unknown command"),
}
} else {
if matches.value_of("cluster").is_none() {
eprintln!("Expect cluster config");
eprintln!("use --help");
return;
}

if matches.value_of("node").is_none() {
eprintln!("Expect node config");
eprintln!("use --help");
return;
}
if matches.value_of("node").is_none() {
eprintln!("Expect node config");
eprintln!("use --help");
return;
}

let cluster_config = matches.value_of("cluster").unwrap();
println!("Cluster config: {:?}", cluster_config);
cluster = ClusterConfig::try_get(cluster_config).await.map_err(|err| {
eprintln!("Cluster config parsing error: {}", err);
err
}).expect("Cluster config parsing error");

let cluster_config = matches.value_of("cluster").expect("'cluster' argument is required");
println!("Cluster config: {:?}", cluster_config);
let cluster = ClusterConfig::try_get(cluster_config).await.map_err(|err| {
eprintln!("Cluster config parsing error: {}", err);
err
}).expect("Cluster config parsing error");

let node_config_file = matches.value_of("node").expect("'node' argument is required");
println!("Node config: {:?}", node_config_file);
let node = cluster.get(node_config_file).await.map_err(|err| {
eprintln!("Node config parsing error: {}", err);
err
}).expect("Node config parsing error");
let node_config_file = matches.value_of("node").unwrap();
println!("Node config: {:?}", node_config_file);
node = cluster.get(node_config_file).await.map_err(|err| {
eprintln!("Node config parsing error: {}", err);
err
}).expect("Node config parsing error");

check_folders(&node, matches.is_present("init_folders"));
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
}

let mut extra_logstash_fields = HashMap::new();
extra_logstash_fields.insert("node_name".to_string(), serde_json::Value::String(node.name().to_string()));
Expand All @@ -63,8 +93,6 @@ async fn main() {
log4rs::init_file(node.log_config(), log4rs::config::Deserializers::default().with_logstash_extra(extra_logstash_fields))
.expect("can't find log config");

check_folders(&node, matches.is_present("init_folders"));

let mut mapper = VirtualMapper::new(&node, &cluster);

let bind = node.bind();
Expand Down Expand Up @@ -139,6 +167,73 @@ async fn main() {
}
}

fn configure_testmode(sub_matches: &ArgMatches) -> AnyResult<(ClusterConfig, NodeConfig)> {
let mut addresses = Vec::with_capacity(1);
let port = match sub_matches.value_of("grpc-port") {
Some(v) => v.parse().context("could not parse --grpc-port")?,
None => 20000
};
let mut this_node = None;
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
if let Some(node_list) = sub_matches.value_of("nodes") {
let available_ips: HashSet<_> = NetworkInterface::show()?.into_iter().filter_map(|itf|
match itf.addr? {
Addr::V4(addr) => {
Some(addr.ip)
},
_ => None
}).collect();

for (index, addr) in node_list.split(",").enumerate() {
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
let addr = addr.trim();
let v4addr = SocketAddrV4::from_str(addr)?;
if this_node.is_none() {
if port == v4addr.port() && available_ips.contains(v4addr.ip()) {
this_node = Some(index)
}
}
addresses.push(String::from(addr));
}
} else {
this_node = Some(0);
addresses.push(format!("127.0.0.1:{port}"))
}
let this_node_index = this_node.ok_or(anyhow!("current node address not found"))?;
let cluster = ClusterConfig::get_testmode(
sub_matches.value_of("data").unwrap_or(format!("data_{this_node_index}").as_str()).to_string(),
addresses)?;
let http_api_port = match sub_matches.value_of("restapi-port") {
Some(v) => Some(v.parse().context("could not parse --restapi-port")?),
None => None
};
let node = cluster.get_testmode_node_config(this_node_index, http_api_port)?;

init_testmode_logger(log::LevelFilter::Error);

check_folders(&node, true);

println!("Bob is starting");
let n = &cluster.nodes()[this_node_index];
println!("Data directory: {}", n.disks()[0].path());
println!("gRPC API available at: {}", n.address());
let rest_api_address = node.http_api_address();
let rest_api_port = node.http_api_port();
println!("REST API available at: http://{rest_api_address}:{rest_api_port}");
println!("REST API Put and Get available at: http://{rest_api_address}:{rest_api_port}/data");

Ok((cluster, node))
}

fn init_testmode_logger(loglevel: log::LevelFilter) {
let stdout = ConsoleAppender::builder()
.encoder(Box::new(PatternEncoder::new( "{d(%Y-%m-%d %H:%M:%S):<20} {M:>20.30}:{L:>3} {h({l})} {m}\n")))
.build();
let config = Config::builder()
.appender(Appender::builder().build("stdout", Box::new(stdout)))
ikopylov marked this conversation as resolved.
Show resolved Hide resolved
.build(Root::builder().appender("stdout").build(loglevel))
ikopylov marked this conversation as resolved.
Show resolved Hide resolved
.unwrap();
log4rs::init_config(config).unwrap();
}

async fn run_server<A: Authenticator>(node: NodeConfig, authenticator: A, mapper: VirtualMapper, address: IpAddr, port: u16, addr: SocketAddr) {
let (metrics, shared_metrics) = init_counters(&node, &addr.to_string()).await;
let handle = Handle::current();
Expand Down Expand Up @@ -288,39 +383,66 @@ fn check_folders(node: &NodeConfig, init_flag: bool) {

fn get_matches<'a>() -> ArgMatches<'a> {
let ver = format!("{}\n{}", crate_version!(), BuildInfo::default());
let testmode_sc = SubCommand::with_name("testmode")
.about("Bob's test mode")
.arg(
Arg::with_name("data")
.help("Path to bob data directory")
.takes_value(true)
.long("data")
)
.arg(
Arg::with_name("grpc-port")
.help("gRPC API port")
.takes_value(true)
.long("grpc-port")
)
.arg(
Arg::with_name("restapi-port")
.help("REST API port")
.takes_value(true)
.long("restapi-port")
)
.arg(
Arg::with_name("nodes")
.help("Comma separated node addresses. Example: 127.0.0.1:20000,127.0.0.1:20001")
.takes_value(true)
.long("nodes")
);

App::new("bobd")
.version(ver.as_str())
.arg(
Arg::with_name("cluster")
.help("cluster config file")
.help("Cluster config file")
.takes_value(true)
.short("c")
.long("cluster"),
)
.arg(
Arg::with_name("node")
.help("node config file")
.help("Node config file")
.takes_value(true)
.short("n")
.long("node"),
)
.arg(
Arg::with_name("name")
.help("node name")
.help("Node name")
.takes_value(true)
.short("a")
.long("name"),
)
.arg(
Arg::with_name("http_api_address")
.help("http api address")
.help("Http api address")
.short("h")
.long("host")
.takes_value(true),
)
.arg(
Arg::with_name("http_api_port")
.help("http api port")
.help("Http api port")
.short("p")
.long("port")
.takes_value(true),
Expand All @@ -331,5 +453,6 @@ fn get_matches<'a>() -> ArgMatches<'a> {
.long("init_folders")
.takes_value(false),
)
.subcommand(testmode_sc)
.get_matches()
}
46 changes: 45 additions & 1 deletion bob-common/src/configs/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
node::NodeName,
core_types::{DiskPath, VDiskId, NodeDisk, DiskName},
};
use anyhow::Result as AnyResult;
use anyhow::{Result as AnyResult, anyhow};
use http::Uri;
use std::collections::{ HashMap, HashSet };

Expand Down Expand Up @@ -394,6 +394,50 @@ impl Cluster {
Ok(config)
}
}

pub fn get_testmode(path: String, addresses: Vec<String>) -> AnyResult<Self> {
let disks = vec![DiskPath::new("disk_0".into(), &path)];
let mut nodes = Vec::with_capacity(addresses.len());
let mut vdisks = Vec::with_capacity(addresses.len());
for (i, address) in addresses.into_iter().enumerate() {
let node = Node {
name: format!("node_{i}"),
address,
disks: disks.clone()
};
let replica = Replica::new(node.name().to_string(), node.disks()[0].name().to_string());
let mut vdisk = VDisk::new(i as u32);
vdisk.push_replica(replica);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
nodes.push(node);
vdisks.push(vdisk);
}
let dist_func = DistributionFunc::default();
let config = Cluster {
nodes,
vdisks,
racks: vec![],
distribution_func: dist_func
};

if let Err(e) = config.validate() {
let msg = format!("config is not valid: {e}");
Err(anyhow!(msg))
} else {
Ok(config)
}
}

pub fn get_testmode_node_config(&self, n_node: usize, rest_port: Option<u16>) -> AnyResult<NodeConfig> {
let node = &self.nodes().get(n_node).ok_or(anyhow!("node with index {} not found", n_node))?;
let config = NodeConfig::get_testmode(node.name(), node.disks()[0].name(), rest_port);
if let Err(e) = config.validate() {
Err(anyhow!("config is not valid: {e}"))
} else {
self.check(&config)
.map_err(|e| anyhow!("node config check failed: {e}"))?;
Ok(config)
}
}
}

impl Validatable for Cluster {
Expand Down
Loading
Loading