Skip to content

Commit

Permalink
Don't canonicalize ListingTableUrl (apache#7994)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 31, 2023
1 parent 0d4dc36 commit 7edff96
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 104 deletions.
10 changes: 1 addition & 9 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,7 @@ mod tests {
// Ensure that local files are also registered
let sql =
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();

if let DataFusionError::IoError(e) = err {
assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
} else {
return Err(err);
}
create_external_table_test(location, &sql).await.unwrap();

Ok(())
}
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,14 +855,17 @@ impl TableProvider for ListingTable {
let input_partitions = input.output_partitioning().partition_count();
let writer_mode = match self.options.insert_mode {
ListingTableInsertMode::AppendToFile => {
if input_partitions > file_groups.len() {
if file_groups.is_empty() && self.options.single_file {
// This is a hack, longer term append should be split out (#7994)
crate::datasource::file_format::write::FileWriterMode::PutMultipart
} else if input_partitions > file_groups.len() {
return plan_err!(
"Cannot append {input_partitions} partitions to {} files!",
file_groups.len()
);
} else {
crate::datasource::file_format::write::FileWriterMode::Append
}

crate::datasource::file_format::write::FileWriterMode::Append
}
ListingTableInsertMode::AppendNewFiles => {
crate::datasource::file_format::write::FileWriterMode::PutMultipart
Expand Down
137 changes: 91 additions & 46 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::fs;
use std::path::{Component, PathBuf};

use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
Expand Down Expand Up @@ -46,37 +46,49 @@ pub struct ListingTableUrl {
impl ListingTableUrl {
/// Parse a provided string as a `ListingTableUrl`
///
/// A URL can either refer to a single object, or a collection of objects with a
/// common prefix, with the presence of a trailing `/` indicating a collection.
///
/// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
/// `file:///foo/` refers to all the files under the directory `/foo` and its
/// subdirectories.
///
/// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
/// wherease `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
/// S3 bucket `BUCKET`
///
/// # Paths without a Scheme
///
/// If no scheme is provided, or the string is an absolute filesystem path
/// as determined [`std::path::Path::is_absolute`], the string will be
/// as determined by [`std::path::Path::is_absolute`], the string will be
/// interpreted as a path on the local filesystem using the operating
/// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
///
/// If the path contains any of `'?', '*', '['`, it will be considered
/// a glob expression and resolved as described in the section below.
///
/// Otherwise, the path will be resolved to an absolute path, returning
/// an error if it does not exist, and converted to a [file URI]
/// Otherwise, the path will be resolved to an absolute path based on the current
/// working directory, and converted to a [file URI].
///
/// If you wish to specify a path that does not exist on the local
/// machine you must provide it as a fully-qualified [file URI]
/// e.g. `file:///myfile.txt`
/// If the path already exists in the local filesystem this will be used to determine if this
/// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
/// of a trailing path delimiter will be used to indicate a directory. For the avoidance
/// of ambiguity it is recommended users always include trailing `/` when intending to
/// refer to a directory.
///
/// ## Glob File Paths
///
/// If no scheme is provided, and the path contains a glob expression, it will
/// be resolved as follows.
///
/// The string up to the first path segment containing a glob expression will be extracted,
/// and resolved in the same manner as a normal scheme-less path. That is, resolved to
/// an absolute path on the local filesystem, returning an error if it does not exist,
/// and converted to a [file URI]
/// and resolved in the same manner as a normal scheme-less path above.
///
/// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
/// filter when listing files from object storage
///
/// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
/// [URL]: https://url.spec.whatwg.org/
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
let s = s.as_ref();

Expand All @@ -92,32 +104,6 @@ impl ListingTableUrl {
}
}

/// Get object store for specified input_url
/// if input_url is actually not a url, we assume it is a local file path
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
pub fn parse_create_local_if_not_exists(
s: impl AsRef<str>,
is_directory: bool,
) -> Result<Self> {
let s = s.as_ref();
let is_valid_url = Url::parse(s).is_ok();

match is_valid_url {
true => ListingTableUrl::parse(s),
false => {
let path = std::path::PathBuf::from(s);
if !path.exists() {
if is_directory {
fs::create_dir_all(path)?;
} else {
fs::File::create(path)?;
}
}
ListingTableUrl::parse(s)
}
}
}

/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
fn parse_path(s: &str) -> Result<Self> {
let (prefix, glob) = match split_glob_expression(s) {
Expand All @@ -129,15 +115,9 @@ impl ListingTableUrl {
None => (s, None),
};

let path = std::path::Path::new(prefix).canonicalize()?;
let url = if path.is_dir() {
Url::from_directory_path(path)
} else {
Url::from_file_path(path)
}
.map_err(|_| DataFusionError::Internal(format!("Can not open path: {s}")))?;
// TODO: Currently we do not have an IO-related error variant that accepts ()
// or a string. Once we have such a variant, change the error type above.
let url = url_from_path(prefix).ok_or_else(|| {
DataFusionError::Internal(format!("Can not open path: {s}"))
})?;
Ok(Self::new(url, glob))
}

Expand Down Expand Up @@ -214,7 +194,12 @@ impl ListingTableUrl {
}
}
},
false => futures::stream::once(store.head(&self.prefix)).boxed(),
false => futures::stream::once(store.head(&self.prefix))
.filter(|r| {
let p = !matches!(r, Err(object_store::Error::NotFound { .. }));
futures::future::ready(p)
})
.boxed(),
};
Ok(list
.try_filter(move |meta| {
Expand Down Expand Up @@ -257,6 +242,45 @@ impl std::fmt::Display for ListingTableUrl {
}
}

fn url_from_path(s: &str) -> Option<Url> {
let path = std::path::Path::new(s);
let is_dir = match path.exists() {
true => path.is_dir(),
// Fallback to inferring from trailing separator
false => std::path::is_separator(s.chars().last()?),
};

let p = match path.is_absolute() {
true => resolve_path(path)?,
false => {
let absolute = std::env::current_dir().ok()?.join(path);
resolve_path(&absolute)?
}
};

match is_dir {
true => Url::from_directory_path(p).ok(),
false => Url::from_file_path(p).ok(),
}
}

fn resolve_path(path: &std::path::Path) -> Option<PathBuf> {
let mut base = PathBuf::with_capacity(path.as_os_str().len());
for component in path.components() {
match component {
Component::Prefix(_) | Component::RootDir => base.push(component.as_os_str()),
Component::Normal(p) => base.push(p),
Component::CurDir => {} // Do nothing
Component::ParentDir => {
if !base.pop() {
return None;
}
}
}
}
Some(base)
}

const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];

/// Splits `path` at the first path segment containing a glob expression, returning
Expand Down Expand Up @@ -368,4 +392,25 @@ mod tests {
Some(("/a/b/c//", "alltypes_plain*.parquet")),
);
}

#[test]
fn test_resolve_path() {
let r = resolve_path("/foo/bar/../baz.txt".as_ref()).unwrap();
assert_eq!(r.to_str().unwrap(), "/foo/baz.txt");

let r = resolve_path("/foo/bar/./baz.txt".as_ref()).unwrap();
assert_eq!(r.to_str().unwrap(), "/foo/bar/baz.txt");

let r = resolve_path("/foo/bar/../../../baz.txt".as_ref());
assert_eq!(r, None);
}

#[test]
fn test_url_from_path() {
let url = url_from_path("foo/bar").unwrap();
assert!(url.path().ends_with("foo/bar"));

let url = url_from_path("foo/bar/").unwrap();
assert!(url.path().ends_with("foo/bar/"));
}
}
11 changes: 1 addition & 10 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,6 @@ impl TableProviderFactory for ListingTableFactory {
.unwrap_or(false)
};

let create_local_path = statement_options
.take_bool_option("create_local_path")?
.unwrap_or(false);
let single_file = statement_options
.take_bool_option("single_file")?
.unwrap_or(false);
Expand Down Expand Up @@ -205,13 +202,7 @@ impl TableProviderFactory for ListingTableFactory {
FileType::AVRO => file_type_writer_options,
};

let table_path = match create_local_path {
true => ListingTableUrl::parse_create_local_if_not_exists(
&cmd.location,
!single_file,
),
false => ListingTableUrl::parse(&cmd.location),
}?;
let table_path = ListingTableUrl::parse(&cmd.location)?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,7 @@ impl DefaultPhysicalPlanner {
copy_options,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;

// TODO: make this behavior configurable via options (should copy to create path/file as needed?)
// TODO: add additional configurable options for if existing files should be overwritten or
// appended to
let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, !*single_file_output)?;
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();

let schema: Schema = (**input.schema()).clone().into();
Expand Down
Loading

0 comments on commit 7edff96

Please sign in to comment.