From d6608e54098f7ace5e379ed215d9471e36e3226c Mon Sep 17 00:00:00 2001 From: Michael Franklin Date: Tue, 13 Aug 2024 10:21:16 +1000 Subject: [PATCH 1/5] Better ETL accessors (soon to action, analysis group) --- metamist_infrastructure/driver.py | 264 ++++++++++++++------------ metamist_infrastructure/etl_config.py | 23 +++ 2 files changed, 163 insertions(+), 124 deletions(-) create mode 100644 metamist_infrastructure/etl_config.py diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index e8897591c..63665ab92 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -14,6 +14,7 @@ from cpg_infra.plugin import CpgInfrastructurePlugin from cpg_infra.utils import archive_folder +from metamist_infrastructure.etl_config import EtlConfig from metamist_infrastructure.slack_notification import ( SlackNotification, SlackNotificationConfig, @@ -21,10 +22,10 @@ ) # this gets moved around during the pip install -ETL_FOLDER = Path(__file__).parent / 'etl' +ETL_FOLDER = Path(__file__).parent / "etl" # ETL_FOLDER = Path(__file__).parent.parent / 'etl' -PATH_TO_ETL_BQ_SCHEMA = ETL_FOLDER / 'bq_schema.json' -PATH_TO_ETL_BQ_LOG_SCHEMA = ETL_FOLDER / 'bq_log_schema.json' +PATH_TO_ETL_BQ_SCHEMA = ETL_FOLDER / "bq_schema.json" +PATH_TO_ETL_BQ_LOG_SCHEMA = ETL_FOLDER / "bq_log_schema.json" def append_private_repositories_to_requirements( @@ -34,13 +35,13 @@ def append_private_repositories_to_requirements( Append private repositories to requirements.txt """ - with open(filename, encoding='utf-8') as file: + with open(filename, encoding="utf-8") as file: file_content = file.read() if private_repo_url and private_repos: file_content = ( file_content # original content - + f'\n--extra-index-url {private_repo_url}\n' # private repo url - + '\n'.join(private_repos) # private repositories + + f"\n--extra-index-url {private_repo_url}\n" # private repo url + + "\n".join(private_repos) # private repositories ) return pulumi.StringAsset(file_content) @@ -56,14 +57,17 @@ def main(self): # todo, eventually configure metamist cloud run server # to be deployed here, but for now it's manually deployed + def on_group_finalisation(self): + """ """ + # do this later on self._setup_etl() @cached_property def _svc_cloudresourcemanager(self): assert self.config.metamist return gcp.projects.Service( - 'metamist-cloudresourcemanager-service', - service='cloudresourcemanager.googleapis.com', + "metamist-cloudresourcemanager-service", + service="cloudresourcemanager.googleapis.com", disable_on_destroy=False, project=self.config.metamist.gcp.project, ) @@ -72,8 +76,8 @@ def _svc_cloudresourcemanager(self): def _svc_iam(self): assert self.config.metamist return gcp.projects.Service( - 'metamist-iam-service', - service='iam.googleapis.com', + "metamist-iam-service", + service="iam.googleapis.com", disable_on_destroy=False, project=self.config.metamist.gcp.project, opts=pulumi.resource.ResourceOptions( @@ -85,8 +89,8 @@ def _svc_iam(self): def _svc_functions(self): assert self.config.metamist return gcp.projects.Service( - 'metamist-cloudfunctions-service', - service='cloudfunctions.googleapis.com', + "metamist-cloudfunctions-service", + service="cloudfunctions.googleapis.com", project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -95,8 +99,8 @@ def _svc_functions(self): def _svc_pubsub(self): assert self.config.metamist return gcp.projects.Service( - 'metamist-pubsub-service', - service='pubsub.googleapis.com', + "metamist-pubsub-service", + service="pubsub.googleapis.com", project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -105,8 +109,8 @@ def _svc_pubsub(self): def _svc_scheduler(self): assert self.config.metamist return gcp.projects.Service( - 'metamist-cloudscheduler-service', - service='cloudscheduler.googleapis.com', + "metamist-cloudscheduler-service", + service="cloudscheduler.googleapis.com", project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -115,8 +119,8 @@ def _svc_scheduler(self): def _svc_build(self): assert self.config.metamist return gcp.projects.Service( - 'metamist-cloudbuild-service', - service='cloudbuild.googleapis.com', + "metamist-cloudbuild-service", + service="cloudbuild.googleapis.com", project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -125,8 +129,8 @@ def _svc_build(self): def _svc_bigquery(self): assert self.config.metamist return gcp.projects.Service( - 'metamist-bigquery-service', - service='bigquery.googleapis.com', + "metamist-bigquery-service", + service="bigquery.googleapis.com", project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -135,8 +139,8 @@ def _svc_bigquery(self): def _svc_secretmanager(self): assert self.config.metamist return gcp.projects.Service( - 'metamist-secretmanager-service', - service='secretmanager.googleapis.com', + "metamist-secretmanager-service", + service="secretmanager.googleapis.com", disable_on_destroy=False, opts=pulumi.resource.ResourceOptions( depends_on=[self._svc_cloudresourcemanager] @@ -153,8 +157,8 @@ def source_bucket(self): assert self.config.gcp assert self.config.metamist return gcp.storage.Bucket( - 'metamist-source-bucket', - name=f'{self.config.gcp.dataset_storage_prefix}metamist-source-bucket', + "metamist-source-bucket", + name=f"{self.config.gcp.dataset_storage_prefix}metamist-source-bucket", location=self.config.gcp.region, project=self.config.metamist.gcp.project, uniform_bucket_level_access=True, @@ -166,8 +170,8 @@ def _etl_function_account(self, f_name: str): """ assert self.config.metamist return gcp.serviceaccount.Account( - f'metamist-etl-{f_name}service-account', - account_id=f'metamist-etl-{f_name}sa', + f"metamist-etl-{f_name}service-account", + account_id=f"metamist-etl-{f_name}sa", project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( depends_on=[self._svc_iam], @@ -177,17 +181,17 @@ def _etl_function_account(self, f_name: str): @cached_property def etl_service_account(self): """Service account to run notification functionality and other services""" - return self._etl_function_account('') + return self._etl_function_account("") @cached_property def etl_load_service_account(self): """Service account to run load/transform functionality""" - return self._etl_function_account('load-') + return self._etl_function_account("load-") @cached_property def etl_extract_service_account(self): """Service account to run extract functionality""" - return self._etl_function_account('extract-') + return self._etl_function_account("extract-") @cached_property def etl_accessors(self) -> dict[str, gcp.serviceaccount.Account]: @@ -197,14 +201,13 @@ def etl_accessors(self) -> dict[str, gcp.serviceaccount.Account]: assert self.config.metamist.etl.accessors return { name: gcp.serviceaccount.Account( - f'metamist-etl-accessor-{name}', - account_id=f'metamist-etl-{name}', + f"metamist-etl-accessor-{name}", + account_id=f"metamist-etl-{name}", project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( depends_on=[self._svc_iam], ), ) - # keys only for name in self.config.metamist.etl.accessors } @@ -218,8 +221,8 @@ def etl_configuration_secret(self): assert self.config.metamist return gcp.secretmanager.Secret( - 'metamist-etl-accessor-configuration-secret', - secret_id='accessor-configuration', + "metamist-etl-accessor-configuration-secret", + secret_id="accessor-configuration", replication=gcp.secretmanager.SecretReplicationArgs( user_managed=gcp.secretmanager.SecretReplicationUserManagedArgs( replicas=[ @@ -244,10 +247,23 @@ def etl_configuration_secret_version(self): k: v.to_dict() for k, v in self.config.metamist.etl.accessors.items() } - def map_accessors_to_new_body(arg): - accessors: dict[str, str] = dict(arg) + def map_accessors_to_new_body(_accessors_map): + assert self.config.metamist + assert self.config.metamist.etl + + accessors: dict[str, str] = dict(_accessors_map) # dict[gcp.serviceaccount.Account: dict[str, ]] remapped = {accessors[k]: v for k, v in etl_accessor_config.items()} + EtlConfig( + by_type={ + t: EtlConfig.EtlConfigType( + parser_name=config.parser_name, + users=[accessors[u] for u in config.accessors], + default_parameters=config.default_parameters, + ) + for t, config in self.config.metamist.etl.by_type.items() + } + ) return json.dumps(remapped) etl_accessors_emails: dict[str, pulumi.Output[str]] = { @@ -257,7 +273,7 @@ def map_accessors_to_new_body(arg): map_accessors_to_new_body ) return gcp.secretmanager.SecretVersion( - 'metamist-etl-accessor-configuration', + "metamist-etl-accessor-configuration", secret=self.etl_configuration_secret.id, secret_data=remapped_with_id, ) @@ -267,12 +283,12 @@ def _setup_etl_configuration_secret_value(self): assert self.config.metamist gcp.secretmanager.SecretIamMember( - 'metamist-etl-accessor-configuration-access', + "metamist-etl-accessor-configuration-access", project=self.config.metamist.gcp.project, secret_id=self.etl_configuration_secret.id, - role='roles/secretmanager.secretAccessor', + role="roles/secretmanager.secretAccessor", member=pulumi.Output.concat( - 'serviceAccount:', self.etl_load_service_account.email + "serviceAccount:", self.etl_load_service_account.email ), ) @@ -283,7 +299,7 @@ def etl_pubsub_topic(self): """ assert self.config.metamist return gcp.pubsub.Topic( - 'metamist-etl-topic', + "metamist-etl-topic", project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions(depends_on=[self._svc_pubsub]), ) @@ -295,18 +311,18 @@ def etl_pubsub_dead_letters_topic(self): """ assert self.config.metamist topic = gcp.pubsub.Topic( - 'metamist-etl-dead-letters-topic', + "metamist-etl-dead-letters-topic", project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions(depends_on=[self._svc_pubsub]), ) # give publisher permission to service account gcp.pubsub.TopicIAMPolicy( - 'metamist-etl-dead-letters-topic-iam-policy', + "metamist-etl-dead-letters-topic-iam-policy", project=self.config.metamist.gcp.project, topic=topic.name, policy_data=self.prepare_service_account_policy_data( - 'roles/pubsub.publisher' + "roles/pubsub.publisher" ), ) @@ -321,14 +337,14 @@ def etl_pubsub_push_subscription(self): assert self.config.metamist subscription = gcp.pubsub.Subscription( - 'metamist-etl-subscription', + "metamist-etl-subscription", topic=self.etl_pubsub_topic.name, ack_deadline_seconds=30, expiration_policy=gcp.pubsub.SubscriptionExpirationPolicyArgs( - ttl='', # never expire + ttl="", # never expire ), retry_policy=gcp.pubsub.SubscriptionRetryPolicyArgs( - minimum_backoff='10s', # 10 seconds backoff + minimum_backoff="10s", # 10 seconds backoff ), dead_letter_policy=gcp.pubsub.SubscriptionDeadLetterPolicyArgs( dead_letter_topic=self.etl_pubsub_dead_letters_topic.id, @@ -340,7 +356,7 @@ def etl_pubsub_push_subscription(self): service_account_email=self.etl_extract_service_account.email, ), attributes={ - 'x-goog-version': 'v1', + "x-goog-version": "v1", }, ), project=self.config.metamist.gcp.project, @@ -354,11 +370,11 @@ def etl_pubsub_push_subscription(self): # give subscriber permission to service account gcp.pubsub.SubscriptionIAMPolicy( - 'metamist-etl-pubsub-topic-subscription-policy', + "metamist-etl-pubsub-topic-subscription-policy", project=self.config.metamist.gcp.project, subscription=subscription.name, policy_data=self.prepare_service_account_policy_data( - 'roles/pubsub.subscriber' + "roles/pubsub.subscriber" ), ) @@ -372,7 +388,7 @@ def etl_pubsub_dead_letter_subscription(self): assert self.config.metamist return gcp.pubsub.Subscription( - 'metamist-etl-dead-letter-subscription', + "metamist-etl-dead-letter-subscription", topic=self.etl_pubsub_dead_letters_topic.name, project=self.config.metamist.gcp.project, ack_deadline_seconds=20, @@ -386,14 +402,14 @@ def etl_bigquery_dataset(self): assert self.config.gcp assert self.config.metamist return gcp.bigquery.Dataset( - 'metamist-etl-bigquery-dataset', - dataset_id='metamist', - friendly_name='metamist bigquery dataset', - description='Metamist related bigquery tables', + "metamist-etl-bigquery-dataset", + dataset_id="metamist", + friendly_name="metamist bigquery dataset", + description="Metamist related bigquery tables", location=self.config.gcp.region, # default_table_expiration_ms=3600000, labels={ - 'project': 'metamist', + "project": "metamist", }, project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( @@ -409,10 +425,10 @@ def _setup_bq_table(self, schema_file_name: Path, table_id: str, name_suffix: st schema = f.read() etl_table = gcp.bigquery.Table( - f'metamist-etl-bigquery-table{name_suffix}', + f"metamist-etl-bigquery-table{name_suffix}", table_id=table_id, dataset_id=self.etl_bigquery_dataset.dataset_id, - labels={'project': 'metamist'}, + labels={"project": "metamist"}, schema=schema, project=self.config.metamist.gcp.project, # docs say: Note: On newer versions of the provider, @@ -427,14 +443,14 @@ def etl_bigquery_table(self): Bigquery table to contain the etl data, for compatibility with the old etl, we do not suffix table name """ - return self._setup_bq_table(PATH_TO_ETL_BQ_SCHEMA, 'etl-data', '') + return self._setup_bq_table(PATH_TO_ETL_BQ_SCHEMA, "etl-data", "") @cached_property def etl_bigquery_log_table(self): """ Bigquery table to contain the etl logs, append '-logs' as resource name """ - return self._setup_bq_table(PATH_TO_ETL_BQ_LOG_SCHEMA, 'etl-logs', '-logs') + return self._setup_bq_table(PATH_TO_ETL_BQ_LOG_SCHEMA, "etl-logs", "-logs") def prepare_service_account_policy_data(self, role): """ @@ -453,9 +469,9 @@ def prepare_service_account_policy_data(self, role): role=role, members=[ pulumi.Output.concat( - 'serviceAccount:service-', + "serviceAccount:service-", project.number, - '@gcp-sa-pubsub.iam.gserviceaccount.com', + "@gcp-sa-pubsub.iam.gserviceaccount.com", ) # type: ignore ], ) @@ -470,54 +486,54 @@ def _setup_etl(self): # give the etl_load/extract service_accounts ability to read/write to bq table gcp.bigquery.DatasetAccess( - 'metamist-etl-bq-dataset-extract-service-access', + "metamist-etl-bq-dataset-extract-service-access", project=self.config.metamist.gcp.project, dataset_id=self.etl_bigquery_dataset.dataset_id, - role='WRITER', + role="WRITER", user_by_email=self.etl_extract_service_account.email, ) gcp.bigquery.DatasetAccess( - 'metamist-etl-bq-dataset-load-service-access', + "metamist-etl-bq-dataset-load-service-access", project=self.config.metamist.gcp.project, dataset_id=self.etl_bigquery_dataset.dataset_id, - role='WRITER', + role="WRITER", user_by_email=self.etl_load_service_account.email, ) # give the etl_load_service_account ability to execute bigquery jobs gcp.projects.IAMMember( - 'metamist-etl-bq-job-user-role', + "metamist-etl-bq-job-user-role", project=self.config.metamist.gcp.project, - role='roles/bigquery.jobUser', + role="roles/bigquery.jobUser", member=pulumi.Output.concat( - 'serviceAccount:', self.etl_load_service_account.email + "serviceAccount:", self.etl_load_service_account.email ), ) # give the etl_extract_service_account ability to push to pub/sub gcp.projects.IAMMember( - 'metamist-etl-extract-editor-role', + "metamist-etl-extract-editor-role", project=self.config.metamist.gcp.project, - role='roles/editor', + role="roles/editor", member=pulumi.Output.concat( - 'serviceAccount:', self.etl_extract_service_account.email + "serviceAccount:", self.etl_extract_service_account.email ), ) # give the etl_load_service_account ability to push to pub/sub gcp.projects.IAMMember( - 'metamist-etl-load-editor-role', + "metamist-etl-load-editor-role", project=self.config.metamist.gcp.project, - role='roles/editor', + role="roles/editor", member=pulumi.Output.concat( - 'serviceAccount:', self.etl_load_service_account.email + "serviceAccount:", self.etl_load_service_account.email ), ) # give the etl_load_service_account ability # to access accessor-configuration in secretmanager gcp.projects.IAMMember( - 'metamist-etl-load-secret-accessor-role', + "metamist-etl-load-secret-accessor-role", project=self.config.metamist.gcp.project, - role='roles/secretmanager.secretAccessor', + role="roles/secretmanager.secretAccessor", member=pulumi.Output.concat( - 'serviceAccount:', self.etl_load_service_account.email + "serviceAccount:", self.etl_load_service_account.email ), ) @@ -526,14 +542,14 @@ def _setup_etl(self): # if not present functions could not be deployed project = gcp.organizations.get_project() robot_account = pulumi.Output.concat( - 'serviceAccount:service-', + "serviceAccount:service-", project.number, - '@serverless-robot-prod.iam.gserviceaccount.com', + "@serverless-robot-prod.iam.gserviceaccount.com", ) gcp.projects.IAMMember( - 'metamist-etl-robot-service-agent-role', + "metamist-etl-robot-service-agent-role", project=self.config.metamist.gcp.project, - role='roles/run.serviceAgent', + role="roles/run.serviceAgent", member=robot_account, ) @@ -567,7 +583,7 @@ def _setup_etl_pubsub(self): def etl_extract_function(self): """etl_extract_function""" return self._etl_function( - 'extract', + "extract", self.etl_extract_service_account, ) @@ -579,7 +595,7 @@ def etl_load_function(self): we would need to wrapp it around with apply funciton as private repo url is Pulumi Output """ return self._private_repo_url().apply( - lambda url: self._etl_function('load', self.etl_load_service_account, url) + lambda url: self._etl_function("load", self.etl_load_service_account, url) ) def _private_repo_url(self): @@ -615,7 +631,7 @@ def _private_repo_url(self): self.infrastructure.gcp_python_registry.project, self.infrastructure.gcp_python_registry.name, ).apply( - lambda args: f'https://{args[0]}-python.pkg.dev/{args[1]}/{args[2]}/simple/' + lambda args: f"https://{args[0]}-python.pkg.dev/{args[1]}/{args[2]}/simple/" ) def _etl_external_function( @@ -629,12 +645,12 @@ def _etl_external_function( Create External Function with custom audiences """ return gcp.cloudrunv2.Service( - f'metamist-etl-{f_name}-external', - name=f'metamist-etl-{f_name}-external', + f"metamist-etl-{f_name}-external", + name=f"metamist-etl-{f_name}-external", project=self.config.metamist.gcp.project, location=self.config.gcp.region, custom_audiences=custom_audiences, - ingress='INGRESS_TRAFFIC_ALL', + ingress="INGRESS_TRAFFIC_ALL", template=gcp.cloudrunv2.ServiceTemplateArgs( containers=[ gcp.cloudrunv2.ServiceTemplateContainerArgs( @@ -643,8 +659,8 @@ def _etl_external_function( cpu_idle=True, startup_cpu_boost=True, limits={ - 'cpu': '1', - 'memory': '2Gi', + "cpu": "1", + "memory": "2Gi", }, ), envs=[ @@ -660,7 +676,7 @@ def _etl_external_function( max_instance_count=1, min_instance_count=0, ), - timeout='540s', + timeout="540s", service_account=sa.email, max_instance_request_concurrency=1, ), @@ -671,28 +687,28 @@ def _etl_get_env(self) -> dict: Commnon environment to all the etl functions and services """ return { - 'BIGQUERY_TABLE': pulumi.Output.concat( + "BIGQUERY_TABLE": pulumi.Output.concat( self.etl_bigquery_table.project, - '.', + ".", self.etl_bigquery_table.dataset_id, - '.', + ".", self.etl_bigquery_table.table_id, ), - 'BIGQUERY_LOG_TABLE': pulumi.Output.concat( + "BIGQUERY_LOG_TABLE": pulumi.Output.concat( self.etl_bigquery_log_table.project, - '.', + ".", self.etl_bigquery_log_table.dataset_id, - '.', + ".", self.etl_bigquery_log_table.table_id, ), - 'PUBSUB_TOPIC': self.etl_pubsub_topic.id, - 'NOTIFICATION_PUBSUB_TOPIC': ( + "PUBSUB_TOPIC": self.etl_pubsub_topic.id, + "NOTIFICATION_PUBSUB_TOPIC": ( self.etl_slack_notification_topic.id if self.etl_slack_notification_topic - else '' + else "" ), - 'SM_ENVIRONMENT': self.config.metamist.etl.environment, - 'CONFIGURATION_SECRET': self.etl_configuration_secret_version.id, + "SM_ENVIRONMENT": self.config.metamist.etl.environment, + "CONFIGURATION_SECRET": self.etl_configuration_secret_version.id, } def _etl_function( @@ -716,8 +732,8 @@ def _etl_function( # include private repos and metamist package # metamist package is only temprary ones to avoid circular dependencies extra_assets = { - 'requirements.txt': append_private_repositories_to_requirements( - filename=f'{str(path_to_func_folder.absolute())}/requirements.txt', + "requirements.txt": append_private_repositories_to_requirements( + filename=f"{str(path_to_func_folder.absolute())}/requirements.txt", private_repo_url=private_repo_url, private_repos=self.config.metamist.etl.private_repo_packages, ), @@ -732,14 +748,14 @@ def _etl_function( # Create the single Cloud Storage object, # which contains the source code source_archive_object = gcp.storage.BucketObject( - f'metamist-etl-{f_name}-source-code', + f"metamist-etl-{f_name}-source-code", # updating the source archive object does not trigger the cloud # function to actually updating the source because # it's based on the name, # allow Pulumi to create a new name each time it gets updated bucket=self.source_bucket.name, source=archive, - opts=pulumi.ResourceOptions(replace_on_changes=['*']), + opts=pulumi.ResourceOptions(replace_on_changes=["*"]), ) # prepare custom audience_list @@ -753,15 +769,15 @@ def _etl_function( ) fxn = gcp.cloudfunctionsv2.Function( - f'metamist-etl-{f_name}', - name=f'metamist-etl-{f_name}', + f"metamist-etl-{f_name}", + name=f"metamist-etl-{f_name}", build_config=gcp.cloudfunctionsv2.FunctionBuildConfigArgs( - runtime='python311', - entry_point=f'etl_{f_name}', + runtime="python311", + entry_point=f"etl_{f_name}", environment_variables={}, # this one is set on an output, so specifying it keeps the function # from being updated, or appearing to update - docker_repository=f'projects/{self.config.metamist.gcp.project}/locations/australia-southeast1/repositories/gcf-artifacts', + docker_repository=f"projects/{self.config.metamist.gcp.project}/locations/australia-southeast1/repositories/gcf-artifacts", source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs( storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs( bucket=self.source_bucket.name, @@ -772,11 +788,11 @@ def _etl_function( service_config=gcp.cloudfunctionsv2.FunctionServiceConfigArgs( max_instance_count=1, # Keep max instances to 1 to avoid racing conditions min_instance_count=0, - available_memory='2Gi', - available_cpu='1', + available_memory="2Gi", + available_cpu="1", timeout_seconds=540, environment_variables=self._etl_get_env(), - ingress_settings='ALLOW_ALL', + ingress_settings="ALLOW_ALL", all_traffic_on_latest_revision=True, service_account_email=sa.email, ), @@ -813,21 +829,21 @@ def _etl_function( def _setup_metamist_etl_accessors(self): for name, sa in self.etl_accessors.items(): gcp.cloudfunctionsv2.FunctionIamMember( - f'metamist-etl-accessor-{name}', + f"metamist-etl-accessor-{name}", location=self.etl_extract_function.location, project=self.etl_extract_function.project, cloud_function=self.etl_extract_function.name, - role='roles/cloudfunctions.invoker', - member=pulumi.Output.concat('serviceAccount:', sa.email), + role="roles/cloudfunctions.invoker", + member=pulumi.Output.concat("serviceAccount:", sa.email), ) gcp.cloudrun.IamMember( - f'metamist-etl-run-accessor-{name}', + f"metamist-etl-run-accessor-{name}", location=self.etl_extract_function.location, project=self.etl_extract_function.project, service=self.etl_extract_function.name, # it shared the name - role='roles/run.invoker', - member=pulumi.Output.concat('serviceAccount:', sa.email), + role="roles/run.invoker", + member=pulumi.Output.concat("serviceAccount:", sa.email), ) @cached_property @@ -853,11 +869,11 @@ def etl_slack_notification(self): notification = SlackNotification( slack_config=slack_config, - topic_name='metamist-etl-notification', + topic_name="metamist-etl-notification", func_to_monitor=[ - 'metamist-etl-notification-func', - 'metamist-etl-extract', - 'metamist-etl-load', + "metamist-etl-notification-func", + "metamist-etl-extract", + "metamist-etl-load", ], notification_type=SlackNotificationType.NOTIFICATION, depends_on=[ diff --git a/metamist_infrastructure/etl_config.py b/metamist_infrastructure/etl_config.py new file mode 100644 index 000000000..64b6957de --- /dev/null +++ b/metamist_infrastructure/etl_config.py @@ -0,0 +1,23 @@ +import dataclasses +from typing import Any + +from cpg_infra.config.deserializabledataclass import DeserializableDataclass + + +@dataclasses.dataclass(frozen=True) +class EtlConfig(DeserializableDataclass): + @dataclasses.dataclass(frozen=True) + class EtlConfigType(DeserializableDataclass): + parser_name: str + default_parameters: dict[str, Any] | None + users: list[str] + + by_type: dict[str, "EtlConfigType"] + + def to_dict(self) -> dict[str, dict[str, Any]]: + return {k: dataclasses.asdict(v) for k, v in self.by_type.items()} + + @classmethod + def from_dict(cls, data: dict[str, dict[str, Any]]) -> "EtlConfig": + by_type = {k: cls.EtlConfigType(**v) for k, v in data.items()} + return cls(by_type=by_type) From d293e40964d12a9b2464fe7e1a397f20b36ccad0 Mon Sep 17 00:00:00 2001 From: Michael Franklin Date: Mon, 26 Aug 2024 13:43:54 +1000 Subject: [PATCH 2/5] Apply pre-commit after rebase --- metamist_infrastructure/driver.py | 246 +++++++++++++------------- metamist_infrastructure/etl_config.py | 14 +- 2 files changed, 136 insertions(+), 124 deletions(-) diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index 63665ab92..c695a3826 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -22,10 +22,10 @@ ) # this gets moved around during the pip install -ETL_FOLDER = Path(__file__).parent / "etl" +ETL_FOLDER = Path(__file__).parent / 'etl' # ETL_FOLDER = Path(__file__).parent.parent / 'etl' -PATH_TO_ETL_BQ_SCHEMA = ETL_FOLDER / "bq_schema.json" -PATH_TO_ETL_BQ_LOG_SCHEMA = ETL_FOLDER / "bq_log_schema.json" +PATH_TO_ETL_BQ_SCHEMA = ETL_FOLDER / 'bq_schema.json' +PATH_TO_ETL_BQ_LOG_SCHEMA = ETL_FOLDER / 'bq_log_schema.json' def append_private_repositories_to_requirements( @@ -35,13 +35,13 @@ def append_private_repositories_to_requirements( Append private repositories to requirements.txt """ - with open(filename, encoding="utf-8") as file: + with open(filename, encoding='utf-8') as file: file_content = file.read() if private_repo_url and private_repos: file_content = ( file_content # original content - + f"\n--extra-index-url {private_repo_url}\n" # private repo url - + "\n".join(private_repos) # private repositories + + f'\n--extra-index-url {private_repo_url}\n' # private repo url + + '\n'.join(private_repos) # private repositories ) return pulumi.StringAsset(file_content) @@ -58,7 +58,9 @@ def main(self): # to be deployed here, but for now it's manually deployed def on_group_finalisation(self): - """ """ + """ + This method is called once all groups have been finalised + """ # do this later on self._setup_etl() @@ -66,8 +68,8 @@ def on_group_finalisation(self): def _svc_cloudresourcemanager(self): assert self.config.metamist return gcp.projects.Service( - "metamist-cloudresourcemanager-service", - service="cloudresourcemanager.googleapis.com", + 'metamist-cloudresourcemanager-service', + service='cloudresourcemanager.googleapis.com', disable_on_destroy=False, project=self.config.metamist.gcp.project, ) @@ -76,8 +78,8 @@ def _svc_cloudresourcemanager(self): def _svc_iam(self): assert self.config.metamist return gcp.projects.Service( - "metamist-iam-service", - service="iam.googleapis.com", + 'metamist-iam-service', + service='iam.googleapis.com', disable_on_destroy=False, project=self.config.metamist.gcp.project, opts=pulumi.resource.ResourceOptions( @@ -89,8 +91,8 @@ def _svc_iam(self): def _svc_functions(self): assert self.config.metamist return gcp.projects.Service( - "metamist-cloudfunctions-service", - service="cloudfunctions.googleapis.com", + 'metamist-cloudfunctions-service', + service='cloudfunctions.googleapis.com', project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -99,8 +101,8 @@ def _svc_functions(self): def _svc_pubsub(self): assert self.config.metamist return gcp.projects.Service( - "metamist-pubsub-service", - service="pubsub.googleapis.com", + 'metamist-pubsub-service', + service='pubsub.googleapis.com', project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -109,8 +111,8 @@ def _svc_pubsub(self): def _svc_scheduler(self): assert self.config.metamist return gcp.projects.Service( - "metamist-cloudscheduler-service", - service="cloudscheduler.googleapis.com", + 'metamist-cloudscheduler-service', + service='cloudscheduler.googleapis.com', project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -119,8 +121,8 @@ def _svc_scheduler(self): def _svc_build(self): assert self.config.metamist return gcp.projects.Service( - "metamist-cloudbuild-service", - service="cloudbuild.googleapis.com", + 'metamist-cloudbuild-service', + service='cloudbuild.googleapis.com', project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -129,8 +131,8 @@ def _svc_build(self): def _svc_bigquery(self): assert self.config.metamist return gcp.projects.Service( - "metamist-bigquery-service", - service="bigquery.googleapis.com", + 'metamist-bigquery-service', + service='bigquery.googleapis.com', project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @@ -139,8 +141,8 @@ def _svc_bigquery(self): def _svc_secretmanager(self): assert self.config.metamist return gcp.projects.Service( - "metamist-secretmanager-service", - service="secretmanager.googleapis.com", + 'metamist-secretmanager-service', + service='secretmanager.googleapis.com', disable_on_destroy=False, opts=pulumi.resource.ResourceOptions( depends_on=[self._svc_cloudresourcemanager] @@ -157,8 +159,8 @@ def source_bucket(self): assert self.config.gcp assert self.config.metamist return gcp.storage.Bucket( - "metamist-source-bucket", - name=f"{self.config.gcp.dataset_storage_prefix}metamist-source-bucket", + 'metamist-source-bucket', + name=f'{self.config.gcp.dataset_storage_prefix}metamist-source-bucket', location=self.config.gcp.region, project=self.config.metamist.gcp.project, uniform_bucket_level_access=True, @@ -170,8 +172,8 @@ def _etl_function_account(self, f_name: str): """ assert self.config.metamist return gcp.serviceaccount.Account( - f"metamist-etl-{f_name}service-account", - account_id=f"metamist-etl-{f_name}sa", + f'metamist-etl-{f_name}service-account', + account_id=f'metamist-etl-{f_name}sa', project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( depends_on=[self._svc_iam], @@ -181,17 +183,17 @@ def _etl_function_account(self, f_name: str): @cached_property def etl_service_account(self): """Service account to run notification functionality and other services""" - return self._etl_function_account("") + return self._etl_function_account('') @cached_property def etl_load_service_account(self): """Service account to run load/transform functionality""" - return self._etl_function_account("load-") + return self._etl_function_account('load-') @cached_property def etl_extract_service_account(self): """Service account to run extract functionality""" - return self._etl_function_account("extract-") + return self._etl_function_account('extract-') @cached_property def etl_accessors(self) -> dict[str, gcp.serviceaccount.Account]: @@ -201,8 +203,8 @@ def etl_accessors(self) -> dict[str, gcp.serviceaccount.Account]: assert self.config.metamist.etl.accessors return { name: gcp.serviceaccount.Account( - f"metamist-etl-accessor-{name}", - account_id=f"metamist-etl-{name}", + f'metamist-etl-accessor-{name}', + account_id=f'metamist-etl-{name}', project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( depends_on=[self._svc_iam], @@ -221,8 +223,8 @@ def etl_configuration_secret(self): assert self.config.metamist return gcp.secretmanager.Secret( - "metamist-etl-accessor-configuration-secret", - secret_id="accessor-configuration", + 'metamist-etl-accessor-configuration-secret', + secret_id='accessor-configuration', replication=gcp.secretmanager.SecretReplicationArgs( user_managed=gcp.secretmanager.SecretReplicationUserManagedArgs( replicas=[ @@ -273,7 +275,7 @@ def map_accessors_to_new_body(_accessors_map): map_accessors_to_new_body ) return gcp.secretmanager.SecretVersion( - "metamist-etl-accessor-configuration", + 'metamist-etl-accessor-configuration', secret=self.etl_configuration_secret.id, secret_data=remapped_with_id, ) @@ -283,12 +285,12 @@ def _setup_etl_configuration_secret_value(self): assert self.config.metamist gcp.secretmanager.SecretIamMember( - "metamist-etl-accessor-configuration-access", + 'metamist-etl-accessor-configuration-access', project=self.config.metamist.gcp.project, secret_id=self.etl_configuration_secret.id, - role="roles/secretmanager.secretAccessor", + role='roles/secretmanager.secretAccessor', member=pulumi.Output.concat( - "serviceAccount:", self.etl_load_service_account.email + 'serviceAccount:', self.etl_load_service_account.email ), ) @@ -299,7 +301,7 @@ def etl_pubsub_topic(self): """ assert self.config.metamist return gcp.pubsub.Topic( - "metamist-etl-topic", + 'metamist-etl-topic', project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions(depends_on=[self._svc_pubsub]), ) @@ -311,18 +313,18 @@ def etl_pubsub_dead_letters_topic(self): """ assert self.config.metamist topic = gcp.pubsub.Topic( - "metamist-etl-dead-letters-topic", + 'metamist-etl-dead-letters-topic', project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions(depends_on=[self._svc_pubsub]), ) # give publisher permission to service account gcp.pubsub.TopicIAMPolicy( - "metamist-etl-dead-letters-topic-iam-policy", + 'metamist-etl-dead-letters-topic-iam-policy', project=self.config.metamist.gcp.project, topic=topic.name, policy_data=self.prepare_service_account_policy_data( - "roles/pubsub.publisher" + 'roles/pubsub.publisher' ), ) @@ -337,14 +339,14 @@ def etl_pubsub_push_subscription(self): assert self.config.metamist subscription = gcp.pubsub.Subscription( - "metamist-etl-subscription", + 'metamist-etl-subscription', topic=self.etl_pubsub_topic.name, ack_deadline_seconds=30, expiration_policy=gcp.pubsub.SubscriptionExpirationPolicyArgs( - ttl="", # never expire + ttl='', # never expire ), retry_policy=gcp.pubsub.SubscriptionRetryPolicyArgs( - minimum_backoff="10s", # 10 seconds backoff + minimum_backoff='10s', # 10 seconds backoff ), dead_letter_policy=gcp.pubsub.SubscriptionDeadLetterPolicyArgs( dead_letter_topic=self.etl_pubsub_dead_letters_topic.id, @@ -356,7 +358,7 @@ def etl_pubsub_push_subscription(self): service_account_email=self.etl_extract_service_account.email, ), attributes={ - "x-goog-version": "v1", + 'x-goog-version': 'v1', }, ), project=self.config.metamist.gcp.project, @@ -370,11 +372,11 @@ def etl_pubsub_push_subscription(self): # give subscriber permission to service account gcp.pubsub.SubscriptionIAMPolicy( - "metamist-etl-pubsub-topic-subscription-policy", + 'metamist-etl-pubsub-topic-subscription-policy', project=self.config.metamist.gcp.project, subscription=subscription.name, policy_data=self.prepare_service_account_policy_data( - "roles/pubsub.subscriber" + 'roles/pubsub.subscriber' ), ) @@ -388,7 +390,7 @@ def etl_pubsub_dead_letter_subscription(self): assert self.config.metamist return gcp.pubsub.Subscription( - "metamist-etl-dead-letter-subscription", + 'metamist-etl-dead-letter-subscription', topic=self.etl_pubsub_dead_letters_topic.name, project=self.config.metamist.gcp.project, ack_deadline_seconds=20, @@ -402,14 +404,14 @@ def etl_bigquery_dataset(self): assert self.config.gcp assert self.config.metamist return gcp.bigquery.Dataset( - "metamist-etl-bigquery-dataset", - dataset_id="metamist", - friendly_name="metamist bigquery dataset", - description="Metamist related bigquery tables", + 'metamist-etl-bigquery-dataset', + dataset_id='metamist', + friendly_name='metamist bigquery dataset', + description='Metamist related bigquery tables', location=self.config.gcp.region, # default_table_expiration_ms=3600000, labels={ - "project": "metamist", + 'project': 'metamist', }, project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( @@ -425,10 +427,10 @@ def _setup_bq_table(self, schema_file_name: Path, table_id: str, name_suffix: st schema = f.read() etl_table = gcp.bigquery.Table( - f"metamist-etl-bigquery-table{name_suffix}", + f'metamist-etl-bigquery-table{name_suffix}', table_id=table_id, dataset_id=self.etl_bigquery_dataset.dataset_id, - labels={"project": "metamist"}, + labels={'project': 'metamist'}, schema=schema, project=self.config.metamist.gcp.project, # docs say: Note: On newer versions of the provider, @@ -443,14 +445,14 @@ def etl_bigquery_table(self): Bigquery table to contain the etl data, for compatibility with the old etl, we do not suffix table name """ - return self._setup_bq_table(PATH_TO_ETL_BQ_SCHEMA, "etl-data", "") + return self._setup_bq_table(PATH_TO_ETL_BQ_SCHEMA, 'etl-data', '') @cached_property def etl_bigquery_log_table(self): """ Bigquery table to contain the etl logs, append '-logs' as resource name """ - return self._setup_bq_table(PATH_TO_ETL_BQ_LOG_SCHEMA, "etl-logs", "-logs") + return self._setup_bq_table(PATH_TO_ETL_BQ_LOG_SCHEMA, 'etl-logs', '-logs') def prepare_service_account_policy_data(self, role): """ @@ -469,9 +471,9 @@ def prepare_service_account_policy_data(self, role): role=role, members=[ pulumi.Output.concat( - "serviceAccount:service-", + 'serviceAccount:service-', project.number, - "@gcp-sa-pubsub.iam.gserviceaccount.com", + '@gcp-sa-pubsub.iam.gserviceaccount.com', ) # type: ignore ], ) @@ -486,54 +488,54 @@ def _setup_etl(self): # give the etl_load/extract service_accounts ability to read/write to bq table gcp.bigquery.DatasetAccess( - "metamist-etl-bq-dataset-extract-service-access", + 'metamist-etl-bq-dataset-extract-service-access', project=self.config.metamist.gcp.project, dataset_id=self.etl_bigquery_dataset.dataset_id, - role="WRITER", + role='WRITER', user_by_email=self.etl_extract_service_account.email, ) gcp.bigquery.DatasetAccess( - "metamist-etl-bq-dataset-load-service-access", + 'metamist-etl-bq-dataset-load-service-access', project=self.config.metamist.gcp.project, dataset_id=self.etl_bigquery_dataset.dataset_id, - role="WRITER", + role='WRITER', user_by_email=self.etl_load_service_account.email, ) # give the etl_load_service_account ability to execute bigquery jobs gcp.projects.IAMMember( - "metamist-etl-bq-job-user-role", + 'metamist-etl-bq-job-user-role', project=self.config.metamist.gcp.project, - role="roles/bigquery.jobUser", + role='roles/bigquery.jobUser', member=pulumi.Output.concat( - "serviceAccount:", self.etl_load_service_account.email + 'serviceAccount:', self.etl_load_service_account.email ), ) # give the etl_extract_service_account ability to push to pub/sub gcp.projects.IAMMember( - "metamist-etl-extract-editor-role", + 'metamist-etl-extract-editor-role', project=self.config.metamist.gcp.project, - role="roles/editor", + role='roles/editor', member=pulumi.Output.concat( - "serviceAccount:", self.etl_extract_service_account.email + 'serviceAccount:', self.etl_extract_service_account.email ), ) # give the etl_load_service_account ability to push to pub/sub gcp.projects.IAMMember( - "metamist-etl-load-editor-role", + 'metamist-etl-load-editor-role', project=self.config.metamist.gcp.project, - role="roles/editor", + role='roles/editor', member=pulumi.Output.concat( - "serviceAccount:", self.etl_load_service_account.email + 'serviceAccount:', self.etl_load_service_account.email ), ) # give the etl_load_service_account ability # to access accessor-configuration in secretmanager gcp.projects.IAMMember( - "metamist-etl-load-secret-accessor-role", + 'metamist-etl-load-secret-accessor-role', project=self.config.metamist.gcp.project, - role="roles/secretmanager.secretAccessor", + role='roles/secretmanager.secretAccessor', member=pulumi.Output.concat( - "serviceAccount:", self.etl_load_service_account.email + 'serviceAccount:', self.etl_load_service_account.email ), ) @@ -542,14 +544,14 @@ def _setup_etl(self): # if not present functions could not be deployed project = gcp.organizations.get_project() robot_account = pulumi.Output.concat( - "serviceAccount:service-", + 'serviceAccount:service-', project.number, - "@serverless-robot-prod.iam.gserviceaccount.com", + '@serverless-robot-prod.iam.gserviceaccount.com', ) gcp.projects.IAMMember( - "metamist-etl-robot-service-agent-role", + 'metamist-etl-robot-service-agent-role', project=self.config.metamist.gcp.project, - role="roles/run.serviceAgent", + role='roles/run.serviceAgent', member=robot_account, ) @@ -583,7 +585,7 @@ def _setup_etl_pubsub(self): def etl_extract_function(self): """etl_extract_function""" return self._etl_function( - "extract", + 'extract', self.etl_extract_service_account, ) @@ -595,7 +597,7 @@ def etl_load_function(self): we would need to wrapp it around with apply funciton as private repo url is Pulumi Output """ return self._private_repo_url().apply( - lambda url: self._etl_function("load", self.etl_load_service_account, url) + lambda url: self._etl_function('load', self.etl_load_service_account, url) ) def _private_repo_url(self): @@ -631,7 +633,7 @@ def _private_repo_url(self): self.infrastructure.gcp_python_registry.project, self.infrastructure.gcp_python_registry.name, ).apply( - lambda args: f"https://{args[0]}-python.pkg.dev/{args[1]}/{args[2]}/simple/" + lambda args: f'https://{args[0]}-python.pkg.dev/{args[1]}/{args[2]}/simple/' ) def _etl_external_function( @@ -645,12 +647,12 @@ def _etl_external_function( Create External Function with custom audiences """ return gcp.cloudrunv2.Service( - f"metamist-etl-{f_name}-external", - name=f"metamist-etl-{f_name}-external", + f'metamist-etl-{f_name}-external', + name=f'metamist-etl-{f_name}-external', project=self.config.metamist.gcp.project, location=self.config.gcp.region, custom_audiences=custom_audiences, - ingress="INGRESS_TRAFFIC_ALL", + ingress='INGRESS_TRAFFIC_ALL', template=gcp.cloudrunv2.ServiceTemplateArgs( containers=[ gcp.cloudrunv2.ServiceTemplateContainerArgs( @@ -659,8 +661,8 @@ def _etl_external_function( cpu_idle=True, startup_cpu_boost=True, limits={ - "cpu": "1", - "memory": "2Gi", + 'cpu': '1', + 'memory': '2Gi', }, ), envs=[ @@ -676,7 +678,7 @@ def _etl_external_function( max_instance_count=1, min_instance_count=0, ), - timeout="540s", + timeout='540s', service_account=sa.email, max_instance_request_concurrency=1, ), @@ -687,28 +689,28 @@ def _etl_get_env(self) -> dict: Commnon environment to all the etl functions and services """ return { - "BIGQUERY_TABLE": pulumi.Output.concat( + 'BIGQUERY_TABLE': pulumi.Output.concat( self.etl_bigquery_table.project, - ".", + '.', self.etl_bigquery_table.dataset_id, - ".", + '.', self.etl_bigquery_table.table_id, ), - "BIGQUERY_LOG_TABLE": pulumi.Output.concat( + 'BIGQUERY_LOG_TABLE': pulumi.Output.concat( self.etl_bigquery_log_table.project, - ".", + '.', self.etl_bigquery_log_table.dataset_id, - ".", + '.', self.etl_bigquery_log_table.table_id, ), - "PUBSUB_TOPIC": self.etl_pubsub_topic.id, - "NOTIFICATION_PUBSUB_TOPIC": ( + 'PUBSUB_TOPIC': self.etl_pubsub_topic.id, + 'NOTIFICATION_PUBSUB_TOPIC': ( self.etl_slack_notification_topic.id if self.etl_slack_notification_topic - else "" + else '' ), - "SM_ENVIRONMENT": self.config.metamist.etl.environment, - "CONFIGURATION_SECRET": self.etl_configuration_secret_version.id, + 'SM_ENVIRONMENT': self.config.metamist.etl.environment, + 'CONFIGURATION_SECRET': self.etl_configuration_secret_version.id, } def _etl_function( @@ -732,8 +734,8 @@ def _etl_function( # include private repos and metamist package # metamist package is only temprary ones to avoid circular dependencies extra_assets = { - "requirements.txt": append_private_repositories_to_requirements( - filename=f"{str(path_to_func_folder.absolute())}/requirements.txt", + 'requirements.txt': append_private_repositories_to_requirements( + filename=f'{str(path_to_func_folder.absolute())}/requirements.txt', private_repo_url=private_repo_url, private_repos=self.config.metamist.etl.private_repo_packages, ), @@ -748,14 +750,14 @@ def _etl_function( # Create the single Cloud Storage object, # which contains the source code source_archive_object = gcp.storage.BucketObject( - f"metamist-etl-{f_name}-source-code", + f'metamist-etl-{f_name}-source-code', # updating the source archive object does not trigger the cloud # function to actually updating the source because # it's based on the name, # allow Pulumi to create a new name each time it gets updated bucket=self.source_bucket.name, source=archive, - opts=pulumi.ResourceOptions(replace_on_changes=["*"]), + opts=pulumi.ResourceOptions(replace_on_changes=['*']), ) # prepare custom audience_list @@ -769,15 +771,15 @@ def _etl_function( ) fxn = gcp.cloudfunctionsv2.Function( - f"metamist-etl-{f_name}", - name=f"metamist-etl-{f_name}", + f'metamist-etl-{f_name}', + name=f'metamist-etl-{f_name}', build_config=gcp.cloudfunctionsv2.FunctionBuildConfigArgs( - runtime="python311", - entry_point=f"etl_{f_name}", + runtime='python311', + entry_point=f'etl_{f_name}', environment_variables={}, # this one is set on an output, so specifying it keeps the function # from being updated, or appearing to update - docker_repository=f"projects/{self.config.metamist.gcp.project}/locations/australia-southeast1/repositories/gcf-artifacts", + docker_repository=f'projects/{self.config.metamist.gcp.project}/locations/australia-southeast1/repositories/gcf-artifacts', source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs( storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs( bucket=self.source_bucket.name, @@ -788,11 +790,11 @@ def _etl_function( service_config=gcp.cloudfunctionsv2.FunctionServiceConfigArgs( max_instance_count=1, # Keep max instances to 1 to avoid racing conditions min_instance_count=0, - available_memory="2Gi", - available_cpu="1", + available_memory='2Gi', + available_cpu='1', timeout_seconds=540, environment_variables=self._etl_get_env(), - ingress_settings="ALLOW_ALL", + ingress_settings='ALLOW_ALL', all_traffic_on_latest_revision=True, service_account_email=sa.email, ), @@ -829,21 +831,21 @@ def _etl_function( def _setup_metamist_etl_accessors(self): for name, sa in self.etl_accessors.items(): gcp.cloudfunctionsv2.FunctionIamMember( - f"metamist-etl-accessor-{name}", + f'metamist-etl-accessor-{name}', location=self.etl_extract_function.location, project=self.etl_extract_function.project, cloud_function=self.etl_extract_function.name, - role="roles/cloudfunctions.invoker", - member=pulumi.Output.concat("serviceAccount:", sa.email), + role='roles/cloudfunctions.invoker', + member=pulumi.Output.concat('serviceAccount:', sa.email), ) gcp.cloudrun.IamMember( - f"metamist-etl-run-accessor-{name}", + f'metamist-etl-run-accessor-{name}', location=self.etl_extract_function.location, project=self.etl_extract_function.project, service=self.etl_extract_function.name, # it shared the name - role="roles/run.invoker", - member=pulumi.Output.concat("serviceAccount:", sa.email), + role='roles/run.invoker', + member=pulumi.Output.concat('serviceAccount:', sa.email), ) @cached_property @@ -869,11 +871,11 @@ def etl_slack_notification(self): notification = SlackNotification( slack_config=slack_config, - topic_name="metamist-etl-notification", + topic_name='metamist-etl-notification', func_to_monitor=[ - "metamist-etl-notification-func", - "metamist-etl-extract", - "metamist-etl-load", + 'metamist-etl-notification-func', + 'metamist-etl-extract', + 'metamist-etl-load', ], notification_type=SlackNotificationType.NOTIFICATION, depends_on=[ diff --git a/metamist_infrastructure/etl_config.py b/metamist_infrastructure/etl_config.py index 64b6957de..d7e800e65 100644 --- a/metamist_infrastructure/etl_config.py +++ b/metamist_infrastructure/etl_config.py @@ -6,18 +6,28 @@ @dataclasses.dataclass(frozen=True) class EtlConfig(DeserializableDataclass): + """ETL Config output type""" + @dataclasses.dataclass(frozen=True) class EtlConfigType(DeserializableDataclass): + """ETL config type""" + parser_name: str default_parameters: dict[str, Any] | None users: list[str] - by_type: dict[str, "EtlConfigType"] + by_type: dict[str, 'EtlConfigType'] def to_dict(self) -> dict[str, dict[str, Any]]: + """ + Convert the config to a dictionary + """ return {k: dataclasses.asdict(v) for k, v in self.by_type.items()} @classmethod - def from_dict(cls, data: dict[str, dict[str, Any]]) -> "EtlConfig": + def from_dict(cls, data: dict[str, dict[str, Any]]) -> 'EtlConfig': + """ + Create an EtlConfig from a dictionary + """ by_type = {k: cls.EtlConfigType(**v) for k, v in data.items()} return cls(by_type=by_type) From 06e1d6536ec38afa93074c6c6fb5747bd0cb5b40 Mon Sep 17 00:00:00 2001 From: Michael Franklin Date: Mon, 26 Aug 2024 14:22:58 +1000 Subject: [PATCH 3/5] Docstring --- metamist_infrastructure/driver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index c695a3826..0323b75d8 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -61,7 +61,7 @@ def on_group_finalisation(self): """ This method is called once all groups have been finalised """ - # do this later on + # we set-up the etl here, as we need to know the etl accessors self._setup_etl() @cached_property From a57a4e9a59f60dbe210a3a7080bdf818e079d32d Mon Sep 17 00:00:00 2001 From: Michael Franklin Date: Wed, 28 Aug 2024 13:54:06 +1000 Subject: [PATCH 4/5] Add analysis group to etl accessors --- etl/load/main.py | 38 ++++++---------- metamist_infrastructure/driver.py | 74 ++++++++++++++++++++++++------- 2 files changed, 72 insertions(+), 40 deletions(-) diff --git a/etl/load/main.py b/etl/load/main.py index b3806957c..08e2f1c9b 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -352,17 +352,10 @@ def get_parser_instance( # check that submitting_user has access to parser accessor_config: dict[ - str, + Literal['by_type'], dict[ - Literal['parsers'], - list[ - dict[ - Literal['name'] - | Literal['parser_name'] - | Literal['default_parameters'], - Any, - ] - ], + str, + dict[Literal['parser_name', 'default_parameters', 'users'], Any], ], ] = get_accessor_config() @@ -371,26 +364,21 @@ def get_parser_instance( f'Submitting user {submitting_user} is not allowed to access any parsers' ) - # find the config - etl_accessor_config = next( - ( - accessor_config - for accessor_config in accessor_config[submitting_user].get('parsers', []) - if accessor_config['name'].strip(STRIP_CHARS) - == request_type.strip(STRIP_CHARS) - ), - None, - ) - if not etl_accessor_config: + parser_config = accessor_config['by_type'].get(request_type) + if not parser_config: + return None, ( + f'Submitting user {submitting_user} requested type {request_type}, ' + 'but was not available' + ) + + if submitting_user not in parser_config.get('users', []): return None, ( f'Submitting user {submitting_user} is not allowed to access {request_type}' ) - parser_name = (etl_accessor_config.get('parser_name') or request_type).strip( - STRIP_CHARS - ) + parser_name = (parser_config.get('parser_name') or request_type).strip(STRIP_CHARS) - init_params.update(etl_accessor_config.get('default_parameters', {})) + init_params.update(parser_config.get('default_parameters', {})) parser_map = prepare_parser_map() diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index 0323b75d8..448afb6b8 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -11,6 +11,7 @@ import pulumi import pulumi_gcp as gcp +from cpg_infra.abstraction.gcp import get_member_key from cpg_infra.plugin import CpgInfrastructurePlugin from cpg_infra.utils import archive_folder @@ -245,33 +246,58 @@ def etl_configuration_secret_version(self): assert self.config.metamist.etl assert self.config.metamist.etl.accessors - etl_accessor_config = { - k: v.to_dict() for k, v in self.config.metamist.etl.accessors.items() - } - - def map_accessors_to_new_body(_accessors_map): + def map_accessors_to_new_body(accessors_by_type: dict[str, list[str]]) -> str: assert self.config.metamist assert self.config.metamist.etl - accessors: dict[str, str] = dict(_accessors_map) - # dict[gcp.serviceaccount.Account: dict[str, ]] - remapped = {accessors[k]: v for k, v in etl_accessor_config.items()} - EtlConfig( + config = EtlConfig( by_type={ t: EtlConfig.EtlConfigType( parser_name=config.parser_name, - users=[accessors[u] for u in config.accessors], + users=accessors_by_type.get(t, []), default_parameters=config.default_parameters, ) for t, config in self.config.metamist.etl.by_type.items() } ) - return json.dumps(remapped) + retval = json.dumps(config.to_dict()) + pulumi.warn(retval) + return retval + # do this in a few hits as pretty sure pulumi.Output collects don't chain etl_accessors_emails: dict[str, pulumi.Output[str]] = { k: v.email for k, v in self.etl_accessors.items() } - remapped_with_id = pulumi.Output.all(**etl_accessors_emails).apply( + datasets_to_lookup_analysis_members_for = set( + t.analysis_group_dataset + for t in self.config.metamist.etl.by_type.values() + if t.analysis_group_dataset + ) + members_for_datasets: dict[str, pulumi.Output[str]] = { + dataset: self.infrastructure.group_provider.resolve_group_members( + self.infrastructure.dataset_infrastructures[dataset] + .clouds['gcp'] + .analysis_group + ) + for dataset in datasets_to_lookup_analysis_members_for + } + + pulumi_outputs_by_etl_type: dict[str, pulumi.Output[list[str]]] = {} + + for by_type, by_type_config in self.config.metamist.etl.by_type.items(): + members: list[pulumi.Output[str]] = [] + + for etl_member in by_type_config.accessors: + members.append(etl_accessors_emails[etl_member]) + + if by_type_config.analysis_group_dataset: + members.extend( + members_for_datasets[by_type_config.analysis_group_dataset] + ) + # turns list[Pulumi.Output[str]] into Pulumi.Output[list[str]] + pulumi_outputs_by_etl_type[by_type] = pulumi.Output.all(*members) + + remapped_with_id = pulumi.Output.all(**pulumi_outputs_by_etl_type).apply( map_accessors_to_new_body ) return gcp.secretmanager.SecretVersion( @@ -829,14 +855,32 @@ def _etl_function( return fxn def _setup_metamist_etl_accessors(self): - for name, sa in self.etl_accessors.items(): + assert self.config.metamist + assert self.config.metamist.etl + + accessors = dict(self.etl_accessors) + # grab datasets + datasets_to_lookup_analysis_members_for = set( + t.analysis_group_dataset + for t in self.config.metamist.etl.by_type.values() + if t.analysis_group_dataset + ) + + for dataset in datasets_to_lookup_analysis_members_for: + accessors[f'{dataset}-analysis-group'] = ( + self.infrastructure.dataset_infrastructures[dataset] + .clouds['gcp'] + .analysis_group + ) + + for name, account in accessors: gcp.cloudfunctionsv2.FunctionIamMember( f'metamist-etl-accessor-{name}', location=self.etl_extract_function.location, project=self.etl_extract_function.project, cloud_function=self.etl_extract_function.name, role='roles/cloudfunctions.invoker', - member=pulumi.Output.concat('serviceAccount:', sa.email), + member=get_member_key(account), ) gcp.cloudrun.IamMember( @@ -845,7 +889,7 @@ def _setup_metamist_etl_accessors(self): project=self.etl_extract_function.project, service=self.etl_extract_function.name, # it shared the name role='roles/run.invoker', - member=pulumi.Output.concat('serviceAccount:', sa.email), + member=get_member_key(account), ) @cached_property From 76ad901303201d2181369de6198f1edc4c76f32a Mon Sep 17 00:00:00 2001 From: Michael Franklin Date: Wed, 28 Aug 2024 13:59:53 +1000 Subject: [PATCH 5/5] No red lines usually means it's fine --- metamist_infrastructure/driver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index 448afb6b8..5b8d9ded8 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -873,7 +873,7 @@ def _setup_metamist_etl_accessors(self): .analysis_group ) - for name, account in accessors: + for name, account in accessors.items(): gcp.cloudfunctionsv2.FunctionIamMember( f'metamist-etl-accessor-{name}', location=self.etl_extract_function.location,