From 3f4b674848e6abd633a48f28d446058c3fcf7a69 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 8 Jul 2022 11:09:46 +0100 Subject: [PATCH] Include rollups in monthlies failed metric as well as monthlies created from scratch --- archives/archives.go | 43 ++++++++++++---------------------------- archives/utils.go | 47 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 30 deletions(-) diff --git a/archives/archives.go b/archives/archives.go index c0c9e00..35bda2e 100644 --- a/archives/archives.go +++ b/archives/archives.go @@ -770,19 +770,21 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s } // RollupOrgArchives rolls up monthly archives from our daily archives -func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { +func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { ctx, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name, "archive_type": archiveType}) - created := make([]*Archive, 0, 1) // get our missing monthly archives archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType) if err != nil { - return nil, err + return nil, nil, err } + created := make([]*Archive, 0, len(archives)) + failed := make([]*Archive, 0, 1) + // build them from rollups for _, archive := range archives { log := log.WithFields(logrus.Fields{"start_date": archive.StartDate}) @@ -791,6 +793,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s err = BuildRollupArchive(ctx, db, config, s3Client, archive, now, org, archiveType) if err != nil { log.WithError(err).Error("error building monthly archive") + failed = append(failed, archive) continue } @@ -798,6 +801,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s err = UploadArchive(ctx, s3Client, config.S3Bucket, archive) if err != nil { log.WithError(err).Error("error writing archive to s3") + failed = append(failed, archive) continue } } @@ -805,6 +809,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s err = WriteArchiveToDB(ctx, db, archive) if err != nil { log.WithError(err).Error("error writing record to db") + failed = append(failed, archive) continue } @@ -820,7 +825,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s created = append(created, archive) } - return created, nil + return created, failed, nil } const setArchiveDeleted = ` @@ -829,21 +834,6 @@ SET needs_deletion = FALSE, deleted_on = $2 WHERE id = $1 ` -// helper method to safely execute an IN query in the passed in transaction -func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) error { - q, vs, err := sqlx.In(query, ids) - if err != nil { - return err - } - q = tx.Rebind(q) - - _, err = tx.ExecContext(ctx, q, vs...) - if err != nil { - tx.Rollback() - } - return err -} - var deleteTransactionSize = 100 // DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created @@ -909,12 +899,14 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3 log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org") } - monthliesRolledUp, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) + rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) if err != nil { return nil, nil, nil, nil, nil, errors.Wrapf(err, "error rolling up archives") } - monthliesCreated = append(monthliesCreated, monthliesRolledUp...) + monthliesCreated = append(monthliesCreated, rollupsCreated...) + monthliesFailed = append(monthliesFailed, rollupsFailed...) + monthliesFailed = removeDuplicates(monthliesFailed) // don't double report monthlies that fail being built from db and rolled up from dailies // finally delete any archives not yet actually archived var deleted []*Archive @@ -997,12 +989,3 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error { return nil } - -// counts the records in the given archives -func countRecords(as []*Archive) int { - n := 0 - for _, a := range as { - n += a.RecordCount - } - return n -} diff --git a/archives/utils.go b/archives/utils.go index d50db23..0a853c0 100644 --- a/archives/utils.go +++ b/archives/utils.go @@ -1,5 +1,52 @@ package archives +import ( + "context" + "fmt" + "time" + + "github.com/jmoiron/sqlx" +) + +// helper method to safely execute an IN query in the passed in transaction +func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) error { + q, vs, err := sqlx.In(query, ids) + if err != nil { + return err + } + q = tx.Rebind(q) + + _, err = tx.ExecContext(ctx, q, vs...) + if err != nil { + tx.Rollback() + } + return err +} + +// counts the records in the given archives +func countRecords(as []*Archive) int { + n := 0 + for _, a := range as { + n += a.RecordCount + } + return n +} + +// removes duplicates from a slice of archives +func removeDuplicates(as []*Archive) []*Archive { + unique := make([]*Archive, 0, len(as)) + seen := make(map[string]bool) + + for _, a := range as { + key := fmt.Sprintf("%s:%s:%s", a.ArchiveType, a.Period, a.StartDate.Format(time.RFC3339)) + if !seen[key] { + unique = append(unique, a) + seen[key] = true + } + } + return unique +} + // chunks a slice of in64 IDs func chunkIDs(ids []int64, size int) [][]int64 { chunks := make([][]int64, 0, len(ids)/size+1)