Skip to content
This repository has been archived by the owner on Mar 15, 2024. It is now read-only.

Commit

Permalink
Implement DB batch insertion instead of one insertion per event
Browse files Browse the repository at this point in the history
Use db batching when inserting events instead of one insertion per
event. This offers a much more scalable database interaction with
reduced round-trip latency.

Later on we may look into batching further across multiple `POST`
requests if batches sent by the clients are too small still.
  • Loading branch information
masih committed Mar 2, 2023
1 parent ca29b07 commit 788a228
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"

"github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)

Expand Down Expand Up @@ -100,9 +102,8 @@ func (r *EventRecorder) handleRetrievalEvents(res http.ResponseWriter, req *http

logger = logger.With("total", len(batch.Events))
ctx := req.Context()
errs := make([]error, 0, len(batch.Events))
var batchQuery pgx.Batch
for _, event := range batch.Events {
// TODO: use db batching; this will not be performant at scale.
query := `
INSERT INTO retrieval_events(
retrieval_id,
Expand All @@ -117,7 +118,7 @@ func (r *EventRecorder) handleRetrievalEvents(res http.ResponseWriter, req *http
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`
_, err := r.db.Exec(ctx, query,
batchQuery.Queue(query,
event.RetrievalId.String(),
event.InstanceId,
event.Cid,
Expand All @@ -127,21 +128,25 @@ func (r *EventRecorder) handleRetrievalEvents(res http.ResponseWriter, req *http
event.EventName,
event.EventTime,
event.EventDetails,
)
if err != nil {
logger.Errorw("Failed to insert retrieval event", "event", event, "err", err)
errs = append(errs, err)
continue
}
logger.Debug("Saved retrieval event")
).Exec(func(ct pgconn.CommandTag) error {
rowsAffected := ct.RowsAffected()
switch rowsAffected {
case 0:
logger.Warnw("Retrieval event insertion did not affect any rows", "event", event, "rowsAffected", rowsAffected)
default:
logger.Debugw("Inserted event successfully", "event", event, "rowsAffected", rowsAffected)
}
return nil
})
}

if len(errs) != 0 {
batchResult := r.db.SendBatch(ctx, &batchQuery)
if err := batchResult.Close(); err != nil {
http.Error(res, "", http.StatusInternalServerError)
logger.Infow("At least one retrieval event insertion failed", "failed", len(errs))
logger.Infow("At least one retrieval event insertion failed", "err", err)
return
} else {
logger.Infow("Successfully submitted batch event insertion")
}
logger.Infow("Successfully inserted events")
}

func (r *EventRecorder) Shutdown(ctx context.Context) error {
Expand Down

0 comments on commit 788a228

Please sign in to comment.