Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dramatically improve postgresql performance #323

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/drivers/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ var (
}
schemaMigrations = []string{
`ALTER TABLE kine MODIFY COLUMN id BIGINT UNSIGNED AUTO_INCREMENT NOT NULL UNIQUE, MODIFY COLUMN create_revision BIGINT UNSIGNED, MODIFY COLUMN prev_revision BIGINT UNSIGNED`,
// Creating an empty migration to ensure that postgresql and mysql migrations match up
// with each other for a give value of KINE_SCHEMA_MIGRATION env var
``,
}
createDB = "CREATE DATABASE IF NOT EXISTS "
)
Expand Down Expand Up @@ -148,6 +151,9 @@ func setup(db *sql.DB) error {
if i >= int(schemaVersion) {
break
}
if stmt == "" {
continue
}
logrus.Tracef("SETUP EXEC MIGRATION %d: %v", i, util.Stripped(stmt))
if _, err := db.Exec(stmt); err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
Expand Down
51 changes: 50 additions & 1 deletion pkg/drivers/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pgsql
import (
"context"
"database/sql"
"fmt"
"net/url"
"os"
"regexp"
Expand Down Expand Up @@ -32,7 +33,7 @@ var (
`CREATE TABLE IF NOT EXISTS kine
(
id BIGSERIAL PRIMARY KEY,
name VARCHAR(630),
name text COLLATE "C",
created INTEGER,
deleted INTEGER,
create_revision BIGINT,
Expand All @@ -41,14 +42,19 @@ var (
value bytea,
old_value bytea
);`,

`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
`CREATE INDEX IF NOT EXISTS kine_list_query_index on kine(name, id DESC, deleted)`,
}
schemaMigrations = []string{
`ALTER TABLE kine ALTER COLUMN id SET DATA TYPE BIGINT, ALTER COLUMN create_revision SET DATA TYPE BIGINT, ALTER COLUMN prev_revision SET DATA TYPE BIGINT; ALTER SEQUENCE kine_id_seq AS BIGINT`,
// It is important to set the collation to "C" to ensure that LIKE and COMPARISON
// queries use the index.
`ALTER TABLE kine ALTER COLUMN name SET DATA TYPE TEXT USING name::TEXT COLLATE "C"`,
sambhav marked this conversation as resolved.
Show resolved Hide resolved
}
createDB = "CREATE DATABASE "
)
Expand All @@ -67,6 +73,41 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
if err != nil {
return nil, err
}
listSQL := `
SELECT
(SELECT MAX(rkv.id) AS id FROM kine AS rkv),
(SELECT MAX(crkv.prev_revision) AS prev_revision FROM kine AS crkv WHERE crkv.name = 'compact_rev_key'),
maxkv.*
FROM (
SELECT DISTINCT ON (name)
kv.id AS theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value
FROM
kine AS kv
WHERE
kv.name LIKE ?
%s
ORDER BY kv.name, theid DESC
) AS maxkv
WHERE
maxkv.deleted = 0 OR ?
ORDER BY maxkv.name, maxkv.theid DESC
`

countSQL := `
SELECT
(SELECT MAX(rkv.id) AS id FROM kine AS rkv),
COUNT(c.theid)
FROM (
SELECT DISTINCT ON (name)
kv.id AS theid, kv.deleted
FROM kine AS kv
WHERE
kv.name LIKE ?
%s
ORDER BY kv.name, theid DESC
) AS c
WHERE c.deleted = 0 OR ?
`
dialect.GetSizeSQL = `SELECT pg_total_relation_size('kine')`
dialect.CompactSQL = `
DELETE FROM kine AS kv
Expand All @@ -85,6 +126,11 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
kd.id <= $2
) AS ks
WHERE kv.id = ks.id`
dialect.GetCurrentSQL = q(fmt.Sprintf(listSQL, "AND kv.name > ?"))
dialect.ListRevisionStartSQL = q(fmt.Sprintf(listSQL, "AND kv.id <= ?"))
dialect.GetRevisionAfterSQL = q(fmt.Sprintf(listSQL, "AND kv.name > ? AND kv.id <= ?"))
dialect.CountCurrentSQL = q(fmt.Sprintf(countSQL, "AND kv.name > ?"))
dialect.CountRevisionSQL = q(fmt.Sprintf(countSQL, "AND kv.name > ? AND kv.id <= ?"))
dialect.FillRetryDuration = time.Millisecond + 5
dialect.InsertRetry = func(err error) bool {
if err, ok := err.(*pgconn.PgError); ok && err.Code == pgerrcode.UniqueViolation && err.ConstraintName == "kine_pkey" {
Expand Down Expand Up @@ -134,6 +180,9 @@ func setup(db *sql.DB) error {
if i >= int(schemaVersion) {
break
}
if stmt == "" {
continue
}
logrus.Tracef("SETUP EXEC MIGRATION %d: %v", i, util.Stripped(stmt))
if _, err := db.Exec(stmt); err != nil {
return err
Expand Down