Skip to content

Commit

Permalink
feat: Notices on transactions (#2443)
Browse files Browse the repository at this point in the history
Emit notices on transaction commands warning about our incomplete
transaction support.

psql:
```
sean@Seans-Air glaredb % psql postgres://localhost:6543
psql (15.4, server 15.1)
Type "help" for help.

sean=> begin;
WARNING:  GlareDB does not support proper transactional semantics. Do not rely on transactions for correctness. Transactions are stubbed out to enable compatability with existing Postgres tools.
BEGIN
sean=> 
```

CLI (colorful):
<img width="871" alt="Screenshot 2024-01-18 at 2 26 37 PM"
src="https://github.com/GlareDB/glaredb/assets/4040560/a6e590b4-8fbb-456f-a9a5-34aea29b1143">


Some pg protocol tests added to assert the notices.
  • Loading branch information
scsmithr authored Jan 18, 2024
1 parent 690b74f commit 52870e9
Show file tree
Hide file tree
Showing 23 changed files with 429 additions and 108 deletions.
11 changes: 10 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,16 @@ and asserts that we receive the appropriate backend messages. These tests ensure
postgres protocol compatability as well as allowing us to assert the contents of
error and notice messages.
Test cases can be found in `./testdata/pgprototest`.
Test cases can be found in `./testdata/pgprototest` and
`./testdata/pgprototest_glaredb`.
The `pgprototest` directory is for test cases to assert that GlareDB matches
Postgres exactly, and the expected output should be generated from an actual
Postgres instance.
The `pgprototest_glaredb` directory contains test cases that do match Postgres
output exactly either because of an incomplete feature, or differing behavior.
The expected output for these tests need to be hand-crafted.
Tests can be ran with the `pgprototest` command:
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/datafusion_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tracing = { workspace = true }
thiserror.workspace = true
decimal = { path = "../decimal" }
protogen = { path = "../protogen" }
pgrepr = { path = "../pgrepr" }
futures = { workspace = true }
parking_lot = "0.12.1"
bson = "2.7.0"
Expand Down
2 changes: 2 additions & 0 deletions crates/datafusion_ext/src/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use constants::*;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::config::{ConfigExtension, ExtensionOptions};
use datafusion::scalar::ScalarValue;
use pgrepr::notice::NoticeSeverity;
use utils::*;

use datafusion::variable::{VarProvider, VarType};
Expand Down Expand Up @@ -79,6 +80,7 @@ impl SessionVars {
datestyle: String,
transaction_isolation: String,
search_path: Vec<String>,
client_min_messages: NoticeSeverity,
enable_debug_datasources: bool,
force_catalog_refresh: bool,
glaredb_version: String,
Expand Down
10 changes: 10 additions & 0 deletions crates/datafusion_ext/src/vars/constants.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;

use pgrepr::notice::NoticeSeverity;

// TODO: Decide proper postgres version to spoof/support
pub(super) const SERVER_VERSION: ServerVar<str> = ServerVar {
name: "server_version",
Expand Down Expand Up @@ -74,6 +76,14 @@ pub(super) static SEARCH_PATH: Lazy<ServerVar<[String]>> = Lazy::new(|| ServerVa
description: "Search path for schemas",
});

pub(super) const CLIENT_MIN_MESSAGES: ServerVar<NoticeSeverity> = ServerVar {
name: "client_min_messages",
value: &NoticeSeverity::Notice,
group: "postgres",
user_configurable: true,
description: "Controls which messages are sent to the client, defaults NOTICE",
};

pub(super) static GLAREDB_VERSION_OWNED: Lazy<String> =
Lazy::new(|| format!("v{}", env!("CARGO_PKG_VERSION")));
pub(super) static GLAREDB_VERSION: Lazy<ServerVar<str>> = Lazy::new(|| ServerVar {
Expand Down
7 changes: 7 additions & 0 deletions crates/datafusion_ext/src/vars/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::config::ConfigEntry;
use datafusion::error::Result;
use datafusion::variable::VarType;
use pgrepr::notice::NoticeSeverity;
use std::borrow::Borrow;

use super::constants::*;
Expand Down Expand Up @@ -32,6 +33,7 @@ pub struct SessionVarsInner {
pub datestyle: SessionVar<str>,
pub transaction_isolation: SessionVar<str>,
pub search_path: SessionVar<[String]>,
pub client_min_messages: SessionVar<NoticeSeverity>,
pub enable_debug_datasources: SessionVar<bool>,
pub force_catalog_refresh: SessionVar<bool>,
pub glaredb_version: SessionVar<str>,
Expand Down Expand Up @@ -82,6 +84,8 @@ impl SessionVarsInner {
Ok(&self.transaction_isolation)
} else if name.eq_ignore_ascii_case(SEARCH_PATH.name) {
Ok(&self.search_path)
} else if name.eq_ignore_ascii_case(CLIENT_MIN_MESSAGES.name) {
Ok(&self.client_min_messages)
} else if name.eq_ignore_ascii_case(ENABLE_DEBUG_DATASOURCES.name) {
Ok(&self.enable_debug_datasources)
} else if name.eq_ignore_ascii_case(FORCE_CATALOG_REFRESH.name) {
Expand Down Expand Up @@ -139,6 +143,8 @@ impl SessionVarsInner {
self.transaction_isolation.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(SEARCH_PATH.name) {
self.search_path.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(CLIENT_MIN_MESSAGES.name) {
self.client_min_messages.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(ENABLE_DEBUG_DATASOURCES.name) {
self.enable_debug_datasources.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(FORCE_CATALOG_REFRESH.name) {
Expand Down Expand Up @@ -214,6 +220,7 @@ impl Default for SessionVarsInner {
datestyle: SessionVar::new(&DATESTYLE),
transaction_isolation: SessionVar::new(&TRANSACTION_ISOLATION),
search_path: SessionVar::new(&SEARCH_PATH),
client_min_messages: SessionVar::new(&CLIENT_MIN_MESSAGES),
enable_debug_datasources: SessionVar::new(&ENABLE_DEBUG_DATASOURCES),
force_catalog_refresh: SessionVar::new(&FORCE_CATALOG_REFRESH),
glaredb_version: SessionVar::new(&GLAREDB_VERSION),
Expand Down
13 changes: 13 additions & 0 deletions crates/datafusion_ext/src/vars/value.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use pgrepr::notice::NoticeSeverity;

use super::*;

pub trait Value: ToOwned + std::fmt::Debug {
fn try_parse(s: &str) -> Option<Self::Owned>;
fn format(&self) -> String;
Expand Down Expand Up @@ -111,3 +114,13 @@ impl Value for Dialect {
}
}
}

impl Value for NoticeSeverity {
fn try_parse(s: &str) -> Option<NoticeSeverity> {
NoticeSeverity::from_str(s).ok()
}

fn format(&self) -> String {
self.to_string()
}
}
20 changes: 20 additions & 0 deletions crates/glaredb/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use pgrepr::format::Format;
use pgrepr::notice::NoticeSeverity;
use reedline::{FileBackedHistory, Reedline, Signal};
use std::collections::HashMap;

Expand Down Expand Up @@ -160,6 +161,25 @@ impl LocalSession {
Ok(_) => {}
Err(e) => println!("Error: {e}"),
};

// Print out notices as needed.
//
// Note this isn't being called in the above `execute`
// function since that can be called in a
// non-interactive fashion which and having notice
// messages interspersed with the output would be
// annoying.
for notice in self.sess.take_notices() {
eprintln!(
"{}: {}",
match notice.severity {
s @ (NoticeSeverity::Warning | NoticeSeverity::Error) =>
s.to_string().red(),
other => other.to_string().blue(),
},
notice.message
);
}
}
},
Ok(Signal::CtrlD) => break,
Expand Down
4 changes: 3 additions & 1 deletion crates/pgprototest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ mod proto;
#[clap(about = "Data driven postgres protocol testing", long_about = None)]
struct Cli {
/// The directory containing the test files.
///
/// Maybe specified multiple times to run tests from multiple directories.
#[clap(long)]
dir: String,
dir: Vec<String>,
/// Address of the postgres compatible server.
#[clap(long)]
addr: String,
Expand Down
14 changes: 10 additions & 4 deletions crates/pgprototest/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,15 @@ impl TryFrom<(char, Message)> for SerializedMessage {
.collect()?,
})?,
),
Message::NoticeResponse(_msg) => {
("NoticeResponse", serde_json::to_string(&NoticeResponse {})?)
}
Message::NoticeResponse(msg) => (
"NoticeResponse",
serde_json::to_string(&ErrorResponse {
fields: msg
.fields()
.map(|field| Ok(field.value().to_string()))
.collect()?,
})?,
),
_ => return Err(anyhow!("unhandle message, type identifier: {}", id)),
};
Ok(SerializedMessage {
Expand Down Expand Up @@ -177,5 +183,5 @@ pub struct ErrorResponse {

#[derive(Serialize)]
pub struct NoticeResponse {
// TODO: Fill me in. Currently we don't assert notices.
pub fields: Vec<String>,
}
34 changes: 19 additions & 15 deletions crates/pgprototest/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,33 @@ use std::time::{Duration, Instant};
/// Each file will open a unique connection, with each test case in that file
/// being ran sequentially using that connection.
pub fn walk(
dir: String,
dirs: Vec<String>,
addr: String,
options: HashMap<String, String>,
password: Option<String>,
timeout: Duration,
verbose: bool,
) {
datadriven::walk(&dir, |file| {
let mut conn = PgConn::connect(&addr, &options, &password, timeout).unwrap();
file.run(|testcase| {
if verbose {
println!();
println!("--- TESTCASE ({}) ---", testcase.directive);
println!("{}", testcase.input);
}
for dir in dirs {
datadriven::walk(&dir, |file| {
let mut conn = PgConn::connect(&addr, &options, &password, timeout).unwrap();
file.run(|testcase| {
if verbose {
println!();
println!("--- TESTCASE ({}) ---", testcase.directive);
println!("{}", testcase.input);
}

match testcase.directive.as_str() {
"send" => run_send(&mut conn, &testcase.args, &testcase.input, verbose),
"until" => run_until(&mut conn, &testcase.args, &testcase.input, timeout, verbose),
unknown => panic!("unknown directive: {}", unknown),
}
match testcase.directive.as_str() {
"send" => run_send(&mut conn, &testcase.args, &testcase.input, verbose),
"until" => {
run_until(&mut conn, &testcase.args, &testcase.input, timeout, verbose)
}
unknown => panic!("unknown directive: {}", unknown),
}
});
});
});
}
}

/// Run a "send" directive.
Expand Down
3 changes: 3 additions & 0 deletions crates/pgrepr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub enum PgReprError {

#[error("Internal error: {0}")]
InternalError(String),

#[error("{0}")]
String(String),
}

pub type Result<T, E = PgReprError> = std::result::Result<T, E>;
1 change: 1 addition & 0 deletions crates/pgrepr/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod error;
pub mod format;
pub mod notice;
pub mod oid;
pub mod reader;
pub mod scalar;
Expand Down
Loading

0 comments on commit 52870e9

Please sign in to comment.