Skip to content

fix: handle stream end #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions src/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ use crate::table::{Column, DataTypeExtension, Row, TableSchema, Value};
use crate::{error, Result};
use snafu::{ensure, OptionExt, ResultExt};

/// Default channel buffer size for streaming FlightData
/// This controls how many FlightData messages can be buffered in the channel
/// before blocking the sender. A larger buffer allows for better throughput
/// at the cost of memory usage.
///
/// Can be overridden by setting the GREPTIMEDB_CHANNEL_BUFFER_SIZE environment variable.
const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1024;

/// Get configuration value from environment variable with fallback to default
fn get_env_or_default<T>(env_var: &str, default: T) -> T
where
T: std::str::FromStr,
{
std::env::var(env_var)
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(default)
}

pub type RequestId = i64;

/// High-level bulk inserter for `GreptimeDB`
Expand Down Expand Up @@ -183,7 +202,11 @@ impl BulkStreamWriter {
.collect();

// Create a channel for streaming FlightData
let (sender, receiver) = mpsc::channel::<FlightData>(1000);
let channel_buffer_size = get_env_or_default(
"GREPTIMEDB_CHANNEL_BUFFER_SIZE",
DEFAULT_CHANNEL_BUFFER_SIZE,
);
let (sender, receiver) = mpsc::channel::<FlightData>(channel_buffer_size);

// Convert receiver to a stream and start the do_put operation
let flight_stream = receiver.boxed();
Expand Down Expand Up @@ -316,9 +339,7 @@ impl BulkStreamWriter {
() = drain_timeout => break,
next_option = self.response_stream.next() => {
match next_option {
Some(response) => {
self.handle_single_response(response?, &mut responses);
}
Some(response) => self.handle_single_response(response?, &mut responses),
None => return self.handle_stream_end(responses),
}
}
Expand Down Expand Up @@ -454,6 +475,19 @@ impl BulkStreamWriter {
Ok(responses)
}

/// Helper method to handle stream end during processing
/// Returns Ok(()) if no pending requests, otherwise returns appropriate error
fn handle_stream_end_during_processing(&self) -> Result<()> {
if !self.pending_requests.is_empty() {
let pending_ids: Vec<RequestId> = self.pending_requests.keys().copied().collect();
return error::StreamEndedWithPendingRequestsSnafu {
request_ids: pending_ids,
}
.fail();
}
Ok(())
}

/// Submit a record batch without waiting for response
/// Returns the `request_id` for later tracking
async fn submit_record_batch(&mut self, batch: RecordBatch) -> Result<RequestId> {
Expand Down Expand Up @@ -481,7 +515,7 @@ impl BulkStreamWriter {
Ok(Some(response)) => {
let _schema_response = response?;
}
Ok(None) => {}
Ok(None) => return error::StreamEndedSnafu.fail(),
Err(_) => {
return error::RequestTimeoutSnafu {
request_ids: vec![],
Expand Down Expand Up @@ -553,10 +587,8 @@ impl BulkStreamWriter {
// First, wait for at least one response (blocking)
let response_result = timeout(self.timeout, self.response_stream.next()).await;
match response_result {
Ok(Some(response)) => {
self.receive_response_and_remove_pending(response?);
}
Ok(None) => return Ok(()), // Stream ended
Ok(Some(response)) => self.receive_response_and_remove_pending(response?),
Ok(None) => return self.handle_stream_end_during_processing(),
Err(_) => {
let pending_ids: Vec<RequestId> = self.pending_requests.keys().copied().collect();
return error::RequestTimeoutSnafu {
Expand All @@ -574,10 +606,8 @@ impl BulkStreamWriter {
() = drain_timeout => break,
next_option = self.response_stream.next() => {
match next_option {
Some(response) => {
self.receive_response_and_remove_pending(response?);
}
None => break, // Stream ended
Some(response) => self.receive_response_and_remove_pending(response?),
None => return self.handle_stream_end_during_processing(),
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Response stream ended unexpectedly with pending requests: {:?}",
request_ids
))]
StreamEndedWithPendingRequests {
request_ids: Vec<i64>,
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"Request timeout after {:?} for request IDs: {:?}",
timeout,
Expand Down
Loading