Skip to content

Commit

Permalink
DA: Executor http api (#801)
Browse files Browse the repository at this point in the history
* Node api handlers and backend modules

* Executor axum api backend

* Expose config functions from node

* Descriptive generics names in http api

* Nomos node metrics feature
  • Loading branch information
bacv authored Oct 2, 2024
1 parent 9ecd738 commit b01c4dd
Show file tree
Hide file tree
Showing 13 changed files with 928 additions and 427 deletions.
31 changes: 28 additions & 3 deletions nodes/nomos-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,42 @@ version = "0.1.0"
edition = "2021"

[dependencies]
async-trait = "0.1"
axum = { version = "0.6" }
clap = { version = "4.5.13", features = ["derive"] }
color-eyre = "0.6.0"
hyper = { version = "0.14", features = ["full"] }
kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" }
nomos-api = { path = "../../nomos-services/api" }
nomos-core = { path = "../../nomos-core" }
nomos-da-network-core = { path = "../../nomos-da/network/core" }
nomos-da-network-service = { path = "../../nomos-services/data-availability/network" }
nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling", features = ["rocksdb-backend"] }
nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-mempool = { path = "../../nomos-services/mempool", features = [
"mock",
"libp2p",
] }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-node = { path = "../nomos-node" }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
clap = { version = "4.5.13", features = ["derive"] }
rand = "0.8"
rand_chacha = "0.3"
serde = "1"
serde_yaml = "0.9"
subnetworks-assignations = { path = "../../nomos-da/network/subnetworks-assignations" }
tower-http = { version = "0.4", features = ["cors", "trace"] }
tracing = "0.1.40"
utoipa = "4.0"
utoipa-swagger-ui = { version = "4.0" }
uuid = { version = "1.10.0", features = ["v4"] }
serde_yaml = "0.9.34+deprecated"

[features]
default = ["tracing"]
mixnet = ["nomos-node/mixnet"]
metrics = ["nomos-node/metrics"]
tracing = ["nomos-node/tracing"]
tracing = ["nomos-node/tracing"]
312 changes: 312 additions & 0 deletions nodes/nomos-executor/src/api/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
// std
use std::error::Error;
use std::{fmt::Debug, hash::Hash};
// crates
use axum::{http::HeaderValue, routing, Router, Server};
use hyper::header::{CONTENT_TYPE, USER_AGENT};
use nomos_api::Backend;
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata;
use nomos_core::da::DaVerifier as CoreDaVerifier;
use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction};
use nomos_da_network_core::SubnetworkId;
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_da_verifier::backend::VerifierBackend;
use nomos_libp2p::PeerId;
use nomos_mempool::{tx::service::openapi::Status, MempoolMetrics};
use nomos_node::api::handlers::{
add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers,
cryptarchia_info, get_metrics, get_range, libp2p_info,
};
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
// internal

/// Configuration for the Http Server
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct AxumBackendSettings {
/// Socket where the server will be listening on for incoming requests.
pub address: std::net::SocketAddr,
/// Allowed origins for this server deployment requests.
pub cors_origins: Vec<String>,
}

pub struct AxumBackend<
DaAttestation,
DaBlob,
DaBlobInfo,
Memebership,
DaVerifiedBlobInfo,
DaVerifierBackend,
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
const SIZE: usize,
> {
settings: AxumBackendSettings,
_attestation: core::marker::PhantomData<DaAttestation>,
_blob: core::marker::PhantomData<DaBlob>,
_certificate: core::marker::PhantomData<DaBlobInfo>,
_membership: core::marker::PhantomData<Memebership>,
_vid: core::marker::PhantomData<DaVerifiedBlobInfo>,
_verifier_backend: core::marker::PhantomData<DaVerifierBackend>,
_tx: core::marker::PhantomData<Tx>,
_storage_serde: core::marker::PhantomData<DaStorageSerializer>,
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
_sampling_rng: core::marker::PhantomData<SamplingRng>,
_sampling_storage: core::marker::PhantomData<SamplingStorage>,
}

#[derive(OpenApi)]
#[openapi(
paths(
),
components(
schemas(Status<HeaderId>, MempoolMetrics)
),
tags(
(name = "da", description = "data availibility related APIs")
)
)]
struct ApiDoc;

#[async_trait::async_trait]
impl<
DaAttestation,
DaBlob,
DaBlobInfo,
Membership,
DaVerifiedBlobInfo,
DaVerifierBackend,
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
const SIZE: usize,
> Backend
for AxumBackend<
DaAttestation,
DaBlob,
DaBlobInfo,
Membership,
DaVerifiedBlobInfo,
DaVerifierBackend,
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>
where
DaAttestation: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
DaBlob: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<DaBlob as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
DaBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
+ Clone
+ Debug
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
<DaBlobInfo as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
DaVerifiedBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
+ From<DaBlobInfo>
+ Eq
+ Debug
+ Metadata
+ Hash
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
<DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
<DaVerifiedBlobInfo as Metadata>::AppId:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync,
<DaVerifiedBlobInfo as Metadata>::Index:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
DaVerifierBackend: VerifierBackend + CoreDaVerifier<DaBlob = DaBlob> + Send + Sync + 'static,
<DaVerifierBackend as VerifierBackend>::Settings: Clone,
<DaVerifierBackend as CoreDaVerifier>::Error: Error,
Tx: Transaction
+ Clone
+ Debug
+ Eq
+ Hash
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static,
<Tx as nomos_core::tx::Transaction>::Hash:
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
DaStorageSerializer: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore + Send + 'static,
SamplingBackend: DaSamplingServiceBackend<
SamplingRng,
BlobId = <DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId,
> + Send
+ 'static,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + 'static,
{
type Error = hyper::Error;
type Settings = AxumBackendSettings;

async fn new(settings: Self::Settings) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(Self {
settings,
_attestation: core::marker::PhantomData,
_blob: core::marker::PhantomData,
_certificate: core::marker::PhantomData,
_membership: core::marker::PhantomData,
_vid: core::marker::PhantomData,
_verifier_backend: core::marker::PhantomData,
_tx: core::marker::PhantomData,
_storage_serde: core::marker::PhantomData,
_sampling_backend: core::marker::PhantomData,
_sampling_network_adapter: core::marker::PhantomData,
_sampling_rng: core::marker::PhantomData,
_sampling_storage: core::marker::PhantomData,
})
}

async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
let mut builder = CorsLayer::new();
if self.settings.cors_origins.is_empty() {
builder = builder.allow_origin(Any);
}

for origin in &self.settings.cors_origins {
builder = builder.allow_origin(
origin
.as_str()
.parse::<HeaderValue>()
.expect("fail to parse origin"),
);
}

let app = Router::new()
.layer(
builder
.allow_headers([CONTENT_TYPE, USER_AGENT])
.allow_methods(Any),
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/cl/metrics", routing::get(cl_metrics::<Tx>))
.route("/cl/status", routing::post(cl_status::<Tx>))
.route(
"/cryptarchia/info",
routing::get(
cryptarchia_info::<
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route(
"/cryptarchia/headers",
routing::get(
cryptarchia_headers::<
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route(
"/da/add_blob",
routing::post(
add_blob::<
DaAttestation,
DaBlob,
Membership,
DaVerifierBackend,
DaStorageSerializer,
>,
),
)
.route(
"/da/get_range",
routing::post(
get_range::<
Tx,
DaBlobInfo,
DaVerifiedBlobInfo,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route("/network/info", routing::get(libp2p_info))
.route(
"/storage/block",
routing::post(block::<DaStorageSerializer, Tx>),
)
.route("/mempool/add/tx", routing::post(add_tx::<Tx>))
.route(
"/mempool/add/blobinfo",
routing::post(
add_blob_info::<
DaVerifiedBlobInfo,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
>,
),
)
.route("/metrics", routing::get(get_metrics))
.with_state(handle);

Server::bind(&self.settings.address)
.serve(app.into_make_service())
.await
}
}
1 change: 1 addition & 0 deletions nodes/nomos-executor/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod backend;
Loading

0 comments on commit b01c4dd

Please sign in to comment.