From 5bd3e25a16302c4deb826bee456fca5a68d3bde7 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 8 Jul 2025 14:14:42 +0200 Subject: [PATCH 1/9] fix: remove debug print --- src/aleph/handlers/content/post.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/aleph/handlers/content/post.py b/src/aleph/handlers/content/post.py index 667d99d0..1b8407e3 100644 --- a/src/aleph/handlers/content/post.py +++ b/src/aleph/handlers/content/post.py @@ -49,12 +49,8 @@ async def update_balances(session: AsyncDbSession, content: Mapping[str, Any]) - dapp = content.get("dapp") LOGGER.info("Updating balances for %s (dapp: %s)", chain, dapp) - print(f"Chain type: {type(chain)}, Chain value: {chain.value}, Full chain: {chain}") balances: Dict[str, float] = content["balances"] - print(f"Number of balances to update: {len(balances)}") - for addr, bal in list(balances.items())[:3]: # Print first 3 for debug - print(f" {addr}: {bal} bal type {type(bal)}") await update_balances_db( session=session, From 87c9260a7d28a360c6abddbababe802e2d7b0381 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 8 Jul 2025 14:17:59 +0200 Subject: [PATCH 2/9] Fix: merge_aggregate_elements is I/O blocking, we use async.to_thread to avoid blocking in case of big file --- src/aleph/handlers/content/aggregate.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/aleph/handlers/content/aggregate.py b/src/aleph/handlers/content/aggregate.py index e3331544..744f1b94 100644 --- a/src/aleph/handlers/content/aggregate.py +++ b/src/aleph/handlers/content/aggregate.py @@ -1,3 +1,4 @@ +import asyncio import itertools import logging from typing import List, Sequence, Set, cast @@ -87,8 +88,8 @@ async def _prepend_to_aggregate( aggregate: AggregateDb, elements: Sequence[AggregateElementDb], ): - # That will need to run in no IO blocking way - new_content = merge_aggregate_elements(elements) + # We run in to thread to avoid IO blocking if huge aggregate pass + new_content = await asyncio.to_thread(merge_aggregate_elements, elements) await update_aggregate( session=session, @@ -130,7 +131,7 @@ async def _update_aggregate( if not aggregate_metadata: LOGGER.info("%s/%s does not exist, creating it", key, owner) - content = merge_aggregate_elements(elements) + content = await asyncio.to_thread(merge_aggregate_elements, elements) await insert_aggregate( session=session, key=key, @@ -239,5 +240,4 @@ async def forget_message( LOGGER.debug("Refreshing aggregate %s/%s...", owner, key) await refresh_aggregate(session=session, owner=owner, key=str(key)) - return set() From 9afddedeec839da330e9ba4140aa381b6a640aec Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 8 Jul 2025 14:18:39 +0200 Subject: [PATCH 3/9] fix: filesystem_engine to use aiofiles to avoid I/O Blocking --- .../services/storage/fileystem_engine.py | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/aleph/services/storage/fileystem_engine.py b/src/aleph/services/storage/fileystem_engine.py index 4a209f4c..6412ecdd 100644 --- a/src/aleph/services/storage/fileystem_engine.py +++ b/src/aleph/services/storage/fileystem_engine.py @@ -1,6 +1,9 @@ from pathlib import Path from typing import Optional, Union +import aiofiles +import aiofiles.ospath + from .engine import StorageEngine @@ -16,19 +19,26 @@ def __init__(self, folder: Union[Path, str]): async def read(self, filename: str) -> Optional[bytes]: file_path = self.folder / filename - if not file_path.is_file(): + if not await aiofiles.ospath.isfile(file_path): return None - - return file_path.read_bytes() + async with aiofiles.open(file_path, "rb") as f: + return await f.read() async def write(self, filename: str, content: bytes): file_path = self.folder / filename - file_path.write_bytes(content) + async with aiofiles.open(file_path, "wb") as f: + await f.write(content) async def delete(self, filename: str): file_path = self.folder / filename - file_path.unlink(missing_ok=True) + async_unlink = aiofiles.ospath.wrap( + Path.unlink + ) # We manually warp unlink (not handle by aiofiles) + + await async_unlink(async_unlink(file_path, missing_ok=True)) async def exists(self, filename: str) -> bool: file_path = self.folder / filename - return file_path.exists() + return await aiofiles.ospath.exists( + file_path + ) # This func warp .exist func into async From 93c89f3548c5f05ecd6e8c14b17d688a774afe49 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 8 Jul 2025 14:46:33 +0200 Subject: [PATCH 4/9] fix: typo error --- src/aleph/services/storage/fileystem_engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aleph/services/storage/fileystem_engine.py b/src/aleph/services/storage/fileystem_engine.py index 6412ecdd..50b29587 100644 --- a/src/aleph/services/storage/fileystem_engine.py +++ b/src/aleph/services/storage/fileystem_engine.py @@ -33,9 +33,9 @@ async def delete(self, filename: str): file_path = self.folder / filename async_unlink = aiofiles.ospath.wrap( Path.unlink - ) # We manually warp unlink (not handle by aiofiles) + ) # We manually wrap unlink (not handled by aiofiles) - await async_unlink(async_unlink(file_path, missing_ok=True)) + await async_unlink(file_path, missing_ok=True) async def exists(self, filename: str) -> bool: file_path = self.folder / filename From dd62e38e3f162e86e8cf77742b93882ccf9ce720 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 8 Jul 2025 14:47:23 +0200 Subject: [PATCH 5/9] Refactor: request hash to peer in a more efficient way --- src/aleph/services/p2p/http.py | 89 +++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 27 deletions(-) diff --git a/src/aleph/services/p2p/http.py b/src/aleph/services/p2p/http.py index 53b92d53..a6f0c862 100644 --- a/src/aleph/services/p2p/http.py +++ b/src/aleph/services/p2p/http.py @@ -12,23 +12,14 @@ LOGGER = logging.getLogger("P2P.HTTP") -SESSIONS = dict() - - -async def api_get_request(base_uri, method, timeout=1): - if timeout not in SESSIONS: - connector = aiohttp.TCPConnector(limit_per_host=5) - SESSIONS[timeout] = aiohttp.ClientSession( - read_timeout=timeout, connector=connector - ) +async def api_get_request(session: aiohttp.ClientSession, base_uri, method, timeout=1): uri = f"{base_uri}/api/v0/{method}" try: - async with SESSIONS[timeout].get(uri) as resp: + async with session.get(uri) as resp: if resp.status != 200: - result = None - else: - result = await resp.json() + return None + return await resp.json() except ( TimeoutError, asyncio.TimeoutError, @@ -44,17 +35,32 @@ async def api_get_request(base_uri, method, timeout=1): async def get_peer_hash_content( - base_uri: str, item_hash: str, timeout: int = 1 + session: aiohttp.ClientSession, + base_uri: str, + item_hash: str, + semaphore, + timeout: int = 1, ) -> Optional[bytes]: - result = None - item = await api_get_request(base_uri, f"storage/{item_hash}", timeout=timeout) - if item is not None and item["status"] == "success" and item["content"] is not None: - # TODO: IMPORTANT /!\ verify the hash of received data! - return base64.decodebytes(item["content"].encode("utf-8")) - else: - LOGGER.debug(f"can't get hash {item_hash}") - - return result + async with ( + semaphore + ): # We use semaphore to avoid having too much call at the same time + result = None + item = await api_get_request( + session=session, + base_uri=base_uri, + method=f"storage/{item_hash}", + timeout=timeout, + ) + if ( + item is not None + and item["status"] == "success" + and item["content"] is not None + ): + # TODO: IMPORTANT /!\ verify the hash of received data! + return base64.decodebytes(item["content"].encode("utf-8")) + else: + LOGGER.debug(f"can't get hash {item_hash}") + return result async def request_hash( @@ -62,9 +68,38 @@ async def request_hash( ) -> Optional[bytes]: uris: List[str] = sample(api_servers, k=len(api_servers)) - for uri in uris: - content = await get_peer_hash_content(uri, item_hash, timeout=timeout) - if content is not None: - return content + # Avoid too much request at the same time + semaphore = asyncio.Semaphore(5) + + connector = aiohttp.TCPConnector(limit_per_host=5) + timeout_conf = aiohttp.ClientTimeout(total=timeout) + + async with aiohttp.ClientSession( + connector=connector, timeout=timeout_conf + ) as session: + # Use Task instead of + tasks = [ + asyncio.create_task( + get_peer_hash_content( + session=session, + base_uri=url, + item_hash=item_hash, + semaphore=semaphore, + ) + ) + for url in uris + ] + + for completed_task in asyncio.as_completed(tasks): + try: + result = await completed_task + if result: + # We cancel other Task + for task in tasks: + if not task.done(): + task.cancel() + return result + except Exception: + continue return None # Nothing found... From 1222f504af67ec7d382313fb03a90b49d9199f77 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 8 Jul 2025 14:48:12 +0200 Subject: [PATCH 6/9] Refactor: request peer info for more efficient way --- src/aleph/services/p2p/jobs.py | 36 ++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/src/aleph/services/p2p/jobs.py b/src/aleph/services/p2p/jobs.py index afd5cbad..eea4783b 100644 --- a/src/aleph/services/p2p/jobs.py +++ b/src/aleph/services/p2p/jobs.py @@ -1,8 +1,9 @@ import asyncio import logging from dataclasses import dataclass -from typing import Optional +from typing import Optional, Sequence +import aiohttp from aleph_p2p_client import AlephP2PServiceClient from configmanager import Config @@ -55,9 +56,13 @@ async def reconnect_p2p_job( await asyncio.sleep(config.p2p.reconnect_delay.value) -async def check_peer(peer_uri: str, timeout: int = 1) -> PeerStatus: +async def check_peer( + session: aiohttp.ClientSession, peer_uri: str, timeout: int = 1 +) -> PeerStatus: try: - version_info = await api_get_request(peer_uri, "version", timeout=timeout) + version_info = await api_get_request( + session, peer_uri, "version", timeout=timeout + ) if version_info is not None: return PeerStatus(peer_uri=peer_uri, is_online=True, version=version_info) @@ -67,6 +72,23 @@ async def check_peer(peer_uri: str, timeout: int = 1) -> PeerStatus: return PeerStatus(peer_uri=peer_uri, is_online=False, version=None) +async def request_version(peers: Sequence[str], my_ip: str, timeout: int = 1): + jobs = [] + connector = aiohttp.TCPConnector(limit_per_host=5) + timeout_conf = aiohttp.ClientTimeout(total=timeout) + + async with aiohttp.ClientSession( + connector=connector, timeout=timeout_conf + ) as session: + for peer in peers: + if my_ip in peer: + continue + + jobs.append(check_peer(session, peer)) + + return await asyncio.gather(*jobs) + + async def tidy_http_peers_job( config: Config, session_factory: AsyncDbSessionFactory, node_cache: NodeCache ) -> None: @@ -77,7 +99,6 @@ async def tidy_http_peers_job( await asyncio.sleep(2) while True: - jobs = [] try: async with session_factory() as session: @@ -85,12 +106,7 @@ async def tidy_http_peers_job( session=session, peer_type=PeerType.HTTP ) - for peer in peers: - if my_ip in peer: - continue - - jobs.append(check_peer(peer)) - peer_statuses = await asyncio.gather(*jobs) + peer_statuses = await request_version(peers=peers, my_ip=my_ip) for peer_status in peer_statuses: peer_in_api_servers = await node_cache.has_api_server( From feb0dcea545545ee97fc35a4caf0d666cf1dbf2b Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 8 Jul 2025 14:48:23 +0200 Subject: [PATCH 7/9] fix: lint issue --- src/aleph/db/accessors/balances.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/aleph/db/accessors/balances.py b/src/aleph/db/accessors/balances.py index 779751d6..c23ac470 100644 --- a/src/aleph/db/accessors/balances.py +++ b/src/aleph/db/accessors/balances.py @@ -154,7 +154,6 @@ async def update_balances( ) # Convert floats to str to avoid having issue with float to decimal conversion - records = [ (address, chain.value, dapp or "", str(balance), eth_height, last_update) for address, balance in balances.items() From 083f74c29e5e70431924f9b71aa9edf8614ddccb Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 15 Jul 2025 11:23:13 +0200 Subject: [PATCH 8/9] Refactor: request_hash from p2p http to concurrently fetch hash --- src/aleph/services/p2p/http.py | 71 +++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/src/aleph/services/p2p/http.py b/src/aleph/services/p2p/http.py index a6f0c862..0a913e58 100644 --- a/src/aleph/services/p2p/http.py +++ b/src/aleph/services/p2p/http.py @@ -66,40 +66,65 @@ async def get_peer_hash_content( async def request_hash( api_servers: Sequence[str], item_hash: str, timeout: int = 1 ) -> Optional[bytes]: + """ + Request a hash from available API servers Concurrently over the network. + We take the first valid respond and close other task + """ uris: List[str] = sample(api_servers, k=len(api_servers)) - - # Avoid too much request at the same time semaphore = asyncio.Semaphore(5) - connector = aiohttp.TCPConnector(limit_per_host=5) timeout_conf = aiohttp.ClientTimeout(total=timeout) - async with aiohttp.ClientSession( - connector=connector, timeout=timeout_conf - ) as session: - # Use Task instead of - tasks = [ + # Use a dedicated session for each get_peer_hash_content call + tasks = [] + for url in uris: + session = aiohttp.ClientSession(connector=connector, timeout=timeout_conf) + tasks.append( asyncio.create_task( - get_peer_hash_content( + get_peer_hash_content_with_session( session=session, base_uri=url, item_hash=item_hash, semaphore=semaphore, ) ) - for url in uris - ] + ) + + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + result = None + for t in done: + try: + data = t.result() + except Exception: + continue + if data: + result = data + break - for completed_task in asyncio.as_completed(tasks): - try: - result = await completed_task - if result: - # We cancel other Task - for task in tasks: - if not task.done(): - task.cancel() - return result - except Exception: - continue + for p in pending: + p.cancel() + await asyncio.gather(*pending, return_exceptions=True) - return None # Nothing found... + return result + + +async def get_peer_hash_content_with_session( + session: aiohttp.ClientSession, + base_uri: str, + item_hash: str, + semaphore, + timeout: int = 1, +) -> Optional[bytes]: + """Wrapper to ensure session is properly closed""" + try: + async with session: + return await get_peer_hash_content( + session=session, + base_uri=base_uri, + item_hash=item_hash, + semaphore=semaphore, + timeout=timeout, + ) + except Exception as e: + LOGGER.exception(f"Error in get_peer_hash_content_with_session: {e}") + return None From 0aa24605156ff3f3f2acaca07b1c39e0ba9a13ef Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 15 Jul 2025 11:24:56 +0200 Subject: [PATCH 9/9] Fix: `Closed HTTP sessions` request_version in p2p job --- src/aleph/services/p2p/jobs.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/aleph/services/p2p/jobs.py b/src/aleph/services/p2p/jobs.py index eea4783b..5222ef1d 100644 --- a/src/aleph/services/p2p/jobs.py +++ b/src/aleph/services/p2p/jobs.py @@ -61,7 +61,7 @@ async def check_peer( ) -> PeerStatus: try: version_info = await api_get_request( - session, peer_uri, "version", timeout=timeout + session=session, base_uri=peer_uri, method="version", timeout=timeout ) if version_info is not None: return PeerStatus(peer_uri=peer_uri, is_online=True, version=version_info) @@ -73,20 +73,19 @@ async def check_peer( async def request_version(peers: Sequence[str], my_ip: str, timeout: int = 1): - jobs = [] connector = aiohttp.TCPConnector(limit_per_host=5) timeout_conf = aiohttp.ClientTimeout(total=timeout) async with aiohttp.ClientSession( connector=connector, timeout=timeout_conf ) as session: + jobs = [] for peer in peers: if my_ip in peer: continue jobs.append(check_peer(session, peer)) - - return await asyncio.gather(*jobs) + return await asyncio.gather(*jobs) async def tidy_http_peers_job(