Skip to content

Commit

Permalink
New http api to nomos-node integration (#490)
Browse files Browse the repository at this point in the history
* Integrate new http api to nomos-node
  • Loading branch information
al8n authored Nov 8, 2023
1 parent ccc8590 commit c3422c1
Show file tree
Hide file tree
Showing 20 changed files with 116 additions and 548 deletions.
2 changes: 1 addition & 1 deletion nodes/mixnode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ serde = "1"
serde_yaml = "0.9"
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = "1.29.1"
tokio = { version = "1.33", features = ["macros"] }
4 changes: 2 additions & 2 deletions nodes/nomos-node-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ edition = "2021"

[features]
default = ["axum"]
axum = ["dep:axum", "dep:hyper", "utoipa-swagger-ui/axum"]
axum = ["dep:axum", "dep:hyper", "dep:tower-http", "utoipa-swagger-ui/axum"]

[dependencies]
async-trait = "0.1"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
tracing = "0.1"


consensus-engine = { path = "../../consensus-engine" }
nomos-core = { path = "../../nomos-core" }
nomos-consensus = { path = "../../nomos-services/consensus" }
Expand All @@ -30,6 +29,7 @@ tokio = { version = "1.33", default-features = false, features = ["sync"] }
# axum related dependencies
axum = { version = "0.6", optional = true }
hyper = { version = "0.14", features = ["full"], optional = true }
tower-http = { version = "0.4", optional = true, features = ["cors", "trace"] }


# openapi related dependencies
Expand Down
88 changes: 60 additions & 28 deletions nodes/nomos-node-api/src/http/backend/axum.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use std::{fmt::Debug, hash::Hash, net::SocketAddr};
use std::{fmt::Debug, hash::Hash};

use axum::{
extract::{Query, State},
http::HeaderValue,
response::Response,
routing, Json, Router, Server,
};
use hyper::header::{CONTENT_TYPE, USER_AGENT};
use overwatch_rs::overwatch::handle::OverwatchHandle;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

Expand All @@ -22,10 +28,13 @@ use crate::{
Backend,
};

#[derive(Clone)]
/// Configuration for the Http Server
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct AxumBackendSettings {
pub addr: SocketAddr,
pub handle: OverwatchHandle,
/// Socket where the server will be listening on for incoming requests.
pub address: std::net::SocketAddr,
/// Allowed origins for this server deployment requests.
pub cors_origins: Vec<String>,
}

pub struct AxumBackend<T, S, const SIZE: usize> {
Expand Down Expand Up @@ -80,8 +89,28 @@ where
})
}

async fn serve(self) -> Result<(), Self::Error> {
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
let mut builder = CorsLayer::new();
if self.settings.cors_origins.is_empty() {
builder = builder.allow_origin(Any);
}

for origin in &self.settings.cors_origins {
builder = builder.allow_origin(
origin
.as_str()
.parse::<HeaderValue>()
.expect("fail to parse origin"),
);
}

let app = Router::new()
.layer(
builder
.allow_headers([CONTENT_TYPE, USER_AGENT])
.allow_methods(Any),
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/da/metrics", routing::get(da_metrics))
.route("/da/status", routing::post(da_status))
Expand All @@ -94,9 +123,9 @@ where
.route("/storage/block", routing::post(block::<S, T>))
.route("/mempool/add/tx", routing::post(add_tx::<T>))
.route("/mempool/add/cert", routing::post(add_cert))
.with_state(self.settings.handle);
.with_state(handle);

Server::bind(&self.settings.addr)
Server::bind(&self.settings.address)
.serve(app.into_make_service())
.await
}
Expand Down Expand Up @@ -125,8 +154,8 @@ macro_rules! make_request_and_return_response {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn da_metrics(State(store): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(da::da_mempool_metrics(&store))
async fn da_metrics(State(handle): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(da::da_mempool_metrics(&handle))
}

#[utoipa::path(
Expand All @@ -138,10 +167,10 @@ async fn da_metrics(State(store): State<OverwatchHandle>) -> Response {
)
)]
async fn da_status(
State(store): State<OverwatchHandle>,
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> Response {
make_request_and_return_response!(da::da_mempool_status(&store, items))
make_request_and_return_response!(da::da_mempool_status(&handle, items))
}

#[utoipa::path(
Expand All @@ -153,10 +182,10 @@ async fn da_status(
)
)]
async fn da_blobs(
State(store): State<OverwatchHandle>,
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> Response {
make_request_and_return_response!(da::da_blobs(&store, items))
make_request_and_return_response!(da::da_blobs(&handle, items))
}

#[utoipa::path(
Expand All @@ -167,7 +196,7 @@ async fn da_blobs(
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cl_metrics<T>(State(store): State<OverwatchHandle>) -> Response
async fn cl_metrics<T>(State(handle): State<OverwatchHandle>) -> Response
where
T: Transaction
+ Clone
Expand All @@ -180,7 +209,7 @@ where
+ 'static,
<T as nomos_core::tx::Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
{
make_request_and_return_response!(cl::cl_mempool_metrics::<T>(&store))
make_request_and_return_response!(cl::cl_mempool_metrics::<T>(&handle))
}

#[utoipa::path(
Expand All @@ -192,15 +221,15 @@ where
)
)]
async fn cl_status<T>(
State(store): State<OverwatchHandle>,
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<T as Transaction>::Hash>>,
) -> Response
where
T: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<T as nomos_core::tx::Transaction>::Hash:
Serialize + DeserializeOwned + std::cmp::Ord + Debug + Send + Sync + 'static,
{
make_request_and_return_response!(cl::cl_mempool_status::<T>(&store, items))
make_request_and_return_response!(cl::cl_mempool_status::<T>(&handle, items))
}

#[utoipa::path(
Expand All @@ -211,13 +240,13 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn carnot_info<Tx, SS, const SIZE: usize>(State(store): State<OverwatchHandle>) -> Response
async fn carnot_info<Tx, SS, const SIZE: usize>(State(handle): State<OverwatchHandle>) -> Response
where
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
SS: StorageSerde + Send + Sync + 'static,
{
make_request_and_return_response!(consensus::carnot_info::<Tx, SS, SIZE>(&store))
make_request_and_return_response!(consensus::carnot_info::<Tx, SS, SIZE>(&handle))
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -255,8 +284,8 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn libp2p_info(State(store): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(libp2p::libp2p_info(&store))
async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(libp2p::libp2p_info(&handle))
}

#[utoipa::path(
Expand All @@ -267,12 +296,12 @@ async fn libp2p_info(State(store): State<OverwatchHandle>) -> Response {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn block<S, Tx>(State(store): State<OverwatchHandle>, Json(id): Json<BlockId>) -> Response
async fn block<S, Tx>(State(handle): State<OverwatchHandle>, Json(id): Json<BlockId>) -> Response
where
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,
S: StorageSerde + Send + Sync + 'static,
{
make_request_and_return_response!(storage::block_req::<S, Tx>(&store, id))
make_request_and_return_response!(storage::block_req::<S, Tx>(&handle, id))
}

#[utoipa::path(
Expand All @@ -283,7 +312,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_tx<Tx>(State(store): State<OverwatchHandle>, Json(tx): Json<Tx>) -> Response
async fn add_tx<Tx>(State(handle): State<OverwatchHandle>, Json(tx): Json<Tx>) -> Response
where
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
Expand All @@ -294,26 +323,29 @@ where
nomos_mempool::Transaction,
Tx,
<Tx as Transaction>::Hash,
>(&store, tx, Transaction::hash))
>(&handle, tx, Transaction::hash))
}

#[utoipa::path(
post,
path = "/mempool/add/tx",
path = "/mempool/add/cert",
responses(
(status = 200, description = "Add certificate to the mempool"),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_cert(State(store): State<OverwatchHandle>, Json(cert): Json<Certificate>) -> Response {
async fn add_cert(
State(handle): State<OverwatchHandle>,
Json(cert): Json<Certificate>,
) -> Response {
make_request_and_return_response!(mempool::add::<
Libp2p,
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
nomos_mempool::Certificate,
Certificate,
<Blob as blob::Blob>::Hash,
>(
&store,
&handle,
cert,
nomos_core::da::certificate::Certificate::hash
))
Expand Down
9 changes: 6 additions & 3 deletions nodes/nomos-node-api/src/http/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::{fmt::Debug, hash::Hash};

use overwatch_rs::overwatch::handle::OverwatchHandle;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::oneshot;

use consensus_engine::{
overlay::{RandomBeaconState, RoundRobin, TreeOverlay},
Block, BlockId,
Expand All @@ -20,9 +24,6 @@ use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter,
};
use nomos_storage::backends::{sled::SledBackend, StorageSerde};
use overwatch_rs::overwatch::handle::OverwatchHandle;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::oneshot;

pub type Carnot<Tx, SS, const SIZE: usize> = CarnotConsensus<
ConsensusLibp2pAdapter,
Expand Down Expand Up @@ -53,6 +54,7 @@ where
.send(ConsensusMsg::Info { tx: sender })
.await
.map_err(|(e, _)| e)?;

Ok(receiver.await?)
}

Expand All @@ -76,5 +78,6 @@ where
})
.await
.map_err(|(e, _)| e)?;

Ok(receiver.await?)
}
2 changes: 1 addition & 1 deletion nodes/nomos-node-api/src/http/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ where
A::Settings: Send + Sync,
D: Discriminant,
Item: Clone + Debug + Send + Sync + 'static + Hash,
Key: Clone + Debug + Ord + Hash,
Key: Clone + Debug + Ord + Hash + 'static,
{
let relay = handle
.relay::<MempoolService<A, MockPool<Item, Key>, D>>()
Expand Down
13 changes: 9 additions & 4 deletions nodes/nomos-node-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use overwatch_rs::{
overwatch::handle::OverwatchHandle,
services::{
handle::ServiceStateHandle,
relay::NoMessage,
Expand All @@ -21,16 +22,17 @@ pub trait Backend {
where
Self: Sized;

async fn serve(self) -> Result<(), Self::Error>;
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error>;
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ApiServiceSettings<S> {
pub backend_settings: S,
}

pub struct ApiService<B: Backend> {
settings: ApiServiceSettings<B::Settings>,
handle: OverwatchHandle,
}

impl<B: Backend> ServiceData for ApiService<B> {
Expand All @@ -53,13 +55,16 @@ where
/// Initialize the service with the given state
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let settings = service_state.settings_reader.get_updated_settings();
Ok(Self { settings })
Ok(Self {
settings,
handle: service_state.overwatch_handle,
})
}

/// Service main loop
async fn run(mut self) -> Result<(), DynError> {
let endpoint = B::new(self.settings.backend_settings).await?;
endpoint.serve().await?;
endpoint.serve(self.handle).await?;
Ok(())
}
}
7 changes: 5 additions & 2 deletions nodes/nomos-node-api/tests/todo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use axum::{routing, Router, Server};
use hyper::Error;
use nomos_node_api::{ApiService, ApiServiceSettings, Backend};
use overwatch_derive::Services;
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use overwatch_rs::{
overwatch::{handle::OverwatchHandle, OverwatchRunner},
services::handle::ServiceHandle,
};
use utoipa::{
openapi::security::{ApiKey, ApiKeyValue, SecurityScheme},
Modify, OpenApi,
Expand Down Expand Up @@ -71,7 +74,7 @@ impl Backend for WebServer {
Ok(Self { addr: settings })
}

async fn serve(self) -> Result<(), Self::Error> {
async fn serve(self, _handle: OverwatchHandle) -> Result<(), Self::Error> {
let store = Arc::new(Store::default());
let app = Router::new()
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
Expand Down
3 changes: 2 additions & 1 deletion nodes/nomos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
blake2 = "0.10"
bincode = "2.0.0-rc.2"
bytes = "1.3"
clap = { version = "4", features = ["derive"] }
clap = { version = "4", features = ["derive", "env"] }
chrono = "0.4"
futures = "0.3"
http = "0.2.9"
Expand All @@ -20,6 +20,7 @@ tracing = "0.1"
multiaddr = "0.18"
nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-node-api = { path = "../nomos-node-api" }
nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
Expand Down
Loading

0 comments on commit c3422c1

Please sign in to comment.