diff --git a/src/aleph/db/accessors/balances.py b/src/aleph/db/accessors/balances.py index 779751d6..c23ac470 100644 --- a/src/aleph/db/accessors/balances.py +++ b/src/aleph/db/accessors/balances.py @@ -154,7 +154,6 @@ async def update_balances( ) # Convert floats to str to avoid having issue with float to decimal conversion - records = [ (address, chain.value, dapp or "", str(balance), eth_height, last_update) for address, balance in balances.items() diff --git a/src/aleph/handlers/content/aggregate.py b/src/aleph/handlers/content/aggregate.py index e3331544..744f1b94 100644 --- a/src/aleph/handlers/content/aggregate.py +++ b/src/aleph/handlers/content/aggregate.py @@ -1,3 +1,4 @@ +import asyncio import itertools import logging from typing import List, Sequence, Set, cast @@ -87,8 +88,8 @@ async def _prepend_to_aggregate( aggregate: AggregateDb, elements: Sequence[AggregateElementDb], ): - # That will need to run in no IO blocking way - new_content = merge_aggregate_elements(elements) + # We run in to thread to avoid IO blocking if huge aggregate pass + new_content = await asyncio.to_thread(merge_aggregate_elements, elements) await update_aggregate( session=session, @@ -130,7 +131,7 @@ async def _update_aggregate( if not aggregate_metadata: LOGGER.info("%s/%s does not exist, creating it", key, owner) - content = merge_aggregate_elements(elements) + content = await asyncio.to_thread(merge_aggregate_elements, elements) await insert_aggregate( session=session, key=key, @@ -239,5 +240,4 @@ async def forget_message( LOGGER.debug("Refreshing aggregate %s/%s...", owner, key) await refresh_aggregate(session=session, owner=owner, key=str(key)) - return set() diff --git a/src/aleph/handlers/content/post.py b/src/aleph/handlers/content/post.py index 667d99d0..1b8407e3 100644 --- a/src/aleph/handlers/content/post.py +++ b/src/aleph/handlers/content/post.py @@ -49,12 +49,8 @@ async def update_balances(session: AsyncDbSession, content: Mapping[str, Any]) - dapp = content.get("dapp") LOGGER.info("Updating balances for %s (dapp: %s)", chain, dapp) - print(f"Chain type: {type(chain)}, Chain value: {chain.value}, Full chain: {chain}") balances: Dict[str, float] = content["balances"] - print(f"Number of balances to update: {len(balances)}") - for addr, bal in list(balances.items())[:3]: # Print first 3 for debug - print(f" {addr}: {bal} bal type {type(bal)}") await update_balances_db( session=session, diff --git a/src/aleph/services/p2p/http.py b/src/aleph/services/p2p/http.py index 53b92d53..0a913e58 100644 --- a/src/aleph/services/p2p/http.py +++ b/src/aleph/services/p2p/http.py @@ -12,23 +12,14 @@ LOGGER = logging.getLogger("P2P.HTTP") -SESSIONS = dict() - - -async def api_get_request(base_uri, method, timeout=1): - if timeout not in SESSIONS: - connector = aiohttp.TCPConnector(limit_per_host=5) - SESSIONS[timeout] = aiohttp.ClientSession( - read_timeout=timeout, connector=connector - ) +async def api_get_request(session: aiohttp.ClientSession, base_uri, method, timeout=1): uri = f"{base_uri}/api/v0/{method}" try: - async with SESSIONS[timeout].get(uri) as resp: + async with session.get(uri) as resp: if resp.status != 200: - result = None - else: - result = await resp.json() + return None + return await resp.json() except ( TimeoutError, asyncio.TimeoutError, @@ -44,27 +35,96 @@ async def api_get_request(base_uri, method, timeout=1): async def get_peer_hash_content( - base_uri: str, item_hash: str, timeout: int = 1 + session: aiohttp.ClientSession, + base_uri: str, + item_hash: str, + semaphore, + timeout: int = 1, ) -> Optional[bytes]: - result = None - item = await api_get_request(base_uri, f"storage/{item_hash}", timeout=timeout) - if item is not None and item["status"] == "success" and item["content"] is not None: - # TODO: IMPORTANT /!\ verify the hash of received data! - return base64.decodebytes(item["content"].encode("utf-8")) - else: - LOGGER.debug(f"can't get hash {item_hash}") - - return result + async with ( + semaphore + ): # We use semaphore to avoid having too much call at the same time + result = None + item = await api_get_request( + session=session, + base_uri=base_uri, + method=f"storage/{item_hash}", + timeout=timeout, + ) + if ( + item is not None + and item["status"] == "success" + and item["content"] is not None + ): + # TODO: IMPORTANT /!\ verify the hash of received data! + return base64.decodebytes(item["content"].encode("utf-8")) + else: + LOGGER.debug(f"can't get hash {item_hash}") + return result async def request_hash( api_servers: Sequence[str], item_hash: str, timeout: int = 1 ) -> Optional[bytes]: + """ + Request a hash from available API servers Concurrently over the network. + We take the first valid respond and close other task + """ uris: List[str] = sample(api_servers, k=len(api_servers)) + semaphore = asyncio.Semaphore(5) + connector = aiohttp.TCPConnector(limit_per_host=5) + timeout_conf = aiohttp.ClientTimeout(total=timeout) - for uri in uris: - content = await get_peer_hash_content(uri, item_hash, timeout=timeout) - if content is not None: - return content + # Use a dedicated session for each get_peer_hash_content call + tasks = [] + for url in uris: + session = aiohttp.ClientSession(connector=connector, timeout=timeout_conf) + tasks.append( + asyncio.create_task( + get_peer_hash_content_with_session( + session=session, + base_uri=url, + item_hash=item_hash, + semaphore=semaphore, + ) + ) + ) - return None # Nothing found... + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + result = None + for t in done: + try: + data = t.result() + except Exception: + continue + if data: + result = data + break + + for p in pending: + p.cancel() + await asyncio.gather(*pending, return_exceptions=True) + + return result + + +async def get_peer_hash_content_with_session( + session: aiohttp.ClientSession, + base_uri: str, + item_hash: str, + semaphore, + timeout: int = 1, +) -> Optional[bytes]: + """Wrapper to ensure session is properly closed""" + try: + async with session: + return await get_peer_hash_content( + session=session, + base_uri=base_uri, + item_hash=item_hash, + semaphore=semaphore, + timeout=timeout, + ) + except Exception as e: + LOGGER.exception(f"Error in get_peer_hash_content_with_session: {e}") + return None diff --git a/src/aleph/services/p2p/jobs.py b/src/aleph/services/p2p/jobs.py index afd5cbad..5222ef1d 100644 --- a/src/aleph/services/p2p/jobs.py +++ b/src/aleph/services/p2p/jobs.py @@ -1,8 +1,9 @@ import asyncio import logging from dataclasses import dataclass -from typing import Optional +from typing import Optional, Sequence +import aiohttp from aleph_p2p_client import AlephP2PServiceClient from configmanager import Config @@ -55,9 +56,13 @@ async def reconnect_p2p_job( await asyncio.sleep(config.p2p.reconnect_delay.value) -async def check_peer(peer_uri: str, timeout: int = 1) -> PeerStatus: +async def check_peer( + session: aiohttp.ClientSession, peer_uri: str, timeout: int = 1 +) -> PeerStatus: try: - version_info = await api_get_request(peer_uri, "version", timeout=timeout) + version_info = await api_get_request( + session=session, base_uri=peer_uri, method="version", timeout=timeout + ) if version_info is not None: return PeerStatus(peer_uri=peer_uri, is_online=True, version=version_info) @@ -67,6 +72,22 @@ async def check_peer(peer_uri: str, timeout: int = 1) -> PeerStatus: return PeerStatus(peer_uri=peer_uri, is_online=False, version=None) +async def request_version(peers: Sequence[str], my_ip: str, timeout: int = 1): + connector = aiohttp.TCPConnector(limit_per_host=5) + timeout_conf = aiohttp.ClientTimeout(total=timeout) + + async with aiohttp.ClientSession( + connector=connector, timeout=timeout_conf + ) as session: + jobs = [] + for peer in peers: + if my_ip in peer: + continue + + jobs.append(check_peer(session, peer)) + return await asyncio.gather(*jobs) + + async def tidy_http_peers_job( config: Config, session_factory: AsyncDbSessionFactory, node_cache: NodeCache ) -> None: @@ -77,7 +98,6 @@ async def tidy_http_peers_job( await asyncio.sleep(2) while True: - jobs = [] try: async with session_factory() as session: @@ -85,12 +105,7 @@ async def tidy_http_peers_job( session=session, peer_type=PeerType.HTTP ) - for peer in peers: - if my_ip in peer: - continue - - jobs.append(check_peer(peer)) - peer_statuses = await asyncio.gather(*jobs) + peer_statuses = await request_version(peers=peers, my_ip=my_ip) for peer_status in peer_statuses: peer_in_api_servers = await node_cache.has_api_server( diff --git a/src/aleph/services/storage/fileystem_engine.py b/src/aleph/services/storage/fileystem_engine.py index 4a209f4c..50b29587 100644 --- a/src/aleph/services/storage/fileystem_engine.py +++ b/src/aleph/services/storage/fileystem_engine.py @@ -1,6 +1,9 @@ from pathlib import Path from typing import Optional, Union +import aiofiles +import aiofiles.ospath + from .engine import StorageEngine @@ -16,19 +19,26 @@ def __init__(self, folder: Union[Path, str]): async def read(self, filename: str) -> Optional[bytes]: file_path = self.folder / filename - if not file_path.is_file(): + if not await aiofiles.ospath.isfile(file_path): return None - - return file_path.read_bytes() + async with aiofiles.open(file_path, "rb") as f: + return await f.read() async def write(self, filename: str, content: bytes): file_path = self.folder / filename - file_path.write_bytes(content) + async with aiofiles.open(file_path, "wb") as f: + await f.write(content) async def delete(self, filename: str): file_path = self.folder / filename - file_path.unlink(missing_ok=True) + async_unlink = aiofiles.ospath.wrap( + Path.unlink + ) # We manually wrap unlink (not handled by aiofiles) + + await async_unlink(file_path, missing_ok=True) async def exists(self, filename: str) -> bool: file_path = self.folder / filename - return file_path.exists() + return await aiofiles.ospath.exists( + file_path + ) # This func warp .exist func into async