Skip to content

Fix: remove I/O Blocking code on message processing parts #820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: 1yam-sync-db-to-async
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/aleph/db/accessors/balances.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ async def update_balances(
)

# Convert floats to str to avoid having issue with float to decimal conversion

records = [
(address, chain.value, dapp or "", str(balance), eth_height, last_update)
for address, balance in balances.items()
Expand Down
8 changes: 4 additions & 4 deletions src/aleph/handlers/content/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import itertools
import logging
from typing import List, Sequence, Set, cast
Expand Down Expand Up @@ -87,8 +88,8 @@ async def _prepend_to_aggregate(
aggregate: AggregateDb,
elements: Sequence[AggregateElementDb],
):
# That will need to run in no IO blocking way
new_content = merge_aggregate_elements(elements)
# We run in to thread to avoid IO blocking if huge aggregate pass
new_content = await asyncio.to_thread(merge_aggregate_elements, elements)

await update_aggregate(
session=session,
Expand Down Expand Up @@ -130,7 +131,7 @@ async def _update_aggregate(
if not aggregate_metadata:
LOGGER.info("%s/%s does not exist, creating it", key, owner)

content = merge_aggregate_elements(elements)
content = await asyncio.to_thread(merge_aggregate_elements, elements)
await insert_aggregate(
session=session,
key=key,
Expand Down Expand Up @@ -239,5 +240,4 @@ async def forget_message(

LOGGER.debug("Refreshing aggregate %s/%s...", owner, key)
await refresh_aggregate(session=session, owner=owner, key=str(key))

return set()
4 changes: 0 additions & 4 deletions src/aleph/handlers/content/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,8 @@ async def update_balances(session: AsyncDbSession, content: Mapping[str, Any]) -
dapp = content.get("dapp")

LOGGER.info("Updating balances for %s (dapp: %s)", chain, dapp)
print(f"Chain type: {type(chain)}, Chain value: {chain.value}, Full chain: {chain}")

balances: Dict[str, float] = content["balances"]
print(f"Number of balances to update: {len(balances)}")
for addr, bal in list(balances.items())[:3]: # Print first 3 for debug
print(f" {addr}: {bal} bal type {type(bal)}")

await update_balances_db(
session=session,
Expand Down
116 changes: 88 additions & 28 deletions src/aleph/services/p2p/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,14 @@

LOGGER = logging.getLogger("P2P.HTTP")

SESSIONS = dict()


async def api_get_request(base_uri, method, timeout=1):
if timeout not in SESSIONS:
connector = aiohttp.TCPConnector(limit_per_host=5)
SESSIONS[timeout] = aiohttp.ClientSession(
read_timeout=timeout, connector=connector
)

async def api_get_request(session: aiohttp.ClientSession, base_uri, method, timeout=1):
uri = f"{base_uri}/api/v0/{method}"
try:
async with SESSIONS[timeout].get(uri) as resp:
async with session.get(uri) as resp:
if resp.status != 200:
result = None
else:
result = await resp.json()
return None
return await resp.json()
Comment on lines +16 to +22
Copy link
Member

@nesitor nesitor Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are ignoring the timeout provided by the argument. If the session timeout is handled outside just removing the parameter, but if the argument is used by other methods that don't handle the timeout from their own, we need to use it inside the method to prevent issues.

except (
TimeoutError,
asyncio.TimeoutError,
Expand All @@ -44,27 +35,96 @@ async def api_get_request(base_uri, method, timeout=1):


async def get_peer_hash_content(
base_uri: str, item_hash: str, timeout: int = 1
session: aiohttp.ClientSession,
base_uri: str,
item_hash: str,
semaphore,
timeout: int = 1,
) -> Optional[bytes]:
result = None
item = await api_get_request(base_uri, f"storage/{item_hash}", timeout=timeout)
if item is not None and item["status"] == "success" and item["content"] is not None:
# TODO: IMPORTANT /!\ verify the hash of received data!
return base64.decodebytes(item["content"].encode("utf-8"))
else:
LOGGER.debug(f"can't get hash {item_hash}")

return result
async with (
semaphore
): # We use semaphore to avoid having too much call at the same time
result = None
item = await api_get_request(
session=session,
base_uri=base_uri,
method=f"storage/{item_hash}",
timeout=timeout,
)
if (
item is not None
and item["status"] == "success"
and item["content"] is not None
):
# TODO: IMPORTANT /!\ verify the hash of received data!
return base64.decodebytes(item["content"].encode("utf-8"))
else:
LOGGER.debug(f"can't get hash {item_hash}")
return result


async def request_hash(
api_servers: Sequence[str], item_hash: str, timeout: int = 1
) -> Optional[bytes]:
"""
Request a hash from available API servers Concurrently over the network.
We take the first valid respond and close other task
"""
uris: List[str] = sample(api_servers, k=len(api_servers))
semaphore = asyncio.Semaphore(5)
connector = aiohttp.TCPConnector(limit_per_host=5)
timeout_conf = aiohttp.ClientTimeout(total=timeout)

for uri in uris:
content = await get_peer_hash_content(uri, item_hash, timeout=timeout)
if content is not None:
return content
# Use a dedicated session for each get_peer_hash_content call
tasks = []
for url in uris:
session = aiohttp.ClientSession(connector=connector, timeout=timeout_conf)
tasks.append(
asyncio.create_task(
get_peer_hash_content_with_session(
session=session,
base_uri=url,
item_hash=item_hash,
semaphore=semaphore,
)
)
)

return None # Nothing found...
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
result = None
for t in done:
try:
data = t.result()
except Exception:
continue
if data:
result = data
break

for p in pending:
p.cancel()
await asyncio.gather(*pending, return_exceptions=True)

return result


async def get_peer_hash_content_with_session(
session: aiohttp.ClientSession,
base_uri: str,
item_hash: str,
semaphore,
timeout: int = 1,
) -> Optional[bytes]:
"""Wrapper to ensure session is properly closed"""
try:
async with session:
return await get_peer_hash_content(
session=session,
base_uri=base_uri,
item_hash=item_hash,
semaphore=semaphore,
timeout=timeout,
)
except Exception as e:
LOGGER.exception(f"Error in get_peer_hash_content_with_session: {e}")
return None
35 changes: 25 additions & 10 deletions src/aleph/services/p2p/jobs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import logging
from dataclasses import dataclass
from typing import Optional
from typing import Optional, Sequence

import aiohttp
from aleph_p2p_client import AlephP2PServiceClient
from configmanager import Config

Expand Down Expand Up @@ -55,9 +56,13 @@ async def reconnect_p2p_job(
await asyncio.sleep(config.p2p.reconnect_delay.value)


async def check_peer(peer_uri: str, timeout: int = 1) -> PeerStatus:
async def check_peer(
session: aiohttp.ClientSession, peer_uri: str, timeout: int = 1
) -> PeerStatus:
try:
version_info = await api_get_request(peer_uri, "version", timeout=timeout)
version_info = await api_get_request(
session=session, base_uri=peer_uri, method="version", timeout=timeout
)
if version_info is not None:
return PeerStatus(peer_uri=peer_uri, is_online=True, version=version_info)

Expand All @@ -67,6 +72,22 @@ async def check_peer(peer_uri: str, timeout: int = 1) -> PeerStatus:
return PeerStatus(peer_uri=peer_uri, is_online=False, version=None)


async def request_version(peers: Sequence[str], my_ip: str, timeout: int = 1):
connector = aiohttp.TCPConnector(limit_per_host=5)
timeout_conf = aiohttp.ClientTimeout(total=timeout)

async with aiohttp.ClientSession(
connector=connector, timeout=timeout_conf
) as session:
jobs = []
for peer in peers:
if my_ip in peer:
continue

jobs.append(check_peer(session, peer))
return await asyncio.gather(*jobs)


async def tidy_http_peers_job(
config: Config, session_factory: AsyncDbSessionFactory, node_cache: NodeCache
) -> None:
Expand All @@ -77,20 +98,14 @@ async def tidy_http_peers_job(
await asyncio.sleep(2)

while True:
jobs = []

try:
async with session_factory() as session:
peers = await get_all_addresses_by_peer_type(
session=session, peer_type=PeerType.HTTP
)

for peer in peers:
if my_ip in peer:
continue

jobs.append(check_peer(peer))
peer_statuses = await asyncio.gather(*jobs)
peer_statuses = await request_version(peers=peers, my_ip=my_ip)

for peer_status in peer_statuses:
peer_in_api_servers = await node_cache.has_api_server(
Expand Down
22 changes: 16 additions & 6 deletions src/aleph/services/storage/fileystem_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from pathlib import Path
from typing import Optional, Union

import aiofiles
import aiofiles.ospath

from .engine import StorageEngine


Expand All @@ -16,19 +19,26 @@ def __init__(self, folder: Union[Path, str]):
async def read(self, filename: str) -> Optional[bytes]:
file_path = self.folder / filename

if not file_path.is_file():
if not await aiofiles.ospath.isfile(file_path):
return None

return file_path.read_bytes()
async with aiofiles.open(file_path, "rb") as f:
return await f.read()

async def write(self, filename: str, content: bytes):
file_path = self.folder / filename
file_path.write_bytes(content)
async with aiofiles.open(file_path, "wb") as f:
await f.write(content)

async def delete(self, filename: str):
file_path = self.folder / filename
file_path.unlink(missing_ok=True)
async_unlink = aiofiles.ospath.wrap(
Path.unlink
) # We manually wrap unlink (not handled by aiofiles)

await async_unlink(file_path, missing_ok=True)

async def exists(self, filename: str) -> bool:
file_path = self.folder / filename
return file_path.exists()
return await aiofiles.ospath.exists(
file_path
) # This func warp .exist func into async
Loading