Skip to content

Commit

Permalink
Conditional compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 3, 2023
1 parent 4b5acd1 commit c441a0d
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 109 deletions.
4 changes: 2 additions & 2 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ authors = { workspace = true }
rust-version = { workspace = true }

[features]
default = ["encoding_expressions"]
# enable the encoding expressions
default= []
# enable the encode/decode functions
encoding_expressions = ["base64", "hex"]


Expand Down
1 change: 0 additions & 1 deletion datafusion/functions/src/encoding/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ enum Encoding {
Base64,
Hex,
}

fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarValue> {
match value {
ColumnarValue::Array(a) => match a.data_type() {
Expand Down
108 changes: 108 additions & 0 deletions datafusion/functions/src/encoding/meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Metadata information for "encode" and "decode" functions
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::TypeSignature::*;
use datafusion_expr::{ColumnarValue, FunctionImplementation, Signature, Volatility};
use std::sync::OnceLock;
use DataType::*;

pub(super) struct EncodeFunc {}

static ENCODE_SIGNATURE: OnceLock<Signature> = OnceLock::new();

impl FunctionImplementation for EncodeFunc {
fn name(&self) -> &str {
"encode"
}

fn signature(&self) -> &Signature {
ENCODE_SIGNATURE.get_or_init(|| {
Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Exact(vec![LargeUtf8, Utf8]),
Exact(vec![Binary, Utf8]),
Exact(vec![LargeBinary, Utf8]),
],
Volatility::Immutable,
)
})
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(match arg_types[0] {
Utf8 => Utf8,
LargeUtf8 => LargeUtf8,
Binary => Utf8,
LargeBinary => LargeUtf8,
Null => Null,
_ => {
return plan_err!("The encode function can only accept utf8 or binary.");
}
})
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
// Put a feature flag here to make sure this is only compiled when the feature is activated
super::inner::encode(args)
}
}

pub(super) struct DecodeFunc {}

static DECODE_SIGNATURE: OnceLock<Signature> = OnceLock::new();

impl FunctionImplementation for DecodeFunc {
fn name(&self) -> &str {
"decode"
}

fn signature(&self) -> &Signature {
DECODE_SIGNATURE.get_or_init(|| {
Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Exact(vec![LargeUtf8, Utf8]),
Exact(vec![Binary, Utf8]),
Exact(vec![LargeBinary, Utf8]),
],
Volatility::Immutable,
)
})
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(match arg_types[0] {
Utf8 => Binary,
LargeUtf8 => LargeBinary,
Binary => Binary,
LargeBinary => LargeBinary,
Null => Null,
_ => {
return plan_err!("The decode function can only accept utf8 or binary.");
}
})
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
// Put a feature flag here to make sure this is only compiled when the feature is activated
super::inner::decode(args)
}
}
121 changes: 15 additions & 106 deletions datafusion/functions/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,120 +15,29 @@
// specific language governing permissions and limitations
// under the License.

#[cfg(feature = "encoding_expressions")]
mod inner;
#[cfg(feature = "encoding_expressions")]
mod meta;

use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::TypeSignature::*;
use datafusion_expr::{
ColumnarValue, FunctionImplementation, ScalarUDF, Signature, Volatility,
};
use crate::utils::insert;
use datafusion_expr::ScalarUDF;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use DataType::*;

// TODO make stub implementations when feature is not activated
//#[cfg(feature = "encoding_expressions")]
//pub mod encoding_expressions;

pub fn encode_udf() -> ScalarUDF {
ScalarUDF::new_from_impl(Arc::new(EncodeFunc {}))
}

pub fn decode_udf() -> ScalarUDF {
ScalarUDF::new_from_impl(Arc::new(DecodeFunc {}))
}

fn insert(registry: &mut HashMap<String, Arc<ScalarUDF>>, udf: ScalarUDF) {
registry.insert(udf.name().to_string(), Arc::new(udf));
}
use std::sync::Arc;

/// Registers the `encode` and `decode` functions with the function registry
#[cfg(feature = "encoding_expressions")]
pub fn register(registry: &mut HashMap<String, Arc<ScalarUDF>>) {
insert(registry, encode_udf());
insert(registry, decode_udf());
}

struct EncodeFunc {}

static ENCODE_SIGNATURE: OnceLock<Signature> = OnceLock::new();

impl FunctionImplementation for EncodeFunc {
fn name(&self) -> &str {
"encode"
}

fn signature(&self) -> &Signature {
ENCODE_SIGNATURE.get_or_init(|| {
Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Exact(vec![LargeUtf8, Utf8]),
Exact(vec![Binary, Utf8]),
Exact(vec![LargeBinary, Utf8]),
],
Volatility::Immutable,
)
})
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(match arg_types[0] {
Utf8 => Utf8,
LargeUtf8 => LargeUtf8,
Binary => Utf8,
LargeBinary => LargeUtf8,
Null => Null,
_ => {
return plan_err!("The encode function can only accept utf8 or binary.");
}
})
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
// Put a feature flag here to make sure this is only compiled when the feature is activated
inner::encode(args)
}
insert(registry, meta::EncodeFunc {});
insert(registry, meta::DecodeFunc {});
}

struct DecodeFunc {}

static DECODE_SIGNATURE: OnceLock<Signature> = OnceLock::new();

impl FunctionImplementation for DecodeFunc {
fn name(&self) -> &str {
"decode"
}

fn signature(&self) -> &Signature {
DECODE_SIGNATURE.get_or_init(|| {
Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Exact(vec![LargeUtf8, Utf8]),
Exact(vec![Binary, Utf8]),
Exact(vec![LargeBinary, Utf8]),
],
Volatility::Immutable,
)
})
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(match arg_types[0] {
Utf8 => Binary,
LargeUtf8 => LargeBinary,
Binary => Binary,
LargeBinary => LargeBinary,
Null => Null,
_ => {
return plan_err!("The decode function can only accept utf8 or binary.");
}
})
}
/// Registers the `encode` and `decode` stubs with the function registry
#[cfg(not(feature = "encoding_expressions"))]
pub fn register(registry: &mut HashMap<String, Arc<ScalarUDF>>) {
let hint = "Requires compilation with feature flag: encoding_expressions.";

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
// Put a feature flag here to make sure this is only compiled when the feature is activated
inner::decode(args)
for function_name in ["encode", "decode"] {
insert(registry, crate::utils::StubFunc::new(function_name, hint));
}
}
1 change: 1 addition & 0 deletions datafusion/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::HashMap;
use std::sync::Arc;

pub mod encoding;
pub mod utils;

/// Registers all "built in" functions from this crate with the provided registry
pub fn register_all(registry: &mut HashMap<String, Arc<ScalarUDF>>) {
Expand Down
66 changes: 66 additions & 0 deletions datafusion/functions/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::DataType;
use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
use datafusion_expr::{
ColumnarValue, FunctionImplementation, ScalarUDF, Signature, Volatility,
};
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};

/// Insert a function into the registry
pub(crate) fn insert(
registry: &mut HashMap<String, Arc<ScalarUDF>>,
udf: impl FunctionImplementation + Send + Sync + 'static,
) {
let udf = ScalarUDF::new_from_impl(Arc::new(udf));
registry.insert(udf.name().to_string(), Arc::new(udf));
}

/// A scalar function that always errors with a hint. This is used to stub out
/// functions that are not enabled with the current set of crate features.
pub struct StubFunc {
name: &'static str,
hint: &'static str,
}

impl StubFunc {
/// Create a new stub function
pub fn new(name: &'static str, hint: &'static str) -> Self {
Self { name, hint }
}
}

static STUB_SIGNATURE: OnceLock<Signature> = OnceLock::new();

impl FunctionImplementation for StubFunc {
fn name(&self) -> &str {
self.name
}

fn signature(&self) -> &Signature {
STUB_SIGNATURE.get_or_init(|| Signature::variadic_any(Volatility::Volatile))
}

fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
plan_err!("function {} not available. {}", self.name, self.hint)
}
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
exec_err!("function {} not available. {}", self.name, self.hint)
}
}

0 comments on commit c441a0d

Please sign in to comment.