Skip to content

Commit

Permalink
BCDA-8215: Remove completed job count application logic (#964)
Browse files Browse the repository at this point in the history
## 🎫 Ticket

https://jira.cms.gov/browse/BCDA-8215

## 🛠 Changes

- Remove all application logic referencing completed job count

## ℹ️ Context

This column, by it's own commented definition, is not a reliable
indicator of the number of actually completed sub-jobs for an export
job. It appears that its only use is in the in-progress job status
header to indicate roughly how far along the job processing is.

Instead of attempting to maintain this field, we should be relying on
the source of truth (the number of completed job keys). To avoid
confusion and to reduce code complexity, we are removing this column.

Before removing the column from the database, we must stop reading it
from the application.

## 🧪 Validation

- Unit testing
  • Loading branch information
kyeah committed Jul 8, 2024
1 parent 777b548 commit ec9c426
Show file tree
Hide file tree
Showing 16 changed files with 83 additions and 117 deletions.
8 changes: 7 additions & 1 deletion bcda/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,13 @@ func (h *Handler) JobStatus(w http.ResponseWriter, r *http.Request) {
logger.Error(job.Status)
h.RespWriter.Exception(r.Context(), w, http.StatusInternalServerError, responseutils.JobFailed, responseutils.DetailJobFailed)
case models.JobStatusPending, models.JobStatusInProgress:
w.Header().Set("X-Progress", job.StatusMessage())
completedJobKeyCount := utils.CountUniq(jobKeys, func(jobKey *models.JobKey) int64 {
if jobKey.QueJobID == nil {
return -1
}
return *jobKey.QueJobID
})
w.Header().Set("X-Progress", job.StatusMessage(completedJobKeyCount))
w.WriteHeader(http.StatusAccepted)
return
case models.JobStatusCompleted:
Expand Down
34 changes: 16 additions & 18 deletions bcda/api/requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,15 +757,14 @@ func (s *RequestsTestSuite) TestJobStatusErrorHandling() {

mockSrv.On("GetJobAndKeys", testUtils.CtxMatcher, uint(1)).Return(
&models.Job{
ID: 1,
ACOID: uuid.NewRandom(),
RequestURL: requestUrl,
Status: tt.status,
TransactionTime: timestp,
JobCount: 100,
CompletedJobCount: 100,
CreatedAt: timestp,
UpdatedAt: timestp.Add(time.Duration(tt.timestampOffset)),
ID: 1,
ACOID: uuid.NewRandom(),
RequestURL: requestUrl,
Status: tt.status,
TransactionTime: timestp,
JobCount: 100,
CreatedAt: timestp,
UpdatedAt: timestp.Add(time.Duration(tt.timestampOffset)),
},
[]*models.JobKey{{
ID: 1,
Expand Down Expand Up @@ -890,15 +889,14 @@ func (s *RequestsTestSuite) TestJobFailedStatus() {
timestp := time.Now()
mockSrv.On("GetJobAndKeys", testUtils.CtxMatcher, uint(1)).Return(
&models.Job{
ID: 1,
ACOID: uuid.NewRandom(),
RequestURL: tt.requestUrl,
Status: tt.status,
TransactionTime: timestp,
JobCount: 100,
CompletedJobCount: 100,
CreatedAt: timestp,
UpdatedAt: timestp,
ID: 1,
ACOID: uuid.NewRandom(),
RequestURL: tt.requestUrl,
Status: tt.status,
TransactionTime: timestp,
JobCount: 100,
CreatedAt: timestp,
UpdatedAt: timestp,
},
[]*models.JobKey{{
ID: 1,
Expand Down
26 changes: 15 additions & 11 deletions bcda/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models

import (
"fmt"
"strings"
"time"

"github.com/pborman/uuid"
Expand All @@ -24,20 +25,19 @@ var AllJobStatuses []JobStatus = []JobStatus{JobStatusPending, JobStatusInProgre

type JobStatus string
type Job struct {
ID uint
ACOID uuid.UUID `json:"aco_id"`
RequestURL string `json:"request_url"` // request_url
Status JobStatus `json:"status"` // status
TransactionTime time.Time // most recent data load transaction time from BFD
JobCount int
CompletedJobCount int
CreatedAt time.Time
UpdatedAt time.Time
ID uint
ACOID uuid.UUID `json:"aco_id"`
RequestURL string `json:"request_url"` // request_url
Status JobStatus `json:"status"` // status
TransactionTime time.Time // most recent data load transaction time from BFD
JobCount int
CreatedAt time.Time
UpdatedAt time.Time
}

func (j *Job) StatusMessage() string {
func (j *Job) StatusMessage(numCompletedJobKeys int) string {
if j.Status == JobStatusInProgress && j.JobCount > 0 {
pct := float64(j.CompletedJobCount) / float64(j.JobCount) * 100
pct := float64(numCompletedJobKeys) / float64(j.JobCount) * 100
return fmt.Sprintf("%s (%d%%)", j.Status, int(pct))
}

Expand All @@ -57,6 +57,10 @@ type JobKey struct {
ResourceType string
}

func (j *JobKey) IsError() bool {
return strings.Contains(j.FileName, "-error.ndjson")
}

// ACO represents an Accountable Care Organization.
type ACO struct {
ID uint
Expand Down
12 changes: 6 additions & 6 deletions bcda/models/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ func TestModelsTestSuite(t *testing.T) {
}

func (s *ModelsTestSuite) TestJobStatusMessage() {
j := Job{Status: constants.InProgress, JobCount: 25, CompletedJobCount: 6}
assert.Equal(s.T(), "In Progress (24%)", j.StatusMessage())
j := Job{Status: constants.InProgress, JobCount: 25}
assert.Equal(s.T(), "In Progress (24%)", j.StatusMessage(6))

j = Job{Status: constants.InProgress, JobCount: 0, CompletedJobCount: 0}
assert.Equal(s.T(), constants.InProgress, j.StatusMessage())
j = Job{Status: constants.InProgress, JobCount: 0}
assert.Equal(s.T(), constants.InProgress, j.StatusMessage(0))

j = Job{Status: JobStatusCompleted, JobCount: 25, CompletedJobCount: 25}
assert.Equal(s.T(), string(JobStatusCompleted), j.StatusMessage())
j = Job{Status: JobStatusCompleted, JobCount: 25}
assert.Equal(s.T(), string(JobStatusCompleted), j.StatusMessage(25))
}

func (s *ModelsTestSuite) TestACOBlacklist() {
Expand Down
11 changes: 5 additions & 6 deletions bcda/models/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (r *Repository) UpdateSuppressionFileImportStatus(ctx context.Context, file
return nil
}

var jobColumns []string = []string{"id", "aco_id", "request_url", "status", "transaction_time", "job_count", "completed_job_count", "created_at", "updated_at"}
var jobColumns []string = []string{"id", "aco_id", "request_url", "status", "transaction_time", "job_count", "created_at", "updated_at"}

func (r *Repository) GetJobs(ctx context.Context, acoID uuid.UUID, statuses ...models.JobStatus) ([]*models.Job, error) {
s := make([]interface{}, len(statuses))
Expand Down Expand Up @@ -425,7 +425,7 @@ func (r *Repository) GetJobByID(ctx context.Context, jobID uint) (*models.Job, e
)

err := r.QueryRowContext(ctx, query, args...).Scan(&j.ID, &j.ACOID, &j.RequestURL, &j.Status, &transactionTime,
&j.JobCount, &j.CompletedJobCount, &createdAt, &updatedAt)
&j.JobCount, &createdAt, &updatedAt)
j.TransactionTime, j.CreatedAt, j.UpdatedAt = transactionTime.Time, createdAt.Time, updatedAt.Time

if err != nil {
Expand All @@ -439,10 +439,10 @@ func (r *Repository) CreateJob(ctx context.Context, j models.Job) (uint, error)
// User raw builder since we need to retrieve the associated ID
ib := sqlFlavor.NewInsertBuilder().InsertInto("jobs")
ib.Cols("aco_id", "request_url", "status",
"transaction_time", "job_count", "completed_job_count",
"transaction_time", "job_count",
"created_at", "updated_at").
Values(j.ACOID, j.RequestURL, j.Status,
j.TransactionTime, j.JobCount, j.CompletedJobCount,
j.TransactionTime, j.JobCount,
sqlbuilder.Raw("NOW()"), sqlbuilder.Raw("NOW()"))

query, args := ib.Build()
Expand All @@ -465,7 +465,6 @@ func (r *Repository) UpdateJob(ctx context.Context, j models.Job) error {
ub.Assign("status", j.Status),
ub.Assign("transaction_time", j.TransactionTime),
ub.Assign("job_count", j.JobCount),
ub.Assign("completed_job_count", j.CompletedJobCount),
ub.Assign("updated_at", sqlbuilder.Raw("NOW()")),
)
ub.Where(ub.Equal("id", j.ID))
Expand Down Expand Up @@ -545,7 +544,7 @@ func (r *Repository) getJobs(ctx context.Context, query string, args ...interfac
for rows.Next() {
var j models.Job
if err = rows.Scan(&j.ID, &j.ACOID, &j.RequestURL, &j.Status, &transactionTime,
&j.JobCount, &j.CompletedJobCount, &createdAt, &updatedAt); err != nil {
&j.JobCount, &createdAt, &updatedAt); err != nil {
return nil, err
}
j.TransactionTime, j.CreatedAt, j.UpdatedAt = transactionTime.Time, createdAt.Time, updatedAt.Time
Expand Down
8 changes: 3 additions & 5 deletions bcda/models/postgres/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,9 +683,9 @@ func (r *RepositoryTestSuite) TestJobsMethods() {

defer postgrestest.DeleteACO(r.T(), r.db, aco.UUID)

failed := models.Job{ACOID: aco.UUID, RequestURL: reqURL, Status: models.JobStatusFailed, JobCount: 10, CompletedJobCount: 20}
pending := models.Job{ACOID: aco.UUID, RequestURL: reqURL, Status: models.JobStatusPending, JobCount: 30, CompletedJobCount: 40}
completed := models.Job{ACOID: aco.UUID, RequestURL: reqURL, Status: models.JobStatusCompleted, JobCount: 40, CompletedJobCount: 60}
failed := models.Job{ACOID: aco.UUID, RequestURL: reqURL, Status: models.JobStatusFailed, JobCount: 10}
pending := models.Job{ACOID: aco.UUID, RequestURL: reqURL, Status: models.JobStatusPending, JobCount: 30}
completed := models.Job{ACOID: aco.UUID, RequestURL: reqURL, Status: models.JobStatusCompleted, JobCount: 40}

failed.ID, err = r.repository.CreateJob(ctx, failed)
assert.NoError(err)
Expand Down Expand Up @@ -755,7 +755,6 @@ func (r *RepositoryTestSuite) TestJobsMethods() {
// Account for time precision in postgres
failed.TransactionTime = time.Now().Round(time.Millisecond)
failed.JobCount = failed.JobCount + 1
failed.CompletedJobCount = failed.CompletedJobCount + 1
failed.Status = models.JobStatusArchived
assert.NoError(r.repository.UpdateJob(ctx, failed))

Expand All @@ -765,7 +764,6 @@ func (r *RepositoryTestSuite) TestJobsMethods() {
assert.Equal(failed.TransactionTime.UTC(), newFailed.TransactionTime.UTC())
assert.Equal(failed.Status, newFailed.Status)
assert.Equal(failed.JobCount, newFailed.JobCount)
assert.Equal(failed.CompletedJobCount, newFailed.CompletedJobCount)

// Verify that we did not modify other job
newCompleted, err := r.repository.GetJobByID(ctx, completed.ID)
Expand Down
16 changes: 16 additions & 0 deletions bcda/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,19 @@ func Dedup(slice []string) []string {

return newSlice
}

// Count the number of unique values in the slice based on given function
func CountUniq[S []E, E any, F comparable](arr S, f func(E) F) int {
var n int
var dupcheck = make(map[F]bool, n)

for _, val := range arr {
comparableVal := f(val)
if !dupcheck[comparableVal] {
dupcheck[comparableVal] = true
n++
}
}

return n
}
10 changes: 9 additions & 1 deletion bcda/utils/common_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package utils

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"testing"
)

type CommonTestSuite struct {
Expand All @@ -26,6 +27,13 @@ func (s *CommonTestSuite) TestDedup() {
assert.Len(s.T(), result, 3)
}

func (s *CommonTestSuite) TestCountUniq() {
firstLetter := func(s string) string { return string(s[0]) }
assert.Equal(s.T(), 0, CountUniq([]string{}, firstLetter))
assert.Equal(s.T(), 1, CountUniq([]string{"abc", "ab"}, firstLetter))
assert.Equal(s.T(), 2, CountUniq([]string{"abc", "bcd", "ab"}, firstLetter))
}

func TestCommonTestSuite(t *testing.T) {
suite.Run(t, new(CommonTestSuite))
}
5 changes: 1 addition & 4 deletions bcdaworker/queueing/manager/alr.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,6 @@ func (q *masterQueue) startAlrJob(queJob *que.Job) error {
return err
}

if err := q.repository.IncrementCompletedJobCount(ctx, jobArgs.ID); err != nil {
q.alrLog.Warnf("Failed to increment completed count %s", err.Error())
return err
}
jobComplete, err := q.isJobComplete(ctx, jobArgs.ID)
if err != nil {
q.alrLog.Warnf("Failed to check job completion %s", err)
Expand Down Expand Up @@ -223,5 +219,6 @@ func (q *masterQueue) isJobComplete(ctx context.Context, jobID uint) (bool, erro
Println("Excess number of jobs completed.")
return true, nil
}

return false, nil
}
3 changes: 1 addition & 2 deletions bcdaworker/queueing/manager/que_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ func TestStartAlrJob(t *testing.T) {
Status: models.JobStatusPending,
TransactionTime: time.Now(),
// JobCount is partitioned automatically, but it is done manually here
JobCount: 2,
CompletedJobCount: 0,
JobCount: 2,
}
id, err := r.CreateJob(ctx, job)
assert.NoError(t, err)
Expand Down
18 changes: 0 additions & 18 deletions bcdaworker/repository/mock_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 2 additions & 23 deletions bcdaworker/repository/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (r *Repository) GetCCLFBeneficiaryByID(ctx context.Context, id uint) (*mode

func (r *Repository) GetJobByID(ctx context.Context, jobID uint) (*models.Job, error) {
sb := sqlFlavor.NewSelectBuilder()
sb.Select("id", "aco_id", "request_url", "status", "transaction_time", "job_count", "completed_job_count", "created_at", "updated_at")
sb.Select("id", "aco_id", "request_url", "status", "transaction_time", "job_count", "created_at", "updated_at")
sb.From("jobs").Where(sb.Equal("id", jobID))

query, args := sb.Build()
Expand All @@ -96,7 +96,7 @@ func (r *Repository) GetJobByID(ctx context.Context, jobID uint) (*models.Job, e
)

err := r.QueryRowContext(ctx, query, args...).Scan(&j.ID, &j.ACOID, &j.RequestURL, &j.Status, &transactionTime,
&j.JobCount, &j.CompletedJobCount, &createdAt, &updatedAt)
&j.JobCount, &createdAt, &updatedAt)
j.TransactionTime, j.CreatedAt, j.UpdatedAt = transactionTime.Time, createdAt.Time, updatedAt.Time

if err != nil {
Expand All @@ -121,27 +121,6 @@ func (r *Repository) UpdateJobStatusCheckStatus(ctx context.Context, jobID uint,
map[string]interface{}{"status": new})
}

func (r *Repository) IncrementCompletedJobCount(ctx context.Context, jobID uint) error {
ub := sqlFlavor.NewUpdateBuilder().Update("jobs")
ub.Set(ub.Incr("completed_job_count"), ub.Assign("updated_at", sqlbuilder.Raw("NOW()"))).
Where(ub.Equal("id", jobID))

query, args := ub.Build()
res, err := r.ExecContext(ctx, query, args...)
if err != nil {
return err
}
count, err := res.RowsAffected()
if err != nil {
return err
}
if count == 0 {
return fmt.Errorf("job %d not updated, no job found", jobID)
}

return nil
}

func (r *Repository) CreateJobKey(ctx context.Context, jobKey models.JobKey) error {
ib := sqlFlavor.NewInsertBuilder().InsertInto("job_keys")
ib.Cols("job_id", "que_job_id", "file_name", "resource_type").
Expand Down
14 changes: 2 additions & 12 deletions bcdaworker/repository/postgres/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (r *RepositoryTestSuite) TestJobsMethods() {
postgrestest.CreateACO(r.T(), r.db, aco)
defer postgrestest.DeleteACO(r.T(), r.db, aco.UUID)

failed := models.Job{ACOID: aco.UUID, Status: models.JobStatusFailed, CompletedJobCount: 1, TransactionTime: now}
completed := models.Job{ACOID: aco.UUID, Status: models.JobStatusCompleted, CompletedJobCount: 2, TransactionTime: now}
failed := models.Job{ACOID: aco.UUID, Status: models.JobStatusFailed, TransactionTime: now}
completed := models.Job{ACOID: aco.UUID, Status: models.JobStatusCompleted, TransactionTime: now}
postgrestest.CreateJobs(r.T(), r.db, &failed, &completed)

failed1, err := r.repository.GetJobByID(ctx, failed.ID)
Expand All @@ -117,12 +117,6 @@ func (r *RepositoryTestSuite) TestJobsMethods() {
failed.UpdatedAt = afterUpdate.UpdatedAt
assertJobsEqual(assert, failed, *afterUpdate)

assert.NoError(r.repository.IncrementCompletedJobCount(ctx, failed.ID))
afterUpdate, err = r.repository.GetJobByID(ctx, failed.ID)
assert.NoError(err)
assert.True(afterUpdate.UpdatedAt.After(failed.UpdatedAt))
assert.Equal(afterUpdate.CompletedJobCount, failed.CompletedJobCount+1)

// After all of these updates, the completed job should remain untouched
completed1, err := r.repository.GetJobByID(ctx, completed.ID)
assert.NoError(err)
Expand All @@ -141,10 +135,6 @@ func (r *RepositoryTestSuite) TestJobsMethods() {

err = r.repository.UpdateJobStatusCheckStatus(ctx, 0, models.JobStatusFailed, models.JobStatusArchived)
assert.EqualError(err, "job was not updated, no match found")

err = r.repository.IncrementCompletedJobCount(ctx, 0)
assert.EqualError(err, "job 0 not updated, no job found")

}

// TestJobKeysMethods validates the CRUD operations associated with the job_keys table
Expand Down
2 changes: 0 additions & 2 deletions bcdaworker/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ type jobRepository interface {
// UpdateJobStatusCheckStatus updates the particular job indicated by the jobID
// iff the Job's status field matches current.
UpdateJobStatusCheckStatus(ctx context.Context, jobID uint, current, new models.JobStatus) error

IncrementCompletedJobCount(ctx context.Context, jobID uint) error
}

type jobKeyRepository interface {
Expand Down
Loading

0 comments on commit ec9c426

Please sign in to comment.