Skip to content

Commit

Permalink
[task #8917] Implement information_schema.schemata
Browse files Browse the repository at this point in the history
Signed-off-by: tangruilin <tang.ruilin@foxmail.com>
  • Loading branch information
Tangruilin committed Jan 25, 2024
1 parent d6ab343 commit 0d3a573
Showing 1 changed file with 134 additions and 2 deletions.
136 changes: 134 additions & 2 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ pub(crate) const TABLES: &str = "tables";
pub(crate) const VIEWS: &str = "views";
pub(crate) const COLUMNS: &str = "columns";
pub(crate) const DF_SETTINGS: &str = "df_settings";
pub(crate) const SCHEMATA: &str = "schemata";

/// All information schema tables
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[TABLES, VIEWS, COLUMNS, DF_SETTINGS];
pub const INFORMATION_SCHEMA_TABLES: &[&str] =
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA];

/// Implements the `information_schema` virtual schema and tables
///
Expand Down Expand Up @@ -115,6 +117,33 @@ impl InformationSchemaConfig {
DF_SETTINGS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
SCHEMATA,
TableType::View,
);
}
}

async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
// schema name may not exist in the catalog, so we need to check
// then get user from env
let schema_owner = match std::env::var("USER") {
Ok(user) => user,
Err(_) => match std::env::var("USERNAME") {
Ok(user) => user,
Err(_) => "".to_owned(),
},
};
builder.add_schemata(&catalog_name, &schema_name, &schema_owner)
}
}
}
}

Expand Down Expand Up @@ -196,6 +225,7 @@ impl SchemaProvider for InformationSchemaProvider {
VIEWS.to_string(),
COLUMNS.to_string(),
DF_SETTINGS.to_string(),
SCHEMATA.to_string(),
]
}

Expand All @@ -209,6 +239,8 @@ impl SchemaProvider for InformationSchemaProvider {
Arc::new(InformationSchemaViews::new(config))
} else if name.eq_ignore_ascii_case("df_settings") {
Arc::new(InformationSchemaDfSettings::new(config))
} else if name.eq_ignore_ascii_case("schemata") {
Arc::new(InformationSchemata::new(config))
} else {
return None;
};
Expand All @@ -219,7 +251,10 @@ impl SchemaProvider for InformationSchemaProvider {
}

fn table_exist(&self, name: &str) -> bool {
matches!(name.to_ascii_lowercase().as_str(), TABLES | VIEWS | COLUMNS)
matches!(
name.to_ascii_lowercase().as_str(),
TABLES | VIEWS | COLUMNS | SCHEMATA
)
}
}

Expand Down Expand Up @@ -617,6 +652,103 @@ impl InformationSchemaColumnsBuilder {
}
}

struct InformationSchemata {
schema: SchemaRef,
config: InformationSchemaConfig,
}

impl InformationSchemata {
fn new(config: InformationSchemaConfig) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, false),
Field::new("schema_name", DataType::Utf8, false),
Field::new("schema_owner", DataType::Utf8, false),
Field::new("default_character_set_catalog", DataType::Utf8, true),
Field::new("default_character_set_schema", DataType::Utf8, true),
Field::new("default_character_set_name", DataType::Utf8, true),
Field::new("sql_path", DataType::Utf8, true),
]));
Self { schema, config }
}

fn builder(&self) -> InformationSchemataBuilder {
InformationSchemataBuilder {
schema: self.schema.clone(),
catalog_name: StringBuilder::new(),
schema_name: StringBuilder::new(),
schema_owner: StringBuilder::new(),
default_character_set_catalog: StringBuilder::new(),
default_character_set_schema: StringBuilder::new(),
default_character_set_name: StringBuilder::new(),
sql_path: StringBuilder::new(),
}
}
}

struct InformationSchemataBuilder {
schema: SchemaRef,
catalog_name: StringBuilder,
schema_name: StringBuilder,
schema_owner: StringBuilder,
default_character_set_catalog: StringBuilder,
default_character_set_schema: StringBuilder,
default_character_set_name: StringBuilder,
sql_path: StringBuilder,
}

impl InformationSchemataBuilder {
fn add_schemata(
&mut self,
catalog_name: &str,
schema_name: &str,
schema_owner: &str,
) {
self.catalog_name.append_value(catalog_name);
self.schema_name.append_value(schema_name);
self.schema_owner.append_value(schema_owner);
// refer to https://www.postgresql.org/docs/current/infoschema-schemata.html, these rows are Applies to a feature not available
self.default_character_set_catalog.append_null();
self.default_character_set_schema.append_null();
self.default_character_set_name.append_null();
self.sql_path.append_null();
}

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(self.catalog_name.finish()),
Arc::new(self.schema_name.finish()),
Arc::new(self.schema_owner.finish()),
Arc::new(self.default_character_set_catalog.finish()),
Arc::new(self.default_character_set_schema.finish()),
Arc::new(self.default_character_set_name.finish()),
Arc::new(self.sql_path.finish()),
],
)
.unwrap()
}
}

impl PartitionStream for InformationSchemata {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_schemata(&mut builder).await;
Ok(builder.finish())
}),
))
}
}

struct InformationSchemaDfSettings {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down

0 comments on commit 0d3a573

Please sign in to comment.