Skip to content

Commit

Permalink
Better ETL accessors (#897)
Browse files Browse the repository at this point in the history
* Better ETL accessors (soon to action, analysis group)

* Apply pre-commit after rebase

* Docstring

* Add analysis group to etl accessors

* No red lines usually means it's fine

---------

Co-authored-by: Michael Franklin <illusional@users.noreply.github.com>
  • Loading branch information
illusional and illusional authored Aug 28, 2024
1 parent e7c6d49 commit 0b681b5
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 38 deletions.
38 changes: 13 additions & 25 deletions etl/load/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,17 +352,10 @@ def get_parser_instance(
# check that submitting_user has access to parser

accessor_config: dict[
str,
Literal['by_type'],
dict[
Literal['parsers'],
list[
dict[
Literal['name']
| Literal['parser_name']
| Literal['default_parameters'],
Any,
]
],
str,
dict[Literal['parser_name', 'default_parameters', 'users'], Any],
],
] = get_accessor_config()

Expand All @@ -371,26 +364,21 @@ def get_parser_instance(
f'Submitting user {submitting_user} is not allowed to access any parsers'
)

# find the config
etl_accessor_config = next(
(
accessor_config
for accessor_config in accessor_config[submitting_user].get('parsers', [])
if accessor_config['name'].strip(STRIP_CHARS)
== request_type.strip(STRIP_CHARS)
),
None,
)
if not etl_accessor_config:
parser_config = accessor_config['by_type'].get(request_type)
if not parser_config:
return None, (
f'Submitting user {submitting_user} requested type {request_type}, '
'but was not available'
)

if submitting_user not in parser_config.get('users', []):
return None, (
f'Submitting user {submitting_user} is not allowed to access {request_type}'
)

parser_name = (etl_accessor_config.get('parser_name') or request_type).strip(
STRIP_CHARS
)
parser_name = (parser_config.get('parser_name') or request_type).strip(STRIP_CHARS)

init_params.update(etl_accessor_config.get('default_parameters', {}))
init_params.update(parser_config.get('default_parameters', {}))

parser_map = prepare_parser_map()

Expand Down
88 changes: 75 additions & 13 deletions metamist_infrastructure/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import pulumi
import pulumi_gcp as gcp

from cpg_infra.abstraction.gcp import get_member_key
from cpg_infra.plugin import CpgInfrastructurePlugin
from cpg_infra.utils import archive_folder

from metamist_infrastructure.etl_config import EtlConfig
from metamist_infrastructure.slack_notification import (
SlackNotification,
SlackNotificationConfig,
Expand Down Expand Up @@ -56,6 +58,11 @@ def main(self):
# todo, eventually configure metamist cloud run server
# to be deployed here, but for now it's manually deployed

def on_group_finalisation(self):
"""
This method is called once all groups have been finalised
"""
# we set-up the etl here, as we need to know the etl accessors
self._setup_etl()

@cached_property
Expand Down Expand Up @@ -204,7 +211,6 @@ def etl_accessors(self) -> dict[str, gcp.serviceaccount.Account]:
depends_on=[self._svc_iam],
),
)
# keys only
for name in self.config.metamist.etl.accessors
}

Expand Down Expand Up @@ -240,20 +246,58 @@ def etl_configuration_secret_version(self):
assert self.config.metamist.etl
assert self.config.metamist.etl.accessors

etl_accessor_config = {
k: v.to_dict() for k, v in self.config.metamist.etl.accessors.items()
}
def map_accessors_to_new_body(accessors_by_type: dict[str, list[str]]) -> str:
assert self.config.metamist
assert self.config.metamist.etl

def map_accessors_to_new_body(arg):
accessors: dict[str, str] = dict(arg)
# dict[gcp.serviceaccount.Account: dict[str, ]]
remapped = {accessors[k]: v for k, v in etl_accessor_config.items()}
return json.dumps(remapped)
config = EtlConfig(
by_type={
t: EtlConfig.EtlConfigType(
parser_name=config.parser_name,
users=accessors_by_type.get(t, []),
default_parameters=config.default_parameters,
)
for t, config in self.config.metamist.etl.by_type.items()
}
)
retval = json.dumps(config.to_dict())
pulumi.warn(retval)
return retval

# do this in a few hits as pretty sure pulumi.Output collects don't chain
etl_accessors_emails: dict[str, pulumi.Output[str]] = {
k: v.email for k, v in self.etl_accessors.items()
}
remapped_with_id = pulumi.Output.all(**etl_accessors_emails).apply(
datasets_to_lookup_analysis_members_for = set(
t.analysis_group_dataset
for t in self.config.metamist.etl.by_type.values()
if t.analysis_group_dataset
)
members_for_datasets: dict[str, pulumi.Output[str]] = {
dataset: self.infrastructure.group_provider.resolve_group_members(
self.infrastructure.dataset_infrastructures[dataset]
.clouds['gcp']
.analysis_group
)
for dataset in datasets_to_lookup_analysis_members_for
}

pulumi_outputs_by_etl_type: dict[str, pulumi.Output[list[str]]] = {}

for by_type, by_type_config in self.config.metamist.etl.by_type.items():
members: list[pulumi.Output[str]] = []

for etl_member in by_type_config.accessors:
members.append(etl_accessors_emails[etl_member])

if by_type_config.analysis_group_dataset:
members.extend(
members_for_datasets[by_type_config.analysis_group_dataset]
)
# turns list[Pulumi.Output[str]] into Pulumi.Output[list[str]]
pulumi_outputs_by_etl_type[by_type] = pulumi.Output.all(*members)

remapped_with_id = pulumi.Output.all(**pulumi_outputs_by_etl_type).apply(
map_accessors_to_new_body
)
return gcp.secretmanager.SecretVersion(
Expand Down Expand Up @@ -811,14 +855,32 @@ def _etl_function(
return fxn

def _setup_metamist_etl_accessors(self):
for name, sa in self.etl_accessors.items():
assert self.config.metamist
assert self.config.metamist.etl

accessors = dict(self.etl_accessors)
# grab datasets
datasets_to_lookup_analysis_members_for = set(
t.analysis_group_dataset
for t in self.config.metamist.etl.by_type.values()
if t.analysis_group_dataset
)

for dataset in datasets_to_lookup_analysis_members_for:
accessors[f'{dataset}-analysis-group'] = (
self.infrastructure.dataset_infrastructures[dataset]
.clouds['gcp']
.analysis_group
)

for name, account in accessors.items():
gcp.cloudfunctionsv2.FunctionIamMember(
f'metamist-etl-accessor-{name}',
location=self.etl_extract_function.location,
project=self.etl_extract_function.project,
cloud_function=self.etl_extract_function.name,
role='roles/cloudfunctions.invoker',
member=pulumi.Output.concat('serviceAccount:', sa.email),
member=get_member_key(account),
)

gcp.cloudrun.IamMember(
Expand All @@ -827,7 +889,7 @@ def _setup_metamist_etl_accessors(self):
project=self.etl_extract_function.project,
service=self.etl_extract_function.name, # it shared the name
role='roles/run.invoker',
member=pulumi.Output.concat('serviceAccount:', sa.email),
member=get_member_key(account),
)

@cached_property
Expand Down
33 changes: 33 additions & 0 deletions metamist_infrastructure/etl_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import dataclasses
from typing import Any

from cpg_infra.config.deserializabledataclass import DeserializableDataclass


@dataclasses.dataclass(frozen=True)
class EtlConfig(DeserializableDataclass):
"""ETL Config output type"""

@dataclasses.dataclass(frozen=True)
class EtlConfigType(DeserializableDataclass):
"""ETL config type"""

parser_name: str
default_parameters: dict[str, Any] | None
users: list[str]

by_type: dict[str, 'EtlConfigType']

def to_dict(self) -> dict[str, dict[str, Any]]:
"""
Convert the config to a dictionary
"""
return {k: dataclasses.asdict(v) for k, v in self.by_type.items()}

@classmethod
def from_dict(cls, data: dict[str, dict[str, Any]]) -> 'EtlConfig':
"""
Create an EtlConfig from a dictionary
"""
by_type = {k: cls.EtlConfigType(**v) for k, v in data.items()}
return cls(by_type=by_type)

0 comments on commit 0b681b5

Please sign in to comment.