diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2e931917..31283ab1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,7 +1,6 @@ name: CI -on: - push: +on: [push, pull_request] jobs: CI: diff --git a/object_store_factory/src/local.rs b/object_store_factory/src/local.rs index acfa6db1..3d70109f 100644 --- a/object_store_factory/src/local.rs +++ b/object_store_factory/src/local.rs @@ -6,6 +6,12 @@ use std::sync::Arc; #[derive(Deserialize, Debug, PartialEq, Eq, Clone)] pub struct LocalConfig { pub data_dir: String, + #[serde(default = "default_false")] + pub disable_hardlinks: bool, +} + +fn default_false() -> bool { + false } impl LocalConfig { @@ -13,13 +19,26 @@ impl LocalConfig { map: &HashMap, ) -> Result { Ok(Self { - data_dir: map.get("data_dir").unwrap().clone(), + data_dir: map + .get("data_dir") + .ok_or_else(|| object_store::Error::Generic { + store: "local", + source: "Missing data_dir".into(), + })? + .clone(), + disable_hardlinks: map + .get("disable_hardlinks") + .map(|s| s == "true") + .unwrap_or(false), }) } pub fn to_hashmap(&self) -> HashMap { let mut map = HashMap::new(); map.insert("data_dir".to_string(), self.data_dir.clone()); + map.insert( + "disable_hardlinks".to_string(), + self.disable_hardlinks.to_string()); map } @@ -44,6 +63,31 @@ mod tests { let config = LocalConfig::from_hashmap(&map) .expect("Failed to create config from hashmap"); assert_eq!(config.data_dir, "/tmp/data".to_string()); + assert!(!config.disable_hardlinks); // Default value + } + + #[test] + fn test_config_from_hashmap_with_disable_hardlinks() { + let mut map = HashMap::new(); + map.insert("data_dir".to_string(), "/tmp/data".to_string()); + map.insert("disable_hardlinks".to_string(), "true".to_string()); + + let config = LocalConfig::from_hashmap(&map) + .expect("Failed to create config from hashmap"); + assert_eq!(config.data_dir, "/tmp/data".to_string()); + assert!(config.disable_hardlinks); + } + + #[test] + fn test_config_from_hashmap_with_disable_hardlinks_false() { + let mut map = HashMap::new(); + map.insert("data_dir".to_string(), "/tmp/data".to_string()); + map.insert("disable_hardlinks".to_string(), "false".to_string()); + + let config = LocalConfig::from_hashmap(&map) + .expect("Failed to create config from hashmap"); + assert_eq!(config.data_dir, "/tmp/data".to_string()); + assert!(!config.disable_hardlinks); } #[test] @@ -64,6 +108,7 @@ mod tests { let result = LocalConfig { data_dir: data_dir.to_string(), + disable_hardlinks: false, } .build_local_storage(); assert!(result.is_ok(), "Expected Ok, got Err: {:?}", result); @@ -73,6 +118,7 @@ mod tests { fn test_build_local_storage_with_invalid_path() { let result = LocalConfig { data_dir: "".to_string(), + disable_hardlinks: false, } .build_local_storage(); assert!(result.is_err(), "Expected Err due to invalid path, got Ok"); @@ -82,10 +128,44 @@ mod tests { fn test_to_hashmap() { let local_config = LocalConfig { data_dir: "path/to/data".to_string(), + disable_hardlinks: true, }; let hashmap = local_config.to_hashmap(); assert_eq!(hashmap.get("data_dir"), Some(&"path/to/data".to_string())); + assert_eq!(hashmap.get("disable_hardlinks"), Some(&"true".to_string())); } -} + + #[test] + fn test_default_false() { + assert!(!default_false()); + } + + #[test] + fn test_deserialize_with_default() { + let json = r#" + { + "data_dir": "/tmp/data" + } + "#; + + let config: LocalConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.data_dir, "/tmp/data"); + assert!(!config.disable_hardlinks); + } + + #[test] + fn test_deserialize_with_disable_hardlinks() { + let json = r#" + { + "data_dir": "/tmp/data", + "disable_hardlinks": true + } + "#; + + let config: LocalConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.data_dir, "/tmp/data"); + assert!(config.disable_hardlinks); + } +} \ No newline at end of file diff --git a/src/config/schema.rs b/src/config/schema.rs index 14483b86..fd271939 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -637,6 +637,7 @@ cache_control = "private, max-age=86400" SeafowlConfig { object_store: Some(ObjectStoreConfig::Local(LocalConfig { data_dir: "./seafowl-data".to_string(), + disable_hardlinks: false, })), catalog: Some(Catalog::Postgres(Postgres { dsn: "postgresql://user:pass@localhost:5432/somedb".to_string(), @@ -732,6 +733,7 @@ cache_control = "private, max-age=86400" SeafowlConfig { object_store: Some(ObjectStoreConfig::Local(LocalConfig { data_dir: "some_other_path".to_string(), + disable_hardlinks: false, })), catalog: Some(Catalog::Sqlite(Sqlite { dsn: "sqlite://file.sqlite".to_string(), diff --git a/src/context/delta.rs b/src/context/delta.rs index 02c88e81..375480c2 100644 --- a/src/context/delta.rs +++ b/src/context/delta.rs @@ -528,6 +528,7 @@ mod tests { Arc::new(LocalFileSystem::new_with_prefix(tmp_dir.path()).unwrap()), ObjectStoreConfig::Local(LocalConfig { data_dir: tmp_dir.path().to_string_lossy().to_string(), + disable_hardlinks: false, }), ), Some(tmp_dir), diff --git a/src/main.rs b/src/main.rs index 90c96fda..1caf26e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,7 +29,7 @@ use seafowl::{ use tokio::time::{interval, Duration}; use tracing::level_filters::LevelFilter; -use tracing::{error, info, subscriber, warn}; +use tracing::{debug, error, info, subscriber, warn}; use tracing_log::LogTracer; use tracing_subscriber::filter::EnvFilter; @@ -157,6 +157,9 @@ async fn main() { config }; + debug!("Input configuration: {:?}", args); + debug!("Starting configuration: {:?}", config); + if !args.cli && let Some(ref metrics) = config.misc.metrics { diff --git a/src/object_store/wrapped.rs b/src/object_store/wrapped.rs index 84dcbe1a..66dd06a8 100644 --- a/src/object_store/wrapped.rs +++ b/src/object_store/wrapped.rs @@ -16,6 +16,7 @@ use url::Url; use object_store_factory::aws::S3Config; use object_store_factory::google::GCSConfig; +use object_store_factory::local::LocalConfig; use object_store_factory::ObjectStoreConfig; // Wrapper around the object_store crate that holds on to the original config @@ -151,6 +152,22 @@ impl ObjectStore for InternalObjectStore { payload: PutPayload, opts: PutOptions, ) -> Result { + if let ObjectStoreConfig::Local(LocalConfig { + disable_hardlinks: true, + .. + }) = self.config + { + return self + .inner + .put_opts( + location, + payload, + PutOptions{ + mode: object_store::PutMode::Overwrite, + ..opts + }, + ).await; + }; self.inner.put_opts(location, payload, opts).await } @@ -239,6 +256,13 @@ impl ObjectStore for InternalObjectStore { /// /// Will return an error if the destination already has an object. async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + if let ObjectStoreConfig::Local(LocalConfig { + disable_hardlinks: true, + .. + }) = self.config + { + return self.inner.copy(from, to).await; + } self.inner.copy_if_not_exists(from, to).await } @@ -254,6 +278,13 @@ impl ObjectStore for InternalObjectStore { // this with a lock too, so look into using that down the line instead. return self.inner.rename(from, to).await; } + if let ObjectStoreConfig::Local(LocalConfig { + disable_hardlinks: true, + .. + }) = self.config + { + return self.inner.rename(from, to).await; + } self.inner.rename_if_not_exists(from, to).await } } @@ -264,7 +295,7 @@ mod tests { use crate::object_store::wrapped::InternalObjectStore; use datafusion::common::Result; use rstest::rstest; - + use object_store_factory::aws::S3Config; use object_store_factory::ObjectStoreConfig;