From aff903aa70926018d9e6be48f302908767a2dbd9 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Wed, 1 Nov 2023 15:24:57 +0800 Subject: [PATCH] Da blob api (#487) * add da_blob for new http api --- nodes/nomos-node-api/src/http/backend/axum.rs | 16 ++++++++++ nodes/nomos-node-api/src/http/da.rs | 31 +++++++++++++++++-- nomos-da/full-replication/src/lib.rs | 1 + 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/nodes/nomos-node-api/src/http/backend/axum.rs b/nodes/nomos-node-api/src/http/backend/axum.rs index 1e17d83dc..ec6b28800 100644 --- a/nodes/nomos-node-api/src/http/backend/axum.rs +++ b/nodes/nomos-node-api/src/http/backend/axum.rs @@ -80,6 +80,7 @@ where .merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi())) .route("/da/metrics", routing::get(da_metrics)) .route("/da/status", routing::post(da_status)) + .route("/da/blobs", routing::post(da_blobs)) .route("/cl/metrics", routing::get(cl_metrics::)) .route("/cl/status", routing::post(cl_status::)) .route("/carnot/info", routing::get(carnot_info::)) @@ -128,6 +129,21 @@ async fn da_status( make_request_and_return_response!(da::da_mempool_status(store, items)) } +#[utoipa::path( + post, + path = "/da/blobs", + responses( + (status = 200, description = "Get pending blobs", body = Vec), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn da_blobs( + State(store): State, + Json(items): Json::Hash>>, +) -> Response { + make_request_and_return_response!(da::da_blobs(store, items)) +} + #[utoipa::path( get, path = "/cl/metrics", diff --git a/nodes/nomos-node-api/src/http/da.rs b/nodes/nomos-node-api/src/http/da.rs index 22d45c0e5..918bb8542 100644 --- a/nodes/nomos-node-api/src/http/da.rs +++ b/nodes/nomos-node-api/src/http/da.rs @@ -1,5 +1,9 @@ -use full_replication::{Blob, Certificate}; +use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication}; use nomos_core::da::blob; +use nomos_da::{ + backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, + DaMsg, DataAvailabilityService, +}; use nomos_mempool::{ backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter, @@ -8,12 +12,18 @@ use nomos_mempool::{ }; use tokio::sync::oneshot; -type DaMempoolService = MempoolService< +pub type DaMempoolService = MempoolService< Libp2pAdapter::Hash>, MockPool::Hash>, CertDiscriminant, >; +pub type DataAvailability = DataAvailabilityService< + FullReplication>, + BlobCache<::Hash, Blob>, + DaLibp2pAdapter, +>; + pub async fn da_mempool_metrics( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, ) -> Result { @@ -45,3 +55,20 @@ pub async fn da_mempool_status( Ok(receiver.await.unwrap()) } + +pub async fn da_blobs( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, + ids: Vec<::Hash>, +) -> Result, super::DynError> { + let relay = handle.relay::().connect().await?; + let (reply_channel, receiver) = oneshot::channel(); + relay + .send(DaMsg::Get { + ids: Box::new(ids.into_iter()), + reply_channel, + }) + .await + .map_err(|(e, _)| e)?; + + Ok(receiver.await?) +} diff --git a/nomos-da/full-replication/src/lib.rs b/nomos-da/full-replication/src/lib.rs index 762c5357b..c8c122a66 100644 --- a/nomos-da/full-replication/src/lib.rs +++ b/nomos-da/full-replication/src/lib.rs @@ -93,6 +93,7 @@ impl CertificateStrategy for AbsoluteNumber { } #[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct Blob { data: Bytes,