Skip to content

Implemented utils classes to imlpement coroutines #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ add_custom_command(TARGET day1

find_package(CURL)
if(CURL_FOUND)
add_executable(input_downloader src/utils/input_downloader.cpp)
target_link_libraries(input_downloader PRIVATE ${CURL_LIBRARIES})

set(UCORO_HDRS
"${CMAKE_CURRENT_SOURCE_DIR}/include/ucoro/coro.hpp")
add_library(ucoro INTERFACE ${UCORO_HDRS})
target_include_directories(ucoro INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_options(ucoro INTERFACE -fcoroutines)

if(UNIX)
add_executable(input_downloader src/utils/input_downloader_coro.cpp)
target_link_libraries(input_downloader PRIVATE ${CURL_LIBRARIES} ucoro)
set_property(TARGET input_downloader PROPERTY CXX_STANDARD 20)
endif()

# An example to use CURL in an async mode
add_executable(non_blocking_curl src/utils/non_blocking_curl.cpp)
target_link_libraries(non_blocking_curl PRIVATE ${CURL_LIBRARIES})
endif()
153 changes: 153 additions & 0 deletions include/ucoro/coro.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#pragma once

#include <coroutine>
#include <concepts>
#include <utility>
#include <iostream>

namespace ucoro {

template<typename T>
requires (!std::is_reference_v<T> && !std::is_pointer_v<T>)
class [[nodiscard]] Coro final
{
public:
class FinalAwaitable;
class Awaitable;

class promise_type final
{
public:
Coro get_return_object() { return Coro{std::coroutine_handle<promise_type>::from_promise(*this)}; }
std::suspend_always initial_suspend() noexcept { return {}; }

void return_value(T&& new_value) {
std::cerr << "\nPromiseType::return_value";
m_value = std::move(new_value);
}

// an exception escapes the body of the coroutine.
void unhandled_exception() {
std::terminate();
}

FinalAwaitable final_suspend() noexcept { return FinalAwaitable{}; }

void continuation(std::coroutine_handle<> continuation) {
m_continuation = continuation;
}

std::coroutine_handle<> continuation() const {
return m_continuation;
}

T&& value() && {
return std::move(m_value);
}

private:
T m_value{};

std::coroutine_handle<> m_continuation{nullptr};
};

explicit Coro(std::coroutine_handle<promise_type> handle)
: m_handle{handle} {}

~Coro() {
if(m_handle)
m_handle.destroy();
}

Coro(const Coro&) = delete;
Coro& operator=(const Coro&) = delete;

Coro(Coro&& rhs) noexcept
: m_handle(std::move(rhs.m_handle)) {
rhs.m_handle = nullptr;
}

Coro& operator=(Coro&& rhs) noexcept {
if(&rhs != this) {
if(m_handle) {
m_handle.destroy();
}
m_handle = std::exchange(rhs.m_handle, nullptr);
}
return *this;
}

Awaitable operator co_await() && noexcept;

private:
std::coroutine_handle<promise_type> m_handle;
};

template<typename T>
class Coro<T>::FinalAwaitable final {
public:
bool await_ready() const noexcept { return false; }

std::coroutine_handle<> await_suspend(std::coroutine_handle<typename Coro<T>::promise_type> coroutine) noexcept
{
std::cerr << "\nFinalAwaitable::await_suspend done? " << coroutine.done()
<< " continuation nullptr? " << (coroutine.promise().continuation() == nullptr);
// If there is a continuation call it, otherwise this is the end of the line.
// auto& promise = coroutine.promise();
// if (promise.continuation() != nullptr)
// {
// return promise.continuation();
// }
// else
// {
// return std::noop_coroutine();
// }
return coroutine.promise().continuation();
}

void await_resume() noexcept {
std::cerr << "\nFinalAwaitable::await_resume";
}
};

template<typename T>
class Coro<T>::Awaitable final
{
using promise_type = typename Coro<T>::promise_type;
public:
explicit Awaitable(std::coroutine_handle<promise_type> handler) noexcept
: m_handler{std::move(handler)} {}

bool await_ready() const {
return !m_handler || m_handler.done();
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> continuation) {
std::cerr << "\nAwaitableBase::await_suspend";
// Store the continuation in the task's promise so that the final_suspend()
// knows to resume this coroutine when the task completes.

m_handler.promise().continuation(continuation);

// Then we resume the task's coroutine, which is currently suspended
// at the initial-suspend-point (ie. at the open curly brace).
return m_handler;
}

T&& await_resume() const {
std::cerr << "\nResponseAwaitable::await_resume";
return std::move(m_handler.promise()).value();
}

protected:
std::coroutine_handle<promise_type> m_handler;
};


template<typename T>
typename Coro<T>::Awaitable Coro<T>::operator co_await() && noexcept {
return typename Coro<T>::Awaitable{m_handle};
}


} //< namespace ucoro
70 changes: 70 additions & 0 deletions src/utils/non_blocking_curl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <curl/curl.h>

size_t write_data(void *ptr, size_t size, size_t nmemb, FILE *stream) {
return fwrite(ptr, size, nmemb, stream);
}

int main(void) {
CURL *easy_handle;
CURLM *multi_handle;
CURLMcode mcode;
int still_running = 1;

const char *url = "https://example.com/file.txt";
const char *output_filename = "downloaded_file.txt";

FILE *fp = fopen(output_filename, "wb");
if (!fp) {
fprintf(stderr, "Failed to open file for writing\n");
return 1;
}

curl_global_init(CURL_GLOBAL_DEFAULT);
easy_handle = curl_easy_init();

curl_easy_setopt(easy_handle, CURLOPT_URL, url);
curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, write_data);
curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, fp);
curl_easy_setopt(easy_handle, CURLOPT_FOLLOWLOCATION, 1L);

multi_handle = curl_multi_init();
curl_multi_add_handle(multi_handle, easy_handle);


while (still_running) {
printf("\nmulti perform");
curl_multi_perform(multi_handle, &still_running);
int numfds;
printf("\ncurl_multi_wait started");
mcode = curl_multi_wait(multi_handle, NULL, 0, 10000, &numfds);
printf("\ncurl_multi_wait finished");
if (mcode != CURLM_OK) {
fprintf(stderr, "curl_multi_wait() failed: %s\n", curl_multi_strerror(mcode));
break;
}
}

// Check for any errors
CURLMsg *msg;
int msgs_left;
while ((msg = curl_multi_info_read(multi_handle, &msgs_left))) {
if (msg->msg == CURLMSG_DONE) {
if (msg->data.result != CURLE_OK) {
fprintf(stderr, "Download failed: %s\n", curl_easy_strerror(msg->data.result));
} else {
printf("Download completed successfully.\n");
}
}
}

curl_multi_remove_handle(multi_handle, easy_handle);
curl_easy_cleanup(easy_handle);
curl_multi_cleanup(multi_handle);
curl_global_cleanup();
fclose(fp);

return 0;
}