Skip to content

Commit

Permalink
re-worked seq count provider a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Jun 27, 2024
1 parent 8a48a62 commit 35b24ba
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 52 deletions.
7 changes: 5 additions & 2 deletions satrs-example/src/tmtc/tm_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ use std::{
};

use log::info;
use satrs::tmtc::{PacketAsVec, PacketInPool, SharedPacketPool};
use satrs::{
pool::PoolProvider,
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
seq_count::CcsdsSimpleSeqCountProvider,
spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
time::cds::MIN_CDS_FIELD_LEN,
CcsdsPacket,
},
};
use satrs::{
seq_count::SequenceCountProvider,
tmtc::{PacketAsVec, PacketInPool, SharedPacketPool},
};

use crate::interface::tcp::SyncTcpTmSource;

Expand Down
8 changes: 4 additions & 4 deletions satrs/src/cfdp/dest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub enum TransactionStep {
SendingFinishedPdu = 6,
}

// This contains transfer state parameters for destination transaction.
#[derive(Debug)]
struct TransferState<CheckTimer: CountdownProvider> {
transaction_id: Option<TransactionId>,
Expand Down Expand Up @@ -87,6 +88,7 @@ impl<CheckTimer: CountdownProvider> Default for TransferState<CheckTimer> {
}
}

// This contains parameters for destination transaction.
#[derive(Debug)]
struct TransactionParams<CheckTimer: CountdownProvider> {
tstate: TransferState<CheckTimer>,
Expand Down Expand Up @@ -380,9 +382,7 @@ impl<
let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?;
self.tparams.reset();
self.tparams.tstate.metadata_params = *metadata_pdu.metadata_params();
let remote_cfg = self
.remote_cfg_table
.get_remote_config(metadata_pdu.source_id().value());
let remote_cfg = self.remote_cfg_table.get(metadata_pdu.source_id().value());
if remote_cfg.is_none() {
return Err(DestError::NoRemoteCfgFound(metadata_pdu.dest_id()));
}
Expand Down Expand Up @@ -847,7 +847,7 @@ mod tests {
tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestFaultHandler},
user::OwnedMetadataRecvdParams,
CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig,
RemoteEntityConfig, StdRemoteEntityConfigProvider, CRC_32,
StdRemoteEntityConfigProvider, CRC_32,
};

use super::*;
Expand Down
8 changes: 4 additions & 4 deletions satrs/src/cfdp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ impl RemoteEntityConfig {

pub trait RemoteEntityConfigProvider {
/// Retrieve the remote entity configuration for the given remote ID.
fn get_remote_config(&self, remote_id: u64) -> Option<&RemoteEntityConfig>;
fn get_remote_config_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>;
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig>;
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>;
/// Add a new remote configuration. Return [true] if the configuration was
/// inserted successfully, and [false] if a configuration already exists.
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool;
Expand All @@ -286,10 +286,10 @@ pub struct StdRemoteEntityConfigProvider {

#[cfg(feature = "std")]
impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider {
fn get_remote_config(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
self.remote_cfg_table.get(&remote_id)
}
fn get_remote_config_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> {
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> {
self.remote_cfg_table.get_mut(&remote_id)
}
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool {
Expand Down
11 changes: 2 additions & 9 deletions satrs/src/cfdp/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,18 +476,11 @@ pub mod alloc_mod {
pub fs_requests_len: usize,
}

pub struct PutRequestCacheConfig {
pub max_msgs_to_user_storage: usize,
pub max_fault_handler_overrides_storage: usize,
pub max_flow_label_storage: usize,
pub max_fs_requests_storage: usize,
}

impl StaticPutRequestCacher {
pub fn new(cfg: PutRequestCacheConfig) -> Self {
pub fn new(max_fs_requests_storage: usize) -> Self {
Self {
static_fields: StaticPutRequestFields::default(),
fs_requests: alloc::vec![0; cfg.max_fs_requests_storage],
fs_requests: alloc::vec![0; max_fs_requests_storage],
fs_requests_len: 0,
}
}
Expand Down
126 changes: 113 additions & 13 deletions satrs/src/cfdp/source.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
use spacepackets::cfdp::{pdu::FileDirectiveType, PduType};
use spacepackets::{
cfdp::{pdu::FileDirectiveType, PduType},
util::UnsignedByteField,
ByteConversionError,
};

use crate::seq_count::SequenceCountProvider;

use super::{
filestore::VirtualFilestore, request::ReadablePutRequest, user::CfdpUser, LocalEntityConfig,
PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfigProvider, UserFaultHookProvider,
filestore::VirtualFilestore,
request::{ReadablePutRequest, StaticPutRequestCacher},
user::CfdpUser,
LocalEntityConfig, PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfig,
RemoteEntityConfigProvider, TransactionId, UserFaultHookProvider,
};

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand All @@ -12,7 +21,7 @@ pub enum TransactionStep {
TransactionStart = 1,
SendingMetadata = 3,
SendingFileData = 4,
/// Re-transmitting missing packets in acknowledged mode6
/// Re-transmitting missing packets in acknowledged mode
Retransmitting = 5,
SendingEof = 6,
WaitingForEofAck = 7,
Expand All @@ -36,6 +45,14 @@ pub struct StateHelper {
num_packets_ready: u32,
}

#[derive(Debug, Copy, Clone, derive_new::new)]
pub struct TransferState {
transaction_id: TransactionId,
remote_cfg: RemoteEntityConfig,
transmission_mode: super::TransmissionMode,
closure_requested: bool,
}

impl Default for StateHelper {
fn default() -> Self {
Self {
Expand All @@ -55,40 +72,62 @@ pub enum SourceError {
},
#[error("unexpected file data PDU")]
UnexpectedFileDataPdu,
#[error("source handler is already busy with put request")]
PutRequestAlreadyActive,
#[error("error caching put request")]
PutRequestCaching(ByteConversionError),
}

#[derive(Debug, thiserror::Error)]
pub enum PutRequestError {
#[error("error caching put request: {0}")]
Storage(#[from] ByteConversionError),
#[error("already busy with put request")]
AlreadyBusy,
}

pub struct SourceHandler<
PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider,
Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider,
SeqCountProvider: SequenceCountProvider,
> {
local_cfg: LocalEntityConfig<UserFaultHook>,
pdu_sender: PduSender,
put_request_cacher: StaticPutRequestCacher,
remote_cfg_table: RemoteCfgTable,
vfs: Vfs,
state_helper: StateHelper,
tstate: Option<TransferState>,
seq_count_provider: SeqCountProvider,
}

impl<
PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider,
Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider,
> SourceHandler<PduSender, UserFaultHook, Vfs, RemoteCfgTable>
SeqCountProvider: SequenceCountProvider,
> SourceHandler<PduSender, UserFaultHook, Vfs, RemoteCfgTable, SeqCountProvider>
{
pub fn new(
cfg: LocalEntityConfig<UserFaultHook>,
pdu_sender: PduSender,
vfs: Vfs,
put_request_cacher: StaticPutRequestCacher,
remote_cfg_table: RemoteCfgTable,
seq_count_provider: SeqCountProvider,
) -> Self {
Self {
local_cfg: cfg,
remote_cfg_table,
vfs,
pdu_sender,
vfs,
put_request_cacher,
state_helper: Default::default(),
tstate: Default::default(),
seq_count_provider,
}
}

Expand Down Expand Up @@ -155,10 +194,66 @@ impl<
Ok(())
}

fn put_request(&mut self, put_request: &impl ReadablePutRequest) -> Result<(), SourceError> {
pub fn put_request(
&mut self,
put_request: &impl ReadablePutRequest,
) -> Result<(), PutRequestError> {
if self.state_helper.state != super::State::Idle {
return Err(PutRequestError::AlreadyBusy);
}
self.put_request_cacher.set(put_request)?;
self.state_helper.state = super::State::Busy;
let source_file = self.put_request_cacher.source_file().unwrap();
if !self.vfs.exists(source_file) {
// TODO: Specific error.
}
let remote_cfg = self.remote_cfg_table.get(
self.put_request_cacher
.static_fields
.destination_id
.value_const(),
);
if remote_cfg.is_none() {
// TODO: Specific error.
}
let remote_cfg = remote_cfg.unwrap();
self.state_helper.num_packets_ready = 0;
//self.tstate.remote_cfg = Some(*remote_cfg);
let transmission_mode = if self.put_request_cacher.static_fields.trans_mode.is_some() {
self.put_request_cacher.static_fields.trans_mode.unwrap()
} else {
remote_cfg.default_transmission_mode
};
let closure_requested = if self
.put_request_cacher
.static_fields
.closure_requested
.is_some()
{
self.put_request_cacher
.static_fields
.closure_requested
.unwrap()
} else {
remote_cfg.closure_requested_by_default
};
self.tstate = Some(TransferState::new(
TransactionId::new(
self.put_request_cacher.static_fields.destination_id,
UnsignedByteField::new(
SeqCountProvider::MAX_BIT_WIDTH / 8,
self.seq_count_provider.get_and_increment().into(),
),
),
*remote_cfg,
transmission_mode,
closure_requested,
));
Ok(())
}

pub fn transmission_mode(&self) {}

fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, SourceError> {
Ok(0)
}
Expand All @@ -172,14 +267,16 @@ impl<

#[cfg(test)]
mod tests {
use alloc::sync::Arc;
use spacepackets::util::UnsignedByteFieldU16;

use super::*;
use crate::cfdp::{
filestore::NativeFilestore,
tests::{basic_remote_cfg_table, TestCfdpSender, TestFaultHandler},
FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider,
use crate::{
cfdp::{
filestore::NativeFilestore,
tests::{basic_remote_cfg_table, TestCfdpSender, TestFaultHandler},
FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider,
},
seq_count::SeqCountProviderSimple,
};

const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
Expand All @@ -190,6 +287,7 @@ mod tests {
TestFaultHandler,
NativeFilestore,
StdRemoteEntityConfigProvider,
SeqCountProviderSimple<u16>,
>;

fn default_source_handler(
Expand All @@ -201,12 +299,14 @@ mod tests {
indication_cfg: IndicationConfig::default(),
fault_handler: FaultHandler::new(test_fault_handler),
};
let static_put_request_cacher = StaticPutRequestCacher::new(1024);
SourceHandler::new(
local_entity_cfg,
test_packet_sender,
NativeFilestore::default(),
static_put_request_cacher,
basic_remote_cfg_table(),
// TestCheckTimerCreator::new(check_timer_expired),
SeqCountProviderSimple::default(),
)
}

Expand Down
Loading

0 comments on commit 35b24ba

Please sign in to comment.