Skip to content

Add support for streaming query #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ ChDB::Database.open(':memory:') do |db|
json_data = db.query_with_format('SELECT 1 as a, 2 as b', 'JSON')
p json_data
end

# Execute streaming query
ChDB::Database.open(':memory:') do |db|
total_rows = 0
collected = []
db.send_query('SELECT * FROM numbers(200000)') do |chunk|
collected << chunk.buf
total_rows += chunk.rows_read
end
p total_rows # => 200000
end
```

## Thread Safety
Expand Down
3 changes: 3 additions & 0 deletions chdb.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Gem::Specification.new do |s|
'ext/chdb/extconf.rb',
'ext/chdb/local_result.c',
'ext/chdb/local_result.h',
'ext/chdb/streaming_result.c',
'ext/chdb/streaming_result.h',
'lib/chdb.rb',
'lib/chdb/constants.rb',
'lib/chdb/data_path.rb',
Expand All @@ -62,6 +64,7 @@ Gem::Specification.new do |s|
'lib/chdb/result_set.rb',
'lib/chdb/sql_processor.rb',
'lib/chdb/statement.rb',
'lib/chdb/streaming_result.rb',
'lib/chdb/version_info.rb',
'lib/chdb/version.rb'
]
Expand Down
2 changes: 1 addition & 1 deletion dependencies.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
chdb:
version: "3.1.2"
version: "3.2.0"
2 changes: 2 additions & 0 deletions ext/chdb/chdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "connection.h"
#include "exception.h"
#include "local_result.h"
#include "streaming_result.h"

void Init_chdb_native()
{
Expand All @@ -15,6 +16,7 @@ void Init_chdb_native()
init_chdb_handle();
init_chdb_constants();
init_local_result();
init_streaming_result();
init_connection();

DEBUG_PRINT("chdb extension initialized successfully");
Expand Down
28 changes: 25 additions & 3 deletions ext/chdb/chdb_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ connect_chdb_func connect_chdb_ptr = NULL;
close_conn_func close_conn_ptr = NULL;
query_conn_func query_conn_ptr = NULL;
free_result_v2_func free_result_v2_ptr = NULL;
query_conn_streaming_func query_conn_streaming_ptr = NULL;
chdb_streaming_result_error_func chdb_streaming_result_error_ptr = NULL;
chdb_streaming_fetch_result_func chdb_streaming_fetch_result_ptr = NULL;
chdb_streaming_cancel_query_func chdb_streaming_cancel_query_ptr = NULL;
chdb_destroy_result_func chdb_destroy_result_ptr = NULL;

VALUE get_chdb_rb_path()
{
Expand Down Expand Up @@ -39,6 +44,11 @@ void init_chdb_handle()
close_conn_ptr = NULL;
query_conn_ptr = NULL;
free_result_v2_ptr = NULL;
query_conn_streaming_ptr = NULL;
chdb_streaming_result_error_ptr = NULL;
chdb_streaming_fetch_result_ptr = NULL;
chdb_streaming_cancel_query_ptr = NULL;
chdb_destroy_result_ptr = NULL;

chdb_handle = dlopen(RSTRING_PTR(lib_path), RTLD_LAZY | RTLD_GLOBAL);
if (!chdb_handle)
Expand All @@ -51,18 +61,30 @@ void init_chdb_handle()
close_conn_ptr = (close_conn_func)dlsym(chdb_handle, "close_conn");
query_conn_ptr = (query_conn_func)dlsym(chdb_handle, "query_conn");
free_result_v2_ptr = (free_result_v2_func)dlsym(chdb_handle, "free_result_v2");
query_conn_streaming_ptr = (query_conn_streaming_func)dlsym(chdb_handle, "query_conn_streaming");
chdb_streaming_result_error_ptr = (chdb_streaming_result_error_func)dlsym(chdb_handle, "chdb_streaming_result_error");
chdb_streaming_fetch_result_ptr = (chdb_streaming_fetch_result_func)dlsym(chdb_handle, "chdb_streaming_fetch_result");
chdb_streaming_cancel_query_ptr = (chdb_streaming_cancel_query_func)dlsym(chdb_handle, "chdb_streaming_cancel_query");
chdb_destroy_result_ptr = (chdb_destroy_result_func)dlsym(chdb_handle, "chdb_destroy_result");

if (!connect_chdb_ptr || !close_conn_ptr || !query_conn_ptr || !free_result_v2_ptr)
if (!connect_chdb_ptr || !close_conn_ptr || !query_conn_ptr || !free_result_v2_ptr ||
!query_conn_streaming_ptr || !chdb_streaming_result_error_ptr || !chdb_streaming_fetch_result_ptr ||
!chdb_streaming_cancel_query_ptr || !chdb_destroy_result_ptr)
{
close_chdb_handle();

rb_raise(cChDBError,
"Symbol loading failed: %s\nMissing functions: connect_chdb(%p) close_conn(%p) query_conn(%p), free_result_v2(%p)",
"Symbol loading failed: %s\nMissing functions: connect_chdb(%p), close_conn(%p), query_conn(%p), free_result_v2(%p), query_conn_streaming(%p), chdb_streaming_result_error(%p), chdb_streaming_fetch_result(%p), chdb_streaming_cancel_query(%p), chdb_destroy_result(%p)",
dlerror(),
(void*)connect_chdb_ptr,
(void*)close_conn_ptr,
(void*)query_conn_ptr,
(void*)free_result_v2_ptr);
(void*)free_result_v2_ptr,
(void*)query_conn_streaming_ptr,
(void*)chdb_streaming_result_error_ptr,
(void*)chdb_streaming_fetch_result_ptr,
(void*)chdb_streaming_cancel_query_ptr,
(void*)chdb_destroy_result_ptr);
}

rb_set_end_proc(close_chdb_handle, 0);
Expand Down
12 changes: 12 additions & 0 deletions ext/chdb/chdb_handle.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
#ifndef CHDB_HANDLE_H
#define CHDB_HANDLE_H

#include "include/chdb.h"

typedef struct chdb_conn **(*connect_chdb_func)(int, char**);
typedef void (*close_conn_func)(struct chdb_conn**);
typedef struct local_result_v2 *(*query_conn_func)(struct chdb_conn*, const char*, const char*);
typedef void (*free_result_v2_func)(struct local_result_v2*);
typedef chdb_streaming_result *(*query_conn_streaming_func)(struct chdb_conn*, const char*, const char*);
typedef const char *(*chdb_streaming_result_error_func)(chdb_streaming_result*);
typedef struct local_result_v2 *(*chdb_streaming_fetch_result_func)(struct chdb_conn*, chdb_streaming_result*);
typedef void (*chdb_streaming_cancel_query_func)(struct chdb_conn*, chdb_streaming_result*);
typedef void (*chdb_destroy_result_func)(chdb_streaming_result*);

extern connect_chdb_func connect_chdb_ptr;
extern close_conn_func close_conn_ptr;
extern query_conn_func query_conn_ptr;
extern free_result_v2_func free_result_v2_ptr;
extern query_conn_streaming_func query_conn_streaming_ptr;
extern chdb_streaming_result_error_func chdb_streaming_result_error_ptr;
extern chdb_streaming_fetch_result_func chdb_streaming_fetch_result_ptr;
extern chdb_streaming_cancel_query_func chdb_streaming_cancel_query_ptr;
extern chdb_destroy_result_func chdb_destroy_result_ptr;

extern void *chdb_handle;

Expand Down
81 changes: 81 additions & 0 deletions ext/chdb/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "exception.h"
#include "include/chdb.h"
#include "local_result.h"
#include "streaming_result.h"

void connection_free(void *ptr)
{
Expand Down Expand Up @@ -32,6 +33,9 @@ void init_connection()
rb_define_alloc_func(cConnection, connection_alloc);
rb_define_method(cConnection, "initialize", connection_initialize, 2);
rb_define_method(cConnection, "query", connection_query, 2);
rb_define_method(cConnection, "send_query", connection_streaming_query, 2);
rb_define_method(cConnection, "fetch_streaming_result", connection_streaming_fecth_result, 1);
rb_define_method(cConnection, "cancel_streaming_query", connection_streaming_cancel_query, 1);
rb_define_method(cConnection, "close", connection_close, 0);
}

Expand Down Expand Up @@ -105,6 +109,83 @@ VALUE connection_query(VALUE self, VALUE query, VALUE format)
return result_obj;
}

VALUE connection_streaming_query(VALUE self, VALUE query, VALUE format)
{
Connection *conn;
TypedData_Get_Struct(self, Connection, &ConnectionType, conn);

Check_Type(query, T_STRING);
Check_Type(format, T_STRING);

chdb_streaming_result *c_result = query_conn_streaming_ptr(
*conn->c_conn,
StringValueCStr(query),
StringValueCStr(format)
);

if (!c_result)
{
rb_raise(cChDBError, "Query failed with nil streaming result");
}

const char *error = chdb_streaming_result_error_ptr(c_result);
if (error)
{
VALUE error_message = rb_str_new_cstr(error);
chdb_destroy_result_ptr(c_result);
rb_raise(cChDBError, "CHDB error: %s", StringValueCStr(error_message));
}

VALUE result_obj = rb_class_new_instance(0, NULL, cStreamingResult);
StreamingResult *result;
TypedData_Get_Struct(result_obj, StreamingResult, &StreamingResultType, result);
result->c_result = c_result;

return result_obj;
}

VALUE connection_streaming_fecth_result(VALUE self, VALUE streaming_result)
{
Connection *conn;
TypedData_Get_Struct(self, Connection, &ConnectionType, conn);

StreamingResult *result;
TypedData_Get_Struct(streaming_result, StreamingResult, &StreamingResultType, result);

struct local_result_v2 *c_result = chdb_streaming_fetch_result_ptr(*conn->c_conn, result->c_result);

if (!c_result)
{
rb_raise(cChDBError, "Failed to fetch streaming result");
}

if (c_result->error_message)
{
VALUE error_message = rb_str_new_cstr(c_result->error_message);
free_result_v2_ptr(c_result);
rb_raise(cChDBError, "CHDB error: %s", StringValueCStr(error_message));
}

VALUE result_obj = rb_class_new_instance(0, NULL, cLocalResult);
LocalResult *local_result;
TypedData_Get_Struct(result_obj, LocalResult, &LocalResultType, local_result);
local_result->c_result = c_result;

return result_obj;
}

VALUE connection_streaming_cancel_query(VALUE self, VALUE streaming_result)
{
Connection *conn;
TypedData_Get_Struct(self, Connection, &ConnectionType, conn);

StreamingResult *result;
TypedData_Get_Struct(streaming_result, StreamingResult, &StreamingResultType, result);

chdb_streaming_cancel_query_ptr(*conn->c_conn, result->c_result);
return Qnil;
}

VALUE connection_close(VALUE self)
{
Connection *conn;
Expand Down
6 changes: 6 additions & 0 deletions ext/chdb/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ VALUE connection_initialize(VALUE self, VALUE argc, VALUE argv);

VALUE connection_query(VALUE self, VALUE query, VALUE format);

VALUE connection_streaming_query(VALUE self, VALUE query, VALUE format);

VALUE connection_streaming_fecth_result(VALUE self, VALUE streaming_result);

VALUE connection_streaming_cancel_query(VALUE self, VALUE streaming_result);

VALUE connection_close(VALUE self);

#endif
1 change: 0 additions & 1 deletion ext/chdb/local_result.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "local_result.h"

#include "constants.h"
#include "include/chdb.h"
#include "chdb_handle.h"

VALUE cLocalResult;
Expand Down
2 changes: 2 additions & 0 deletions ext/chdb/local_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include <ruby.h>

#include "include/chdb.h"

typedef struct
{
struct local_result_v2 *c_result;
Expand Down
39 changes: 39 additions & 0 deletions ext/chdb/streaming_result.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "streaming_result.h"

#include "constants.h"
#include "chdb_handle.h"

VALUE cStreamingResult;

void streaming_result_free(void *ptr)
{
StreamingResult *result = (StreamingResult *)ptr;
DEBUG_PRINT("Freeing StreamingResult: %p", (void*)result);
if (result->c_result)
{
DEBUG_PRINT("Freeing chdb_streaming_result: %p", (void*)result->c_result);
chdb_destroy_result_ptr(result->c_result);
}
free(result);
}

const rb_data_type_t StreamingResultType =
{
"StreamingResult",
{NULL, streaming_result_free, NULL},
};

void init_streaming_result()
{
VALUE mChDB = rb_define_module("ChDB");
cStreamingResult = rb_define_class_under(mChDB, "StreamingResult", rb_cObject);
rb_define_alloc_func(cStreamingResult, streaming_result_alloc);
}

VALUE streaming_result_alloc(VALUE klass)
{
StreamingResult *result = ALLOC(StreamingResult);
DEBUG_PRINT("Allocating StreamingResult: %p", (void*)result);
result->c_result = NULL;
return rb_data_typed_object_wrap(klass, result, &StreamingResultType);
}
20 changes: 20 additions & 0 deletions ext/chdb/streaming_result.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#ifndef CHDB_STREAMING_RESULT_H
#define CHDB_STREAMING_RESULT_H

#include <ruby.h>

#include "include/chdb.h"

typedef struct
{
chdb_streaming_result *c_result;
} StreamingResult;

extern VALUE cStreamingResult;
extern const rb_data_type_t StreamingResultType;

void init_streaming_result();

VALUE streaming_result_alloc(VALUE klass);

#endif
18 changes: 15 additions & 3 deletions lib/chdb/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ def execute(sql, bind_vars = [], &block)
end
end

def execute2(sql, *bind_vars, &) # rubocop:disable Metrics/MethodLength
def execute2(sql, *bind_vars, &block) # rubocop:disable Metrics/MethodLength
prepare(sql) do |stmt|
result = stmt.execute(*bind_vars)
stmt.parse

if block_given?
if block
yield stmt.columns
result.each(&)
result.each(&block)
else
return result.each_with_object([stmt.columns]) do |row, arr|
arr << row
Expand All @@ -128,6 +128,18 @@ def query_with_format(sql, format = 'CSV', bind_vars = [])
end
end

def send_query(sql, format = 'CSV', bind_vars = [], &block)
prepare(sql) do |stmt|
result = stmt.send_query(bind_vars, format)

if block
result.each(&block)
else
result
end
end
end

def get_first_row(sql, *bind_vars)
execute(sql, *bind_vars).first
end
Expand Down
Loading