From 49ac81b950d08b2d19a2de7787d76b34869458a5 Mon Sep 17 00:00:00 2001 From: holisticode <88287+holisticode@users.noreply.github.com> Date: Fri, 6 Sep 2024 13:31:51 -0500 Subject: [PATCH] Implemented get_blob on sampling rocksdb adapter (#732) * Implemented get_blob on sampling rocksdb adapter * addressed PR comments --- .../sampling/src/storage/adapters/rocksdb.rs | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs b/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs index 8e0100e3a..8322fc809 100644 --- a/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs +++ b/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs @@ -1,4 +1,8 @@ use kzgrs_backend::common::ColumnIndex; +use nomos_da_storage::{ + fs::load_blob, + rocksdb::{key_bytes, DA_VERIFIED_KEY_PREFIX}, +}; // std use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{marker::PhantomData, path::PathBuf}; @@ -19,8 +23,8 @@ pub struct RocksAdapter where S: StorageSerde + Send + Sync + 'static, { - _settings: RocksAdapterSettings, - _storage_relay: OutboundRelay>>, + settings: RocksAdapterSettings, + storage_relay: OutboundRelay>>, blob: PhantomData, } @@ -36,22 +40,43 @@ where type Settings = RocksAdapterSettings; async fn new( - _settings: Self::Settings, - _storage_relay: OutboundRelay< as ServiceData>::Message>, + settings: Self::Settings, + storage_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self { Self { - _settings, - _storage_relay, + settings, + storage_relay, blob: PhantomData, } } async fn get_blob( &self, - _blob_id: ::BlobId, - _column_idx: ColumnIndex, + blob_id: ::BlobId, + column_idx: ColumnIndex, ) -> Result, DynError> { - todo!() + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + self.storage_relay + .send(StorageMsg::Load { + key: key_bytes(DA_VERIFIED_KEY_PREFIX, &blob_id), + reply_channel: reply_tx, + }) + .await + .expect("failed to send Load message to storage relay"); + + if reply_rx.await?.is_some() { + let blob_bytes = load_blob( + self.settings.blob_storage_directory.clone(), + blob_id.as_ref(), + &column_idx.to_be_bytes(), + ) + .await?; + Ok(S::deserialize(blob_bytes) + .map(|blob| Some(blob)) + .unwrap_or_default()) + } else { + Ok(None) + } } }