Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better ETL accessors #897

Merged
merged 5 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 13 additions & 25 deletions etl/load/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,17 +352,10 @@ def get_parser_instance(
# check that submitting_user has access to parser

accessor_config: dict[
str,
Literal['by_type'],
dict[
Literal['parsers'],
list[
dict[
Literal['name']
| Literal['parser_name']
| Literal['default_parameters'],
Any,
]
],
str,
dict[Literal['parser_name', 'default_parameters', 'users'], Any],
],
] = get_accessor_config()

Expand All @@ -371,26 +364,21 @@ def get_parser_instance(
f'Submitting user {submitting_user} is not allowed to access any parsers'
)

# find the config
etl_accessor_config = next(
(
accessor_config
for accessor_config in accessor_config[submitting_user].get('parsers', [])
if accessor_config['name'].strip(STRIP_CHARS)
== request_type.strip(STRIP_CHARS)
),
None,
)
if not etl_accessor_config:
parser_config = accessor_config['by_type'].get(request_type)
if not parser_config:
return None, (
f'Submitting user {submitting_user} requested type {request_type}, '
'but was not available'
)

if submitting_user not in parser_config.get('users', []):
return None, (
f'Submitting user {submitting_user} is not allowed to access {request_type}'
)

parser_name = (etl_accessor_config.get('parser_name') or request_type).strip(
STRIP_CHARS
)
parser_name = (parser_config.get('parser_name') or request_type).strip(STRIP_CHARS)

init_params.update(etl_accessor_config.get('default_parameters', {}))
init_params.update(parser_config.get('default_parameters', {}))

parser_map = prepare_parser_map()

Expand Down
88 changes: 75 additions & 13 deletions metamist_infrastructure/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import pulumi
import pulumi_gcp as gcp

from cpg_infra.abstraction.gcp import get_member_key
from cpg_infra.plugin import CpgInfrastructurePlugin
from cpg_infra.utils import archive_folder

from metamist_infrastructure.etl_config import EtlConfig
from metamist_infrastructure.slack_notification import (
SlackNotification,
SlackNotificationConfig,
Expand Down Expand Up @@ -56,6 +58,11 @@ def main(self):
# todo, eventually configure metamist cloud run server
# to be deployed here, but for now it's manually deployed

def on_group_finalisation(self):
"""
This method is called once all groups have been finalised
"""
# we set-up the etl here, as we need to know the etl accessors
self._setup_etl()

@cached_property
Expand Down Expand Up @@ -204,7 +211,6 @@ def etl_accessors(self) -> dict[str, gcp.serviceaccount.Account]:
depends_on=[self._svc_iam],
),
)
# keys only
for name in self.config.metamist.etl.accessors
}

Expand Down Expand Up @@ -240,20 +246,58 @@ def etl_configuration_secret_version(self):
assert self.config.metamist.etl
assert self.config.metamist.etl.accessors

etl_accessor_config = {
k: v.to_dict() for k, v in self.config.metamist.etl.accessors.items()
}
def map_accessors_to_new_body(accessors_by_type: dict[str, list[str]]) -> str:
assert self.config.metamist
assert self.config.metamist.etl

def map_accessors_to_new_body(arg):
accessors: dict[str, str] = dict(arg)
# dict[gcp.serviceaccount.Account: dict[str, ]]
remapped = {accessors[k]: v for k, v in etl_accessor_config.items()}
return json.dumps(remapped)
config = EtlConfig(
by_type={
t: EtlConfig.EtlConfigType(
parser_name=config.parser_name,
users=accessors_by_type.get(t, []),
default_parameters=config.default_parameters,
)
for t, config in self.config.metamist.etl.by_type.items()
}
)
retval = json.dumps(config.to_dict())
pulumi.warn(retval)
return retval

# do this in a few hits as pretty sure pulumi.Output collects don't chain
etl_accessors_emails: dict[str, pulumi.Output[str]] = {
k: v.email for k, v in self.etl_accessors.items()
}
remapped_with_id = pulumi.Output.all(**etl_accessors_emails).apply(
datasets_to_lookup_analysis_members_for = set(
t.analysis_group_dataset
for t in self.config.metamist.etl.by_type.values()
if t.analysis_group_dataset
)
members_for_datasets: dict[str, pulumi.Output[str]] = {
dataset: self.infrastructure.group_provider.resolve_group_members(
self.infrastructure.dataset_infrastructures[dataset]
.clouds['gcp']
.analysis_group
)
for dataset in datasets_to_lookup_analysis_members_for
}

pulumi_outputs_by_etl_type: dict[str, pulumi.Output[list[str]]] = {}

for by_type, by_type_config in self.config.metamist.etl.by_type.items():
members: list[pulumi.Output[str]] = []

for etl_member in by_type_config.accessors:
members.append(etl_accessors_emails[etl_member])

if by_type_config.analysis_group_dataset:
members.extend(
members_for_datasets[by_type_config.analysis_group_dataset]
)
# turns list[Pulumi.Output[str]] into Pulumi.Output[list[str]]
pulumi_outputs_by_etl_type[by_type] = pulumi.Output.all(*members)

remapped_with_id = pulumi.Output.all(**pulumi_outputs_by_etl_type).apply(
map_accessors_to_new_body
)
return gcp.secretmanager.SecretVersion(
Expand Down Expand Up @@ -811,14 +855,32 @@ def _etl_function(
return fxn

def _setup_metamist_etl_accessors(self):
for name, sa in self.etl_accessors.items():
assert self.config.metamist
assert self.config.metamist.etl

accessors = dict(self.etl_accessors)
# grab datasets
datasets_to_lookup_analysis_members_for = set(
t.analysis_group_dataset
for t in self.config.metamist.etl.by_type.values()
if t.analysis_group_dataset
)

for dataset in datasets_to_lookup_analysis_members_for:
accessors[f'{dataset}-analysis-group'] = (
self.infrastructure.dataset_infrastructures[dataset]
.clouds['gcp']
.analysis_group
)

for name, account in accessors.items():
gcp.cloudfunctionsv2.FunctionIamMember(
f'metamist-etl-accessor-{name}',
location=self.etl_extract_function.location,
project=self.etl_extract_function.project,
cloud_function=self.etl_extract_function.name,
role='roles/cloudfunctions.invoker',
member=pulumi.Output.concat('serviceAccount:', sa.email),
member=get_member_key(account),
)

gcp.cloudrun.IamMember(
Expand All @@ -827,7 +889,7 @@ def _setup_metamist_etl_accessors(self):
project=self.etl_extract_function.project,
service=self.etl_extract_function.name, # it shared the name
role='roles/run.invoker',
member=pulumi.Output.concat('serviceAccount:', sa.email),
member=get_member_key(account),
)

@cached_property
Expand Down
33 changes: 33 additions & 0 deletions metamist_infrastructure/etl_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import dataclasses
from typing import Any

from cpg_infra.config.deserializabledataclass import DeserializableDataclass


@dataclasses.dataclass(frozen=True)
class EtlConfig(DeserializableDataclass):
"""ETL Config output type"""

@dataclasses.dataclass(frozen=True)
class EtlConfigType(DeserializableDataclass):
"""ETL config type"""

parser_name: str
default_parameters: dict[str, Any] | None
users: list[str]

by_type: dict[str, 'EtlConfigType']

def to_dict(self) -> dict[str, dict[str, Any]]:
"""
Convert the config to a dictionary
"""
return {k: dataclasses.asdict(v) for k, v in self.by_type.items()}

@classmethod
def from_dict(cls, data: dict[str, dict[str, Any]]) -> 'EtlConfig':
"""
Create an EtlConfig from a dictionary
"""
by_type = {k: cls.EtlConfigType(**v) for k, v in data.items()}
return cls(by_type=by_type)