From 885f46b7c9bd42fa80afa93b27b9e85eccd290f1 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Thu, 3 Apr 2025 00:50:55 +0800 Subject: [PATCH 1/5] feat: Add support for streaming query --- dependencies.yml | 2 +- ext/chdb/chdb.c | 2 + ext/chdb/chdb_handle.c | 28 ++++- ext/chdb/chdb_handle.h | 12 ++ ext/chdb/connection.c | 81 +++++++++++++ ext/chdb/connection.h | 6 + ext/chdb/local_result.c | 1 - ext/chdb/local_result.h | 2 + ext/chdb/streaming_result.c | 39 +++++++ ext/chdb/streaming_result.h | 20 ++++ lib/chdb/database.rb | 18 ++- lib/chdb/result_set.rb | 40 +++++++ lib/chdb/statement.rb | 20 +++- lib/chdb/streaming_result.rb | 8 ++ spec/chdb/database_spec.rb | 212 ++++++++++++++++++++++++----------- spec/chdb/statement_spec.rb | 10 +- 16 files changed, 420 insertions(+), 81 deletions(-) create mode 100644 ext/chdb/streaming_result.c create mode 100644 ext/chdb/streaming_result.h create mode 100644 lib/chdb/streaming_result.rb diff --git a/dependencies.yml b/dependencies.yml index f31a6f3..78cc84a 100644 --- a/dependencies.yml +++ b/dependencies.yml @@ -1,2 +1,2 @@ chdb: - version: "3.1.2" \ No newline at end of file + version: "3.2.0" \ No newline at end of file diff --git a/ext/chdb/chdb.c b/ext/chdb/chdb.c index 3372de0..28298c7 100644 --- a/ext/chdb/chdb.c +++ b/ext/chdb/chdb.c @@ -6,6 +6,7 @@ #include "connection.h" #include "exception.h" #include "local_result.h" +#include "streaming_result.h" void Init_chdb_native() { @@ -15,6 +16,7 @@ void Init_chdb_native() init_chdb_handle(); init_chdb_constants(); init_local_result(); + init_streaming_result(); init_connection(); DEBUG_PRINT("chdb extension initialized successfully"); diff --git a/ext/chdb/chdb_handle.c b/ext/chdb/chdb_handle.c index 06e3fc8..f8f0ba2 100644 --- a/ext/chdb/chdb_handle.c +++ b/ext/chdb/chdb_handle.c @@ -10,6 +10,11 @@ connect_chdb_func connect_chdb_ptr = NULL; close_conn_func close_conn_ptr = NULL; query_conn_func query_conn_ptr = NULL; free_result_v2_func free_result_v2_ptr = NULL; +query_conn_streaming_func query_conn_streaming_ptr = NULL; +chdb_streaming_result_error_func chdb_streaming_result_error_ptr = NULL; +chdb_streaming_fetch_result_func chdb_streaming_fetch_result_ptr = NULL; +chdb_streaming_cancel_query_func chdb_streaming_cancel_query_ptr = NULL; +chdb_destroy_result_func chdb_destroy_result_ptr = NULL; VALUE get_chdb_rb_path() { @@ -39,6 +44,11 @@ void init_chdb_handle() close_conn_ptr = NULL; query_conn_ptr = NULL; free_result_v2_ptr = NULL; + query_conn_streaming_ptr = NULL; + chdb_streaming_result_error_ptr = NULL; + chdb_streaming_fetch_result_ptr = NULL; + chdb_streaming_cancel_query_ptr = NULL; + chdb_destroy_result_ptr = NULL; chdb_handle = dlopen(RSTRING_PTR(lib_path), RTLD_LAZY | RTLD_GLOBAL); if (!chdb_handle) @@ -51,18 +61,30 @@ void init_chdb_handle() close_conn_ptr = (close_conn_func)dlsym(chdb_handle, "close_conn"); query_conn_ptr = (query_conn_func)dlsym(chdb_handle, "query_conn"); free_result_v2_ptr = (free_result_v2_func)dlsym(chdb_handle, "free_result_v2"); + query_conn_streaming_ptr = (query_conn_streaming_func)dlsym(chdb_handle, "query_conn_streaming"); + chdb_streaming_result_error_ptr = (chdb_streaming_result_error_func)dlsym(chdb_handle, "chdb_streaming_result_error"); + chdb_streaming_fetch_result_ptr = (chdb_streaming_fetch_result_func)dlsym(chdb_handle, "chdb_streaming_fetch_result"); + chdb_streaming_cancel_query_ptr = (chdb_streaming_cancel_query_func)dlsym(chdb_handle, "chdb_streaming_cancel_query"); + chdb_destroy_result_ptr = (chdb_destroy_result_func)dlsym(chdb_handle, "chdb_destroy_result"); - if (!connect_chdb_ptr || !close_conn_ptr || !query_conn_ptr || !free_result_v2_ptr) + if (!connect_chdb_ptr || !close_conn_ptr || !query_conn_ptr || !free_result_v2_ptr || + !query_conn_streaming_ptr || !chdb_streaming_result_error_ptr || !chdb_streaming_fetch_result_ptr || + !chdb_streaming_cancel_query_ptr || !chdb_destroy_result_ptr) { close_chdb_handle(); rb_raise(cChDBError, - "Symbol loading failed: %s\nMissing functions: connect_chdb(%p) close_conn(%p) query_conn(%p), free_result_v2(%p)", + "Symbol loading failed: %s\nMissing functions: connect_chdb(%p), close_conn(%p), query_conn(%p), free_result_v2(%p), query_conn_streaming(%p), chdb_streaming_result_error(%p), chdb_streaming_fetch_result(%p), chdb_streaming_cancel_query(%p), chdb_destroy_result(%p)", dlerror(), (void*)connect_chdb_ptr, (void*)close_conn_ptr, (void*)query_conn_ptr, - (void*)free_result_v2_ptr); + (void*)free_result_v2_ptr, + (void*)query_conn_streaming_ptr, + (void*)chdb_streaming_result_error_ptr, + (void*)chdb_streaming_fetch_result_ptr, + (void*)chdb_streaming_cancel_query_ptr, + (void*)chdb_destroy_result_ptr); } rb_set_end_proc(close_chdb_handle, 0); diff --git a/ext/chdb/chdb_handle.h b/ext/chdb/chdb_handle.h index b68d63c..adb8002 100644 --- a/ext/chdb/chdb_handle.h +++ b/ext/chdb/chdb_handle.h @@ -1,15 +1,27 @@ #ifndef CHDB_HANDLE_H #define CHDB_HANDLE_H +#include "include/chdb.h" + typedef struct chdb_conn **(*connect_chdb_func)(int, char**); typedef void (*close_conn_func)(struct chdb_conn**); typedef struct local_result_v2 *(*query_conn_func)(struct chdb_conn*, const char*, const char*); typedef void (*free_result_v2_func)(struct local_result_v2*); +typedef chdb_streaming_result *(*query_conn_streaming_func)(struct chdb_conn*, const char*, const char*); +typedef const char *(*chdb_streaming_result_error_func)(chdb_streaming_result*); +typedef struct local_result_v2 *(*chdb_streaming_fetch_result_func)(struct chdb_conn*, chdb_streaming_result*); +typedef void (*chdb_streaming_cancel_query_func)(struct chdb_conn*, chdb_streaming_result*); +typedef void (*chdb_destroy_result_func)(chdb_streaming_result*); extern connect_chdb_func connect_chdb_ptr; extern close_conn_func close_conn_ptr; extern query_conn_func query_conn_ptr; extern free_result_v2_func free_result_v2_ptr; +extern query_conn_streaming_func query_conn_streaming_ptr; +extern chdb_streaming_result_error_func chdb_streaming_result_error_ptr; +extern chdb_streaming_fetch_result_func chdb_streaming_fetch_result_ptr; +extern chdb_streaming_cancel_query_func chdb_streaming_cancel_query_ptr; +extern chdb_destroy_result_func chdb_destroy_result_ptr; extern void *chdb_handle; diff --git a/ext/chdb/connection.c b/ext/chdb/connection.c index 3e2e0df..bbcad78 100644 --- a/ext/chdb/connection.c +++ b/ext/chdb/connection.c @@ -5,6 +5,7 @@ #include "exception.h" #include "include/chdb.h" #include "local_result.h" +#include "streaming_result.h" void connection_free(void *ptr) { @@ -32,6 +33,9 @@ void init_connection() rb_define_alloc_func(cConnection, connection_alloc); rb_define_method(cConnection, "initialize", connection_initialize, 2); rb_define_method(cConnection, "query", connection_query, 2); + rb_define_method(cConnection, "send_query", connection_streaming_query, 2); + rb_define_method(cConnection, "fetch_streaming_result", connection_streaming_fecth_result, 1); + rb_define_method(cConnection, "cancel_streaming_query", connection_streaming_cancel_query, 1); rb_define_method(cConnection, "close", connection_close, 0); } @@ -105,6 +109,83 @@ VALUE connection_query(VALUE self, VALUE query, VALUE format) return result_obj; } +VALUE connection_streaming_query(VALUE self, VALUE query, VALUE format) +{ + Connection *conn; + TypedData_Get_Struct(self, Connection, &ConnectionType, conn); + + Check_Type(query, T_STRING); + Check_Type(format, T_STRING); + + chdb_streaming_result *c_result = query_conn_streaming_ptr( + *conn->c_conn, + StringValueCStr(query), + StringValueCStr(format) + ); + + if (!c_result) + { + rb_raise(cChDBError, "Query failed with nil streaming result"); + } + + const char *error = chdb_streaming_result_error_ptr(c_result); + if (error) + { + VALUE error_message = rb_str_new_cstr(error); + chdb_destroy_result_ptr(c_result); + rb_raise(cChDBError, "CHDB error: %s", StringValueCStr(error_message)); + } + + VALUE result_obj = rb_class_new_instance(0, NULL, cStreamingResult); + StreamingResult *result; + TypedData_Get_Struct(result_obj, StreamingResult, &StreamingResultType, result); + result->c_result = c_result; + + return result_obj; +} + +VALUE connection_streaming_fecth_result(VALUE self, VALUE streaming_result) +{ + Connection *conn; + TypedData_Get_Struct(self, Connection, &ConnectionType, conn); + + StreamingResult *result; + TypedData_Get_Struct(streaming_result, StreamingResult, &StreamingResultType, result); + + struct local_result_v2 *c_result = chdb_streaming_fetch_result_ptr(*conn->c_conn, result->c_result); + + if (!c_result) + { + rb_raise(cChDBError, "Failed to fetch streaming result"); + } + + if (c_result->error_message) + { + VALUE error_message = rb_str_new_cstr(c_result->error_message); + free_result_v2_ptr(c_result); + rb_raise(cChDBError, "CHDB error: %s", StringValueCStr(error_message)); + } + + VALUE result_obj = rb_class_new_instance(0, NULL, cLocalResult); + LocalResult *local_result; + TypedData_Get_Struct(result_obj, LocalResult, &LocalResultType, local_result); + local_result->c_result = c_result; + + return result_obj; +} + +VALUE connection_streaming_cancel_query(VALUE self, VALUE streaming_result) +{ + Connection *conn; + TypedData_Get_Struct(self, Connection, &ConnectionType, conn); + + StreamingResult *result; + TypedData_Get_Struct(streaming_result, StreamingResult, &StreamingResultType, result); + + chdb_streaming_cancel_query_ptr(*conn->c_conn, result->c_result); + return Qnil; +} + VALUE connection_close(VALUE self) { Connection *conn; diff --git a/ext/chdb/connection.h b/ext/chdb/connection.h index a22e07d..0778fbf 100644 --- a/ext/chdb/connection.h +++ b/ext/chdb/connection.h @@ -18,6 +18,12 @@ VALUE connection_initialize(VALUE self, VALUE argc, VALUE argv); VALUE connection_query(VALUE self, VALUE query, VALUE format); +VALUE connection_streaming_query(VALUE self, VALUE query, VALUE format); + +VALUE connection_streaming_fecth_result(VALUE self, VALUE streaming_result); + +VALUE connection_streaming_cancel_query(VALUE self, VALUE streaming_result); + VALUE connection_close(VALUE self); #endif diff --git a/ext/chdb/local_result.c b/ext/chdb/local_result.c index b00e175..a8787f5 100644 --- a/ext/chdb/local_result.c +++ b/ext/chdb/local_result.c @@ -1,7 +1,6 @@ #include "local_result.h" #include "constants.h" -#include "include/chdb.h" #include "chdb_handle.h" VALUE cLocalResult; diff --git a/ext/chdb/local_result.h b/ext/chdb/local_result.h index e41dff3..b63ba79 100644 --- a/ext/chdb/local_result.h +++ b/ext/chdb/local_result.h @@ -3,6 +3,8 @@ #include +#include "include/chdb.h" + typedef struct { struct local_result_v2 *c_result; diff --git a/ext/chdb/streaming_result.c b/ext/chdb/streaming_result.c new file mode 100644 index 0000000..56bffa7 --- /dev/null +++ b/ext/chdb/streaming_result.c @@ -0,0 +1,39 @@ +#include "streaming_result.h" + +#include "constants.h" +#include "chdb_handle.h" + +VALUE cStreamingResult; + +void streaming_result_free(void *ptr) +{ + StreamingResult *result = (StreamingResult *)ptr; + DEBUG_PRINT("Freeing StreamingResult: %p", (void*)result); + if (result->c_result) + { + DEBUG_PRINT("Freeing chdb_streaming_result: %p", (void*)result->c_result); + chdb_destroy_result_ptr(result->c_result); + } + free(result); +} + +const rb_data_type_t StreamingResultType = +{ + "StreamingResult", + {NULL, streaming_result_free, NULL}, +}; + +void init_streaming_result() +{ + VALUE mChDB = rb_define_module("ChDB"); + cStreamingResult = rb_define_class_under(mChDB, "StreamingResult", rb_cObject); + rb_define_alloc_func(cStreamingResult, streaming_result_alloc); +} + +VALUE streaming_result_alloc(VALUE klass) +{ + StreamingResult *result = ALLOC(StreamingResult); + DEBUG_PRINT("Allocating StreamingResult: %p", (void*)result); + result->c_result = NULL; + return rb_data_typed_object_wrap(klass, result, &StreamingResultType); +} diff --git a/ext/chdb/streaming_result.h b/ext/chdb/streaming_result.h new file mode 100644 index 0000000..8ec26a8 --- /dev/null +++ b/ext/chdb/streaming_result.h @@ -0,0 +1,20 @@ +#ifndef CHDB_STREAMING_RESULT_H +#define CHDB_STREAMING_RESULT_H + +#include + +#include "include/chdb.h" + +typedef struct +{ + chdb_streaming_result *c_result; +} StreamingResult; + +extern VALUE cStreamingResult; +extern const rb_data_type_t StreamingResultType; + +void init_streaming_result(); + +VALUE streaming_result_alloc(VALUE klass); + +#endif diff --git a/lib/chdb/database.rb b/lib/chdb/database.rb index c0fb78a..47622b7 100644 --- a/lib/chdb/database.rb +++ b/lib/chdb/database.rb @@ -94,14 +94,14 @@ def execute(sql, bind_vars = [], &block) end end - def execute2(sql, *bind_vars, &) # rubocop:disable Metrics/MethodLength + def execute2(sql, *bind_vars, &block) # rubocop:disable Metrics/MethodLength prepare(sql) do |stmt| result = stmt.execute(*bind_vars) stmt.parse - if block_given? + if block yield stmt.columns - result.each(&) + result.each(&block) else return result.each_with_object([stmt.columns]) do |row, arr| arr << row @@ -128,6 +128,18 @@ def query_with_format(sql, format = 'CSV', bind_vars = []) end end + def send_query(sql, format = 'CSV', bind_vars = [], &block) + prepare(sql) do |stmt| + result = stmt.send_query(bind_vars, format) + + if block + result.each(&block) + else + result + end + end + end + def get_first_row(sql, *bind_vars) execute(sql, *bind_vars).first end diff --git a/lib/chdb/result_set.rb b/lib/chdb/result_set.rb index cd7eb33..37f430e 100644 --- a/lib/chdb/result_set.rb +++ b/lib/chdb/result_set.rb @@ -54,4 +54,44 @@ def next_hash class HashResultSet < ResultSet # :nodoc: alias next next_hash end + + class StreamingResultSet + include Enumerable + + def initialize(db, streaming_result) + @db = db + @streaming_result = streaming_result + @done = false + end + + def eof? + @done + end + + def next + return nil if @done + + result = @db.conn.fetch_streaming_result(@streaming_result) + + if result.nil? || result.rows_read == 0 + @done = true + nil + else + result + end + end + + def each + while (node = self.next) + yield node + end + end + + def cancel + return nil if @done + + @db.conn.cancel_streaming_query(@streaming_result) + @done = true + end + end end diff --git a/lib/chdb/statement.rb b/lib/chdb/statement.rb index 45523fe..f2381c0 100644 --- a/lib/chdb/statement.rb +++ b/lib/chdb/statement.rb @@ -8,6 +8,7 @@ require 'chdb/chdb_native' end require 'chdb/local_result' +require 'chdb/streaming_result' require 'chdb/result_set' require 'chdb/result_handler' require 'chdb/parameter_binding' @@ -71,6 +72,21 @@ def execute_with_format(*bind_vars, format) @result.buf end + def send_query(*bind_vars, format) + reset! if @executed + + bind_params(*bind_vars) unless bind_vars.empty? + + my_processed_sql = process_sql + streaming_result = @connection.conn.send_query(my_processed_sql, format) + streaming_result.output_format = format + + results = StreamingResultSet.new(@connection, streaming_result) + + yield results if block_given? + results + end + def reset! @executed = false @parsed = false @@ -78,7 +94,7 @@ def reset! @bind_vars.clear @parsed_data.clear @columns.clear - @results = nil + @result = nil end def step @@ -101,7 +117,7 @@ def parse end @parsed = true - @results = nil + @result = nil end private diff --git a/lib/chdb/streaming_result.rb b/lib/chdb/streaming_result.rb new file mode 100644 index 0000000..2b380dc --- /dev/null +++ b/lib/chdb/streaming_result.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +module ChDB + # Represents the local result of a ChDB operation. + class StreamingResult + attr_accessor :output_format + end +end diff --git a/spec/chdb/database_spec.rb b/spec/chdb/database_spec.rb index f6e2bc5..8ff5f18 100644 --- a/spec/chdb/database_spec.rb +++ b/spec/chdb/database_spec.rb @@ -11,12 +11,12 @@ # FileUtils.rm_rf(test_db_path2) if Dir.exist?(test_db_path2) # end - # after do - # FileUtils.remove_entry(test_db_path) if Dir.exist?(test_db_path) + # after do + # FileUtils.remove_entry(test_db_path) if Dir.exist?(test_db_path) # FileUtils.remove_entry(test_db_path2) if Dir.exist?(test_db_path2) # end - def create_empty_table(db) + def create_empty_table(db) db.execute('CREATE DATABASE IF NOT EXISTS test') db.execute('DROP TABLE IF EXISTS test.test_table') db.execute("CREATE TABLE test.test_table( @@ -26,9 +26,9 @@ def create_empty_table(db) ORDER BY id") end - def create_test_table(db) - create_empty_table(db) - + def create_test_table(db) + create_empty_table(db) + { 1 => "Alice", 2 => "Bob" @@ -47,12 +47,12 @@ def create_test_table(db) it 'open database without block' do db = ChDB::Database.open("file:#{test_db_path}") expect(db).to be_a(ChDB::Database) - expect(db.closed?).to be false + expect(db.closed?).to be false expect(db.readonly?).to be false expect(db.results_as_hash).to be false db.close() - expect(db.closed?).to be true - end + expect(db.closed?).to be true + end it 'auto-closes database with block' do db = nil @@ -65,7 +65,7 @@ def create_test_table(db) end expect(db.closed?).to be true end - + it 'raises error when open database' do db1 = ChDB::Database.open(test_db_path, results_as_hash: true) expect { ChDB::Database.new(test_db_path, results_as_hash: true) }.to raise_error(ChDB::InternalException, /Existing database/) @@ -75,11 +75,11 @@ def create_test_table(db) result = db2.execute('SELECT 1 AS value') expect(result).to eq([["1"]]) db2.close() - + db3 = ChDB::Database.new(test_db_path) expect { ChDB::Database.new(test_db_path, results_as_hash: true) }.to raise_error(ChDB::InternalException, /Existing database/) db3.close() - + ChDB::Database.open(test_db_path) do |database| db = database expect(db).to be_a(ChDB::Database) @@ -87,10 +87,10 @@ def create_test_table(db) expect(db.readonly?).to be false expect(db.results_as_hash).to be false end - db4 = ChDB::Database.new(test_db_path) + db4 = ChDB::Database.new(test_db_path) db4.close() end - + it 'open, close, open database' do db = ChDB::Database.open(test_db_path) create_test_table(db) @@ -122,37 +122,37 @@ def create_test_table(db) result = db.execute('SELECT 1 AS value') expect(result).to eq([{ 'value' => '1' }]) end - + ChDB::Database.open(test_db_path) do |db| result = db.execute("SELECT number FROM system.numbers LIMIT 3") expect(result).to eq([["0"], ["1"], ["2"]]) end end - + it 'handles positional parameters' do ChDB::Database.open(test_db_path) do |db| result = db.execute("SELECT ? * ? AS product", [6, 7]) expect(result).to eq([["42"]]) end end - + it 'processes different data types' do ChDB::Database.open(test_db_path) do |db| result = db.execute( "SELECT ?, ?, ?", - ["O'Reilly", 3.14, false] + ["O'Reilly", 3.14, false] ) expect(result).to eq([["O'Reilly", '3.14', '0']]) end end - + it 'raises error when parameter count mismatch' do ChDB::Database.open(test_db_path) do |db| expect { db.execute("SELECT ? + ?", [10]) }.to raise_error(ChDB::SQLException) end - + ChDB::Database.open(test_db_path) do |db| expect { db.execute("SELECT ? + ?", [10, 11, 22]) @@ -201,51 +201,51 @@ def create_test_table(db) expect(result).to eq([%w[1 Alice], %w[2 Bob], %w[3 Charlie], %w[4 David]]) db.close end - + it 'multi threads query' do ChDB::Database.open(test_db_path) do |db| create_test_table(db) - + expected_results = [%w[1 Alice], %w[2 Bob], %w[3 Charlie], %w[4 David]] thread_count = 5 results = Array.new(thread_count, nil) threads = [] - + thread_count.times do |i| threads << Thread.new do results[i] = db.execute("SELECT * FROM test.test_table ORDER BY id") end end - + threads.each(&:join) - + results.each do |result| expect(result).to eq(expected_results) end end end - + it 'multi threads query with params' do ChDB::Database.open(test_db_path) do |db| create_test_table(db) - + queries = [ { sql: "SELECT name FROM test.test_table WHERE id = ? ORDER BY id", params: [1], expected: [["Alice"]] }, { sql: "SELECT id FROM test.test_table WHERE name = ? ORDER BY id", params: ["Bob"], expected: [["2"]] }, { sql: "SELECT * FROM test.test_table WHERE id > ? ORDER BY id", params: [2], expected: [%w[3 Charlie], %w[4 David]] } ] - + results = [] threads = [] - - queries.each_with_index do |q, idx| + + queries.each_with_index do |q, idx| threads << Thread.new do - results[idx] = db.execute(q[:sql], q[:params]) + results[idx] = db.execute(q[:sql], q[:params]) end end - + threads.each(&:join) - + expect(results[0]).to eq(queries[0][:expected]) expect(results[1]).to eq(queries[1][:expected]) expect(results[2]).to eq(queries[2][:expected]) @@ -265,28 +265,28 @@ def create_test_table(db) collected << row end expect(collected).to eq([]) + end end - end it 'returns query results with simple query' do ChDB::Database.open(test_db_path, results_as_hash: true) do |db| result = db.execute2('SELECT 1 AS value') expect(result).to eq([['value'], { 'value' => '1' }]) end - + ChDB::Database.open(test_db_path) do |db| result = db.execute2("SELECT number FROM system.numbers LIMIT 2") expect(result).to eq([["number"], ["0"], ["1"]]) end end - + it 'handles loose parameters' do ChDB::Database.open(test_db_path) do |db| result = db.execute2("SELECT ? || ? AS combined", "Hello", "World") expect(result).to eq([["combined"], ["HelloWorld"]]) end end - + it 'processes array parameters with splat' do ChDB::Database.open(test_db_path) do |db| params = [41, 42] @@ -294,7 +294,7 @@ def create_test_table(db) expect(result[1]).to eq(['41', '42']) end end - + it 'yields headers and rows with block' do headers = [] collected = [] @@ -307,14 +307,14 @@ def create_test_table(db) expect(headers).to eq(["number", "plus(number, 1)"]) expect(collected.size).to eq(3) end - + it 'returns hash results when results_as_hash enabled' do ChDB::Database.open(test_db_path, results_as_hash: true) do |db| result = db.execute2("SELECT 1 AS value") expect(result).to eq([["value"], { "value" => "1" }]) end end - + it 'raises error with invalid parameter types' do ChDB::Database.open(test_db_path) do |db| expect { @@ -364,51 +364,51 @@ def create_test_table(db) expect(result).to eq([%w[id name], %w[1 Alice], %w[2 Bob], %w[3 Charlie], %w[4 David]]) db.close end - + it 'multi threads query' do ChDB::Database.open(test_db_path) do |db| create_test_table(db) - + expected_results = [%w[id name], %w[1 Alice], %w[2 Bob], %w[3 Charlie], %w[4 David]] thread_count = 5 results = Array.new(thread_count, nil) threads = [] - + thread_count.times do |i| threads << Thread.new do results[i] = db.execute2("SELECT * FROM test.test_table ORDER BY id") end end - + threads.each(&:join) - + results.each do |result| expect(result).to eq(expected_results) end end end - + it 'multi threads query with params' do ChDB::Database.open(test_db_path) do |db| create_test_table(db) - + queries = [ { sql: "SELECT name FROM test.test_table WHERE id = ? ORDER BY id", params: [1], expected: [['name'], ['Alice']] }, { sql: "SELECT id FROM test.test_table WHERE name = ? ORDER BY id", params: ["Bob"], expected: [['id'], ["2"]] }, { sql: "SELECT * FROM test.test_table WHERE id > ? ORDER BY id", params: [2], expected: [%w[id name], %w[3 Charlie], %w[4 David]] } ] - + results = [] threads = [] - - queries.each_with_index do |q, idx| + + queries.each_with_index do |q, idx| threads << Thread.new do - results[idx] = db.execute2(q[:sql], q[:params]) + results[idx] = db.execute2(q[:sql], q[:params]) end end - + threads.each(&:join) - + expect(results[0]).to eq(queries[0][:expected]) expect(results[1]).to eq(queries[1][:expected]) expect(results[2]).to eq(queries[2][:expected]) @@ -422,13 +422,13 @@ def create_test_table(db) create_test_table(db) result = db.query('SELECT * FROM test.test_table ORDER BY id') expect(result.to_a).to eq([%w[1 Alice], %w[2 Bob], %w[3 Charlie], %w[4 David]]) - + result = db.query('SELECT * FROM test.test_table WHERE id != ? AND name != ? ORDER BY id', [0, 'Jack']) expect(result.to_a).to eq([%w[1 Alice], %w[2 Bob], %w[3 Charlie], %w[4 David]]) db.close end - + it 'query empty table' do db = ChDB::Database.new(test_db_path) create_empty_table(db) @@ -444,12 +444,12 @@ def create_test_table(db) create_test_table(db) result = db.query_with_format('SELECT * FROM test.test_table ORDER BY id') expect(result).to eq("1,\"Alice\"\n2,\"Bob\"\n3,\"Charlie\"\n4,\"David\"\n") - + result = db.query_with_format('SELECT * FROM test.test_table WHERE id > ? ORDER BY id', 'CSV', [0]) expect(result).to eq("1,\"Alice\"\n2,\"Bob\"\n3,\"Charlie\"\n4,\"David\"\n") db.close end - + it 'query with format and empty table' do db = ChDB::Database.new(test_db_path) create_empty_table(db) @@ -459,6 +459,86 @@ def create_test_table(db) end end + describe '#streaming query' do + it 'streaming query' do + ChDB::Database.open(test_db_path) do |db| + collected = [] + total_rows = 0 + db.execute('SELECT * FROM numbers(200000)') do |row| + total_rows += 1 + end + expect(total_rows).to eq 200000 + + total_rows = 0 + collected = [] + db.send_query('SELECT * FROM numbers(200000)', 'JSON') do |chunk| + # p chunk.buf + collected << chunk + total_rows += chunk.rows_read + end + + expect(collected.size).to be > 1 + expect(total_rows).to eq 200000 + + total_rows = 0 + collected = [] + db.execute('SELECT * FROM numbers(200000)') do |row| + total_rows += 1 + end + expect(total_rows).to eq 200000 + + total_rows = 0 + collected = [] + db.send_query('SELECT * FROM numbers(?)', 'JSON', 200000) do |chunk| + # p chunk.buf + collected << chunk + total_rows += chunk.rows_read + end + + expect(collected.size).to be > 1 + expect(total_rows).to eq 200000 + + total_rows = 0 + collected = [] + result = db.send_query('SELECT * FROM numbers(?)', 'JSON', 200000) + while (chunk = result.next) + collected << chunk + total_rows += chunk.rows_read + end + + expect(collected.size).to be > 1 + expect(total_rows).to eq 200000 + end + end + + it 'cancel streaming query' do + ChDB::Database.open(test_db_path) do |db| + collected = [] + total_rows = 0 + result = db.send_query('SELECT * FROM numbers(200000)') + expect(result.eof?).to be false + + while (chunk = result.next) + collected << chunk + total_rows += chunk.rows_read + result.cancel + end + + expect(collected.size).to eq(1) + expect(total_rows).to be > 1 + expect(result.eof?).to be true + end + end + + it 'streaming query with exception' do + ChDB::Database.open(test_db_path) do |db| + expect { + db.send_query('SELECT * FROM numbers(200000);SELECT * FROM numbers(200000)') + }.to raise_error(Exception) + end + end + end + describe '#get_first_row' do it 'get first row' do db = ChDB::Database.new(test_db_path) @@ -475,7 +555,7 @@ def create_test_table(db) expect(result).to eq({ 'id' => '1', 'name' => 'Alice' }) db.close end - + it 'get first row with empty table' do db = ChDB::Database.new(test_db_path) create_empty_table(db) @@ -509,7 +589,7 @@ def create_test_table(db) expect(result).to eq('1') db.close end - + it 'get first value with empty table' do db = ChDB::Database.new(test_db_path) create_empty_table(db) @@ -517,7 +597,7 @@ def create_test_table(db) expect(result).to eq(nil) db.close end - + it 'get first value with empty table and hash' do db = ChDB::Database.new(test_db_path, results_as_hash: true) create_empty_table(db) @@ -533,24 +613,24 @@ def create_test_table(db) stmt = db.prepare('SELECT ? AS value') result = stmt.execute(42) expect(result.to_a).to eq([['42']]) - + result = stmt.execute(55) expect(result.to_a).to eq([['55']]) - + create_test_table(db) stmt = db.prepare('SELECT * FROM test.test_table WHERE id != ? AND name != ? ORDER BY id') result = stmt.execute([0, 'Jack']) expect(result.to_a).to eq([%w[1 Alice], %w[2 Bob], %w[3 Charlie], %w[4 David]]) - + result = stmt.execute([2, 'Bob']) expect(result.to_a).to eq([%w[1 Alice], %w[3 Charlie], %w[4 David]]) - + result = stmt.execute([false, 'Alice']) expect(result.to_a).to eq([%w[2 Bob], %w[3 Charlie], %w[4 David]]) - + result = stmt.execute([true, 'Jack']) expect(result.to_a).to eq([%w[2 Bob], %w[3 Charlie], %w[4 David]]) - + result = stmt.execute([nil, 'xx']) expect(result.to_a).to eq([]) end diff --git a/spec/chdb/statement_spec.rb b/spec/chdb/statement_spec.rb index 5dd5353..7b5aaf6 100644 --- a/spec/chdb/statement_spec.rb +++ b/spec/chdb/statement_spec.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require 'json' +require 'json' require 'spec_helper' RSpec.describe ChDB::Statement do @@ -62,7 +62,7 @@ expect { stmt.execute() }.not_to raise_error - + stmt = db.prepare("SHOW TABLES") expect(stmt.execute().to_a).to satisfy { |result| result.any? { |row| row == ['statement_test'] } @@ -72,7 +72,7 @@ expect { stmt.execute() }.not_to raise_error - + stmt = db.prepare('SHOW TABLES') expect(stmt.execute().to_a).not_to include(['statement_test']) end @@ -83,11 +83,11 @@ stmt = db.prepare("SELECT number FROM system.numbers LIMIT 3") expect(stmt.execute!).to eq([['0'], ['1'], ['2']]) end - + it 'return hash array when results_as_hash enabled' do db.close() hash_db = ChDB::Database.new(test_db_path, results_as_hash: true) - + begin stmt = hash_db.prepare("SELECT 1 AS value, 'hello' AS greeting") result = stmt.execute! From b4039e9836d1a610d4f747aba46b27c1129171e0 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Thu, 8 May 2025 02:09:49 +0800 Subject: [PATCH 2/5] refactor: Format code with RuboCop --- chdb.gemspec | 3 +++ ext/chdb/chdb_handle.c | 4 ++-- ext/chdb/connection.c | 8 ++++---- lib/chdb/result_set.rb | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/chdb.gemspec b/chdb.gemspec index 24f19f2..b98dbf7 100644 --- a/chdb.gemspec +++ b/chdb.gemspec @@ -51,6 +51,8 @@ Gem::Specification.new do |s| 'ext/chdb/extconf.rb', 'ext/chdb/local_result.c', 'ext/chdb/local_result.h', + 'ext/chdb/streaming_result.c', + 'ext/chdb/streaming_result.h', 'lib/chdb.rb', 'lib/chdb/constants.rb', 'lib/chdb/data_path.rb', @@ -62,6 +64,7 @@ Gem::Specification.new do |s| 'lib/chdb/result_set.rb', 'lib/chdb/sql_processor.rb', 'lib/chdb/statement.rb', + 'lib/chdb/streaming_result.rb', 'lib/chdb/version_info.rb', 'lib/chdb/version.rb' ] diff --git a/ext/chdb/chdb_handle.c b/ext/chdb/chdb_handle.c index f8f0ba2..23d9d74 100644 --- a/ext/chdb/chdb_handle.c +++ b/ext/chdb/chdb_handle.c @@ -68,8 +68,8 @@ void init_chdb_handle() chdb_destroy_result_ptr = (chdb_destroy_result_func)dlsym(chdb_handle, "chdb_destroy_result"); if (!connect_chdb_ptr || !close_conn_ptr || !query_conn_ptr || !free_result_v2_ptr || - !query_conn_streaming_ptr || !chdb_streaming_result_error_ptr || !chdb_streaming_fetch_result_ptr || - !chdb_streaming_cancel_query_ptr || !chdb_destroy_result_ptr) + !query_conn_streaming_ptr || !chdb_streaming_result_error_ptr || !chdb_streaming_fetch_result_ptr || + !chdb_streaming_cancel_query_ptr || !chdb_destroy_result_ptr) { close_chdb_handle(); diff --git a/ext/chdb/connection.c b/ext/chdb/connection.c index bbcad78..ce3b7ac 100644 --- a/ext/chdb/connection.c +++ b/ext/chdb/connection.c @@ -118,10 +118,10 @@ VALUE connection_streaming_query(VALUE self, VALUE query, VALUE format) Check_Type(format, T_STRING); chdb_streaming_result *c_result = query_conn_streaming_ptr( - *conn->c_conn, - StringValueCStr(query), - StringValueCStr(format) - ); + *conn->c_conn, + StringValueCStr(query), + StringValueCStr(format) + ); if (!c_result) { diff --git a/lib/chdb/result_set.rb b/lib/chdb/result_set.rb index 37f430e..f6fde2b 100644 --- a/lib/chdb/result_set.rb +++ b/lib/chdb/result_set.rb @@ -73,7 +73,7 @@ def next result = @db.conn.fetch_streaming_result(@streaming_result) - if result.nil? || result.rows_read == 0 + if result.nil? || result.rows_read.zero? @done = true nil else From b820b448c0aae4ef45e0dae3ca6ae788779c6e15 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Thu, 8 May 2025 02:11:30 +0800 Subject: [PATCH 3/5] chore: upgrade version --- lib/chdb/version.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/chdb/version.rb b/lib/chdb/version.rb index 23ea99a..da69d66 100644 --- a/lib/chdb/version.rb +++ b/lib/chdb/version.rb @@ -2,5 +2,5 @@ module ChDB # (String) the version of the chdb gem, e.g. "0.1.0" - VERSION = '0.1.0' + VERSION = '0.2.0.rc.1' end From 7372eea96ffc469ce00d7cbcfdd2f9d5d01573ef Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Thu, 8 May 2025 02:23:38 +0800 Subject: [PATCH 4/5] chore: update README --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index 8b5292f..16fef9f 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,17 @@ ChDB::Database.open(':memory:') do |db| json_data = db.query_with_format('SELECT 1 as a, 2 as b', 'JSON') p json_data end + +# Execute streaming query +ChDB::Database.open(':memory:') do |db| + total_rows = 0 + collected = [] + db.send_query('SELECT * FROM numbers(200000)') do |chunk| + collected << chunk + total_rows += chunk.rows_read + end + p total_rows # => 200000 +end ``` ## Thread Safety From 9b0f7f8590fad98530049cadb5cf1837a5873f76 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Thu, 8 May 2025 02:25:37 +0800 Subject: [PATCH 5/5] chore: update README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 16fef9f..14cf40f 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,7 @@ ChDB::Database.open(':memory:') do |db| total_rows = 0 collected = [] db.send_query('SELECT * FROM numbers(200000)') do |chunk| - collected << chunk + collected << chunk.buf total_rows += chunk.rows_read end p total_rows # => 200000