Skip to content

Commit

Permalink
integrate all new http api to nomos-node
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Oct 29, 2023
1 parent 75b20ad commit 52f77ef
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 487 deletions.
6 changes: 4 additions & 2 deletions nodes/nomos-node-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ edition = "2021"

[features]
default = ["axum"]
axum = ["dep:axum", "dep:hyper", "utoipa-swagger-ui/axum"]
axum = ["dep:axum", "dep:hyper", "dep:tower-http", "utoipa-swagger-ui/axum"]
clap = ["dep:clap"]

[dependencies]
async-trait = "0.1"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
tracing = "0.1"


clap = { version = "4", features = ["derive", "env"], optional = true }
consensus-engine = { path = "../../consensus-engine" }
nomos-core = { path = "../../nomos-core" }
nomos-consensus = { path = "../../nomos-services/consensus" }
Expand All @@ -30,6 +31,7 @@ tokio = { version = "1.33", default-features = false, features = ["sync"] }
# axum related dependencies
axum = { version = "0.6", optional = true }
hyper = { version = "0.14", features = ["full"], optional = true }
tower-http = { version = "0.4", optional = true, features = ["cors", "trace"] }

# openapi related dependencies
utoipa = "4.0"
Expand Down
124 changes: 81 additions & 43 deletions nodes/nomos-node-api/src/http/backend/axum.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
use std::{fmt::Debug, hash::Hash, net::SocketAddr, sync::Arc};
use std::{fmt::Debug, hash::Hash, sync::Arc};

use axum::{extract::State, response::IntoResponse, routing, Json, Router, Server};
use axum::{
extract::State, http::HeaderValue, response::IntoResponse, routing, Json, Router, Server,
};
use consensus_engine::BlockId;
use full_replication::{Blob, Certificate};
use hyper::StatusCode;
use hyper::{
header::{CONTENT_TYPE, USER_AGENT},
StatusCode,
};
use nomos_core::{da::blob, tx::Transaction};
use nomos_mempool::{network::adapters::libp2p::Libp2pAdapter, openapi::Status, MempoolMetrics};
use nomos_network::backends::libp2p::Libp2p;
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

Expand All @@ -18,17 +27,23 @@ use crate::{
Backend,
};

#[derive(Clone)]
/// Configuration for the Http Server
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[cfg_attr(feature = "clap", derive(clap::Args))]
pub struct AxumBackendSettings {
pub addr: SocketAddr,
pub da_mempool: OverwatchHandle,
pub da: OverwatchHandle,
pub cl: OverwatchHandle,
pub carnot: OverwatchHandle,
pub network: OverwatchHandle,
pub storage: OverwatchHandle,
pub cert_mempool: OverwatchHandle,
pub tx_mempool: OverwatchHandle,
/// Socket where the server will be listening on for incoming requests.
#[cfg_attr(feature = "clap", arg(
short, long = "http-addr",
default_value_t = std::net::SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
8080,
),
env = "HTTP_BIND_ADDRESS"
))]
pub address: std::net::SocketAddr,
/// Allowed origins for this server deployment requests.
#[cfg_attr(feature = "clap", arg(long = "http-cors-origin"))]
pub cors_origins: Vec<String>,
}

pub struct AxumBackend<T, S, const SIZE: usize> {
Expand All @@ -52,7 +67,7 @@ pub struct AxumBackend<T, S, const SIZE: usize> {
)]
struct ApiDoc;

type Store = Arc<AxumBackendSettings>;
// type Store = Arc<AxumBackendSettings>;

#[async_trait::async_trait]
impl<T, S, const SIZE: usize> Backend for AxumBackend<T, S, SIZE>
Expand Down Expand Up @@ -85,9 +100,28 @@ where
})
}

async fn serve(self) -> Result<(), Self::Error> {
let store = self.settings.clone();
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
let mut builder = CorsLayer::new();
if self.settings.cors_origins.is_empty() {
builder = builder.allow_origin(Any);
}

for origin in &self.settings.cors_origins {
builder = builder.allow_origin(
origin
.as_str()
.parse::<HeaderValue>()
.expect("fail to parse origin"),
);
}

let app = Router::new()
.layer(
builder
.allow_headers([CONTENT_TYPE, USER_AGENT])
.allow_methods(Any),
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/da/metrics", routing::get(da_metrics))
.route("/da/status", routing::post(da_status))
Expand All @@ -99,9 +133,9 @@ where
.route("/storage/block", routing::post(block::<S, T>))
.route("/mempool/add/tx", routing::post(add_tx::<T>))
.route("/mempool/add/cert", routing::post(add_cert))
.with_state(store);
.with_state(handle);

Server::bind(&self.settings.addr)
Server::bind(&self.settings.address)
.serve(app.into_make_service())
.await
}
Expand All @@ -115,8 +149,8 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn da_metrics(State(store): State<Store>) -> impl IntoResponse {
match da::da_mempool_metrics(&store.da_mempool).await {
async fn da_metrics(State(handle): State<OverwatchHandle>) -> impl IntoResponse {
match da::da_mempool_metrics(&handle).await {
Ok(metrics) => (StatusCode::OK, Json(metrics)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
Expand All @@ -131,10 +165,10 @@ async fn da_metrics(State(store): State<Store>) -> impl IntoResponse {
)
)]
async fn da_status(
State(store): State<Store>,
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> impl IntoResponse {
match da::da_mempool_status(&store.da_mempool, items).await {
match da::da_mempool_status(&handle, items).await {
Ok(status) => (StatusCode::OK, Json(status)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
Expand All @@ -149,10 +183,10 @@ async fn da_status(
)
)]
async fn da_blob(
State(store): State<Store>,
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> impl IntoResponse {
match da::da_blob(&store.da, items).await {
match da::da_blob(&handle, items).await {
Ok(status) => (StatusCode::OK, Json(status)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
Expand All @@ -166,7 +200,7 @@ async fn da_blob(
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cl_metrics<T>(State(store): State<Store>) -> impl IntoResponse
async fn cl_metrics<T>(State(handle): State<OverwatchHandle>) -> impl IntoResponse
where
T: Transaction
+ Clone
Expand All @@ -179,7 +213,7 @@ where
+ 'static,
<T as nomos_core::tx::Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
{
match cl::cl_mempool_metrics::<T>(&store.cl).await {
match cl::cl_mempool_metrics::<T>(&handle).await {
Ok(metrics) => (StatusCode::OK, Json(metrics)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
Expand All @@ -194,15 +228,15 @@ where
)
)]
async fn cl_status<T>(
State(store): State<Store>,
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<T as Transaction>::Hash>>,
) -> impl IntoResponse
where
T: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<T as nomos_core::tx::Transaction>::Hash:
Serialize + DeserializeOwned + std::cmp::Ord + Debug + Send + Sync + 'static,
{
match cl::cl_mempool_status::<T>(&store.cl, items).await {
match cl::cl_mempool_status::<T>(&handle, items).await {
Ok(status) => (StatusCode::OK, Json(status)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
Expand All @@ -216,13 +250,15 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn carnot_info<Tx, SS, const SIZE: usize>(State(store): State<Store>) -> impl IntoResponse
async fn carnot_info<Tx, SS, const SIZE: usize>(
State(handle): State<OverwatchHandle>,
) -> impl IntoResponse
where
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
SS: StorageSerde + Send + Sync + 'static,
{
match info::carnot_info::<Tx, SS, SIZE>(&store.carnot).await {
match info::carnot_info::<Tx, SS, SIZE>(&handle).await {
Ok(info) => (StatusCode::OK, Json(info)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
Expand All @@ -236,8 +272,8 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn libp2p_info(State(store): State<Store>) -> impl IntoResponse {
match info::libp2p_info(&store.network).await {
async fn libp2p_info(State(handle): State<OverwatchHandle>) -> impl IntoResponse {
match info::libp2p_info(&handle).await {
Ok(info) => (StatusCode::OK, Json(info)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
Expand All @@ -251,12 +287,15 @@ async fn libp2p_info(State(store): State<Store>) -> impl IntoResponse {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn block<S, Tx>(State(store): State<Store>, Json(id): Json<BlockId>) -> impl IntoResponse
async fn block<S, Tx>(
State(handle): State<OverwatchHandle>,
Json(id): Json<BlockId>,
) -> impl IntoResponse
where
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,
S: StorageSerde + Send + Sync + 'static,
{
match storage::block_req::<S, Tx>(&store.storage, id).await {
match storage::block_req::<S, Tx>(&handle, id).await {
Ok(status) => (StatusCode::OK, Json(status)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
Expand All @@ -270,16 +309,13 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_tx<Tx>(State(store): State<Store>, Json(tx): Json<Tx>) -> impl IntoResponse
async fn add_tx<Tx>(State(handle): State<OverwatchHandle>, Json(tx): Json<Tx>) -> impl IntoResponse
where
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
{
match mempool::add_tx::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>, Tx>(
&store.tx_mempool,
tx,
)
.await
match mempool::add_tx::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>, Tx>(&handle, tx)
.await
{
Ok(status) => (StatusCode::OK, Json(status)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
Expand All @@ -294,10 +330,12 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_cert(State(store): State<Store>, Json(cert): Json<Certificate>) -> impl IntoResponse {
async fn add_cert(
State(handle): State<OverwatchHandle>,
Json(cert): Json<Certificate>,
) -> impl IntoResponse {
match mempool::add_cert::<Libp2p, Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>>(
&store.tx_mempool,
cert,
&handle, cert,
)
.await
{
Expand Down
13 changes: 9 additions & 4 deletions nodes/nomos-node-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use overwatch_rs::{
overwatch::handle::OverwatchHandle,
services::{
handle::ServiceStateHandle,
relay::NoMessage,
Expand All @@ -21,16 +22,17 @@ pub trait Backend {
where
Self: Sized;

async fn serve(self) -> Result<(), Self::Error>;
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error>;
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ApiServiceSettings<S> {
pub backend_settings: S,
}

pub struct ApiService<B: Backend> {
settings: ApiServiceSettings<B::Settings>,
handle: OverwatchHandle,
}

impl<B: Backend> ServiceData for ApiService<B> {
Expand All @@ -53,13 +55,16 @@ where
/// Initialize the service with the given state
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let settings = service_state.settings_reader.get_updated_settings();
Ok(Self { settings })
Ok(Self {
settings,
handle: service_state.overwatch_handle,
})
}

/// Service main loop
async fn run(mut self) -> Result<(), DynError> {
let endpoint = B::new(self.settings.backend_settings).await?;
endpoint.serve().await?;
endpoint.serve(self.handle).await?;
Ok(())
}
}
7 changes: 5 additions & 2 deletions nodes/nomos-node-api/tests/todo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use axum::{routing, Router, Server};
use hyper::Error;
use nomos_node_api::{ApiService, ApiServiceSettings, Backend};
use overwatch_derive::Services;
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use overwatch_rs::{
overwatch::{handle::OverwatchHandle, OverwatchRunner},
services::handle::ServiceHandle,
};
use utoipa::{
openapi::security::{ApiKey, ApiKeyValue, SecurityScheme},
Modify, OpenApi,
Expand Down Expand Up @@ -71,7 +74,7 @@ impl Backend for WebServer {
Ok(Self { addr: settings })
}

async fn serve(self) -> Result<(), Self::Error> {
async fn serve(self, _handle: OverwatchHandle) -> Result<(), Self::Error> {
let store = Arc::new(Store::default());
let app = Router::new()
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
Expand Down
1 change: 1 addition & 0 deletions nodes/nomos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tracing = "0.1"
multiaddr = "0.18"
nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-node-api = { path = "../nomos-node-api" }
nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
Expand Down
26 changes: 0 additions & 26 deletions nodes/nomos-node/src/bridges/libp2p.rs

This file was deleted.

Loading

0 comments on commit 52f77ef

Please sign in to comment.