Skip to content

Commit

Permalink
refactor: Fir 33557 remove account v 1 fb 2 0 support in python sdk (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stepansergeevitch authored Jun 26, 2024
1 parent a1288a7 commit 25c9a70
Show file tree
Hide file tree
Showing 73 changed files with 1,222 additions and 2,552 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
with:
firebolt-client-id: ${{ secrets.FIREBOLT_CLIENT_ID_STG_NEW_IDN }}
firebolt-client-secret: ${{ secrets.FIREBOLT_CLIENT_SECRET_STG_NEW_IDN }}
account: ${{ vars.FIREBOLT_ACCOUNT_V2 }}
account: ${{ vars.FIREBOLT_ACCOUNT }}
api-endpoint: "api.staging.firebolt.io"

- name: Run integration tests
Expand Down
98 changes: 13 additions & 85 deletions src/firebolt/async_db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,10 @@
_firebolt_account_info_cache,
_firebolt_system_engine_cache,
)
from firebolt.common.constants import (
DEFAULT_TIMEOUT_SECONDS,
ENGINE_STATUS_RUNNING_LIST,
)
from firebolt.utils.exception import (
ConfigurationError,
ConnectionClosedError,
EngineNotRunningError,
InterfaceError,
)
from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS
from firebolt.utils.exception import ConfigurationError, ConnectionClosedError
from firebolt.utils.usage_tracker import get_user_agent_header
from firebolt.utils.util import (
Timer,
fix_url_schema,
validate_engine_name_and_url_v1,
)
from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1


class Connection(BaseConnection):
Expand Down Expand Up @@ -65,7 +53,6 @@ class Connection(BaseConnection):
"engine_url",
"api_endpoint",
"_is_closed",
"_system_engine_connection",
"client_class",
"cursor_type",
)
Expand All @@ -76,19 +63,18 @@ def __init__(
database: Optional[str],
client: AsyncClient,
cursor_type: Type[Cursor],
system_engine_connection: Optional["Connection"],
api_endpoint: str,
init_parameters: Optional[Dict[str, Any]] = None,
):
super().__init__()
self.api_endpoint = api_endpoint
self.engine_url = engine_url
self.database = database
self.cursor_type = cursor_type
self._cursors: List[Cursor] = []
self._system_engine_connection = system_engine_connection
self._client = client
self.init_parameters = init_parameters
self.init_parameters = init_parameters or {}
if database:
self.init_parameters["database"] = database

def cursor(self, **kwargs: Any) -> Cursor:
if self.closed:
Expand Down Expand Up @@ -120,9 +106,6 @@ async def aclose(self) -> None:
await self._client.aclose()
self._is_closed = True

if self._system_engine_connection:
await self._system_engine_connection.aclose()

async def __aexit__(
self, exc_type: type, exc_val: Exception, exc_tb: TracebackType
) -> None:
Expand Down Expand Up @@ -221,90 +204,36 @@ async def connect_v2(
client = AsyncClientV2(
auth=auth,
account_name=account_name,
base_url=system_engine_url,
api_endpoint=api_endpoint,
timeout=Timeout(DEFAULT_TIMEOUT_SECONDS, read=None),
headers={"User-Agent": user_agent_header},
)
# Don't use context manager since this will be stored
# and used in a resulting connection
system_engine_connection = Connection(

async with Connection(
system_engine_url,
database,
None,
client,
CursorV2,
None,
api_endpoint,
system_engine_params,
)
) as system_engine_connection:

account_version = await system_engine_connection._client._account_version
if account_version == 2:
cursor = system_engine_connection.cursor()
if database:
await cursor.execute(f'USE DATABASE "{database}"')
if engine_name:
await cursor.execute(f'USE ENGINE "{engine_name}"')
# Ensure cursors created from this conection are using the same starting
# Ensure cursors created from this connection are using the same starting
# database and engine
return Connection(
cursor.engine_url,
cursor.database,
client,
client.clone(),
CursorV2,
system_engine_connection,
api_endpoint,
cursor.parameters,
)

if not engine_name:
return system_engine_connection

else:
try:
cursor = system_engine_connection.cursor()
assert isinstance(cursor, CursorV2) # Mypy check
with Timer("[PERFORMANCE] Resolving engine name "):
(
engine_url,
status,
attached_db,
) = await cursor._get_engine_url_status_db(engine_name)

if status not in ENGINE_STATUS_RUNNING_LIST:
raise EngineNotRunningError(engine_name)

if database is not None and database != attached_db:
raise InterfaceError(
f"Engine {engine_name} is attached to {attached_db} "
f"instead of {database}"
)
elif database is None:
database = attached_db

assert engine_url is not None

engine_url = fix_url_schema(engine_url)
client = AsyncClientV2(
auth=auth,
account_name=account_name,
base_url=engine_url,
api_endpoint=api_endpoint,
timeout=Timeout(DEFAULT_TIMEOUT_SECONDS, read=None),
headers={"User-Agent": user_agent_header},
)
return Connection(
engine_url,
database,
client,
CursorV2,
system_engine_connection,
api_endpoint,
)
except: # noqa
await system_engine_connection.aclose()
raise


async def connect_v1(
auth: Auth,
Expand Down Expand Up @@ -357,9 +286,8 @@ async def connect_v1(
client = AsyncClientV1(
auth=auth,
account_name=account_name,
base_url=engine_url,
api_endpoint=api_endpoint,
timeout=Timeout(DEFAULT_TIMEOUT_SECONDS, read=None),
headers={"User-Agent": user_agent_header},
)
return Connection(engine_url, database, client, CursorV1, None, api_endpoint)
return Connection(engine_url, database, client, CursorV1, api_endpoint)
60 changes: 9 additions & 51 deletions src/firebolt/async_db/cursor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import os
import time
from abc import ABCMeta, abstractmethod
from functools import wraps
Expand Down Expand Up @@ -41,11 +42,9 @@
check_not_closed,
check_query_executed,
)
from firebolt.common.constants import ENGINE_STATUS_RUNNING_LIST
from firebolt.utils.exception import (
EngineNotRunningError,
FireboltDatabaseError,
FireboltEngineError,
OperationalError,
ProgrammingError,
)
Expand Down Expand Up @@ -84,8 +83,6 @@ def __init__(
self._client = client
self.connection = connection
self.engine_url = connection.engine_url
if connection.database:
self.database = connection.database
if connection.init_parameters:
self._update_set_parameters(connection.init_parameters)

Expand Down Expand Up @@ -149,7 +146,6 @@ async def _parse_response_headers(self, headers: Headers) -> None:
)
self._update_set_parameters(params)
self.engine_url = endpoint
self._client.base_url = URL(endpoint)

if headers.get(RESET_SESSION_HEADER):
self.flush_parameters()
Expand Down Expand Up @@ -380,17 +376,12 @@ async def _api_request(
self._set_parameters to be ignored.
"""
parameters = parameters or {}
account_version = await self._client._account_version
if use_set_parameters:
parameters = {**(self._set_parameters or {}), **parameters}
if self.parameters:
parameters = {**self.parameters, **parameters}
# Engines v2 will have account context in the URL
if self.connection._is_system and account_version == 1:
assert isinstance(self._client, AsyncClientV2)
parameters["account_id"] = await self._client.account_id
return await self._client.request(
url=f"/{path}" if path else "",
url=os.path.join(self.engine_url, path or ""),
method="POST",
params=parameters,
content=query,
Expand All @@ -404,17 +395,9 @@ async def is_db_available(self, database_name: str) -> bool:
connection (firebolt.async_db.connection.Connection)
database_name (str): Name of a database
"""
system_engine = self.connection._system_engine_connection or self.connection
with system_engine.cursor() as cursor:
return (
await cursor.execute(
"""
SELECT 1 FROM information_schema.databases WHERE database_name=?
""",
[database_name],
)
> 0
)
# For v2 accounts if we're connected it automatically
# means the database is available
return True

async def is_engine_running(self, engine_url: str) -> bool:
"""
Expand All @@ -424,34 +407,9 @@ async def is_engine_running(self, engine_url: str) -> bool:
connection (firebolt.async_db.connection.Connection): connection.
engine_url (str): URL of the engine
"""

if self.connection._is_system:
# System engine is always running
return True

assert self.connection._system_engine_connection is not None # Type check
system_cursor = self.connection._system_engine_connection.cursor()
assert isinstance(system_cursor, CursorV2) # Type check, should always be true
(
_,
status,
_,
) = await system_cursor._get_engine_url_status_db(self.engine_name)
return status in ENGINE_STATUS_RUNNING_LIST

async def _get_engine_url_status_db(self, engine_name: str) -> Tuple[str, str, str]:
await self.execute(
"""
SELECT url, attached_to, status FROM information_schema.engines
WHERE engine_name=?
""",
[engine_name],
)
row = await self.fetchone()
if row is None:
raise FireboltEngineError(f"Engine with name {engine_name} doesn't exist")
engine_url, database, status = row
return str(engine_url), str(status), str(database) # Mypy check
# For v2 accounts we don't have the engine context,
# so we can't check if it's running
return True


class CursorV1(Cursor):
Expand Down Expand Up @@ -493,7 +451,7 @@ async def _api_request(
if self.parameters:
parameters = {**self.parameters, **parameters}
return await self._client.request(
url=f"/{path}",
url=os.path.join(self.engine_url, path or ""),
method="POST",
params={
**(parameters or dict()),
Expand Down
Loading

0 comments on commit 25c9a70

Please sign in to comment.