Skip to content

Refactor: Use Async DB instead of Sync #817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/aleph/api_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import aleph.config
from aleph.chains.signature_verifier import SignatureVerifier
from aleph.db.connection import make_engine, make_session_factory
from aleph.db.connection import make_async_engine, make_async_session_factory
from aleph.services.cache.node_cache import NodeCache
from aleph.services.ipfs import IpfsService
from aleph.services.p2p import init_p2p_client
Expand All @@ -34,12 +34,12 @@ async def configure_aiohttp_app(
with sentry_sdk.start_transaction(name="init-api-server"):
p2p_client = await init_p2p_client(config, service_name="api-server-aiohttp")

engine = make_engine(
engine = make_async_engine(
config,
echo=config.logging.level.value == logging.DEBUG,
application_name="aleph-api",
)
session_factory = make_session_factory(engine)
session_factory = make_async_session_factory(engine)

node_cache = NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/chains/bsc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from aleph.chains.chain_data_service import PendingTxPublisher
from aleph.chains.indexer_reader import AlephIndexerReader
from aleph.types.chain_sync import ChainEventType
from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import AsyncDbSessionFactory


class BscConnector(ChainReader):
def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
):
self.indexer_reader = AlephIndexerReader(
Expand Down
26 changes: 13 additions & 13 deletions src/aleph/chains/chain_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@
from aleph.storage import StorageService
from aleph.toolkit.timestamp import utc_now
from aleph.types.chain_sync import ChainSyncProtocol
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.types.db_session import AsyncDbSession, AsyncDbSessionFactory
from aleph.types.files import FileType
from aleph.utils import get_sha256


class ChainDataService:
def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
storage_service: StorageService,
):
self.session_factory = session_factory
self.storage_service = storage_service

async def prepare_sync_event_payload(
self, session: DbSession, messages: List[MessageDb]
self, session: AsyncDbSession, messages: List[MessageDb]
) -> OffChainSyncEventPayload:
"""
Returns the payload of a sync event to be published on chain.
Expand Down Expand Up @@ -129,22 +129,22 @@ async def _get_tx_messages_off_chain_protocol(
LOGGER.info("Got bulk data with %d items" % len(messages))
if config.ipfs.enabled.value:
try:
with self.session_factory() as session:
async with self.session_factory() as session:
# Some chain data files are duplicated, and can be treated in parallel,
# hence the upsert.
upsert_file(
await upsert_file(
session=session,
file_hash=sync_file_content.hash,
file_type=FileType.FILE,
size=len(sync_file_content.raw_value),
)
upsert_tx_file_pin(
await upsert_tx_file_pin(
session=session,
file_hash=file_hash,
tx_hash=tx.hash,
created=utc_now(),
)
session.commit()
await session.commit()

# Some IPFS fetches can take a while, hence the large timeout.
await asyncio.wait_for(
Expand Down Expand Up @@ -246,17 +246,17 @@ def __init__(self, pending_tx_exchange: aio_pika.abc.AbstractExchange):
self.pending_tx_exchange = pending_tx_exchange

@staticmethod
def add_pending_tx(session: DbSession, tx: ChainTxDb):
upsert_chain_tx(session=session, tx=tx)
upsert_pending_tx(session=session, tx_hash=tx.hash)
async def add_pending_tx(session: AsyncDbSession, tx: ChainTxDb):
await upsert_chain_tx(session=session, tx=tx)
await upsert_pending_tx(session=session, tx_hash=tx.hash)

async def publish_pending_tx(self, tx: ChainTxDb):
message = aio_pika.Message(body=tx.hash.encode("utf-8"))
await self.pending_tx_exchange.publish(
message=message, routing_key=f"{tx.chain.value}.{tx.publisher}.{tx.hash}"
)

async def add_and_publish_pending_tx(self, session: DbSession, tx: ChainTxDb):
async def add_and_publish_pending_tx(self, session: AsyncDbSession, tx: ChainTxDb):
"""
Add an event published on one of the supported chains.
Adds the tx to the database, creates a pending tx entry in the pending tx table
Expand All @@ -265,8 +265,8 @@ async def add_and_publish_pending_tx(self, session: DbSession, tx: ChainTxDb):
Note that this function commits changes to the database for consistency
between the DB and the message queue.
"""
self.add_pending_tx(session=session, tx=tx)
session.commit()
await self.add_pending_tx(session=session, tx=tx)
await session.commit()
await self.publish_pending_tx(tx)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/chains/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from aleph_message.models import Chain
from configmanager import Config

from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import AsyncDbSessionFactory

from .abc import ChainReader, ChainWriter
from .bsc import BscConnector
Expand All @@ -29,7 +29,7 @@ class ChainConnector:

def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
):
Expand Down
28 changes: 14 additions & 14 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from aleph.schemas.chains.tx_context import TxContext
from aleph.toolkit.timestamp import utc_now
from aleph.types.chain_sync import ChainEventType
from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import AsyncDbSessionFactory
from aleph.utils import run_in_executor

from .abc import ChainWriter
Expand Down Expand Up @@ -77,7 +77,7 @@ class EthereumVerifier(EVMVerifier):
class EthereumConnector(ChainWriter):
def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
):
Expand All @@ -93,8 +93,8 @@ def __init__(

async def get_last_height(self, sync_type: ChainEventType) -> int:
"""Returns the last height for which we already have the ethereum data."""
with self.session_factory() as session:
last_height = get_last_height(
async with self.session_factory() as session:
last_height = await get_last_height(
session=session, chain=Chain.ETH, sync_type=sync_type
)

Expand Down Expand Up @@ -209,15 +209,15 @@ async def _request_transactions(
# block height to do next requests from there.
last_height = event_data.blockNumber
if last_height:
with self.session_factory() as session:
upsert_chain_sync_status(
async with self.session_factory() as session:
await upsert_chain_sync_status(
session=session,
chain=Chain.ETH,
sync_type=ChainEventType.SYNC,
height=last_height,
update_datetime=utc_now(),
)
session.commit()
await session.commit()

async def fetch_ethereum_sync_events(self, config: Config):
last_stored_height = await self.get_last_height(sync_type=ChainEventType.SYNC)
Expand All @@ -236,11 +236,11 @@ async def fetch_ethereum_sync_events(self, config: Config):
config, web3, contract, abi, last_stored_height
):
tx = ChainTxDb.from_sync_tx_context(tx_context=context, tx_data=jdata)
with self.session_factory() as session:
async with self.session_factory() as session:
await self.pending_tx_publisher.add_and_publish_pending_tx(
session=session, tx=tx
)
session.commit()
await session.commit()

async def fetch_sync_events_task(self, config: Config):
while True:
Expand Down Expand Up @@ -295,10 +295,10 @@ async def packer(self, config: Config):
i = 0
gas_price = web3.eth.generate_gas_price()
while True:
with self.session_factory() as session:
async with self.session_factory() as session:
# Wait for sync operations to complete
if (count_pending_txs(session=session, chain=Chain.ETH)) or (
count_pending_messages(session=session, chain=Chain.ETH)
if (await count_pending_txs(session=session, chain=Chain.ETH)) or (
await count_pending_messages(session=session, chain=Chain.ETH)
) > 1000:
await asyncio.sleep(30)
continue
Expand All @@ -317,7 +317,7 @@ async def packer(self, config: Config):
nonce = web3.eth.get_transaction_count(account.address)

messages = list(
get_unconfirmed_messages(
await get_unconfirmed_messages(
session=session, limit=10000, chain=Chain.ETH
)
)
Expand All @@ -332,7 +332,7 @@ async def packer(self, config: Config):
)
)
# Required to apply update to the files table in get_chaindata
session.commit()
await session.commit()
response = await run_in_executor(
None,
self._broadcast_content,
Expand Down
22 changes: 12 additions & 10 deletions src/aleph/chains/indexer_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from aleph.toolkit.range import MultiRange, Range
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.chain_sync import ChainEventType, ChainSyncProtocol
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.types.db_session import AsyncDbSession, AsyncDbSessionFactory

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -246,7 +246,7 @@ class AlephIndexerReader:
def __init__(
self,
chain: Chain,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
):
self.chain = chain
Expand All @@ -257,7 +257,7 @@ def __init__(

async def fetch_range(
self,
session: DbSession,
session: AsyncDbSession,
indexer_client: AlephIndexerClient,
chain: Chain,
event_type: ChainEventType,
Expand Down Expand Up @@ -295,7 +295,9 @@ async def fetch_range(
LOGGER.info("%d new txs", len(txs))
# Events are listed in reverse order in the indexer response
for tx in txs:
self.pending_tx_publisher.add_pending_tx(session=session, tx=tx)
await self.pending_tx_publisher.add_pending_tx(
session=session, tx=tx
)

if nb_events_fetched >= limit:
last_event_datetime = txs[-1].datetime
Expand All @@ -320,7 +322,7 @@ async def fetch_range(
str(synced_range),
)

add_indexer_range(
await add_indexer_range(
session=session,
chain=chain,
event_type=event_type,
Expand All @@ -329,7 +331,7 @@ async def fetch_range(

# Committing periodically reduces the size of DB transactions for large numbers
# of events.
session.commit()
await session.commit()

# Now that the txs are committed to the DB, add them to the pending tx message queue
for tx in txs:
Expand All @@ -347,7 +349,7 @@ async def fetch_range(

async def fetch_new_events(
self,
session: DbSession,
session: AsyncDbSession,
indexer_url: str,
smart_contract_address: str,
event_type: ChainEventType,
Expand All @@ -372,7 +374,7 @@ async def fetch_new_events(
]
)

multirange_to_sync = get_missing_indexer_datetime_multirange(
multirange_to_sync = await get_missing_indexer_datetime_multirange(
session=session,
chain=self.chain,
event_type=event_type,
Expand All @@ -399,14 +401,14 @@ async def fetcher(
):
while True:
try:
with self.session_factory() as session:
async with self.session_factory() as session:
await self.fetch_new_events(
session=session,
indexer_url=indexer_url,
smart_contract_address=smart_contract_address,
event_type=event_type,
)
session.commit()
await session.commit()
except Exception:
LOGGER.exception(
"An unexpected exception occurred, "
Expand Down
Loading
Loading