Skip to content

Commit

Permalink
Merge pull request #2002 from joto/single-query-per-exec
Browse files Browse the repository at this point in the history
Don't do multi-statement queries
  • Loading branch information
lonvia authored Jul 17, 2023
2 parents 1cfb05b + 7529244 commit d34c23b
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 83 deletions.
170 changes: 91 additions & 79 deletions src/middle-pgsql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,22 @@ static std::string build_sql(options_t const &options, std::string const &templ)
: ""));
}

static std::vector<std::string>
build_sql(options_t const &options, std::vector<std::string> const &templs)
{
std::vector<std::string> out;

for (auto const &templ : templs) {
out.push_back(build_sql(options, templ));
}

return out;
}

middle_pgsql_t::table_desc::table_desc(options_t const &options,
table_sql const &ts)
: m_create_table(build_sql(options, ts.create_table)),
m_prepare_query(build_sql(options, ts.prepare_query)),
m_prepare_queries(build_sql(options, ts.prepare_queries)),
m_copy_target(std::make_shared<db_target_descr_t>())
{
m_copy_target->name = build_sql(options, ts.name);
Expand Down Expand Up @@ -161,7 +173,9 @@ void middle_pgsql_t::table_desc::build_index(std::string const &conninfo) const
pg_conn_t const db_connection{conninfo};

log_info("Building index on table '{}'", name());
db_connection.exec(m_create_fw_dep_indexes);
for (auto const &query : m_create_fw_dep_indexes) {
db_connection.exec(query);
}
}

/**
Expand Down Expand Up @@ -1307,7 +1321,7 @@ void middle_pgsql_t::update_users_table()

m_db_connection.exec("PREPARE insert_user(int8, text) AS"
" INSERT INTO {}\"{}\" (id, name) VALUES ($1, $2)"
" ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id;\n",
" ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id",
m_users_table.schema(), m_users_table.name());

for (auto const &[id, name] : m_users) {
Expand Down Expand Up @@ -1361,7 +1375,7 @@ static table_sql sql_for_users(middle_pgsql_options const &store_options)
sql.create_table = "CREATE TABLE {schema}\"{prefix}_users\" ("
" id INT4 PRIMARY KEY {using_tablespace},"
" name TEXT NOT NULL"
") {data_tablespace};\n";
") {data_tablespace}";
}

return sql;
Expand All @@ -1379,15 +1393,15 @@ static table_sql sql_for_nodes_format1(bool create_table)
" id int8 PRIMARY KEY {using_tablespace},"
" lat int4 NOT NULL,"
" lon int4 NOT NULL"
") {data_tablespace};\n";
") {data_tablespace}";

sql.prepare_query =
sql.prepare_queries = {
"PREPARE get_node_list(int8[]) AS"
" SELECT id, lon, lat FROM {schema}\"{prefix}_nodes\""
" WHERE id = ANY($1::int8[]);\n"
" WHERE id = ANY($1::int8[])",
"PREPARE get_node(int8) AS"
" SELECT id, lon, lat FROM {schema}\"{prefix}_nodes\""
" WHERE id = $1;\n";
" WHERE id = $1"};
}

return sql;
Expand All @@ -1407,15 +1421,15 @@ static table_sql sql_for_nodes_format2(middle_pgsql_options const &options)
" lon int4 NOT NULL,"
"{attribute_columns_definition}"
" tags jsonb NOT NULL"
") {data_tablespace};\n";
") {data_tablespace}";

sql.prepare_query =
sql.prepare_queries = {
"PREPARE get_node_list(int8[]) AS"
" SELECT id, lon, lat FROM {schema}\"{prefix}_nodes\""
" WHERE id = ANY($1::int8[]);\n"
" WHERE id = ANY($1::int8[])",
"PREPARE get_node(int8) AS"
" SELECT id, lon, lat FROM {schema}\"{prefix}_nodes\""
" WHERE id = $1;\n";
" WHERE id = $1"};
}

return sql;
Expand All @@ -1431,32 +1445,32 @@ static table_sql sql_for_ways_format1(uint8_t way_node_index_id_shift)
" id int8 PRIMARY KEY {using_tablespace},"
" nodes int8[] NOT NULL,"
" tags text[]"
") {data_tablespace};\n";
") {data_tablespace}";

sql.prepare_query = "PREPARE get_way(int8) AS"
" SELECT nodes, tags"
" FROM {schema}\"{prefix}_ways\" WHERE id = $1;\n"
"PREPARE get_way_list(int8[]) AS"
" SELECT id, nodes, tags"
" FROM {schema}\"{prefix}_ways\""
" WHERE id = ANY($1::int8[]);\n";
sql.prepare_queries = {"PREPARE get_way(int8) AS"
" SELECT nodes, tags"
" FROM {schema}\"{prefix}_ways\" WHERE id = $1",
"PREPARE get_way_list(int8[]) AS"
" SELECT id, nodes, tags"
" FROM {schema}\"{prefix}_ways\""
" WHERE id = ANY($1::int8[])"};

if (way_node_index_id_shift == 0) {
sql.create_fw_dep_indexes =
sql.create_fw_dep_indexes = {
"CREATE INDEX ON {schema}\"{prefix}_ways\" USING GIN (nodes)"
" WITH (fastupdate = off) {index_tablespace};\n";
" WITH (fastupdate = off) {index_tablespace}"};
} else {
sql.create_fw_dep_indexes =
sql.create_fw_dep_indexes = {
"CREATE OR REPLACE FUNCTION"
" {schema}\"{prefix}_index_bucket\"(int8[])"
" RETURNS int8[] AS $$\n"
" RETURNS int8[] AS $$"
" SELECT ARRAY(SELECT DISTINCT"
" unnest($1) >> {way_node_index_id_shift})\n"
"$$ LANGUAGE SQL IMMUTABLE;\n"
" unnest($1) >> {way_node_index_id_shift})"
"$$ LANGUAGE SQL IMMUTABLE",
"CREATE INDEX \"{prefix}_ways_nodes_bucket_idx\""
" ON {schema}\"{prefix}_ways\""
" USING GIN ({schema}\"{prefix}_index_bucket\"(nodes))"
" WITH (fastupdate = off) {index_tablespace};\n";
" WITH (fastupdate = off) {index_tablespace}"};
}

return sql;
Expand All @@ -1473,36 +1487,35 @@ static table_sql sql_for_ways_format2(middle_pgsql_options const &options)
"{attribute_columns_definition}"
" nodes int8[] NOT NULL,"
" tags jsonb NOT NULL"
") {data_tablespace};\n";

sql.prepare_query = "PREPARE get_way(int8) AS"
" SELECT nodes, tags{attribute_columns_use}"
" FROM {schema}\"{prefix}_ways\" o"
" {users_table_access}"
" WHERE o.id = $1;\n";

sql.prepare_query += "PREPARE get_way_list(int8[]) AS"
" SELECT o.id, nodes, tags{attribute_columns_use}"
" FROM {schema}\"{prefix}_ways\" o"
" {users_table_access}"
" WHERE o.id = ANY($1::int8[]);\n";
") {data_tablespace}";

sql.prepare_queries = {"PREPARE get_way(int8) AS"
" SELECT nodes, tags{attribute_columns_use}"
" FROM {schema}\"{prefix}_ways\" o"
" {users_table_access}"
" WHERE o.id = $1",
"PREPARE get_way_list(int8[]) AS"
" SELECT o.id, nodes, tags{attribute_columns_use}"
" FROM {schema}\"{prefix}_ways\" o"
" {users_table_access}"
" WHERE o.id = ANY($1::int8[])"};

if (options.way_node_index_id_shift == 0) {
sql.create_fw_dep_indexes =
sql.create_fw_dep_indexes = {
"CREATE INDEX ON {schema}\"{prefix}_ways\" USING GIN (nodes)"
" WITH (fastupdate = off) {index_tablespace};\n";
" WITH (fastupdate = off) {index_tablespace}"};
} else {
sql.create_fw_dep_indexes =
sql.create_fw_dep_indexes = {
"CREATE OR REPLACE FUNCTION"
" {schema}\"{prefix}_index_bucket\"(int8[])"
" RETURNS int8[] AS $$\n"
" RETURNS int8[] AS $$"
" SELECT ARRAY(SELECT DISTINCT"
" unnest($1) >> {way_node_index_id_shift})\n"
"$$ LANGUAGE SQL IMMUTABLE;\n"
" unnest($1) >> {way_node_index_id_shift})"
"$$ LANGUAGE SQL IMMUTABLE",
"CREATE INDEX \"{prefix}_ways_nodes_bucket_idx\""
" ON {schema}\"{prefix}_ways\""
" USING GIN ({schema}\"{prefix}_index_bucket\"(nodes))"
" WITH (fastupdate = off) {index_tablespace};\n";
" WITH (fastupdate = off) {index_tablespace}"};
}

return sql;
Expand All @@ -1521,15 +1534,15 @@ static table_sql sql_for_relations_format1()
" parts int8[],"
" members text[],"
" tags text[]"
") {data_tablespace};\n";
") {data_tablespace}";

sql.prepare_query = "PREPARE get_rel(int8) AS"
" SELECT members, tags"
" FROM {schema}\"{prefix}_rels\" WHERE id = $1;\n";
sql.prepare_queries = {"PREPARE get_rel(int8) AS"
" SELECT members, tags"
" FROM {schema}\"{prefix}_rels\" WHERE id = $1"};

sql.create_fw_dep_indexes =
sql.create_fw_dep_indexes = {
"CREATE INDEX ON {schema}\"{prefix}_rels\" USING GIN (parts)"
" WITH (fastupdate = off) {index_tablespace};\n";
" WITH (fastupdate = off) {index_tablespace}"};

return sql;
}
Expand All @@ -1540,36 +1553,35 @@ static table_sql sql_for_relations_format2()

sql.name = "{prefix}_rels";

sql.create_table += "CREATE {unlogged} TABLE {schema}\"{prefix}_rels\" ("
" id int8 PRIMARY KEY {using_tablespace},"
"{attribute_columns_definition}"
" members jsonb NOT NULL,"
" tags jsonb NOT NULL"
") {data_tablespace};\n"

"CREATE OR REPLACE FUNCTION"
" {schema}\"{prefix}_member_ids\"(jsonb, char)"
" RETURNS int8[] AS $$\n"
" SELECT array_agg((el->>'ref')::int8)"
" FROM jsonb_array_elements($1) AS el"
" WHERE el->>'type' = $2\n"
"$$ LANGUAGE SQL IMMUTABLE;\n";

sql.prepare_query = "PREPARE get_rel(int8) AS"
" SELECT members, tags{attribute_columns_use}"
" FROM {schema}\"{prefix}_rels\" o"
" {users_table_access}"
" WHERE o.id = $1;\n";

sql.create_fw_dep_indexes =
sql.create_table = "CREATE {unlogged} TABLE {schema}\"{prefix}_rels\" ("
" id int8 PRIMARY KEY {using_tablespace},"
"{attribute_columns_definition}"
" members jsonb NOT NULL,"
" tags jsonb NOT NULL"
") {data_tablespace}";

sql.prepare_queries = {"PREPARE get_rel(int8) AS"
" SELECT members, tags{attribute_columns_use}"
" FROM {schema}\"{prefix}_rels\" o"
" {users_table_access}"
" WHERE o.id = $1"};

sql.create_fw_dep_indexes = {
"CREATE OR REPLACE FUNCTION"
" {schema}\"{prefix}_member_ids\"(jsonb, char)"
" RETURNS int8[] AS $$"
" SELECT array_agg((el->>'ref')::int8)"
" FROM jsonb_array_elements($1) AS el"
" WHERE el->>'type' = $2"
"$$ LANGUAGE SQL IMMUTABLE",
"CREATE INDEX \"{prefix}_rels_node_members\""
" ON {schema}\"{prefix}_rels\" USING GIN"
" (({schema}\"{prefix}_member_ids\"(members, 'N'::char)))"
" WITH (fastupdate = off) {index_tablespace};\n"
" WITH (fastupdate = off) {index_tablespace}",
"CREATE INDEX \"{prefix}_rels_way_members\""
" ON {schema}\"{prefix}_rels\" USING GIN"
" (({schema}\"{prefix}_member_ids\"(members, 'W'::char)))"
" WITH (fastupdate = off) {index_tablespace};\n";
" WITH (fastupdate = off) {index_tablespace}"};

return sql;
}
Expand Down Expand Up @@ -1659,8 +1671,8 @@ middle_pgsql_t::get_query_instance()

// We use a connection per table to enable the use of COPY
for (auto &table : m_tables) {
if (!table.m_prepare_query.empty()) {
mid->exec_sql(table.m_prepare_query);
for (auto const &query : table.m_prepare_queries) {
mid->exec_sql(query);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/middle-pgsql.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ struct table_sql
{
std::string name;
std::string create_table;
std::string prepare_query;
std::string create_fw_dep_indexes;
std::vector<std::string> prepare_queries;
std::vector<std::string> create_fw_dep_indexes;
};

struct middle_pgsql_t : public middle_t
Expand Down Expand Up @@ -150,8 +150,8 @@ struct middle_pgsql_t : public middle_t
void build_index(std::string const &conninfo) const;

std::string m_create_table;
std::string m_prepare_query;
std::string m_create_fw_dep_indexes;
std::vector<std::string> m_prepare_queries;
std::vector<std::string> m_create_fw_dep_indexes;

void task_set(std::future<std::chrono::microseconds> &&future)
{
Expand Down
6 changes: 6 additions & 0 deletions tests/test-middle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,9 @@ TEMPLATE_TEST_CASE("middle: change nodes in way", "", options_slim_default,
check_way_nodes(mid, way21.id(), {&node11, &node12});

REQUIRE_FALSE(dependency_manager.has_pending());

mid->stop();
mid->wait();
}

// From now on use append mode to not destroy the data we just added.
Expand Down Expand Up @@ -1235,6 +1238,9 @@ TEMPLATE_TEST_CASE("middle: change nodes in relation", "", options_slim_default,
mid->relation(rel30);
mid->relation(rel31);
mid->after_relations();

mid->stop();
mid->wait();
}

// From now on use append mode to not destroy the data we just added.
Expand Down

0 comments on commit d34c23b

Please sign in to comment.