diff --git a/nodes/mixnode/Cargo.toml b/nodes/mixnode/Cargo.toml index e6e5b1cbd..f3ae76d2d 100644 --- a/nodes/mixnode/Cargo.toml +++ b/nodes/mixnode/Cargo.toml @@ -15,4 +15,4 @@ serde = "1" serde_yaml = "0.9" tracing = "0.1" tracing-subscriber = "0.3" -tokio = "1.29.1" +tokio = { version = "1.33", features = ["macros"] } diff --git a/nodes/nomos-node-api/Cargo.toml b/nodes/nomos-node-api/Cargo.toml index b04db9825..d348b7a43 100644 --- a/nodes/nomos-node-api/Cargo.toml +++ b/nodes/nomos-node-api/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [features] default = ["axum"] -axum = ["dep:axum", "dep:hyper", "utoipa-swagger-ui/axum"] +axum = ["dep:axum", "dep:hyper", "dep:tower-http", "utoipa-swagger-ui/axum"] [dependencies] async-trait = "0.1" @@ -13,7 +13,6 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } tracing = "0.1" - consensus-engine = { path = "../../consensus-engine" } nomos-core = { path = "../../nomos-core" } nomos-consensus = { path = "../../nomos-services/consensus" } @@ -30,6 +29,7 @@ tokio = { version = "1.33", default-features = false, features = ["sync"] } # axum related dependencies axum = { version = "0.6", optional = true } hyper = { version = "0.14", features = ["full"], optional = true } +tower-http = { version = "0.4", optional = true, features = ["cors", "trace"] } # openapi related dependencies diff --git a/nodes/nomos-node-api/src/http/backend/axum.rs b/nodes/nomos-node-api/src/http/backend/axum.rs index 901077287..1dd95ae8f 100644 --- a/nodes/nomos-node-api/src/http/backend/axum.rs +++ b/nodes/nomos-node-api/src/http/backend/axum.rs @@ -1,12 +1,18 @@ -use std::{fmt::Debug, hash::Hash, net::SocketAddr}; +use std::{fmt::Debug, hash::Hash}; use axum::{ extract::{Query, State}, + http::HeaderValue, response::Response, routing, Json, Router, Server, }; +use hyper::header::{CONTENT_TYPE, USER_AGENT}; use overwatch_rs::overwatch::handle::OverwatchHandle; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tower_http::{ + cors::{Any, CorsLayer}, + trace::TraceLayer, +}; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; @@ -22,10 +28,13 @@ use crate::{ Backend, }; -#[derive(Clone)] +/// Configuration for the Http Server +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct AxumBackendSettings { - pub addr: SocketAddr, - pub handle: OverwatchHandle, + /// Socket where the server will be listening on for incoming requests. + pub address: std::net::SocketAddr, + /// Allowed origins for this server deployment requests. + pub cors_origins: Vec, } pub struct AxumBackend { @@ -80,8 +89,28 @@ where }) } - async fn serve(self) -> Result<(), Self::Error> { + async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> { + let mut builder = CorsLayer::new(); + if self.settings.cors_origins.is_empty() { + builder = builder.allow_origin(Any); + } + + for origin in &self.settings.cors_origins { + builder = builder.allow_origin( + origin + .as_str() + .parse::() + .expect("fail to parse origin"), + ); + } + let app = Router::new() + .layer( + builder + .allow_headers([CONTENT_TYPE, USER_AGENT]) + .allow_methods(Any), + ) + .layer(TraceLayer::new_for_http()) .merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi())) .route("/da/metrics", routing::get(da_metrics)) .route("/da/status", routing::post(da_status)) @@ -94,9 +123,9 @@ where .route("/storage/block", routing::post(block::)) .route("/mempool/add/tx", routing::post(add_tx::)) .route("/mempool/add/cert", routing::post(add_cert)) - .with_state(self.settings.handle); + .with_state(handle); - Server::bind(&self.settings.addr) + Server::bind(&self.settings.address) .serve(app.into_make_service()) .await } @@ -125,8 +154,8 @@ macro_rules! make_request_and_return_response { (status = 500, description = "Internal server error", body = String), ) )] -async fn da_metrics(State(store): State) -> Response { - make_request_and_return_response!(da::da_mempool_metrics(&store)) +async fn da_metrics(State(handle): State) -> Response { + make_request_and_return_response!(da::da_mempool_metrics(&handle)) } #[utoipa::path( @@ -138,10 +167,10 @@ async fn da_metrics(State(store): State) -> Response { ) )] async fn da_status( - State(store): State, + State(handle): State, Json(items): Json::Hash>>, ) -> Response { - make_request_and_return_response!(da::da_mempool_status(&store, items)) + make_request_and_return_response!(da::da_mempool_status(&handle, items)) } #[utoipa::path( @@ -153,10 +182,10 @@ async fn da_status( ) )] async fn da_blobs( - State(store): State, + State(handle): State, Json(items): Json::Hash>>, ) -> Response { - make_request_and_return_response!(da::da_blobs(&store, items)) + make_request_and_return_response!(da::da_blobs(&handle, items)) } #[utoipa::path( @@ -167,7 +196,7 @@ async fn da_blobs( (status = 500, description = "Internal server error", body = String), ) )] -async fn cl_metrics(State(store): State) -> Response +async fn cl_metrics(State(handle): State) -> Response where T: Transaction + Clone @@ -180,7 +209,7 @@ where + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, { - make_request_and_return_response!(cl::cl_mempool_metrics::(&store)) + make_request_and_return_response!(cl::cl_mempool_metrics::(&handle)) } #[utoipa::path( @@ -192,7 +221,7 @@ where ) )] async fn cl_status( - State(store): State, + State(handle): State, Json(items): Json::Hash>>, ) -> Response where @@ -200,7 +229,7 @@ where ::Hash: Serialize + DeserializeOwned + std::cmp::Ord + Debug + Send + Sync + 'static, { - make_request_and_return_response!(cl::cl_mempool_status::(&store, items)) + make_request_and_return_response!(cl::cl_mempool_status::(&handle, items)) } #[utoipa::path( @@ -211,13 +240,13 @@ where (status = 500, description = "Internal server error", body = String), ) )] -async fn carnot_info(State(store): State) -> Response +async fn carnot_info(State(handle): State) -> Response where Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, SS: StorageSerde + Send + Sync + 'static, { - make_request_and_return_response!(consensus::carnot_info::(&store)) + make_request_and_return_response!(consensus::carnot_info::(&handle)) } #[derive(Deserialize)] @@ -255,8 +284,8 @@ where (status = 500, description = "Internal server error", body = String), ) )] -async fn libp2p_info(State(store): State) -> Response { - make_request_and_return_response!(libp2p::libp2p_info(&store)) +async fn libp2p_info(State(handle): State) -> Response { + make_request_and_return_response!(libp2p::libp2p_info(&handle)) } #[utoipa::path( @@ -267,12 +296,12 @@ async fn libp2p_info(State(store): State) -> Response { (status = 500, description = "Internal server error", body = String), ) )] -async fn block(State(store): State, Json(id): Json) -> Response +async fn block(State(handle): State, Json(id): Json) -> Response where Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash, S: StorageSerde + Send + Sync + 'static, { - make_request_and_return_response!(storage::block_req::(&store, id)) + make_request_and_return_response!(storage::block_req::(&handle, id)) } #[utoipa::path( @@ -283,7 +312,7 @@ where (status = 500, description = "Internal server error", body = String), ) )] -async fn add_tx(State(store): State, Json(tx): Json) -> Response +async fn add_tx(State(handle): State, Json(tx): Json) -> Response where Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, @@ -294,18 +323,21 @@ where nomos_mempool::Transaction, Tx, ::Hash, - >(&store, tx, Transaction::hash)) + >(&handle, tx, Transaction::hash)) } #[utoipa::path( post, - path = "/mempool/add/tx", + path = "/mempool/add/cert", responses( (status = 200, description = "Add certificate to the mempool"), (status = 500, description = "Internal server error", body = String), ) )] -async fn add_cert(State(store): State, Json(cert): Json) -> Response { +async fn add_cert( + State(handle): State, + Json(cert): Json, +) -> Response { make_request_and_return_response!(mempool::add::< Libp2p, Libp2pAdapter::Hash>, @@ -313,7 +345,7 @@ async fn add_cert(State(store): State, Json(cert): Json::Hash, >( - &store, + &handle, cert, nomos_core::da::certificate::Certificate::hash )) diff --git a/nodes/nomos-node-api/src/http/consensus.rs b/nodes/nomos-node-api/src/http/consensus.rs index e7d0b48b4..5169a2e84 100644 --- a/nodes/nomos-node-api/src/http/consensus.rs +++ b/nodes/nomos-node-api/src/http/consensus.rs @@ -1,5 +1,9 @@ use std::{fmt::Debug, hash::Hash}; +use overwatch_rs::overwatch::handle::OverwatchHandle; +use serde::{de::DeserializeOwned, Serialize}; +use tokio::sync::oneshot; + use consensus_engine::{ overlay::{RandomBeaconState, RoundRobin, TreeOverlay}, Block, BlockId, @@ -20,9 +24,6 @@ use nomos_mempool::{ backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter, }; use nomos_storage::backends::{sled::SledBackend, StorageSerde}; -use overwatch_rs::overwatch::handle::OverwatchHandle; -use serde::{de::DeserializeOwned, Serialize}; -use tokio::sync::oneshot; pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, @@ -53,6 +54,7 @@ where .send(ConsensusMsg::Info { tx: sender }) .await .map_err(|(e, _)| e)?; + Ok(receiver.await?) } @@ -76,5 +78,6 @@ where }) .await .map_err(|(e, _)| e)?; + Ok(receiver.await?) } diff --git a/nodes/nomos-node-api/src/http/mempool.rs b/nodes/nomos-node-api/src/http/mempool.rs index a5b7009f5..67e4bfb87 100644 --- a/nodes/nomos-node-api/src/http/mempool.rs +++ b/nodes/nomos-node-api/src/http/mempool.rs @@ -17,7 +17,7 @@ where A::Settings: Send + Sync, D: Discriminant, Item: Clone + Debug + Send + Sync + 'static + Hash, - Key: Clone + Debug + Ord + Hash, + Key: Clone + Debug + Ord + Hash + 'static, { let relay = handle .relay::, D>>() diff --git a/nodes/nomos-node-api/src/lib.rs b/nodes/nomos-node-api/src/lib.rs index 7f78baad1..a013f81fd 100644 --- a/nodes/nomos-node-api/src/lib.rs +++ b/nodes/nomos-node-api/src/lib.rs @@ -1,4 +1,5 @@ use overwatch_rs::{ + overwatch::handle::OverwatchHandle, services::{ handle::ServiceStateHandle, relay::NoMessage, @@ -21,16 +22,17 @@ pub trait Backend { where Self: Sized; - async fn serve(self) -> Result<(), Self::Error>; + async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error>; } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ApiServiceSettings { pub backend_settings: S, } pub struct ApiService { settings: ApiServiceSettings, + handle: OverwatchHandle, } impl ServiceData for ApiService { @@ -53,13 +55,16 @@ where /// Initialize the service with the given state fn init(service_state: ServiceStateHandle) -> Result { let settings = service_state.settings_reader.get_updated_settings(); - Ok(Self { settings }) + Ok(Self { + settings, + handle: service_state.overwatch_handle, + }) } /// Service main loop async fn run(mut self) -> Result<(), DynError> { let endpoint = B::new(self.settings.backend_settings).await?; - endpoint.serve().await?; + endpoint.serve(self.handle).await?; Ok(()) } } diff --git a/nodes/nomos-node-api/tests/todo.rs b/nodes/nomos-node-api/tests/todo.rs index 484bb1114..ec5df850b 100644 --- a/nodes/nomos-node-api/tests/todo.rs +++ b/nodes/nomos-node-api/tests/todo.rs @@ -8,7 +8,10 @@ use axum::{routing, Router, Server}; use hyper::Error; use nomos_node_api::{ApiService, ApiServiceSettings, Backend}; use overwatch_derive::Services; -use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle}; +use overwatch_rs::{ + overwatch::{handle::OverwatchHandle, OverwatchRunner}, + services::handle::ServiceHandle, +}; use utoipa::{ openapi::security::{ApiKey, ApiKeyValue, SecurityScheme}, Modify, OpenApi, @@ -71,7 +74,7 @@ impl Backend for WebServer { Ok(Self { addr: settings }) } - async fn serve(self) -> Result<(), Self::Error> { + async fn serve(self, _handle: OverwatchHandle) -> Result<(), Self::Error> { let store = Arc::new(Store::default()); let app = Router::new() .merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi())) diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 293b6536e..a00ee9f6a 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" blake2 = "0.10" bincode = "2.0.0-rc.2" bytes = "1.3" -clap = { version = "4", features = ["derive"] } +clap = { version = "4", features = ["derive", "env"] } chrono = "0.4" futures = "0.3" http = "0.2.9" @@ -20,6 +20,7 @@ tracing = "0.1" multiaddr = "0.18" nomos-core = { path = "../../nomos-core" } nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] } +nomos-node-api = { path = "../nomos-node-api" } nomos-log = { path = "../../nomos-services/log" } nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] } nomos-http = { path = "../../nomos-services/http", features = ["http"] } diff --git a/nodes/nomos-node/config.yaml b/nodes/nomos-node/config.yaml index 41c92f8b0..808dcf98a 100644 --- a/nodes/nomos-node/config.yaml +++ b/nodes/nomos-node/config.yaml @@ -51,7 +51,7 @@ network: end: "0ms" http: - backend: + backend_settings: address: 0.0.0.0:8080 cors_origins: [] diff --git a/nodes/nomos-node/src/bridges/libp2p.rs b/nodes/nomos-node/src/bridges/libp2p.rs deleted file mode 100644 index ee7cb7e06..000000000 --- a/nodes/nomos-node/src/bridges/libp2p.rs +++ /dev/null @@ -1,26 +0,0 @@ -// std -// crates -use tokio::sync::mpsc::Sender; -use tokio::sync::oneshot; -// internal -use nomos_http::http::HttpResponse; -use nomos_network::backends::libp2p::{Command, Libp2p}; -use nomos_network::NetworkMsg; -use overwatch_rs::services::relay::OutboundRelay; - -pub(super) async fn handle_libp2p_info_req( - channel: &OutboundRelay>, - res_tx: Sender, -) -> Result<(), overwatch_rs::DynError> { - let (sender, receiver) = oneshot::channel(); - - channel - .send(NetworkMsg::Process(Command::Info { reply: sender })) - .await - .map_err(|(e, _)| e)?; - - let info = receiver.await.unwrap(); - res_tx.send(Ok(serde_json::to_vec(&info)?.into())).await?; - - Ok(()) -} diff --git a/nodes/nomos-node/src/bridges/mod.rs b/nodes/nomos-node/src/bridges/mod.rs deleted file mode 100644 index b8bf41985..000000000 --- a/nodes/nomos-node/src/bridges/mod.rs +++ /dev/null @@ -1,426 +0,0 @@ -mod libp2p; -use consensus_engine::BlockId; -use libp2p::*; -use std::collections::HashMap; - -// std -// crates -use bytes::Bytes; -use http::StatusCode; -use nomos_consensus::{CarnotInfo, ConsensusMsg}; -use serde::{de::DeserializeOwned, Serialize}; -use tokio::sync::mpsc::Sender; -use tokio::sync::oneshot; -use tracing::error; -// internal -use full_replication::{Blob, Certificate}; -use nomos_core::{ - block::Block, - da::{blob, certificate::Certificate as _}, - tx::Transaction, -}; -use nomos_da::DaMsg; -use nomos_http::backends::axum::AxumBackend; -use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; -use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse}; -use nomos_mempool::backend::mockpool::MockPool; -use nomos_mempool::network::adapters::libp2p::Libp2pAdapter; -use nomos_mempool::network::NetworkAdapter; -use nomos_mempool::{Certificate as CertDiscriminant, Transaction as TxDiscriminant}; -use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; -use nomos_network::backends::libp2p::Libp2p; -use nomos_network::backends::NetworkBackend; -use nomos_network::NetworkService; -use nomos_node::{Carnot, Tx}; -use nomos_node::{DataAvailability as DataAvailabilityService, Wire}; -use nomos_storage::{backends::sled::SledBackend, StorageMsg, StorageService}; -use overwatch_rs::services::relay::OutboundRelay; - -type DaMempoolService = MempoolService< - Libp2pAdapter::Hash>, - MockPool::Hash>, - CertDiscriminant, ->; -type ClMempoolService = MempoolService< - Libp2pAdapter::Hash>, - MockPool::Hash>, - TxDiscriminant, ->; - -macro_rules! get_handler { - ($handle:expr, $service:ty, $path:expr => $handler:tt) => {{ - let (channel, mut http_request_channel) = - build_http_bridge::<$service, AxumBackend, _>($handle, HttpMethod::GET, $path) - .await - .unwrap(); - while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { - if let Err(e) = $handler(&channel, res_tx).await { - error!(e); - } - } - Ok(()) - }}; -} - -macro_rules! post_handler { - ($handle:expr, $service:ty, $path:expr => $handler:tt) => {{ - let (channel, mut http_request_channel) = - build_http_bridge::<$service, AxumBackend, _>($handle, HttpMethod::POST, $path) - .await - .unwrap(); - while let Some(HttpRequest { - res_tx, payload, .. - }) = http_request_channel.recv().await - { - if let Err(e) = $handler(&channel, payload, res_tx).await { - error!(e); - } - } - Ok(()) - }}; -} - -pub fn carnot_info_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - get_handler!(handle, Carnot, "info" => handle_carnot_info_req) - })) -} - -pub fn block_info_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - let (channel, mut http_request_channel) = - build_http_bridge::(handle, HttpMethod::GET, "blocks") - .await - .unwrap(); - while let Some(HttpRequest { res_tx, query, .. }) = http_request_channel.recv().await { - if let Err(e) = handle_block_info_req(&channel, query, res_tx).await { - error!(e); - } - } - Ok(()) - })) -} - -pub fn cl_mempool_metrics_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - get_handler!(handle, ClMempoolService, "metrics" => handle_mempool_metrics_req) - })) -} - -pub fn da_mempool_metrics_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - get_handler!(handle, DaMempoolService, "metrics" => handle_mempool_metrics_req) - })) -} - -pub fn da_mempool_status_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - post_handler!(handle, DaMempoolService, "status" => handle_mempool_status_req) - })) -} - -pub fn cl_mempool_status_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - post_handler!(handle, ClMempoolService, "status" => handle_mempool_status_req) - })) -} - -pub fn storage_get_blocks_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - post_handler!(handle, StorageService>, "block" => handle_block_get_req) - })) -} - -pub fn network_info_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - get_handler!(handle, NetworkService, "info" => handle_libp2p_info_req) - })) -} - -pub fn da_blob_get_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - post_handler!(handle, DataAvailabilityService, "blobs" => handle_da_blobs_req) - })) -} - -pub async fn handle_da_blobs_req( - da_channel: &OutboundRelay>, - payload: Option, - res_tx: Sender, -) -> Result<(), overwatch_rs::DynError> -where - B: blob::Blob + Serialize, - B::Hash: DeserializeOwned + Send + 'static, -{ - let (reply_channel, receiver) = oneshot::channel(); - let ids: Vec = serde_json::from_slice(payload.unwrap_or_default().as_ref())?; - da_channel - .send(DaMsg::Get { - ids: Box::new(ids.into_iter()), - reply_channel, - }) - .await - .map_err(|(e, _)| e)?; - - let blobs = receiver.await.unwrap(); - res_tx - .send(Ok(serde_json::to_string(&blobs).unwrap().into())) - .await?; - - Ok(()) -} - -pub async fn handle_block_get_req( - storage_channel: &OutboundRelay>>, - payload: Option, - res_tx: Sender, -) -> Result<(), overwatch_rs::DynError> { - let key: BlockId = serde_json::from_slice(payload.unwrap_or_default().as_ref())?; - let (msg, receiver) = StorageMsg::new_load_message(key); - storage_channel.send(msg).await.map_err(|(e, _)| e)?; - let block: Option> = receiver.recv().await?; - res_tx - .send(Ok(serde_json::to_string(&block).unwrap().into())) - .await?; - - Ok(()) -} - -pub async fn handle_block_info_req( - carnot_channel: &OutboundRelay, - query: HashMap, - res_tx: Sender, -) -> Result<(), overwatch_rs::DynError> { - fn parse_block_id(field: Option<&String>) -> Result, overwatch_rs::DynError> { - field - .map(|id| { - hex::decode(id) - .map_err(|e| e.into()) - .and_then(|bytes| { - <[u8; 32]>::try_from(bytes) - .map_err(|e| format!("expected 32 bytes found {}", e.len()).into()) - }) - .map(BlockId::from) - }) - .transpose() - } - - const QUERY_FROM: &str = "from"; - const QUERY_TO: &str = "to"; - - let (sender, receiver) = oneshot::channel(); - carnot_channel - .send(ConsensusMsg::GetBlocks { - from: parse_block_id(query.get(QUERY_FROM))?, - to: parse_block_id(query.get(QUERY_TO))?, - tx: sender, - }) - .await - .map_err(|(e, _)| e)?; - let blocks = receiver.await.unwrap(); - res_tx.send(Ok(serde_json::to_vec(&blocks)?.into())).await?; - - Ok(()) -} - -pub async fn handle_mempool_status_req( - mempool_channel: &OutboundRelay>, - payload: Option, - res_tx: Sender, -) -> Result<(), overwatch_rs::DynError> -where - K: DeserializeOwned, -{ - let (sender, receiver) = oneshot::channel(); - let items: Vec = serde_json::from_slice(payload.unwrap_or_default().as_ref())?; - mempool_channel - .send(MempoolMsg::Status { - items, - reply_channel: sender, - }) - .await - .map_err(|(e, _)| e)?; - - let status = receiver.await.unwrap(); - res_tx - .send(Ok(serde_json::to_string(&status).unwrap().into())) - .await?; - - Ok(()) -} - -pub fn mempool_add_tx_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner -where - N: NetworkBackend, - A: NetworkAdapter::Hash> - + Send - + Sync - + 'static, - A::Settings: Send + Sync, -{ - Box::new(Box::pin(async move { - let (mempool_channel, mut http_request_channel) = - build_http_bridge::< - MempoolService::Hash>, TxDiscriminant>, - AxumBackend, - _, - >(handle.clone(), HttpMethod::POST, "add") - .await - .unwrap(); - - while let Some(HttpRequest { - res_tx, payload, .. - }) = http_request_channel.recv().await - { - if let Err(e) = handle_mempool_add_req( - &mempool_channel, - res_tx, - payload.unwrap_or_default(), - |tx| tx.hash(), - ) - .await - { - error!(e); - } - } - Ok(()) - })) -} - -pub fn mempool_add_cert_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner -where - N: NetworkBackend, - A: NetworkAdapter::Hash> - + Send - + Sync - + 'static, - A::Settings: Send + Sync, -{ - Box::new(Box::pin(async move { - let (mempool_channel, mut http_request_channel) = build_http_bridge::< - MempoolService::Hash>, CertDiscriminant>, - AxumBackend, - _, - >( - handle.clone(), HttpMethod::POST, "add" - ) - .await - .unwrap(); - - while let Some(HttpRequest { - res_tx, payload, .. - }) = http_request_channel.recv().await - { - if let Err(e) = handle_mempool_add_req( - &mempool_channel, - res_tx, - payload.unwrap_or_default(), - |cert| cert.hash(), - ) - .await - { - error!(e); - } - } - Ok(()) - })) -} - -async fn handle_carnot_info_req( - carnot_channel: &OutboundRelay, - res_tx: Sender, -) -> Result<(), overwatch_rs::DynError> { - let (sender, receiver) = oneshot::channel(); - carnot_channel - .send(ConsensusMsg::Info { tx: sender }) - .await - .map_err(|(e, _)| e)?; - let carnot_info: CarnotInfo = receiver.await.unwrap(); - res_tx - .send(Ok(serde_json::to_vec(&carnot_info)?.into())) - .await?; - - Ok(()) -} - -async fn handle_mempool_metrics_req( - mempool_channel: &OutboundRelay>, - res_tx: Sender, -) -> Result<(), overwatch_rs::DynError> { - let (sender, receiver) = oneshot::channel(); - mempool_channel - .send(MempoolMsg::Metrics { - reply_channel: sender, - }) - .await - .map_err(|(e, _)| e)?; - - let metrics: MempoolMetrics = receiver.await.unwrap(); - res_tx - // TODO: use serde to serialize metrics - .send(Ok(format!( - "{{\"pending_items\": {}, \"last_item\": {}}}", - metrics.pending_items, metrics.last_item_timestamp - ) - .into())) - .await?; - - Ok(()) -} - -pub(super) async fn handle_mempool_add_req( - mempool_channel: &OutboundRelay>, - res_tx: Sender, - wire_item: Bytes, - key: impl Fn(&K) -> V, -) -> Result<(), overwatch_rs::DynError> -where - K: DeserializeOwned, -{ - let item: K = serde_json::from_slice(&wire_item)?; - let (sender, receiver) = oneshot::channel(); - let key = key(&item); - mempool_channel - .send(MempoolMsg::Add { - item, - key, - reply_channel: sender, - }) - .await - .map_err(|(e, _)| e)?; - - match receiver.await { - Ok(Ok(())) => Ok(res_tx.send(Ok(b"".to_vec().into())).await?), - Ok(Err(())) => Ok(res_tx - .send(Err(( - StatusCode::CONFLICT, - "error: unable to add tx".into(), - ))) - .await?), - Err(err) => Ok(res_tx - .send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))) - .await?), - } -} diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index f3f464d92..eb0f7bf2a 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -4,18 +4,18 @@ use std::{ time::Duration, }; -use crate::Carnot; use crate::DataAvailability; +use crate::{Carnot, Tx, Wire, MB16}; use clap::{Parser, ValueEnum}; use color_eyre::eyre::{self, eyre, Result}; use hex::FromHex; #[cfg(feature = "metrics")] use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService}; -use nomos_http::{backends::axum::AxumBackend, http::HttpService}; use nomos_libp2p::{secp256k1::SecretKey, Multiaddr}; use nomos_log::{Logger, LoggerBackend, LoggerFormat}; use nomos_network::backends::libp2p::Libp2p; use nomos_network::NetworkService; +use nomos_node_api::{http::backend::axum::AxumBackend, ApiService}; use overwatch_rs::services::ServiceData; use serde::{Deserialize, Serialize}; use tracing::Level; @@ -118,7 +118,7 @@ pub struct DaArgs { pub struct Config { pub log: ::Settings, pub network: as ServiceData>::Settings, - pub http: as ServiceData>::Settings, + pub http: > as ServiceData>::Settings, pub consensus: ::Settings, #[cfg(feature = "metrics")] pub metrics: > as ServiceData>::Settings, @@ -206,11 +206,11 @@ impl Config { } = http_args; if let Some(addr) = http_addr { - self.http.backend.address = addr; + self.http.backend_settings.address = addr; } if let Some(cors) = cors_origins { - self.http.backend.cors_origins = cors; + self.http.backend_settings.cors_origins = cors; } Ok(self) diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 9e3c7b87f..3b3049109 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -20,9 +20,6 @@ use nomos_da::{ backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, DataAvailabilityService, }; -use nomos_http::backends::axum::AxumBackend; -use nomos_http::bridge::HttpBridgeService; -use nomos_http::http::HttpService; use nomos_log::Logger; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter; use nomos_mempool::{ @@ -30,6 +27,8 @@ use nomos_mempool::{ Transaction as TxDiscriminant, }; use nomos_network::backends::libp2p::Libp2p; +use nomos_node_api::http::backend::axum::AxumBackend; +use nomos_node_api::ApiService; use nomos_storage::{ backends::{sled::SledBackend, StorageSerde}, StorageService, @@ -88,8 +87,7 @@ pub struct Nomos { >, >, consensus: ServiceHandle, - http: ServiceHandle>, - bridges: ServiceHandle, + http: ServiceHandle>>, #[cfg(feature = "metrics")] metrics: ServiceHandle>>, da: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index f956b2b01..a7441b526 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -4,19 +4,16 @@ use nomos_node::{ OverlayArgs, Tx, }; -mod bridges; - use clap::Parser; use color_eyre::eyre::{eyre, Result}; use nomos_core::{ da::{blob, certificate}, tx::Transaction, }; -use nomos_http::bridge::{HttpBridge, HttpBridgeSettings}; -use nomos_mempool::network::adapters::libp2p::{Libp2pAdapter, Settings as AdapterSettings}; -use nomos_network::backends::libp2p::Libp2p; + +use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; + use overwatch_rs::overwatch::*; -use std::sync::Arc; const DEFAULT_DB_PATH: &str = "./db"; @@ -63,28 +60,6 @@ fn main() -> Result<()> { .update_overlay(overlay_args)? .update_network(network_args)?; - let bridges: Vec = vec![ - Arc::new(Box::new(bridges::carnot_info_bridge)), - Arc::new(Box::new(bridges::block_info_bridge)), - // Due to a limitation in the current api system, we can't connect a single endopint to multiple services - // which means we need two different paths for complete mempool metrics. - Arc::new(Box::new(bridges::cl_mempool_metrics_bridge)), - Arc::new(Box::new(bridges::da_mempool_metrics_bridge)), - Arc::new(Box::new(bridges::cl_mempool_status_bridge)), - Arc::new(Box::new(bridges::da_mempool_status_bridge)), - Arc::new(Box::new(bridges::da_blob_get_bridge)), - Arc::new(Box::new(bridges::storage_get_blocks_bridge)), - Arc::new(Box::new(bridges::network_info_bridge)), - Arc::new(Box::new( - bridges::mempool_add_tx_bridge::::Hash>>, - )), - Arc::new(Box::new( - bridges::mempool_add_cert_bridge::< - Libp2p, - Libp2pAdapter::Hash>, - >, - )), - ]; let app = OverwatchRunner::::run( NomosServiceSettings { network: config.network, @@ -105,7 +80,6 @@ fn main() -> Result<()> { }, }, consensus: config.consensus, - bridges: HttpBridgeSettings { bridges }, #[cfg(feature = "metrics")] metrics: config.metrics, da: config.da, diff --git a/nomos-cli/src/api/da.rs b/nomos-cli/src/api/da.rs index 1c2ae4406..35b4a9de1 100644 --- a/nomos-cli/src/api/da.rs +++ b/nomos-cli/src/api/da.rs @@ -10,6 +10,7 @@ pub async fn get_blobs( const BLOBS_PATH: &str = "da/blobs"; CLIENT .post(node.join(BLOBS_PATH).unwrap()) + .header("Content-Type", "application/json") .body(serde_json::to_string(&ids).unwrap()) .send() .await? diff --git a/nomos-cli/src/api/mempool.rs b/nomos-cli/src/api/mempool.rs index 9db2253c1..5ecc4e785 100644 --- a/nomos-cli/src/api/mempool.rs +++ b/nomos-cli/src/api/mempool.rs @@ -6,9 +6,10 @@ pub async fn send_certificate(node: &Url, cert: &C) -> Result { } } -#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub type Voter = [u8; 32]; #[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)] diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 6e1e5061d..eefcb578f 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -9,7 +9,7 @@ nomos-node = { path = "../nodes/nomos-node", default-features = false } nomos-consensus = { path = "../nomos-services/consensus" } nomos-network = { path = "../nomos-services/network", features = ["libp2p"]} nomos-log = { path = "../nomos-services/log" } -nomos-http = { path = "../nomos-services/http", features = ["http"] } +nomos-node-api = { path = "../nodes/nomos-node-api", features = ["axum"] } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } nomos-core = { path = "../nomos-core" } consensus-engine = { path = "../consensus-engine", features = ["serde"] } diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 81a1d5447..04276af62 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -12,13 +12,13 @@ use mixnet_node::MixnetNodeConfig; use mixnet_topology::MixnetTopology; use nomos_consensus::{CarnotInfo, CarnotSettings}; use nomos_core::block::Block; -use nomos_http::backends::axum::AxumBackendSettings; use nomos_libp2p::{multiaddr, Multiaddr}; use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_mempool::MempoolMetrics; use nomos_network::backends::libp2p::Libp2pConfig; use nomos_network::NetworkConfig; use nomos_node::{Config, Tx}; +use nomos_node_api::http::backend::axum::AxumBackendSettings; // crates use fraction::Fraction; use once_cell::sync::Lazy; @@ -30,7 +30,6 @@ static CLIENT: Lazy = Lazy::new(Client::new); const NOMOS_BIN: &str = "../target/debug/nomos-node"; const CARNOT_INFO_API: &str = "carnot/info"; const STORAGE_BLOCKS_API: &str = "storage/block"; -const MEMPOOL_API: &str = "mempool-"; const LOGS_PREFIX: &str = "__logs"; const GET_BLOCKS_INFO: &str = "carnot/blocks"; @@ -82,11 +81,11 @@ impl NomosNode { let child = Command::new(std::env::current_dir().unwrap().join(NOMOS_BIN)) .arg(&config_path) .current_dir(dir.path()) - .stdout(Stdio::null()) + .stdout(Stdio::inherit()) .spawn() .unwrap(); let node = Self { - addr: config.http.backend.address, + addr: config.http.backend_settings.address, child, _tempdir: dir, config, @@ -120,6 +119,7 @@ impl NomosNode { pub async fn get_block(&self, id: BlockId) -> Option> { CLIENT .post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API)) + .header("Content-Type", "application/json") .body(serde_json::to_string(&id).unwrap()) .send() .await @@ -134,7 +134,7 @@ impl NomosNode { Pool::Cl => "cl", Pool::Da => "da", }; - let addr = format!("{}{}/metrics", MEMPOOL_API, discr); + let addr = format!("{}/metrics", discr); let res = self .get(&addr) .await @@ -144,7 +144,7 @@ impl NomosNode { .unwrap(); MempoolMetrics { pending_items: res["pending_items"].as_u64().unwrap() as usize, - last_item_timestamp: res["last_item"].as_u64().unwrap(), + last_item_timestamp: res["last_item_timestamp"].as_u64().unwrap(), } } @@ -334,8 +334,8 @@ fn create_node_config( blob_selector_settings: (), }, log: Default::default(), - http: nomos_http::http::HttpServiceSettings { - backend: AxumBackendSettings { + http: nomos_node_api::ApiServiceSettings { + backend_settings: AxumBackendSettings { address: format!("127.0.0.1:{}", get_available_port()) .parse() .unwrap(), diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs index 98fc52186..2fbf7ab1f 100644 --- a/tests/src/tests/cli.rs +++ b/tests/src/tests/cli.rs @@ -37,9 +37,12 @@ async fn disseminate_blob() { }, }, node_addr: Some( - format!("http://{}", nodes[0].config().http.backend.address.clone()) - .parse() - .unwrap(), + format!( + "http://{}", + nodes[0].config().http.backend_settings.address.clone() + ) + .parse() + .unwrap(), ), output: None, });