Skip to content

Commit

Permalink
feat: command to sync DBT to Superset
Browse files Browse the repository at this point in the history
  • Loading branch information
betodealmeida committed Jan 19, 2022
1 parent 9e2bc72 commit 4035ccb
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 11 deletions.
5 changes: 1 addition & 4 deletions superset/cli/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ def worker(workers: int) -> None:
"-a", "--address", default="localhost", help="Address on which to run the service",
)
def flower(port: int, address: str) -> None:
"""Runs a Celery Flower web server
Celery Flower is a UI to monitor the Celery operation on a given
broker"""
"""Runs a Celery Flower web server"""
broker_url = celery_app.conf.BROKER_URL
cmd = (
"celery flower "
Expand Down
28 changes: 21 additions & 7 deletions superset/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import importlib
import logging
import pkgutil
from typing import Any, Dict
from typing import Any, Dict, Set

import click
from colorama import Fore, Style
Expand All @@ -43,14 +43,28 @@ def make_shell_context() -> Dict[str, Any]:
return dict(app=app, db=db)


# add sub-commands
for load, module_name, is_pkg in pkgutil.walk_packages(
cli.__path__, cli.__name__ + "." # type: ignore
):
module = importlib.import_module(module_name)
# add sub-commands from ``superset.cli``
seen: Set[click.core.Command] = set()
modules = [
importlib.import_module(module_name)
for load, module_name, is_pkg in pkgutil.walk_packages(
cli.__path__, cli.__name__ + "." # type: ignore
)
]

# first find all groups
for module in modules:
for attribute in module.__dict__.values():
if isinstance(attribute, click.core.Group):
superset.add_command(attribute)
seen.update(attribute.commands.values())

# then add orphan commands
for module in modules:
for attribute in module.__dict__.values():
if isinstance(attribute, click.core.Command):
if isinstance(attribute, click.core.Command) and attribute not in seen:
superset.add_command(attribute)
seen.add(attribute)


@superset.command()
Expand Down
16 changes: 16 additions & 0 deletions superset/cli/sync/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions superset/cli/sync/dbt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
52 changes: 52 additions & 0 deletions superset/cli/sync/dbt/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
A command to sync DBT models and metrics to Superset.
"""

import os.path
from pathlib import Path

import click
from flask.cli import with_appcontext

from superset.cli.sync.dbt.databases import sync_database
from superset.cli.sync.dbt.datasets import sync_datasets


@click.command()
@with_appcontext
@click.argument("manifest", type=click.Path(exists=True, resolve_path=True))
@click.option(
"--project", help="Name of the DBT project", default="default",
)
@click.option("--target", help="Target name", default="dev")
@click.option(
"--profile",
help="Location of profiles.yml file",
type=click.Path(exists=True, resolve_path=True),
)
def dbt(manifest: str, project: str, target: str, profile: str) -> None:
"""
Sync models and metrics from DBT to Superset.
"""
if profile is None:
profile = os.path.expanduser("~/.dbt/profiles.yml")

database = sync_database(Path(profile), project, target)
sync_datasets(Path(manifest), database)
108 changes: 108 additions & 0 deletions superset/cli/sync/dbt/databases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
from pathlib import Path
from typing import Any, Dict, TYPE_CHECKING

import yaml

from superset import db

if TYPE_CHECKING:
from superset.models.core import Database

_logger = logging.getLogger(__name__)


def build_sqlalchemy_uri(target: Dict[str, Any]) -> str:
"""
Build the SQLAlchemy URI for a given target.
"""
type_ = target.get("type")

if type_ == "postgres":
return build_postgres_sqlalchemy_uri(target)

raise Exception(
f"Unable to build a SQLAlchemy URI for a target of type {type_}. Please file an "
"issue at "
"https://github.com/apache/superset/issues/new?template=feature_request.md"
)


def build_postgres_sqlalchemy_uri(target: Dict[str, Any]) -> str:
"""
Build the SQLAlchemy URI for a Postgres target.
"""
return "postgresql+psycopg2://{user}:{pass}@{host}:{port}/{dbname}".format(**target)


def sync_database(
profile_path: Path, project_name: str, target_name: str
) -> "Database":
"""
Read target database from a DBT profile.yml and sync to Superset.
"""
from superset.models.core import Database # pylint: disable=import-outside-toplevel

with open(profile_path, encoding="utf-8") as inp:
profile = yaml.load(inp, Loader=yaml.SafeLoader)

if project_name not in profile:
raise Exception(f"Project {project_name} not found in {profile_path}")

project = profile[project_name]
outputs = project["outputs"]

if target_name not in outputs:
raise Exception(
f"Target {target_name} not found in the outputs of {profile_path}"
)

target = outputs[target_name]
sqlalchemy_uri = build_sqlalchemy_uri(target)

databases = (
db.session.query(Database).filter_by(sqlalchemy_uri=sqlalchemy_uri).all()
)
if len(databases) > 1:
raise Exception(
"More than one database with the same SQLAlchemy URI found, unable to update"
)

# read additional metadata
meta = target.get("meta", {}).get("superset", {})

if databases:
_logger.info("Found an existing database, updating it")
database = databases[0]
database.database_name = f"{project_name}_{target_name}"
for key, value in meta.items():
setattr(database, key, value)
else:
_logger.info("No database found, creating it")
database = Database(
database_name=f"{project_name}_{target_name}",
sqlalchemy_uri=sqlalchemy_uri,
**meta,
)
db.session.add(database)

db.session.commit()

return database
115 changes: 115 additions & 0 deletions superset/cli/sync/dbt/datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=import-outside-toplevel

import logging
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, TYPE_CHECKING

import yaml

from superset import db

if TYPE_CHECKING:
from superset.models.core import Database

_logger = logging.getLogger(__name__)


def get_metric_expression(metric: Dict[str, Any]) -> str:
"""
Return a SQL expression for a given DBT metric.
"""
return "{type}({sql})".format(**metric)


def sync_datasets( # pylint: disable=too-many-locals
manifest_path: Path, database: "Database",
) -> None:
"""
Read the DBT manifest and import models as datasets with metrics.
"""
from superset import security_manager
from superset.connectors.sqla.models import SqlaTable, SqlMetric
from superset.datasets.commands.create import CreateDatasetCommand
from superset.datasets.dao import DatasetDAO

with open(manifest_path, encoding="utf-8") as inp:
manifest = yaml.load(inp, Loader=yaml.SafeLoader)

user = security_manager.find_user("admin")

# extract metrics
metrics: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for metric in manifest["metrics"].values():
for unique_id in metric["depends_on"]["nodes"]:
metrics[unique_id].append(metric)

# add datasets
datasets = list(manifest["sources"].values()) + list(manifest["nodes"].values())
for config in datasets:
new = DatasetDAO.validate_uniqueness(
database.id, config["schema"], config["name"]
)
if new:
_logger.info("Creating dataset %s", config["unique_id"])
command = CreateDatasetCommand(
user,
{
"database": database.id,
"schema": config["schema"],
"table_name": config["name"],
},
)
dataset = command.run()
else:
_logger.info("Updating dataset %s", config["unique_id"])
dataset = (
db.session.query(SqlaTable)
.filter_by(
database_id=database.id,
schema=config["schema"],
table_name=config["name"],
)
.one()
)
dataset.fetch_metadata()

# add extra metadata
dataset.description = config["description"]

# delete existing metrics before adding the ones from the config
for existing_metric in dataset.metrics:
db.session.delete(existing_metric)
db.session.flush()

# add metrics
if config["resource_type"] == "model":
for metric in metrics[config["unique_id"]]:
dataset.metrics.append(
SqlMetric(
expression=get_metric_expression(metric),
metric_name=metric["name"],
metric_type=metric["type"],
verbose_name=get_metric_expression(metric),
description=metric["description"],
**metric["meta"],
)
)

db.session.commit()
Loading

0 comments on commit 4035ccb

Please sign in to comment.