-
Notifications
You must be signed in to change notification settings - Fork 22
Refactor: Use Async DB instead of Sync #817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would make sense that @amalcaraz review this parts (mostly the update part)
f113cd1
to
64c1b98
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except the bunch of comments I made, everything looks fine for me, but before merging it to main, we have to do a lot of testing on staging ensuring we don't create race conditions or other side effects.
select( | ||
AggregateDb.key, | ||
AggregateDb.content, | ||
AggregateDb.creation_datetime.label("created"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to ensure that this is the same equivalent
select(AggregateDb.key, AggregateDb.content) | ||
.filter(where_clause) | ||
.order_by(AggregateDb.key) | ||
) | ||
result = query.all() | ||
result = (await session.execute(query)).all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
if dapp: | ||
# For some reason asyncpg don't handle it if dapp is None | ||
query = query.where(AlephBalanceDb.dapp == dapp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if dapp: | |
# For some reason asyncpg don't handle it if dapp is None | |
query = query.where(AlephBalanceDb.dapp == dapp) | |
if dapp: | |
# For some reason asyncpg don't handle it if dapp is None | |
query = query.where(AlephBalanceDb.dapp == dapp) | |
else: | |
query = query.where(AlephBalanceDb.dapp is null) |
This is not correct, as if dapp is None don't filter it, we need to equal to None or Null in DB.
select_stmt += " WHERE address = ANY(:addresses)" | ||
parameters = {"addresses": list(addresses)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is totally equivalent, maybe we need to maintain IN
operator?
|
||
return { | ||
column.name: getattr(self, column.name) | ||
for column in self.__table__.columns | ||
if column.name not in exclude_set | ||
if column.name not in exclude_set and column.name not in insp.unloaded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why added that condition?
@@ -49,9 +49,14 @@ def update_balances(session: DbSession, content: Mapping[str, Any]) -> None: | |||
dapp = content.get("dapp") | |||
|
|||
LOGGER.info("Updating balances for %s (dapp: %s)", chain, dapp) | |||
print(f"Chain type: {type(chain)}, Chain value: {chain.value}, Full chain: {chain}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This print should be removed
print(f"Number of balances to update: {len(balances)}") | ||
for addr, bal in list(balances.items())[:3]: # Print first 3 for debug | ||
print(f" {addr}: {bal} bal type {type(bal)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, these one as are debug messages I think
return None | ||
|
||
# TODO: fix this pydanticV2 issue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's exactly the issue?
To bring back message parallelization we need to use AsyncDb instead of sync one
Related Clickup or Jira tickets : ALEPH-526
Self proofreading checklist
Changes
This pull request updates the database session handling across multiple files to use asynchronous methods and factories, improving compatibility with modern async workflows. The changes primarily involve replacing synchronous session handling with asynchronous equivalents and updating method calls accordingly.
Transition to Asynchronous Database Sessions:
Updated session factories and types: Replaced
DbSessionFactory
andDbSession
withAsyncDbSessionFactory
andAsyncDbSession
across various modules (src/aleph/api_entrypoint.py
,src/aleph/chains/bsc.py
,src/aleph/chains/ethereum.py
,src/aleph/chains/nuls2.py
,src/aleph/chains/connector.py
,src/aleph/chains/indexer_reader.py
,src/aleph/chains/chain_data_service.py
). [1] [2] [3] [4] [5] [6] [7]Replaced synchronous session management: Updated methods to use
async with
for session handling instead ofwith
, ensuring proper handling of asynchronous database operations. This change affects methods likeadd_pending_tx
,upsert_chain_sync_status
, and others (src/aleph/chains/chain_data_service.py
,src/aleph/chains/ethereum.py
,src/aleph/chains/indexer_reader.py
). [1] [2] [3]Migration of Database Operations to Async:
Async database operations: Replaced synchronous calls to database methods (e.g.,
session.commit()
,upsert_file
) with their asynchronous counterparts (await session.commit()
,await upsert_file
). This ensures compatibility with async workflows (src/aleph/chains/chain_data_service.py
,src/aleph/chains/ethereum.py
,src/aleph/chains/indexer_reader.py
). [1] [2] [3]Async helper methods: Updated helper methods like
get_last_height
,count_pending_txs
, andget_unconfirmed_messages
to useawait
for asynchronous execution (src/aleph/chains/ethereum.py
,src/aleph/chains/indexer_reader.py
). [1] [2] [3]Consistency Across Chain Modules:
src/aleph/chains/bsc.py
,src/aleph/chains/ethereum.py
,src/aleph/chains/nuls2.py
,src/aleph/chains/connector.py
). This ensures uniformity in database interaction patterns. [1] [2] [3] [4]These updates pave the way for better scalability and performance in asynchronous environments, aligning the codebase with modern best practices for database interaction.
Notes
This PR is the first parts of message parallelization, but this parts could have side effects intense testing will be needed
For some reason on local test in some case (rare case) it's gave me an error about psycopg2 but it's shouldn't be used since we use async engine that use asyncpg, not sure if it only on testing parts or if could happen in prod