Skip to content

Commit

Permalink
Test against Elastic 7 and 8
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed May 13, 2024
1 parent 334038a commit b5c323e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 31 deletions.
27 changes: 20 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ jobs:
test:
name: Test
runs-on: ubuntu-latest
strategy:
matrix:
elastic-port: [9207, 9208]

services:
postgres:
Expand All @@ -17,13 +20,19 @@ jobs:
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
elastic:
image: elasticsearch:7.17.9
ports:
- 9200:9200
- 9300:9300
env:
discovery.type: single-node
elastic7:
image: elasticsearch:7.17.9
ports:
- 9207:9200
env:
discovery.type: single-node
elastic8:
image: elasticsearch:8.13.4
ports:
- 9208:9200
env:
discovery.type: single-node
xpack.security.enabled: false

steps:
- name: Checkout code
Expand All @@ -34,6 +43,10 @@ jobs:
with:
go-version: ${{ env.go-version }}

- name: Setup environment
run: |
echo "INDEXER_ELASTIC_URL=http://localhost:${{ matrix.elastic-port }}" >> "$GITHUB_ENV"
- name: Run tests
run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./...

Expand Down
31 changes: 19 additions & 12 deletions indexers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,47 @@ import (

"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/jsonx"
indexer "github.com/nyaruka/rp-indexer/v9"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const elasticURL = "http://localhost:9200"
const aliasName = "indexer_test"

func setup(t *testing.T) *sql.DB {
func setup(t *testing.T) (*indexer.Config, *sql.DB) {
cfg := indexer.NewDefaultConfig()
cfg.DB = "postgres://indexer_test:temba@localhost:5432/indexer_test?sslmode=disable"
cfg.ElasticURL = os.Getenv("INDEXER_ELASTIC_URL")
if cfg.ElasticURL == "" {
cfg.ElasticURL = "http://localhost:9200"
}

testDB, err := os.ReadFile("../testdb.sql")
require.NoError(t, err)

db, err := sql.Open("postgres", "postgres://indexer_test:temba@localhost:5432/indexer_test?sslmode=disable")
db, err := sql.Open("postgres", cfg.DB)
require.NoError(t, err)

_, err = db.Exec(string(testDB))
require.NoError(t, err)

// delete all indexes with our alias prefix
existing := elasticRequest(t, http.MethodGet, "/_aliases", nil)
existing := elasticRequest(t, cfg, http.MethodGet, "/_aliases", nil)

for name := range existing {
if strings.HasPrefix(name, aliasName) {
elasticRequest(t, http.MethodDelete, "/"+name, nil)
elasticRequest(t, cfg, http.MethodDelete, "/"+name, nil)
}
}

slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})))

return db
return cfg, db
}

func assertQuery(t *testing.T, query []byte, expected []int64, msgAndArgs ...interface{}) {
results := elasticRequest(t, http.MethodPost, "/"+aliasName+"/_search",
func assertQuery(t *testing.T, cfg *indexer.Config, query []byte, expected []int64, msgAndArgs ...interface{}) {
results := elasticRequest(t, cfg, http.MethodPost, "/"+aliasName+"/_search",
map[string]any{"query": json.RawMessage(query), "sort": []map[string]any{{"id": "asc"}}},
)
hits := results["hits"].(map[string]any)["hits"].([]any)
Expand All @@ -63,8 +70,8 @@ func assertQuery(t *testing.T, query []byte, expected []int64, msgAndArgs ...int
assert.Equal(t, expected, actual, msgAndArgs...)
}

func assertIndexesWithPrefix(t *testing.T, prefix string, expected []string) {
all := elasticRequest(t, http.MethodGet, "/_aliases", nil)
func assertIndexesWithPrefix(t *testing.T, cfg *indexer.Config, prefix string, expected []string) {
all := elasticRequest(t, cfg, http.MethodGet, "/_aliases", nil)

actual := []string{}
for name := range all {
Expand All @@ -82,12 +89,12 @@ func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed, expe
assert.Equal(t, expectedDeleted, actual.Deleted, "deleted mismatch")
}

func elasticRequest(t *testing.T, method, path string, data map[string]any) map[string]any {
func elasticRequest(t *testing.T, cfg *indexer.Config, method, path string, data map[string]any) map[string]any {
var body io.Reader
if data != nil {
body = bytes.NewReader(jsonx.MustMarshal(data))
}
req, err := httpx.NewRequest(method, elasticURL+path, body, map[string]string{"Content-Type": "application/json"})
req, err := httpx.NewRequest(method, cfg.ElasticURL+path, body, map[string]string{"Content-Type": "application/json"})
require.NoError(t, err)

trace, err := httpx.DoTrace(http.DefaultClient, req, nil, nil, -1)
Expand Down
24 changes: 12 additions & 12 deletions indexers/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ var contactQueryTests = []struct {
}

func TestContacts(t *testing.T) {
db := setup(t)
cfg, db := setup(t)

ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
ix1 := indexers.NewContactIndexer(cfg.ElasticURL, aliasName, 2, 1, 4)
assert.Equal(t, "indexer_test", ix1.Name())

dbModified, err := ix1.GetDBLastModified(context.Background(), db)
Expand All @@ -87,10 +87,10 @@ func TestContacts(t *testing.T) {
assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), esModified, 0)

assertIndexerStats(t, ix1, 9, 0)
assertIndexesWithPrefix(t, aliasName, []string{expectedIndexName})
assertIndexesWithPrefix(t, cfg, aliasName, []string{expectedIndexName})

for _, tc := range contactQueryTests {
assertQuery(t, []byte(tc.query), tc.expected, "query mismatch for %s", tc.query)
assertQuery(t, cfg, []byte(tc.query), tc.expected, "query mismatch for %s", tc.query)
}

lastModified, err := ix1.GetESLastModified(indexName)
Expand All @@ -112,21 +112,21 @@ func TestContacts(t *testing.T) {

time.Sleep(1 * time.Second)

assertIndexesWithPrefix(t, aliasName, []string{expectedIndexName})
assertIndexesWithPrefix(t, cfg, aliasName, []string{expectedIndexName})

// should only match new john, old john is gone
assertQuery(t, []byte(`{"match": {"name": {"query": "john"}}}`), []int64{2})
assertQuery(t, cfg, []byte(`{"match": {"name": {"query": "john"}}}`), []int64{2})

// 3 is no longer in our group
assertQuery(t, []byte(`{"match": {"group_ids": {"query": 4}}}`), []int64{1})
assertQuery(t, cfg, []byte(`{"match": {"group_ids": {"query": 4}}}`), []int64{1})

// change John's name to Eric..
_, err = db.Exec(`
UPDATE contacts_contact SET name = 'Eric', modified_on = '2020-08-20 14:00:00+00' where id = 2;`)
require.NoError(t, err)

// and simulate another indexer doing a parallel rebuild
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
ix2 := indexers.NewContactIndexer(cfg.ElasticURL, aliasName, 2, 1, 4)

indexName2, err := ix2.Index(db, true, false)
assert.NoError(t, err)
Expand All @@ -136,20 +136,20 @@ func TestContacts(t *testing.T) {
time.Sleep(1 * time.Second)

// check we have a new index but the old index is still around
assertIndexesWithPrefix(t, aliasName, []string{expectedIndexName, expectedIndexName + "_1"})
assertIndexesWithPrefix(t, cfg, aliasName, []string{expectedIndexName, expectedIndexName + "_1"})

// and the alias points to the new index
assertQuery(t, []byte(`{"match": {"name": {"query": "eric"}}}`), []int64{2})
assertQuery(t, cfg, []byte(`{"match": {"name": {"query": "eric"}}}`), []int64{2})

// simulate another indexer doing a parallel rebuild with cleanup
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
ix3 := indexers.NewContactIndexer(cfg.ElasticURL, aliasName, 2, 1, 4)
indexName3, err := ix3.Index(db, true, true)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used
assertIndexerStats(t, ix3, 8, 0)

// check we cleaned up indexes besides the new one
assertIndexesWithPrefix(t, aliasName, []string{expectedIndexName + "_2"})
assertIndexesWithPrefix(t, cfg, aliasName, []string{expectedIndexName + "_2"})

// check that the original indexer now indexes against the new index
indexName, err = ix1.Index(db, false, false)
Expand Down

0 comments on commit b5c323e

Please sign in to comment.