-
Notifications
You must be signed in to change notification settings - Fork 22
Fix: remove I/O Blocking code on message processing parts #820
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 1yam-sync-db-to-async
Are you sure you want to change the base?
Conversation
this PR replace #819 |
… to avoid blocking in case of big file
a869f13
to
feb0dce
Compare
async def api_get_request(session: aiohttp.ClientSession, base_uri, method, timeout=1): | ||
uri = f"{base_uri}/api/v0/{method}" | ||
try: | ||
async with SESSIONS[timeout].get(uri) as resp: | ||
async with session.get(uri) as resp: | ||
if resp.status != 200: | ||
result = None | ||
else: | ||
result = await resp.json() | ||
return None | ||
return await resp.json() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are ignoring the timeout provided by the argument. If the session timeout is handled outside just removing the parameter, but if the argument is used by other methods that don't handle the timeout from their own, we need to use it inside the method to prevent issues.
This PR remove I/O blocking operations on parts used by message processing to bring back message parallelization
Related Clickup or Jira tickets : ALEPH-526
Self proofreading checklist
Changes
This pull request introduces significant changes to improve asynchronous handling across multiple modules, enhance efficiency, and ensure non-blocking operations. Key updates include replacing synchronous file and I/O operations with asynchronous alternatives, introducing semaphores to limit concurrent requests, and refactoring code to use shared
aiohttp.ClientSession
instances for HTTP requests.Asynchronous Enhancements:
src/aleph/handlers/content/aggregate.py
: Updated_prepend_to_aggregate
and_update_aggregate
to useasyncio.to_thread
for non-blocking execution ofmerge_aggregate_elements
. [1] [2]src/aleph/services/storage/fileystem_engine.py
: Replaced synchronous file operations with asynchronous equivalents usingaiofiles
, includingread
,write
,delete
, andexists
methods.HTTP Request Optimization:
src/aleph/services/p2p/http.py
: Refactoredapi_get_request
to use a sharedaiohttp.ClientSession
instead of maintaining separate session instances, improving resource management. Added semaphores to limit concurrent calls inget_peer_hash_content
andrequest_hash
. [1] [2] [3]src/aleph/services/p2p/jobs.py
: Introducedrequest_version
to handle peer version checks usingaiohttp.ClientSession
and asynchronous tasks. Refactoredtidy_http_peers_job
to utilizerequest_version
for better scalability. [1] [2]Code Cleanup:
src/aleph/handlers/content/post.py
: Removed debug print statements fromupdate_balances
for cleaner logging.These changes collectively enhance the system's scalability, responsiveness, and maintainability by leveraging asynchronous programming patterns effectively.
How to test
This PR doesn't add any breaking change, it's just running things in Non I/O blockign way