Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/redshift iam auth #818

Merged
merged 8 commits into from
Jul 4, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def date_function(cls):
def get_status(cls, cursor):
return cursor.statusmessage

@classmethod
def get_credentials(cls, credentials):
return credentials

@classmethod
def open_connection(cls, connection):
if connection.get('state') == 'open':
Expand All @@ -61,8 +65,10 @@ def open_connection(cls, connection):

result = connection.copy()

base_credentials = connection.get('credentials', {})
credentials = cls.get_credentials(base_credentials.copy())

try:
credentials = connection.get('credentials', {})
handle = psycopg2.connect(
dbname=credentials.get('dbname'),
user=credentials.get('user'),
Expand Down
60 changes: 57 additions & 3 deletions dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

from dbt.adapters.postgres import PostgresAdapter
from dbt.logger import GLOBAL_LOGGER as logger # noqa

import dbt.exceptions
import boto3

drop_lock = multiprocessing.Lock()


class RedshiftAdapter(PostgresAdapter):

@classmethod
def type(cls):
return 'redshift'
Expand All @@ -17,6 +17,60 @@ def type(cls):
def date_function(cls):
return 'getdate()'

@classmethod
def get_tmp_iam_cluster_credentials(cls, credentials):
cluster_id = credentials.get('cluster_id')

# default via:
# boto3.readthedocs.io/en/latest/reference/services/redshift.html
iam_duration_s = credentials.get('iam_duration_seconds', 900)

if not cluster_id:
raise dbt.exceptions.FailedToConnectException(
"'cluster_id' must be provided in profile if IAM "
"authentication method selected")

boto_client = boto3.client('redshift')

# replace username and password with temporary redshift credentials
to_update = {}
try:
cluster_creds = boto_client.get_cluster_credentials(
DbUser=credentials.get('user'),
DbName=credentials.get('dbname'),
ClusterIdentifier=credentials.get('cluster_id'),
DurationSeconds=iam_duration_s,
AutoCreate=False)

to_update = {
'user': cluster_creds.get('DbUser'),
'pass': cluster_creds.get('DbPassword')
}

except boto_client.exceptions.ClientError as e:
raise dbt.exceptions.FailedToConnectException(
"Unable to get temporary Redshift cluster credentials: "
"{}".format(e))

return dbt.utils.merge(credentials, to_update)

@classmethod
def get_credentials(cls, credentials):
method = credentials.get('method')

# Support missing 'method' for backwards compatibility
if method == 'database' or method is None:
logger.debug("Connecting to Redshift using 'database' credentials")
return credentials

elif method == 'iam':
logger.debug("Connecting to Redshift using 'IAM' credentials")
return cls.get_tmp_iam_cluster_credentials(credentials)

else:
raise dbt.exceptions.FailedToConnectException(
'Invalid `method` in profile: "{}"'.format(method))

@classmethod
def _get_columns_in_table_sql(cls, schema_name, table_name, database):
# Redshift doesn't support cross-database queries,
Expand All @@ -27,7 +81,7 @@ def _get_columns_in_table_sql(cls, schema_name, table_name, database):
table_schema_filter = '1=1'
else:
table_schema_filter = "table_schema = '{schema_name}'".format(
schema_name=schema_name)
schema_name=schema_name)

sql = """
with bound_views as (
Expand Down
56 changes: 53 additions & 3 deletions dbt/contracts/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,52 @@
'required': ['dbname', 'host', 'user', 'pass', 'port', 'schema'],
}

REDSHIFT_CREDENTIALS_CONTRACT = {
'type': 'object',
'additionalProperties': False,
'properties': {
'method': {
'enum': ['database', 'iam']
},
'dbname': {
'type': 'string',
},
'host': {
'type': 'string',
},
'user': {
'type': 'string',
},
'pass': {
'type': 'string',
},
'port': {
'oneOf': [
{
'type': 'integer',
'minimum': 0,
'maximum': 65535,
},
{
'type': 'string'
},
],
},
'schema': {
'type': 'string',
},
'cluster_id': {
'type': 'string'
},
'iam_duration_seconds': {
'type': ['null', 'integer'],
'minimum': 900,
'maximum': 3600
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean if a model takes >1hr to run, that connection will close? (that's fine, just want to understand)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's right. I couldn't find any specifics in the docs, but keen to set the timeout to 900s then experiment with a time.sleep() after acquiring the connection. I bet that once a query is issues, it will complete even if the creds are expired, but subsequent queries on the same connection will likely fail.

I wish we could make the timeout longer, but 3600s is the max per the docs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is pretty typical of aws federated auth. >1hr would be great.

},
'required': ['dbname', 'host', 'user', 'port', 'schema']
}
}

SNOWFLAKE_CREDENTIALS_CONTRACT = {
'type': 'object',
'additionalProperties': False,
Expand Down Expand Up @@ -113,11 +159,11 @@
},
'credentials': {
'description': (
'The credentials object here should match the connection '
'type. Redshift uses the Postgres connection model.'
'The credentials object here should match the connection type.'
),
'oneOf': [
POSTGRES_CREDENTIALS_CONTRACT,
REDSHIFT_CREDENTIALS_CONTRACT,
SNOWFLAKE_CREDENTIALS_CONTRACT,
BIGQUERY_CREDENTIALS_CONTRACT,
],
Expand All @@ -133,6 +179,10 @@ class PostgresCredentials(APIObject):
SCHEMA = POSTGRES_CREDENTIALS_CONTRACT


class RedshiftCredentials(APIObject):
SCHEMA = REDSHIFT_CREDENTIALS_CONTRACT


class SnowflakeCredentials(APIObject):
SCHEMA = SNOWFLAKE_CREDENTIALS_CONTRACT

Expand All @@ -143,7 +193,7 @@ class BigQueryCredentials(APIObject):

CREDENTIALS_MAPPING = {
'postgres': PostgresCredentials,
'redshift': PostgresCredentials,
'redshift': RedshiftCredentials,
'snowflake': SnowflakeCredentials,
'bigquery': BigQueryCredentials,
}
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ google-cloud-bigquery==0.29.0
requests>=2.18.0
agate>=1.6,<2
jsonschema==2.6.0
boto3>=1.6.23
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ def read(fname):
'google-cloud-bigquery==0.29.0',
'agate>=1.6,<2',
'jsonschema==2.6.0',
'boto3>=1.6.23'
]
)