-
Notifications
You must be signed in to change notification settings - Fork 4
/
config.py
97 lines (69 loc) · 3.05 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from dataclasses import field
from typing import List, Optional, Set
from pydantic import model_validator
from pydantic.dataclasses import dataclass
from metaphor.common.base_config import BaseConfig
from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.filter import DatasetFilter
from metaphor.common.sql.process_query.config import ProcessQueryConfig
from metaphor.common.tag_matcher import TagMatcher
from metaphor.common.utils import must_set_exactly_one
# logs "list_entries" page size, max 1000.
# See https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list
DEFAULT_QUERY_LOG_FETCH_SIZE = 1000
@dataclass(config=ConnectorConfig)
class BigQueryCredentials:
"""Credentials used to connect to BigQuery"""
# Project ID to use
project_id: str
# Private key ID
private_key_id: str
# Private key value
private_key: str
# Client email contained in the key file
client_email: str
# Client ID contained in the key file
client_id: str
type: str = "service_account"
auth_uri: str = "https://accounts.google.com/o/oauth2/auth"
token_uri: str = "https://oauth2.googleapis.com/token"
@dataclass(config=ConnectorConfig)
class BigQueryQueryLogConfig:
# Number of days back of query logs to fetch, if 0, don't fetch query logs
lookback_days: int = 1
# Query log filter to exclude certain usernames
excluded_usernames: Set[str] = field(default_factory=lambda: set())
# Exclude queries issued by service accounts
exclude_service_accounts: bool = False
# The number of query logs to fetch from BigQuery in one batch
fetch_size: int = DEFAULT_QUERY_LOG_FETCH_SIZE
# Fetch the full query SQL from job API if it's truncated in the audit metadata log
fetch_job_query_if_truncated: bool = True
# Config to control query processing
process_query: ProcessQueryConfig = field(
default_factory=lambda: ProcessQueryConfig()
)
@dataclass(config=ConnectorConfig)
class BigQueryRunConfig(BaseConfig):
# List of project IDs to extract metadata from
project_ids: List[str]
# Path to service account's JSON key file
key_path: Optional[str] = None
# The credentials from the BigQuery JSON key file
credentials: Optional[BigQueryCredentials] = None
# Use a different project ID to run BigQuery jobs if set
job_project_id: Optional[str] = None
# Max number of concurrent requests to bigquery or logging API, default is 5
max_concurrency: int = 5
# Include or exclude specific databases/schemas/tables
filter: DatasetFilter = field(default_factory=lambda: DatasetFilter())
# How tags should be assigned to datasets
tag_matchers: List[TagMatcher] = field(default_factory=lambda: [])
# configs for fetching query logs
query_log: BigQueryQueryLogConfig = field(
default_factory=lambda: BigQueryQueryLogConfig()
)
@model_validator(mode="after")
def have_key_path_or_credentials(self) -> "BigQueryRunConfig":
must_set_exactly_one(self.__dict__, ["key_path", "credentials"])
return self