Skip to content

Commit

Permalink
Implemented get_blob on sampling rocksdb adapter (#732)
Browse files Browse the repository at this point in the history
* Implemented get_blob on sampling rocksdb adapter

* addressed PR comments
  • Loading branch information
holisticode authored Sep 6, 2024
1 parent 9835d09 commit 49ac81b
Showing 1 changed file with 34 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use kzgrs_backend::common::ColumnIndex;
use nomos_da_storage::{
fs::load_blob,
rocksdb::{key_bytes, DA_VERIFIED_KEY_PREFIX},
};
// std
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{marker::PhantomData, path::PathBuf};
Expand All @@ -19,8 +23,8 @@ pub struct RocksAdapter<B, S>
where
S: StorageSerde + Send + Sync + 'static,
{
_settings: RocksAdapterSettings,
_storage_relay: OutboundRelay<StorageMsg<RocksBackend<S>>>,
settings: RocksAdapterSettings,
storage_relay: OutboundRelay<StorageMsg<RocksBackend<S>>>,
blob: PhantomData<B>,
}

Expand All @@ -36,22 +40,43 @@ where
type Settings = RocksAdapterSettings;

async fn new(
_settings: Self::Settings,
_storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
settings: Self::Settings,
storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
) -> Self {
Self {
_settings,
_storage_relay,
settings,
storage_relay,
blob: PhantomData,
}
}

async fn get_blob(
&self,
_blob_id: <Self::Blob as Blob>::BlobId,
_column_idx: ColumnIndex,
blob_id: <Self::Blob as Blob>::BlobId,
column_idx: ColumnIndex,
) -> Result<Option<Self::Blob>, DynError> {
todo!()
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.storage_relay
.send(StorageMsg::Load {
key: key_bytes(DA_VERIFIED_KEY_PREFIX, &blob_id),
reply_channel: reply_tx,
})
.await
.expect("failed to send Load message to storage relay");

if reply_rx.await?.is_some() {
let blob_bytes = load_blob(
self.settings.blob_storage_directory.clone(),
blob_id.as_ref(),
&column_idx.to_be_bytes(),
)
.await?;
Ok(S::deserialize(blob_bytes)
.map(|blob| Some(blob))
.unwrap_or_default())
} else {
Ok(None)
}
}
}

Expand Down

0 comments on commit 49ac81b

Please sign in to comment.