diff --git a/CMakeLists.txt b/CMakeLists.txt index c39d6ae..a94e575 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -58,6 +58,20 @@ add_custom_command(TARGET day1 find_package(CURL) if(CURL_FOUND) - add_executable(input_downloader src/utils/input_downloader.cpp) - target_link_libraries(input_downloader PRIVATE ${CURL_LIBRARIES}) + + set(UCORO_HDRS + "${CMAKE_CURRENT_SOURCE_DIR}/include/ucoro/coro.hpp") + add_library(ucoro INTERFACE ${UCORO_HDRS}) + target_include_directories(ucoro INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/include) + target_compile_options(ucoro INTERFACE -fcoroutines) + + if(UNIX) + add_executable(input_downloader src/utils/input_downloader_coro.cpp) + target_link_libraries(input_downloader PRIVATE ${CURL_LIBRARIES} ucoro) + set_property(TARGET input_downloader PROPERTY CXX_STANDARD 20) + endif() + + # An example to use CURL in an async mode + add_executable(non_blocking_curl src/utils/non_blocking_curl.cpp) + target_link_libraries(non_blocking_curl PRIVATE ${CURL_LIBRARIES}) endif() \ No newline at end of file diff --git a/include/ucoro/coro.hpp b/include/ucoro/coro.hpp new file mode 100644 index 0000000..7a62589 --- /dev/null +++ b/include/ucoro/coro.hpp @@ -0,0 +1,153 @@ +#pragma once + +#include +#include +#include +#include + +namespace ucoro { + +template +requires (!std::is_reference_v && !std::is_pointer_v) +class [[nodiscard]] Coro final +{ +public: + class FinalAwaitable; + class Awaitable; + + class promise_type final + { + public: + Coro get_return_object() { return Coro{std::coroutine_handle::from_promise(*this)}; } + std::suspend_always initial_suspend() noexcept { return {}; } + + void return_value(T&& new_value) { + std::cerr << "\nPromiseType::return_value"; + m_value = std::move(new_value); + } + + // an exception escapes the body of the coroutine. + void unhandled_exception() { + std::terminate(); + } + + FinalAwaitable final_suspend() noexcept { return FinalAwaitable{}; } + + void continuation(std::coroutine_handle<> continuation) { + m_continuation = continuation; + } + + std::coroutine_handle<> continuation() const { + return m_continuation; + } + + T&& value() && { + return std::move(m_value); + } + + private: + T m_value{}; + + std::coroutine_handle<> m_continuation{nullptr}; + }; + + explicit Coro(std::coroutine_handle handle) + : m_handle{handle} {} + + ~Coro() { + if(m_handle) + m_handle.destroy(); + } + + Coro(const Coro&) = delete; + Coro& operator=(const Coro&) = delete; + + Coro(Coro&& rhs) noexcept + : m_handle(std::move(rhs.m_handle)) { + rhs.m_handle = nullptr; + } + + Coro& operator=(Coro&& rhs) noexcept { + if(&rhs != this) { + if(m_handle) { + m_handle.destroy(); + } + m_handle = std::exchange(rhs.m_handle, nullptr); + } + return *this; + } + + Awaitable operator co_await() && noexcept; + +private: + std::coroutine_handle m_handle; +}; + +template +class Coro::FinalAwaitable final { +public: + bool await_ready() const noexcept { return false; } + + std::coroutine_handle<> await_suspend(std::coroutine_handle::promise_type> coroutine) noexcept + { + std::cerr << "\nFinalAwaitable::await_suspend done? " << coroutine.done() + << " continuation nullptr? " << (coroutine.promise().continuation() == nullptr); + // If there is a continuation call it, otherwise this is the end of the line. + // auto& promise = coroutine.promise(); + // if (promise.continuation() != nullptr) + // { + // return promise.continuation(); + // } + // else + // { + // return std::noop_coroutine(); + // } + return coroutine.promise().continuation(); + } + + void await_resume() noexcept { + std::cerr << "\nFinalAwaitable::await_resume"; + } +}; + +template +class Coro::Awaitable final +{ + using promise_type = typename Coro::promise_type; +public: + explicit Awaitable(std::coroutine_handle handler) noexcept + : m_handler{std::move(handler)} {} + + bool await_ready() const { + return !m_handler || m_handler.done(); + } + + std::coroutine_handle<> await_suspend(std::coroutine_handle continuation) { + std::cerr << "\nAwaitableBase::await_suspend"; + // Store the continuation in the task's promise so that the final_suspend() + // knows to resume this coroutine when the task completes. + + m_handler.promise().continuation(continuation); + + // Then we resume the task's coroutine, which is currently suspended + // at the initial-suspend-point (ie. at the open curly brace). + return m_handler; + } + + T&& await_resume() const { + std::cerr << "\nResponseAwaitable::await_resume"; + return std::move(m_handler.promise()).value(); + } + +protected: + std::coroutine_handle m_handler; +}; + + +template +typename Coro::Awaitable Coro::operator co_await() && noexcept { + return typename Coro::Awaitable{m_handle}; +} + + +} //< namespace ucoro \ No newline at end of file diff --git a/src/utils/non_blocking_curl.cpp b/src/utils/non_blocking_curl.cpp new file mode 100644 index 0000000..1e031bc --- /dev/null +++ b/src/utils/non_blocking_curl.cpp @@ -0,0 +1,70 @@ +#include +#include +#include +#include + +size_t write_data(void *ptr, size_t size, size_t nmemb, FILE *stream) { + return fwrite(ptr, size, nmemb, stream); +} + +int main(void) { + CURL *easy_handle; + CURLM *multi_handle; + CURLMcode mcode; + int still_running = 1; + + const char *url = "https://example.com/file.txt"; + const char *output_filename = "downloaded_file.txt"; + + FILE *fp = fopen(output_filename, "wb"); + if (!fp) { + fprintf(stderr, "Failed to open file for writing\n"); + return 1; + } + + curl_global_init(CURL_GLOBAL_DEFAULT); + easy_handle = curl_easy_init(); + + curl_easy_setopt(easy_handle, CURLOPT_URL, url); + curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, write_data); + curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, fp); + curl_easy_setopt(easy_handle, CURLOPT_FOLLOWLOCATION, 1L); + + multi_handle = curl_multi_init(); + curl_multi_add_handle(multi_handle, easy_handle); + + + while (still_running) { + printf("\nmulti perform"); + curl_multi_perform(multi_handle, &still_running); + int numfds; + printf("\ncurl_multi_wait started"); + mcode = curl_multi_wait(multi_handle, NULL, 0, 10000, &numfds); + printf("\ncurl_multi_wait finished"); + if (mcode != CURLM_OK) { + fprintf(stderr, "curl_multi_wait() failed: %s\n", curl_multi_strerror(mcode)); + break; + } + } + + // Check for any errors + CURLMsg *msg; + int msgs_left; + while ((msg = curl_multi_info_read(multi_handle, &msgs_left))) { + if (msg->msg == CURLMSG_DONE) { + if (msg->data.result != CURLE_OK) { + fprintf(stderr, "Download failed: %s\n", curl_easy_strerror(msg->data.result)); + } else { + printf("Download completed successfully.\n"); + } + } + } + + curl_multi_remove_handle(multi_handle, easy_handle); + curl_easy_cleanup(easy_handle); + curl_multi_cleanup(multi_handle); + curl_global_cleanup(); + fclose(fp); + + return 0; +}