diff --git a/satrs/src/cfdp/dest.rs b/satrs/src/cfdp/dest.rs index 75cf5e2..b1e12da 100644 --- a/satrs/src/cfdp/dest.rs +++ b/satrs/src/cfdp/dest.rs @@ -6,9 +6,9 @@ use super::{ filestore::{FilestoreError, NativeFilestore, VirtualFilestore}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams}, CheckTimerProviderCreator, CountdownProvider, EntityType, LocalEntityConfig, PacketInfo, - PacketTarget, RemoteEntityConfig, RemoteEntityConfigProvider, State, StdCheckTimer, - StdCheckTimerCreator, StdRemoteEntityConfigProvider, TimerContext, TransactionId, - TransactionStep, + PacketTarget, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, State, + StdCheckTimer, StdCheckTimerCreator, StdRemoteEntityConfigProvider, TimerContext, + TransactionId, UserFaultHookProvider, }; use smallvec::SmallVec; use spacepackets::{ @@ -42,6 +42,18 @@ enum CompletionDisposition { Cancelled = 1, } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum TransactionStep { + Idle = 0, + TransactionStart = 1, + ReceivingFileDataPdus = 2, + ReceivingFileDataPdusWithCheckLimitHandling = 3, + SendingAckPdu = 4, + TransferCompletion = 5, + SendingFinishedPdu = 6, +} + #[derive(Debug)] struct TransferState { transaction_id: Option, @@ -140,9 +152,12 @@ impl TransactionParams { pub enum DestError { /// File directive expected, but none specified #[error("expected file directive")] - DirectiveExpected, - #[error("can not process packet type {0:?}")] - CantProcessPacketType(FileDirectiveType), + DirectiveFieldEmpty, + #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")] + CantProcessPacketType { + pdu_type: PduType, + directive_type: Option, + }, #[error("can not process file data PDUs in current state")] WrongStateForFileDataAndEof, // Received new metadata PDU while being already being busy with a file transfer. @@ -168,15 +183,6 @@ pub enum DestError { NoRemoteCfgFound(UnsignedByteField), } -pub trait CfdpPacketSender: Send { - fn send_pdu( - &self, - pdu_type: PduType, - file_directive_type: Option, - raw_pdu: &[u8], - ) -> Result<(), PduError>; -} - /// This is the primary CFDP destination handler. It models the CFDP destination entity, which is /// primarily responsible for receiving files sent from another CFDP entity. It performs the /// reception side of File Copy Operations. @@ -192,13 +198,14 @@ pub trait CfdpPacketSender: Send { /// user and passed as a constructor parameter. The number of generated packets is returned /// by the state machine call. pub struct DestinationHandler< - PduSender: CfdpPacketSender, + PduSender: PduSendProvider, + UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, CheckTimerCreator: CheckTimerProviderCreator, CheckTimerProvider: CountdownProvider, > { - local_cfg: LocalEntityConfig, + local_cfg: LocalEntityConfig, step: TransactionStep, state: State, tparams: TransactionParams, @@ -210,8 +217,9 @@ pub struct DestinationHandler< } #[cfg(feature = "std")] -pub type StdDestinationHandler = DestinationHandler< +pub type StdDestinationHandler = DestinationHandler< PduSender, + UserFaultHook, NativeFilestore, StdRemoteEntityConfigProvider, StdCheckTimerCreator, @@ -219,12 +227,21 @@ pub type StdDestinationHandler = DestinationHandler< >; impl< - PduSender: CfdpPacketSender, + PduSender: PduSendProvider, + UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, CheckTimerCreator: CheckTimerProviderCreator, CheckTimerProvider: CountdownProvider, - > DestinationHandler + > + DestinationHandler< + PduSender, + UserFaultHook, + Vfs, + RemoteCfgTable, + CheckTimerCreator, + CheckTimerProvider, + > { /// Constructs a new destination handler. /// @@ -246,9 +263,9 @@ impl< /// * `check_timer_creator` - This is used by the CFDP handler to generate timers required /// by various tasks. pub fn new( - local_cfg: LocalEntityConfig, + local_cfg: LocalEntityConfig, max_packet_len: usize, - packet_sender: PduSender, + pdu_sender: PduSender, vfs: Vfs, remote_cfg_table: RemoteCfgTable, check_timer_creator: CheckTimerCreator, @@ -259,7 +276,7 @@ impl< state: State::Idle, tparams: Default::default(), packet_buf: alloc::vec![0; max_packet_len], - pdu_sender: packet_sender, + pdu_sender, vfs, remote_cfg_table, check_timer_creator, @@ -272,6 +289,8 @@ impl< /// The state machine should either be called if a packet with the appropriate destination ID /// is received, or periodically in IDLE periods to perform all CFDP related tasks, for example /// checking for timeouts or missed file segments. + /// + /// The function returns the number of sent PDU packets on success. pub fn state_machine( &mut self, cfdp_user: &mut impl CfdpUser, @@ -308,14 +327,15 @@ impl< if packet_info.target() != PacketTarget::DestEntity { // Unwrap is okay here, a PacketInfo for a file data PDU should always have the // destination as the target. - return Err(DestError::CantProcessPacketType( - packet_info.pdu_directive().unwrap(), - )); + return Err(DestError::CantProcessPacketType { + pdu_type: packet_info.pdu_type(), + directive_type: packet_info.pdu_directive(), + }); } match packet_info.pdu_type { PduType::FileDirective => { if packet_info.pdu_directive.is_none() { - return Err(DestError::DirectiveExpected); + return Err(DestError::DirectiveFieldEmpty); } self.handle_file_directive( cfdp_user, @@ -338,12 +358,13 @@ impl< FileDirectiveType::FinishedPdu | FileDirectiveType::NakPdu | FileDirectiveType::KeepAlivePdu => { - return Err(DestError::CantProcessPacketType(pdu_directive)); + return Err(DestError::CantProcessPacketType { + pdu_type: PduType::FileDirective, + directive_type: Some(pdu_directive), + }); } FileDirectiveType::AckPdu => { - todo!( - "check whether ACK pdu handling is applicable by checking the acked directive field" - ) + todo!("acknowledged mode not implemented yet") } FileDirectiveType::MetadataPdu => self.handle_metadata_pdu(raw_packet)?, FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?, @@ -730,7 +751,7 @@ impl< let progress = self.tstate().progress; let fh_code = self .local_cfg - .default_fault_handler + .fault_handler .get_fault_handler(condition_code); match fh_code { FaultHandlerCode::NoticeOfCancellation => { @@ -741,7 +762,7 @@ impl< FaultHandlerCode::AbandonTransaction => self.abandon_transaction(), } self.local_cfg - .default_fault_handler + .fault_handler .report_fault(transaction_id, condition_code, progress) } @@ -804,15 +825,12 @@ impl< #[cfg(test)] mod tests { - use core::{ - cell::{Cell, RefCell}, - sync::atomic::AtomicBool, - }; + use core::{cell::Cell, sync::atomic::AtomicBool}; + use std::fs; #[allow(unused_imports)] use std::println; - use std::{fs, sync::Mutex}; - use alloc::{boxed::Box, collections::VecDeque, string::String, sync::Arc, vec::Vec}; + use alloc::{collections::VecDeque, string::String, sync::Arc, vec::Vec}; use rand::Rng; use spacepackets::{ cfdp::{ @@ -824,9 +842,11 @@ mod tests { }; use crate::cfdp::{ - filestore::NativeFilestore, user::OwnedMetadataRecvdParams, CheckTimerProviderCreator, - CountdownProvider, DefaultFaultHandler, IndicationConfig, RemoteEntityConfig, - StdRemoteEntityConfigProvider, UserFaultHandler, CRC_32, + filestore::NativeFilestore, + tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestFaultHandler}, + user::OwnedMetadataRecvdParams, + CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig, + RemoteEntityConfig, StdRemoteEntityConfigProvider, CRC_32, }; use super::*; @@ -840,41 +860,6 @@ mod tests { pub length: usize, } - struct SentPdu { - pdu_type: PduType, - file_directive_type: Option, - raw_pdu: Vec, - } - #[derive(Default)] - struct TestCfdpSender { - packet_queue: RefCell>, - } - - impl CfdpPacketSender for TestCfdpSender { - fn send_pdu( - &self, - pdu_type: PduType, - file_directive_type: Option, - raw_pdu: &[u8], - ) -> Result<(), PduError> { - self.packet_queue.borrow_mut().push_back(SentPdu { - pdu_type, - file_directive_type, - raw_pdu: raw_pdu.to_vec(), - }); - Ok(()) - } - } - - impl TestCfdpSender { - pub fn retrieve_next_pdu(&self) -> Option { - self.packet_queue.borrow_mut().pop_front() - } - pub fn queue_empty(&self) -> bool { - self.packet_queue.borrow_mut().is_empty() - } - } - #[derive(Default)] struct TestCfdpUser { next_expected_seq_num: u64, @@ -1000,81 +985,6 @@ mod tests { } } - #[derive(Default, Clone)] - struct TestFaultHandler { - notice_of_suspension_queue: Arc>>, - notice_of_cancellation_queue: Arc>>, - abandoned_queue: Arc>>, - ignored_queue: Arc>>, - } - - impl UserFaultHandler for TestFaultHandler { - fn notice_of_suspension_cb( - &mut self, - transaction_id: TransactionId, - cond: ConditionCode, - progress: u64, - ) { - self.notice_of_suspension_queue.lock().unwrap().push_back(( - transaction_id, - cond, - progress, - )) - } - - fn notice_of_cancellation_cb( - &mut self, - transaction_id: TransactionId, - cond: ConditionCode, - progress: u64, - ) { - self.notice_of_cancellation_queue - .lock() - .unwrap() - .push_back((transaction_id, cond, progress)) - } - - fn abandoned_cb( - &mut self, - transaction_id: TransactionId, - cond: ConditionCode, - progress: u64, - ) { - self.abandoned_queue - .lock() - .unwrap() - .push_back((transaction_id, cond, progress)) - } - - fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { - self.ignored_queue - .lock() - .unwrap() - .push_back((transaction_id, cond, progress)) - } - } - - impl TestFaultHandler { - fn suspension_queue_empty(&self) -> bool { - self.notice_of_suspension_queue.lock().unwrap().is_empty() - } - fn cancellation_queue_empty(&self) -> bool { - self.notice_of_cancellation_queue.lock().unwrap().is_empty() - } - fn ignored_queue_empty(&self) -> bool { - self.ignored_queue.lock().unwrap().is_empty() - } - fn abandoned_queue_empty(&self) -> bool { - self.abandoned_queue.lock().unwrap().is_empty() - } - fn all_queues_empty(&self) -> bool { - self.suspension_queue_empty() - && self.cancellation_queue_empty() - && self.ignored_queue_empty() - && self.abandoned_queue_empty() - } - } - #[derive(Debug)] struct TestCheckTimer { counter: Cell, @@ -1128,6 +1038,7 @@ mod tests { type TestDestHandler = DestinationHandler< TestCfdpSender, + TestFaultHandler, NativeFilestore, StdRemoteEntityConfigProvider, TestCheckTimerCreator, @@ -1177,6 +1088,14 @@ mod tests { &self.dest_path } + fn all_fault_queues_empty(&self) -> bool { + self.handler + .local_cfg + .user_fault_hook() + .borrow() + .all_queues_empty() + } + #[allow(dead_code)] fn indication_cfg_mut(&mut self) -> &mut IndicationConfig { &mut self.handler.local_cfg.indication_cfg @@ -1295,36 +1214,15 @@ mod tests { ) } - fn basic_remote_cfg_table() -> StdRemoteEntityConfigProvider { - let mut table = StdRemoteEntityConfigProvider::default(); - let remote_entity_cfg = RemoteEntityConfig::new_with_default_values( - UnsignedByteFieldU16::new(1).into(), - 1024, - 1024, - true, - true, - TransmissionMode::Unacknowledged, - ChecksumType::Crc32, - ); - table.add_config(&remote_entity_cfg); - table - } - fn default_dest_handler( test_fault_handler: TestFaultHandler, test_packet_sender: TestCfdpSender, check_timer_expired: Arc, - ) -> DestinationHandler< - TestCfdpSender, - NativeFilestore, - StdRemoteEntityConfigProvider, - TestCheckTimerCreator, - TestCheckTimer, - > { + ) -> TestDestHandler { let local_entity_cfg = LocalEntityConfig { id: REMOTE_ID.into(), indication_cfg: IndicationConfig::default(), - default_fault_handler: DefaultFaultHandler::new(Box::new(test_fault_handler)), + fault_handler: FaultHandler::new(test_fault_handler), }; DestinationHandler::new( local_entity_cfg, @@ -1390,15 +1288,20 @@ mod tests { fn test_basic() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); - let dest_handler = default_dest_handler(fault_handler.clone(), test_sender, Arc::default()); + let dest_handler = default_dest_handler(fault_handler, test_sender, Arc::default()); assert!(dest_handler.transmission_mode().is_none()); - assert!(fault_handler.all_queues_empty()); + assert!(dest_handler + .local_cfg + .fault_handler + .user_hook + .borrow() + .all_queues_empty()); } #[test] fn test_empty_file_transfer_not_acked_no_closure() { let fault_handler = TestFaultHandler::default(); - let mut testbench = DestHandlerTester::new(fault_handler.clone(), false); + let mut testbench = DestHandlerTester::new(fault_handler, false); let mut test_user = testbench.test_user_from_cached_paths(0); testbench .generic_transfer_init(&mut test_user, 0) @@ -1407,7 +1310,7 @@ mod tests { testbench .generic_eof_no_error(&mut test_user, Vec::new()) .expect("EOF no error insertion failed"); - assert!(fault_handler.all_queues_empty()); + assert!(testbench.all_fault_queues_empty()); assert!(testbench.handler.pdu_sender.queue_empty()); testbench.state_check(State::Idle, TransactionStep::Idle); } @@ -1419,7 +1322,7 @@ mod tests { let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); - let mut testbench = DestHandlerTester::new(fault_handler.clone(), false); + let mut testbench = DestHandlerTester::new(fault_handler, false); let mut test_user = testbench.test_user_from_cached_paths(file_size); testbench .generic_transfer_init(&mut test_user, file_size) @@ -1431,7 +1334,7 @@ mod tests { testbench .generic_eof_no_error(&mut test_user, file_data.to_vec()) .expect("EOF no error insertion failed"); - assert!(fault_handler.all_queues_empty()); + assert!(testbench.all_fault_queues_empty()); assert!(testbench.handler.pdu_sender.queue_empty()); testbench.state_check(State::Idle, TransactionStep::Idle); } @@ -1445,7 +1348,7 @@ mod tests { let segment_len = 256; let fault_handler = TestFaultHandler::default(); - let mut testbench = DestHandlerTester::new(fault_handler.clone(), false); + let mut testbench = DestHandlerTester::new(fault_handler, false); let mut test_user = testbench.test_user_from_cached_paths(file_size); testbench .generic_transfer_init(&mut test_user, file_size) @@ -1464,7 +1367,7 @@ mod tests { testbench .generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); - assert!(fault_handler.all_queues_empty()); + assert!(testbench.all_fault_queues_empty()); assert!(testbench.handler.pdu_sender.queue_empty()); testbench.state_check(State::Idle, TransactionStep::Idle); } @@ -1478,7 +1381,7 @@ mod tests { let segment_len = 256; let fault_handler = TestFaultHandler::default(); - let mut testbench = DestHandlerTester::new(fault_handler.clone(), false); + let mut testbench = DestHandlerTester::new(fault_handler, false); let mut test_user = testbench.test_user_from_cached_paths(file_size); let transaction_id = testbench .generic_transfer_init(&mut test_user, file_size) @@ -1507,10 +1410,10 @@ mod tests { .handler .state_machine(&mut test_user, None) .expect("fsm failure"); + let fault_handler = testbench.handler.local_cfg.fault_handler.user_hook.borrow(); - let ignored_queue = fault_handler.ignored_queue.lock().unwrap(); - assert_eq!(ignored_queue.len(), 1); - let cancelled = *ignored_queue.front().unwrap(); + assert_eq!(fault_handler.ignored_queue.len(), 1); + let cancelled = fault_handler.ignored_queue.front().unwrap(); assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); assert_eq!(cancelled.2, segment_len as u64); @@ -1527,7 +1430,7 @@ mod tests { let segment_len = 256; let fault_handler = TestFaultHandler::default(); - let mut testbench = DestHandlerTester::new(fault_handler.clone(), false); + let mut testbench = DestHandlerTester::new(fault_handler, false); let mut test_user = testbench.test_user_from_cached_paths(file_size); let transaction_id = testbench .generic_transfer_init(&mut test_user, file_size) @@ -1560,27 +1463,25 @@ mod tests { .expect("fsm error"); testbench.state_check(State::Idle, TransactionStep::Idle); - assert!(fault_handler - .notice_of_suspension_queue - .lock() - .unwrap() - .is_empty()); + let fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow(); - let ignored_queue = fault_handler.ignored_queue.lock().unwrap(); + assert!(fault_hook.notice_of_suspension_queue.is_empty()); + let ignored_queue = &fault_hook.ignored_queue; assert_eq!(ignored_queue.len(), 1); - let cancelled = *ignored_queue.front().unwrap(); + let cancelled = ignored_queue.front().unwrap(); assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); assert_eq!(cancelled.2, segment_len as u64); - let cancelled_queue = fault_handler.notice_of_cancellation_queue.lock().unwrap(); + let fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow(); + let cancelled_queue = &fault_hook.notice_of_cancellation_queue; assert_eq!(cancelled_queue.len(), 1); let cancelled = *cancelled_queue.front().unwrap(); assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.1, ConditionCode::CheckLimitReached); assert_eq!(cancelled.2, segment_len as u64); - drop(cancelled_queue); + drop(fault_hook); assert!(testbench.handler.pdu_sender.queue_empty()); @@ -1610,7 +1511,7 @@ mod tests { #[test] fn test_file_transfer_with_closure() { let fault_handler = TestFaultHandler::default(); - let mut testbench = DestHandlerTester::new(fault_handler.clone(), true); + let mut testbench = DestHandlerTester::new(fault_handler, true); let mut test_user = testbench.test_user_from_cached_paths(0); testbench .generic_transfer_init(&mut test_user, 0) @@ -1620,7 +1521,7 @@ mod tests { .generic_eof_no_error(&mut test_user, Vec::new()) .expect("EOF no error insertion failed"); assert_eq!(sent_packets, 1); - assert!(fault_handler.all_queues_empty()); + assert!(testbench.all_fault_queues_empty()); // The Finished PDU was sent, so the state machine is done. testbench.state_check(State::Idle, TransactionStep::Idle); assert!(!testbench.handler.pdu_sender.queue_empty()); diff --git a/satrs/src/cfdp/mod.rs b/satrs/src/cfdp/mod.rs index 172bbed..51f72e6 100644 --- a/satrs/src/cfdp/mod.rs +++ b/satrs/src/cfdp/mod.rs @@ -12,8 +12,6 @@ use spacepackets::{ util::{UnsignedByteField, UnsignedEnum}, }; -#[cfg(feature = "alloc")] -use alloc::boxed::Box; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -311,9 +309,9 @@ impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider { /// implement some CFDP features like fault handler logging, which would not be possible /// generically otherwise. /// -/// For each error reported by the [DefaultFaultHandler], the appropriate fault handler callback +/// For each error reported by the [FaultHandler], the appropriate fault handler callback /// will be called depending on the [FaultHandlerCode]. -pub trait UserFaultHandler { +pub trait UserFaultHookProvider { fn notice_of_suspension_cb( &mut self, transaction_id: TransactionId, @@ -333,6 +331,37 @@ pub trait UserFaultHandler { fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64); } +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +struct DummyFaultHook {} + +impl UserFaultHookProvider for DummyFaultHook { + fn notice_of_suspension_cb( + &mut self, + _transaction_id: TransactionId, + _cond: ConditionCode, + _progress: u64, + ) { + } + + fn notice_of_cancellation_cb( + &mut self, + _transaction_id: TransactionId, + _cond: ConditionCode, + _progress: u64, + ) { + } + + fn abandoned_cb( + &mut self, + _transaction_id: TransactionId, + _cond: ConditionCode, + _progress: u64, + ) { + } + + fn ignore_cb(&mut self, _transaction_id: TransactionId, _cond: ConditionCode, _progress: u64) {} +} + /// This structure is used to implement the fault handling as specified in chapter 4.8 of the CFDP /// standard. /// @@ -353,14 +382,14 @@ pub trait UserFaultHandler { /// These defaults can be overriden by using the [Self::set_fault_handler] method. /// Please note that in any case, fault handler overrides can be specified by the sending CFDP /// entity. -pub struct DefaultFaultHandler { +pub struct FaultHandler { handler_array: [FaultHandlerCode; 10], // Could also change the user fault handler trait to have non mutable methods, but that limits // flexbility on the user side.. - user_fault_handler: RefCell>, + pub user_hook: RefCell, } -impl DefaultFaultHandler { +impl FaultHandler { fn condition_code_to_array_index(conditon_code: ConditionCode) -> Option { Some(match conditon_code { ConditionCode::PositiveAckLimitReached => 0, @@ -389,7 +418,7 @@ impl DefaultFaultHandler { self.handler_array[array_idx.unwrap()] = fault_handler; } - pub fn new(user_fault_handler: Box) -> Self { + pub fn new(user_fault_handler: UserHandler) -> Self { let mut init_array = [FaultHandlerCode::NoticeOfCancellation; 10]; init_array [Self::condition_code_to_array_index(ConditionCode::FileChecksumFailure).unwrap()] = @@ -398,7 +427,7 @@ impl DefaultFaultHandler { .unwrap()] = FaultHandlerCode::IgnoreError; Self { handler_array: init_array, - user_fault_handler: RefCell::new(user_fault_handler), + user_hook: RefCell::new(user_fault_handler), } } @@ -421,7 +450,7 @@ impl DefaultFaultHandler { return FaultHandlerCode::IgnoreError; } let fh_code = self.handler_array[array_idx.unwrap()]; - let mut handler_mut = self.user_fault_handler.borrow_mut(); + let mut handler_mut = self.user_hook.borrow_mut(); match fh_code { FaultHandlerCode::NoticeOfCancellation => { handler_mut.notice_of_cancellation_cb(transaction_id, condition, progress); @@ -462,10 +491,29 @@ impl Default for IndicationConfig { } } -pub struct LocalEntityConfig { +pub struct LocalEntityConfig { pub id: UnsignedByteField, pub indication_cfg: IndicationConfig, - pub default_fault_handler: DefaultFaultHandler, + pub fault_handler: FaultHandler, +} + +impl LocalEntityConfig { + pub fn user_fault_hook_mut(&mut self) -> &mut RefCell { + &mut self.fault_handler.user_hook + } + + pub fn user_fault_hook(&self) -> &RefCell { + &self.fault_handler.user_hook + } +} + +pub trait PduSendProvider { + fn send_pdu( + &self, + pdu_type: PduType, + file_directive_type: Option, + raw_pdu: &[u8], + ) -> Result<(), PduError>; } /// The CFDP transaction ID of a CFDP transaction consists of the source entity ID and the sequence @@ -505,18 +553,6 @@ impl PartialEq for TransactionId { } } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum TransactionStep { - Idle = 0, - TransactionStart = 1, - ReceivingFileDataPdus = 2, - ReceivingFileDataPdusWithCheckLimitHandling = 3, - SendingAckPdu = 4, - TransferCompletion = 5, - SendingFinishedPdu = 6, -} - #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum State { @@ -627,21 +663,147 @@ impl<'raw> PacketInfo<'raw> { } #[cfg(test)] -mod tests { - use spacepackets::cfdp::{ - lv::Lv, - pdu::{ - eof::EofPdu, - file_data::FileDataPdu, - metadata::{MetadataGenericParams, MetadataPduCreator}, - CommonPduConfig, FileDirectiveType, PduHeader, WritablePduPacket, +pub(crate) mod tests { + use core::cell::RefCell; + + use alloc::{collections::VecDeque, vec::Vec}; + use spacepackets::{ + cfdp::{ + lv::Lv, + pdu::{ + eof::EofPdu, + file_data::FileDataPdu, + metadata::{MetadataGenericParams, MetadataPduCreator}, + CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket, + }, + ChecksumType, ConditionCode, PduType, TransmissionMode, }, - PduType, + util::UnsignedByteFieldU16, }; use crate::cfdp::PacketTarget; - use super::PacketInfo; + use super::{ + PacketInfo, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, + StdRemoteEntityConfigProvider, TransactionId, UserFaultHookProvider, + }; + + #[derive(Default)] + pub(crate) struct TestFaultHandler { + pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>, + pub notice_of_cancellation_queue: VecDeque<(TransactionId, ConditionCode, u64)>, + pub abandoned_queue: VecDeque<(TransactionId, ConditionCode, u64)>, + pub ignored_queue: VecDeque<(TransactionId, ConditionCode, u64)>, + } + + impl UserFaultHookProvider for TestFaultHandler { + fn notice_of_suspension_cb( + &mut self, + transaction_id: TransactionId, + cond: ConditionCode, + progress: u64, + ) { + self.notice_of_suspension_queue + .push_back((transaction_id, cond, progress)) + } + + fn notice_of_cancellation_cb( + &mut self, + transaction_id: TransactionId, + cond: ConditionCode, + progress: u64, + ) { + self.notice_of_cancellation_queue + .push_back((transaction_id, cond, progress)) + } + + fn abandoned_cb( + &mut self, + transaction_id: TransactionId, + cond: ConditionCode, + progress: u64, + ) { + self.abandoned_queue + .push_back((transaction_id, cond, progress)) + } + + fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { + self.ignored_queue + .push_back((transaction_id, cond, progress)) + } + } + + impl TestFaultHandler { + pub(crate) fn suspension_queue_empty(&self) -> bool { + self.notice_of_suspension_queue.is_empty() + } + pub(crate) fn cancellation_queue_empty(&self) -> bool { + self.notice_of_cancellation_queue.is_empty() + } + pub(crate) fn ignored_queue_empty(&self) -> bool { + self.ignored_queue.is_empty() + } + pub(crate) fn abandoned_queue_empty(&self) -> bool { + self.abandoned_queue.is_empty() + } + pub(crate) fn all_queues_empty(&self) -> bool { + self.suspension_queue_empty() + && self.cancellation_queue_empty() + && self.ignored_queue_empty() + && self.abandoned_queue_empty() + } + } + + pub struct SentPdu { + pub pdu_type: PduType, + pub file_directive_type: Option, + pub raw_pdu: Vec, + } + + #[derive(Default)] + pub struct TestCfdpSender { + pub packet_queue: RefCell>, + } + + impl PduSendProvider for TestCfdpSender { + fn send_pdu( + &self, + pdu_type: PduType, + file_directive_type: Option, + raw_pdu: &[u8], + ) -> Result<(), PduError> { + self.packet_queue.borrow_mut().push_back(SentPdu { + pdu_type, + file_directive_type, + raw_pdu: raw_pdu.to_vec(), + }); + Ok(()) + } + } + + impl TestCfdpSender { + pub fn retrieve_next_pdu(&self) -> Option { + self.packet_queue.borrow_mut().pop_front() + } + pub fn queue_empty(&self) -> bool { + self.packet_queue.borrow_mut().is_empty() + } + } + + pub fn basic_remote_cfg_table() -> StdRemoteEntityConfigProvider { + let mut table = StdRemoteEntityConfigProvider::default(); + let remote_entity_cfg = RemoteEntityConfig::new_with_default_values( + UnsignedByteFieldU16::new(1).into(), + 1024, + 1024, + true, + true, + TransmissionMode::Unacknowledged, + ChecksumType::Crc32, + ); + table.add_config(&remote_entity_cfg); + table + } fn generic_pdu_header() -> PduHeader { let pdu_conf = CommonPduConfig::default(); diff --git a/satrs/src/cfdp/source.rs b/satrs/src/cfdp/source.rs index 433f4d2..250bc59 100644 --- a/satrs/src/cfdp/source.rs +++ b/satrs/src/cfdp/source.rs @@ -1,15 +1,217 @@ -#![allow(dead_code)] -use spacepackets::util::UnsignedByteField; +use spacepackets::cfdp::{pdu::FileDirectiveType, PduType}; -pub struct SourceHandler { - id: UnsignedByteField, +use super::{ + filestore::VirtualFilestore, user::CfdpUser, LocalEntityConfig, PacketInfo, PacketTarget, + PduSendProvider, RemoteEntityConfigProvider, UserFaultHookProvider, +}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum TransactionStep { + Idle = 0, + TransactionStart = 1, + SendingMetadata = 3, + SendingFileData = 4, + /// Re-transmitting missing packets in acknowledged mode6 + Retransmitting = 5, + SendingEof = 6, + WaitingForEofAck = 7, + WaitingForFinished = 8, + SendingAckOfFinished = 9, + NoticeOfCompletion = 10, +} + +pub struct FileParams { + pub progress: usize, + pub segment_len: usize, + pub crc32: Option<[u8; 4]>, + pub metadata_only: bool, + pub file_size: usize, + pub no_eof: bool, +} + +pub struct StateHelper { + state: super::State, + step: TransactionStep, + num_packets_ready: u32, } -impl SourceHandler { - pub fn new(id: impl Into) -> Self { - Self { id: id.into() } +impl Default for StateHelper { + fn default() -> Self { + Self { + state: super::State::Idle, + step: TransactionStep::Idle, + num_packets_ready: 0, + } } } +#[derive(Debug, thiserror::Error)] +pub enum SourceError { + #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")] + CantProcessPacketType { + pdu_type: PduType, + directive_type: Option, + }, + #[error("unexpected file data PDU")] + UnexpectedFileDataPdu, +} + +pub struct SourceHandler< + PduSender: PduSendProvider, + UserFaultHook: UserFaultHookProvider, + Vfs: VirtualFilestore, + RemoteCfgTable: RemoteEntityConfigProvider, +> { + local_cfg: LocalEntityConfig, + pdu_sender: PduSender, + remote_cfg_table: RemoteCfgTable, + vfs: Vfs, + state_helper: StateHelper, +} + +impl< + PduSender: PduSendProvider, + UserFaultHook: UserFaultHookProvider, + Vfs: VirtualFilestore, + RemoteCfgTable: RemoteEntityConfigProvider, + > SourceHandler +{ + pub fn new( + cfg: LocalEntityConfig, + pdu_sender: PduSender, + vfs: Vfs, + remote_cfg_table: RemoteCfgTable, + ) -> Self { + Self { + local_cfg: cfg, + remote_cfg_table, + vfs, + pdu_sender, + state_helper: Default::default(), + } + } + + /// This is the core function to drive the source handler. It is also used to insert + /// packets into the source handler. + /// + /// The state machine should either be called if a packet with the appropriate destination ID + /// is received, or periodically in IDLE periods to perform all CFDP related tasks, for example + /// checking for timeouts or missed file segments. + /// + /// The function returns the number of sent PDU packets on success. + pub fn state_machine( + &mut self, + cfdp_user: &mut impl CfdpUser, + packet_to_insert: Option<&PacketInfo>, + ) -> Result { + if let Some(packet) = packet_to_insert { + self.insert_packet(cfdp_user, packet)?; + } + match self.state_helper.state { + super::State::Idle => todo!(), + super::State::Busy => self.fsm_busy(cfdp_user), + super::State::Suspended => todo!(), + } + } + + fn insert_packet( + &mut self, + cfdp_user: &mut impl CfdpUser, + packet_info: &PacketInfo, + ) -> Result<(), SourceError> { + if packet_info.target() != PacketTarget::SourceEntity { + // Unwrap is okay here, a PacketInfo for a file data PDU should always have the + // destination as the target. + return Err(SourceError::CantProcessPacketType { + pdu_type: packet_info.pdu_type(), + directive_type: packet_info.pdu_directive(), + }); + } + if packet_info.pdu_type() == PduType::FileData { + // The [PacketInfo] API should ensure that file data PDUs can not be passed + // into a source entity, so this should never happen. + return Err(SourceError::UnexpectedFileDataPdu); + } + // Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is + // always a valid value. + match packet_info + .pdu_directive() + .expect("PDU directive type unexpectedly not set") + { + FileDirectiveType::FinishedPdu => self.handle_finished_pdu(), + FileDirectiveType::NakPdu => self.handle_nak_pdu(), + FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(), + FileDirectiveType::AckPdu => todo!("acknowledged mode not implemented yet"), + FileDirectiveType::EofPdu + | FileDirectiveType::PromptPdu + | FileDirectiveType::MetadataPdu => { + return Err(SourceError::CantProcessPacketType { + pdu_type: packet_info.pdu_type(), + directive_type: packet_info.pdu_directive(), + }); + } + } + Ok(()) + } + + fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { + Ok(0) + } + + fn handle_finished_pdu(&mut self) {} + + fn handle_nak_pdu(&mut self) {} + + fn handle_keep_alive_pdu(&mut self) {} +} + #[cfg(test)] -mod tests {} +mod tests { + use alloc::sync::Arc; + use spacepackets::util::UnsignedByteFieldU16; + + use super::*; + use crate::cfdp::{ + filestore::NativeFilestore, + tests::{basic_remote_cfg_table, TestCfdpSender, TestFaultHandler}, + FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider, + }; + + const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); + const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); + + type TestSourceHandler = SourceHandler< + TestCfdpSender, + TestFaultHandler, + NativeFilestore, + StdRemoteEntityConfigProvider, + >; + + fn default_source_handler( + test_fault_handler: TestFaultHandler, + test_packet_sender: TestCfdpSender, + ) -> TestSourceHandler { + let local_entity_cfg = LocalEntityConfig { + id: REMOTE_ID.into(), + indication_cfg: IndicationConfig::default(), + fault_handler: FaultHandler::new(test_fault_handler), + }; + SourceHandler::new( + local_entity_cfg, + test_packet_sender, + NativeFilestore::default(), + basic_remote_cfg_table(), + // TestCheckTimerCreator::new(check_timer_expired), + ) + } + + #[test] + fn test_basic() { + let fault_handler = TestFaultHandler::default(); + let test_sender = TestCfdpSender::default(); + let source_handler = default_source_handler(fault_handler, test_sender); + // assert!(dest_handler.transmission_mode().is_none()); + // assert!(fault_handler.all_queues_empty()); + } +}