Skip to content

Commit

Permalink
Merge pull request #82 from nyaruka/created_vs_updated
Browse files Browse the repository at this point in the history
Split up created and updated contacts in logging
  • Loading branch information
rowanseymour authored Jun 10, 2024
2 parents 2db16bc + 3340f1d commit a88111a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 17 deletions.
17 changes: 11 additions & 6 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,20 +267,23 @@ type indexResponse struct {
}

// indexes the batch of contacts
func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) {
func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, int, error) {
response := indexResponse{}
indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index)

_, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response)
if err != nil {
return 0, 0, err
return 0, 0, 0, err
}

createdCount, deletedCount, conflictedCount := 0, 0, 0
createdCount, updatedCount, deletedCount, conflictedCount := 0, 0, 0, 0

for _, item := range response.Items {
if item.Index.ID != "" {
slog.Debug("index response", "id", item.Index.ID, "status", item.Index.Status)
if item.Index.Status == 200 || item.Index.Status == 201 {
if item.Index.Status == 200 {
updatedCount++
} else if item.Index.Status == 201 {
createdCount++
} else if item.Index.Status == 409 {
conflictedCount++
Expand All @@ -298,8 +301,10 @@ func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) {
slog.Error("unparsed item in response")
}
}
slog.Debug("indexed batch", "created", createdCount, "deleted", deletedCount, "conflicted", conflictedCount)
return createdCount, deletedCount, nil

slog.Debug("indexed batch", "created", createdCount, "updated", updatedCount, "deleted", deletedCount, "conflicted", conflictedCount)

return createdCount, updatedCount, deletedCount, nil
}

// our response for finding the last modified document
Expand Down
27 changes: 16 additions & 11 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error

// now index our docs
start := time.Now()
indexed, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
created, updated, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
if err != nil {
return "", fmt.Errorf("error indexing documents: %w", err)
}

i.recordComplete(indexed, deleted, time.Since(start))
i.recordComplete(created+updated, deleted, time.Since(start))

// if the index didn't previously exist or we are rebuilding, remap to our alias
if remapAlias {
Expand Down Expand Up @@ -153,8 +153,8 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
`

// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, error) {
totalFetched, totalCreated, totalDeleted := 0, 0, 0
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, int, error) {
totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0

var modifiedOn time.Time
var contactJSON string
Expand All @@ -168,18 +168,20 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
batchStart := time.Now() // start time for this batch
batchFetched := 0 // contacts fetched in this batch
batchCreated := 0 // contacts created in ES
batchUpdated := 0 // contacts updated in ES
batchDeleted := 0 // contacts deleted in ES
batchESTime := time.Duration(0) // time spent indexing for this batch

indexSubBatch := func(b *bytes.Buffer) error {
t := time.Now()
created, deleted, err := i.indexBatch(index, b.Bytes())
created, updated, deleted, err := i.indexBatch(index, b.Bytes())
if err != nil {
return err
}

batchESTime += time.Since(t)
batchCreated += created
batchUpdated += updated
batchDeleted += deleted
b.Reset()
return nil
Expand All @@ -191,17 +193,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st

// no more rows? return
if err == sql.ErrNoRows {
return 0, 0, nil
return 0, 0, 0, nil
}
if err != nil {
return 0, 0, err
return 0, 0, 0, err
}
defer rows.Close()

for rows.Next() {
err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON)
if err != nil {
return 0, 0, err
return 0, 0, 0, err
}

batchFetched++
Expand All @@ -224,21 +226,22 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
// write to elastic search in batches
if batchFetched%i.batchSize == 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, err
return 0, 0, 0, err
}
}
}

if subBatch.Len() > 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, err
return 0, 0, 0, err
}
}

rows.Close()

totalFetched += batchFetched
totalCreated += batchCreated
totalUpdated += batchUpdated
totalDeleted += batchDeleted

totalTime := time.Since(start)
Expand All @@ -249,10 +252,12 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
"rate", batchRate,
"batch_fetched", batchFetched,
"batch_created", batchCreated,
"batch_updated", batchUpdated,
"batch_elapsed", batchTime,
"batch_elapsed_es", batchESTime,
"total_fetched", totalFetched,
"total_created", totalCreated,
"total_updated", totalUpdated,
"total_elapsed", totalTime,
)

Expand All @@ -269,7 +274,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
}
}

return totalCreated, totalDeleted, nil
return totalCreated, totalUpdated, totalDeleted, nil
}

func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {
Expand Down

0 comments on commit a88111a

Please sign in to comment.