diff --git a/superset/cli/celery.py b/superset/cli/celery.py index a0373573e8825..de4952f0e28b4 100755 --- a/superset/cli/celery.py +++ b/superset/cli/celery.py @@ -58,10 +58,7 @@ def worker(workers: int) -> None: "-a", "--address", default="localhost", help="Address on which to run the service", ) def flower(port: int, address: str) -> None: - """Runs a Celery Flower web server - - Celery Flower is a UI to monitor the Celery operation on a given - broker""" + """Runs a Celery Flower web server""" broker_url = celery_app.conf.BROKER_URL cmd = ( "celery flower " diff --git a/superset/cli/main.py b/superset/cli/main.py index 45b4c9e46a101..4df5b34ad51e5 100755 --- a/superset/cli/main.py +++ b/superset/cli/main.py @@ -18,7 +18,7 @@ import importlib import logging import pkgutil -from typing import Any, Dict +from typing import Any, Dict, Set import click from colorama import Fore, Style @@ -43,14 +43,28 @@ def make_shell_context() -> Dict[str, Any]: return dict(app=app, db=db) -# add sub-commands -for load, module_name, is_pkg in pkgutil.walk_packages( - cli.__path__, cli.__name__ + "." # type: ignore -): - module = importlib.import_module(module_name) +# add sub-commands from ``superset.cli`` +seen: Set[click.core.Command] = set() +modules = [ + importlib.import_module(module_name) + for load, module_name, is_pkg in pkgutil.walk_packages( + cli.__path__, cli.__name__ + "." # type: ignore + ) +] + +# first find all groups +for module in modules: + for attribute in module.__dict__.values(): + if isinstance(attribute, click.core.Group): + superset.add_command(attribute) + seen.update(attribute.commands.values()) + +# then add orphan commands +for module in modules: for attribute in module.__dict__.values(): - if isinstance(attribute, click.core.Command): + if isinstance(attribute, click.core.Command) and attribute not in seen: superset.add_command(attribute) + seen.add(attribute) @superset.command() diff --git a/superset/cli/sync/__init__.py b/superset/cli/sync/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/superset/cli/sync/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/superset/cli/sync/dbt/__init__.py b/superset/cli/sync/dbt/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/superset/cli/sync/dbt/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/superset/cli/sync/dbt/command.py b/superset/cli/sync/dbt/command.py new file mode 100644 index 0000000000000..5be2112bbe2ec --- /dev/null +++ b/superset/cli/sync/dbt/command.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +A command to sync DBT models and metrics to Superset. +""" + +import os.path +from pathlib import Path + +import click +from flask.cli import with_appcontext + +from superset.cli.sync.dbt.databases import sync_database +from superset.cli.sync.dbt.datasets import sync_datasets + + +@click.command() +@with_appcontext +@click.argument("manifest", type=click.Path(exists=True, resolve_path=True)) +@click.option( + "--project", help="Name of the DBT project", default="default", +) +@click.option("--target", help="Target name", default="dev") +@click.option( + "--profile", + help="Location of profiles.yml file", + type=click.Path(exists=True, resolve_path=True), +) +def dbt(manifest: str, project: str, target: str, profile: str) -> None: + """ + Sync models and metrics from DBT to Superset. + """ + if profile is None: + profile = os.path.expanduser("~/.dbt/profiles.yml") + + database = sync_database(Path(profile), project, target) + sync_datasets(Path(manifest), database) diff --git a/superset/cli/sync/dbt/databases.py b/superset/cli/sync/dbt/databases.py new file mode 100644 index 0000000000000..2e23946c649fd --- /dev/null +++ b/superset/cli/sync/dbt/databases.py @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +from pathlib import Path +from typing import Any, Dict, TYPE_CHECKING + +import yaml + +from superset import db + +if TYPE_CHECKING: + from superset.models.core import Database + +_logger = logging.getLogger(__name__) + + +def build_sqlalchemy_uri(target: Dict[str, Any]) -> str: + """ + Build the SQLAlchemy URI for a given target. + """ + type_ = target.get("type") + + if type_ == "postgres": + return build_postgres_sqlalchemy_uri(target) + + raise Exception( + f"Unable to build a SQLAlchemy URI for a target of type {type_}. Please file an " + "issue at " + "https://github.com/apache/superset/issues/new?template=feature_request.md" + ) + + +def build_postgres_sqlalchemy_uri(target: Dict[str, Any]) -> str: + """ + Build the SQLAlchemy URI for a Postgres target. + """ + return "postgresql+psycopg2://{user}:{pass}@{host}:{port}/{dbname}".format(**target) + + +def sync_database( + profile_path: Path, project_name: str, target_name: str +) -> "Database": + """ + Read target database from a DBT profile.yml and sync to Superset. + """ + from superset.models.core import Database # pylint: disable=import-outside-toplevel + + with open(profile_path, encoding="utf-8") as inp: + profile = yaml.load(inp, Loader=yaml.SafeLoader) + + if project_name not in profile: + raise Exception(f"Project {project_name} not found in {profile_path}") + + project = profile[project_name] + outputs = project["outputs"] + + if target_name not in outputs: + raise Exception( + f"Target {target_name} not found in the outputs of {profile_path}" + ) + + target = outputs[target_name] + sqlalchemy_uri = build_sqlalchemy_uri(target) + + databases = ( + db.session.query(Database).filter_by(sqlalchemy_uri=sqlalchemy_uri).all() + ) + if len(databases) > 1: + raise Exception( + "More than one database with the same SQLAlchemy URI found, unable to update" + ) + + # read additional metadata + meta = target.get("meta", {}).get("superset", {}) + + if databases: + _logger.info("Found an existing database, updating it") + database = databases[0] + database.database_name = f"{project_name}_{target_name}" + for key, value in meta.items(): + setattr(database, key, value) + else: + _logger.info("No database found, creating it") + database = Database( + database_name=f"{project_name}_{target_name}", + sqlalchemy_uri=sqlalchemy_uri, + **meta, + ) + db.session.add(database) + + db.session.commit() + + return database diff --git a/superset/cli/sync/dbt/datasets.py b/superset/cli/sync/dbt/datasets.py new file mode 100644 index 0000000000000..ee85169b55d4b --- /dev/null +++ b/superset/cli/sync/dbt/datasets.py @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=import-outside-toplevel + +import logging +from collections import defaultdict +from pathlib import Path +from typing import Any, Dict, List, TYPE_CHECKING + +import yaml + +from superset import db + +if TYPE_CHECKING: + from superset.models.core import Database + +_logger = logging.getLogger(__name__) + + +def get_metric_expression(metric: Dict[str, Any]) -> str: + """ + Return a SQL expression for a given DBT metric. + """ + return "{type}({sql})".format(**metric) + + +def sync_datasets( # pylint: disable=too-many-locals + manifest_path: Path, database: "Database", +) -> None: + """ + Read the DBT manifest and import models as datasets with metrics. + """ + from superset import security_manager + from superset.connectors.sqla.models import SqlaTable, SqlMetric + from superset.datasets.commands.create import CreateDatasetCommand + from superset.datasets.dao import DatasetDAO + + with open(manifest_path, encoding="utf-8") as inp: + manifest = yaml.load(inp, Loader=yaml.SafeLoader) + + user = security_manager.find_user("admin") + + # extract metrics + metrics: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + for metric in manifest["metrics"].values(): + for unique_id in metric["depends_on"]["nodes"]: + metrics[unique_id].append(metric) + + # add datasets + datasets = list(manifest["sources"].values()) + list(manifest["nodes"].values()) + for config in datasets: + new = DatasetDAO.validate_uniqueness( + database.id, config["schema"], config["name"] + ) + if new: + _logger.info("Creating dataset %s", config["unique_id"]) + command = CreateDatasetCommand( + user, + { + "database": database.id, + "schema": config["schema"], + "table_name": config["name"], + }, + ) + dataset = command.run() + else: + _logger.info("Updating dataset %s", config["unique_id"]) + dataset = ( + db.session.query(SqlaTable) + .filter_by( + database_id=database.id, + schema=config["schema"], + table_name=config["name"], + ) + .one() + ) + dataset.fetch_metadata() + + # add extra metadata + dataset.description = config["description"] + + # delete existing metrics before adding the ones from the config + for existing_metric in dataset.metrics: + db.session.delete(existing_metric) + db.session.flush() + + # add metrics + if config["resource_type"] == "model": + for metric in metrics[config["unique_id"]]: + dataset.metrics.append( + SqlMetric( + expression=get_metric_expression(metric), + metric_name=metric["name"], + metric_type=metric["type"], + verbose_name=get_metric_expression(metric), + description=metric["description"], + **metric["meta"], + ) + ) + + db.session.commit() diff --git a/superset/cli/sync/main.py b/superset/cli/sync/main.py new file mode 100644 index 0000000000000..784c11249d360 --- /dev/null +++ b/superset/cli/sync/main.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Commands for syncing metadata stores to and from Superset. +""" + +import click + +from superset.cli.sync.dbt.command import dbt + + +@click.group() +def sync() -> None: + """ + Sync metadata between Superset and an external repository (DBT). + """ + + +sync.add_command(dbt) diff --git a/superset/cli/update.py b/superset/cli/update.py index c3a7a2d5a8a24..ef47c8aad5ddc 100755 --- a/superset/cli/update.py +++ b/superset/cli/update.py @@ -163,6 +163,9 @@ def update_api_docs() -> None: "is not set on the config", ) def re_encrypt_secrets(previous_secret_key: Optional[str] = None) -> None: + """ + Rotate secret key, reencrypting the database. + """ previous_secret_key = previous_secret_key or current_app.config.get( "PREVIOUS_SECRET_KEY" )