Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DA: Sampling service backend fixes #710

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 35 additions & 52 deletions nomos-services/data-availability/sampling/src/backend/kzgrs.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::borrow::BorrowMut;
// std
use std::collections::{BTreeSet, HashMap};
use std::fmt::Debug;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};

// crates
use chrono::{naive::NaiveDateTime, Utc};
use rand::distributions::Standard;
use rand::prelude::*;
use rand_chacha::ChaCha20Rng;
Expand All @@ -20,9 +17,8 @@ use nomos_core::da::BlobId;
use nomos_da_network_core::SubnetworkId;

pub struct SamplingContext {
blob_id: BlobId,
subnets: Vec<SubnetworkId>,
started: NaiveDateTime,
started: Instant,
}

#[derive(Debug, Clone)]
Expand All @@ -35,47 +31,34 @@ pub struct KzgrsDaSamplerSettings {
pub struct KzgrsDaSampler {
settings: KzgrsDaSamplerSettings,
validated_blobs: BTreeSet<BlobId>,
// TODO: This needs to be properly synchronized, if this is going to be accessed
// by independent threads (monitoring thread)
pending_sampling_blobs: HashMap<BlobId, SamplingContext>,
// TODO: is there a better place for this? Do we need to have this even globally?
// Do we already have some source of randomness already?
rng: ChaCha20Rng,
}

impl KzgrsDaSampler {
// TODO: this might not be the right signature, as the lifetime of self needs to be evaluated
async fn start_pending_blob_monitor(&'static mut self) {
//let mut sself = self;
let monitor = thread::spawn(move || {
loop {
thread::sleep(self.settings.old_blobs_check_duration);
// everything older than cut_timestamp should be removed;
let cut_timestamp = Utc::now().naive_utc() - self.settings.blobs_validity_duration;
// retain all elements which come after the cut_timestamp
self.pending_sampling_blobs
.retain(|_, ctx| ctx.started.gt(&cut_timestamp));
}
fn prune_by_time(&mut self) {
self.pending_sampling_blobs.retain(|_blob_id, context| {
context.started.elapsed() < self.settings.old_blobs_check_duration
});
monitor.join().unwrap();
}
}

#[async_trait::async_trait]
impl<'a> DaSamplingServiceBackend for KzgrsDaSampler {
impl DaSamplingServiceBackend for KzgrsDaSampler {
type Settings = KzgrsDaSamplerSettings;
type BlobId = BlobId;
type Blob = DaBlob;

fn new(settings: Self::Settings) -> Self {
let bt: BTreeSet<BlobId> = BTreeSet::new();
Self {
settings: settings,
settings,
validated_blobs: bt,
pending_sampling_blobs: HashMap::new(),
rng: ChaCha20Rng::from_entropy(),
}
// TODO: how to start the actual monitoring thread with the correct ownership/lifetime?
}

async fn get_validated_blobs(&self) -> BTreeSet<Self::BlobId> {
Expand All @@ -84,48 +67,48 @@ impl<'a> DaSamplingServiceBackend for KzgrsDaSampler {

async fn mark_in_block(&mut self, blobs_ids: &[Self::BlobId]) {
for id in blobs_ids {
if self.pending_sampling_blobs.contains_key(id) {
self.pending_sampling_blobs.remove(id);
}

if self.validated_blobs.contains(id) {
self.validated_blobs.remove(id);
}
self.pending_sampling_blobs.remove(id);
self.validated_blobs.remove(id);
}
}

async fn handle_sampling_success(&mut self, blob_id: Self::BlobId, blob: Self::Blob) {
// this should not even happen
if !self.pending_sampling_blobs.contains_key(&blob_id) {}

let ctx = self.pending_sampling_blobs.get_mut(&blob_id).unwrap();
ctx.subnets.push(blob.column_idx as SubnetworkId);

// sampling of this blob_id terminated successfully
if ctx.subnets.len() == self.settings.num_samples as usize {
self.validated_blobs.insert(blob_id);
if let Some(ctx) = self.pending_sampling_blobs.get_mut(&blob_id) {
ctx.subnets.push(blob.column_idx as SubnetworkId);

// sampling of this blob_id terminated successfully
if ctx.subnets.len() == self.settings.num_samples as usize {
self.validated_blobs.insert(blob_id);
// cleanup from pending samplings
self.pending_sampling_blobs.remove(&blob_id);
}
} else {
unreachable!("We should not receive a sampling success from a non triggered blobId");
}
}

async fn handle_sampling_error(&mut self, _blob_id: Self::BlobId) {
// TODO: Unimplmented yet because the error handling in the service
// does not yet receive a blob_id
unimplemented!("no use case yet")
async fn handle_sampling_error(&mut self, blob_id: Self::BlobId) {
// If it fails a single time we consider it failed.
// We may want to abstract the sampling policies somewhere else at some point if we
// need to get fancier than this
self.pending_sampling_blobs.remove(&blob_id);
self.validated_blobs.remove(&blob_id);
}

async fn init_sampling(&mut self, blob_id: Self::BlobId) -> Vec<SubnetworkId> {
let mut ctx: SamplingContext = SamplingContext {
blob_id: (blob_id),
subnets: vec![],
started: Utc::now().naive_utc(),
};

let subnets: Vec<SubnetworkId> = Standard
.sample_iter(&mut self.rng)
.take(self.settings.num_samples as usize)
.collect();
ctx.subnets = subnets.clone();

let ctx: SamplingContext = SamplingContext {
subnets: subnets.clone(),
started: Instant::now(),
};
self.pending_sampling_blobs.insert(blob_id, ctx);
subnets
}

fn prune(&mut self) {
self.prune_by_time()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ pub trait DaSamplingServiceBackend {
async fn handle_sampling_success(&mut self, blob_id: Self::BlobId, blob: Self::Blob);
async fn handle_sampling_error(&mut self, blob_id: Self::BlobId);
async fn init_sampling(&mut self, blob_id: Self::BlobId) -> Vec<SubnetworkId>;
fn prune(&mut self);
}
2 changes: 2 additions & 0 deletions nomos-services/data-availability/sampling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ where
}
}
}
// cleanup not on time samples
sampler.prune();
}
}
.instrument(span!(Level::TRACE, DA_SAMPLING_TAG))
Expand Down
Loading