Skip to content

Commit

Permalink
BCDA-8227: Remove usage of completed_job_count in ALR jobs (#966)
Browse files Browse the repository at this point in the history
## 🎫 Ticket

https://jira.cms.gov/browse/BCDA-8227

## 🛠 Changes

- Replace completed_job_count with a count of unique job keys (based on
job_key.que_job_id)
- Avoid re-processing ALR job if que job was already processed

## ℹ️ Context

As part of #964, I discovered that the completed_job_count was still
being used as part of ALR jobs. This applies the same fix from the
"normal" export jobs in #962.

Note that many different job keys can be produced by an ALR job. To
simplify the determination of whether a que job was processed, I added a
new "getUniqueJobKeyCount" method.

For some historical context, the ALR endpoints are not enabled in
deployed environments - see #853 for more details. To avoid making a
decision on whether to remove the code entirely, I am applying these
changes for now to remove completed_job_count.

## 🧪 Validation

Unit Testing
  • Loading branch information
kyeah committed Jul 2, 2024
1 parent 0d9986b commit a4b8a28
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 38 deletions.
47 changes: 28 additions & 19 deletions bcdaworker/queueing/manager/alr.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func checkIfCancelled(ctx context.Context, r repository.Repository,
// startALRJob is the Job that the worker will run from the pool. This function
// has been written here (alr.go) to separate from beneficiary FHIR workflow.
// This job is handled by the same worker pool that works on beneficiary.
func (q *masterQueue) startAlrJob(job *que.Job) error {
func (q *masterQueue) startAlrJob(queJob *que.Job) error {

// Creating Context for possible cancellation; used by checkIfCancelled fn
ctx := context.Background()
Expand All @@ -73,13 +73,13 @@ func (q *masterQueue) startAlrJob(job *que.Job) error {

// Unmarshall JSON that contains the job details
var jobArgs models.JobAlrEnqueueArgs
err := json.Unmarshal(job.Args, &jobArgs)
err := json.Unmarshal(queJob.Args, &jobArgs)
// If we cannot unmarshall this, it would be very problematic.
if err != nil {
// TODO: perhaps just fail the job?
// Currently, we retry it
q.alrLog.Warnf("Failed to unmarhall job.Args '%s' %s.",
job.Args, err)
queJob.Args, err)
}

// Validate the job like bcdaworker/worker/worker.go#L43
Expand All @@ -90,7 +90,7 @@ func (q *masterQueue) startAlrJob(job *que.Job) error {
if errors.Is(err, repository.ErrJobNotFound) {
// Parent job is not found
// If parent job is not found reach maxretry, fail the job
if job.ErrorCount >= q.MaxRetry {
if queJob.ErrorCount >= q.MaxRetry {
q.alrLog.Errorf("No job found for ID: %d acoID: %s. Retries exhausted. Removing job from queue.", jobArgs.ID,
jobArgs.CMSID)
// By returning a nil error response, we're singaling to que-go to remove this job from the jobqueue.
Expand All @@ -100,28 +100,38 @@ func (q *masterQueue) startAlrJob(job *que.Job) error {
return fmt.Errorf("could not retrieve job from database")
}
// Else that job just doesn't exist
return fmt.Errorf("failed to valiate job: %w", err)
return fmt.Errorf("failed to validate job: %w", err)
}
// If the job was cancelled...
if alrJobs.Status == models.JobStatusCancelled {
q.alrLog.Warnf("ALR big job has been cancelled, worker will not be tasked for %s",
job.Args)
queJob.Args)
return nil
}
// If the job has been failed by a previous worker...
if alrJobs.Status == models.JobStatusFailed {
q.alrLog.Warnf("ALR big job has been failed, worker will not be tasked for %s",
job.Args)
queJob.Args)
return nil
}
// if the que job was already processed...
_, err = q.repository.GetJobKey(ctx, uint(jobArgs.ID), queJob.ID)
if err == nil {
// If there was no error, we found a job key and can avoid re-processing the job.
q.alrLog.Warnf("ALR que job (que_jobs.id) %d has already been processed, worker will not be tasked for %s", queJob.ID, queJob.Args)
return nil
}
if !errors.Is(err, repository.ErrJobKeyNotFound) {
return fmt.Errorf("Failed to search for job keys in database: %w", err)
}
// End of validation

// Check if the job was cancelled
go checkIfCancelled(ctx, q.repository, cancel, jobArgs.ID, 15)

// Before moving forward, check if this job has failed before
// If it has reached the maxRetry, stop the parent job
if job.ErrorCount > q.MaxRetry {
if queJob.ErrorCount > q.MaxRetry {
// Fail the job - ALL OR NOTHING
err = q.repository.UpdateJobStatus(ctx, jobArgs.ID, models.JobStatusFailed)
if err != nil {
Expand All @@ -142,16 +152,16 @@ func (q *masterQueue) startAlrJob(job *que.Job) error {
// don't fail the job.
if !errors.Is(err, repository.ErrJobNotUpdated) {
q.alrLog.Warnf("Failed to update job status '%s' %s.",
job.Args, err)
queJob.Args, err)
return err
}
}

// Run ProcessAlrJob, which is the meat of the whole operation
err = q.alrWorker.ProcessAlrJob(ctx, jobArgs)
err = q.alrWorker.ProcessAlrJob(ctx, queJob.ID, jobArgs)
if err != nil {
// This means the job did not finish
q.alrLog.Warnf("Failed to complete job.Args '%s' %s", job.Args, err)
q.alrLog.Warnf("Failed to complete job.Args '%s' %s", queJob.Args, err)
// Re-enqueue the job
return err
}
Expand All @@ -177,7 +187,7 @@ func (q *masterQueue) startAlrJob(job *que.Job) error {
err = q.repository.UpdateJobStatus(ctx, jobArgs.ID, models.JobStatusCompleted)
if err != nil {
// This means the job did not finish for various reason
q.alrLog.Warnf("Failed to update job to complete for '%s' %s", job.Args, err)
q.alrLog.Warnf("Failed to update job to complete for '%s' %s", queJob.Args, err)
// Re-enqueue the job
return err
}
Expand All @@ -201,16 +211,15 @@ func (q *masterQueue) isJobComplete(ctx context.Context, jobID uint) (bool, erro
return false, nil
}

// Possible source of error
//completedCount, err := q.repository.GetJobKeyCount(ctx, jobID)
//if err != nil {
//return false, fmt.Errorf("failed to get job key count: %w", err)
//}
completedCount, err := q.repository.GetUniqueJobKeyCount(ctx, jobID)
if err != nil {
return false, fmt.Errorf("failed to get job key count: %w", err)
}

if j.CompletedJobCount >= j.JobCount {
if completedCount >= j.JobCount {
q.alrLog.WithFields(logrus.Fields{
"jobID": j.ID,
"jobCount": j.JobCount, "completedJobCount": j.CompletedJobCount}).
"jobCount": j.JobCount, "completedJobCount": completedCount}).
Println("Excess number of jobs completed.")
return true, nil
}
Expand Down
10 changes: 9 additions & 1 deletion bcdaworker/queueing/manager/que_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,24 @@ func TestStartAlrJob(t *testing.T) {
// Since the worker is tested by BFD, it is not tested here
// and we jump straight to the work
err = master.startAlrJob(&que.Job{
ID: rand.Int63(),
Args: jobArgsJson,
})
assert.NoError(t, err)

// Check job is in progress
alrJob, err := r.GetJobByID(ctx, id)
assert.NoError(t, err)
assert.Equal(t, models.JobStatusInProgress, alrJob.Status)

err = master.startAlrJob(&que.Job{
ID: rand.Int63(),
Args: jobArgsJson2,
})
assert.NoError(t, err)

// Check job is complete
alrJob, err := r.GetJobByID(ctx, id)
alrJob, err = r.GetJobByID(ctx, id)
assert.NoError(t, err)
assert.Equal(t, models.JobStatusCompleted, alrJob.Status)
}
Expand Down
28 changes: 28 additions & 0 deletions bcdaworker/repository/mock_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions bcdaworker/repository/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,18 @@ func (r *Repository) GetJobKeyCount(ctx context.Context, jobID uint) (int, error
return count, nil
}

func (r *Repository) GetUniqueJobKeyCount(ctx context.Context, jobID uint) (int, error) {
sb := sqlFlavor.NewSelectBuilder().Select("COUNT(DISTINCT que_job_id)").From("job_keys")
sb.Where(sb.Equal("job_id", jobID))

query, args := sb.Build()
var count int
if err := r.QueryRowContext(ctx, query, args...).Scan(&count); err != nil {
return -1, err
}
return count, nil
}

func (r *Repository) GetJobKey(ctx context.Context, jobID uint, queJobID int64) (*models.JobKey, error) {
sb := sqlFlavor.NewSelectBuilder().Select("id").From("job_keys")
sb.Where(sb.And(sb.Equal("job_id", jobID), sb.Equal("que_job_id", queJobID)))
Expand Down
1 change: 1 addition & 0 deletions bcdaworker/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type jobKeyRepository interface {
CreateJobKey(ctx context.Context, jobKey models.JobKey) error
CreateJobKeys(ctx context.Context, jobKeys []models.JobKey) error
GetJobKeyCount(ctx context.Context, jobID uint) (int, error)
GetUniqueJobKeyCount(ctx context.Context, jobID uint) (int, error)
GetJobKey(ctx context.Context, jobID uint, queJobID int64) (*models.JobKey, error)
}

Expand Down
39 changes: 23 additions & 16 deletions bcdaworker/worker/alr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewAlrWorker(db *sql.DB) AlrWorker {
}

func goWriterV1(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, fileMap map[string]*os.File,
result chan error, resourceTypes []string, id uint) {
result chan error, resourceTypes []string, id uint, queJobID int64) {

writerPool := make([]*bufio.Writer, len(fileMap))

Expand Down Expand Up @@ -105,20 +105,23 @@ func goWriterV1(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, file
}

// update the jobs keys
var jobKeys []models.JobKey
for resource, path := range fileMap {
filename := filepath.Base(path.Name())
jk := models.JobKey{JobID: id, FileName: filename, ResourceType: resource}
if err := a.Repository.CreateJobKey(ctx, jk); err != nil {
result <- fmt.Errorf(constants.JobKeyCreateErr, err)
return
}
jk := models.JobKey{JobID: id, QueJobID: &queJobID, FileName: filename, ResourceType: resource}
jobKeys = append(jobKeys, jk)
}

if err := a.Repository.CreateJobKeys(ctx, jobKeys); err != nil {
result <- fmt.Errorf(constants.JobKeyCreateErr, err)
return
}

result <- nil
}

func goWriterV2(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, fileMap map[string]*os.File,
result chan error, resourceTypes []string, id uint) {
result chan error, resourceTypes []string, id uint, queJobID int64) {

writerPool := make([]*bufio.Writer, len(fileMap))

Expand Down Expand Up @@ -164,13 +167,16 @@ func goWriterV2(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, file
}

// update the jobs keys
var jobKeys []models.JobKey
for resource, path := range fileMap {
filename := filepath.Base(path.Name())
jk := models.JobKey{JobID: id, FileName: filename, ResourceType: resource}
if err := a.Repository.CreateJobKey(ctx, jk); err != nil {
result <- fmt.Errorf(constants.JobKeyCreateErr, err)
return
}
jk := models.JobKey{JobID: id, QueJobID: &queJobID, FileName: filename, ResourceType: resource}
jobKeys = append(jobKeys, jk)
}

if err := a.Repository.CreateJobKeys(ctx, jobKeys); err != nil {
result <- fmt.Errorf(constants.JobKeyCreateErr, err)
return
}

result <- nil
Expand All @@ -184,6 +190,7 @@ func goWriterV2(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, file
// ProcessAlrJob is a function called by the Worker to serve ALR data to users
func (a *AlrWorker) ProcessAlrJob(
ctx context.Context,
queJobID int64,
jobArgs models.JobAlrEnqueueArgs,
) error {

Expand All @@ -200,10 +207,10 @@ func (a *AlrWorker) ProcessAlrJob(
return err
}

// If we did not have an ALR data to write, we'll write a specific file name that indicates that the
// If we did not have any ALR data to write, we'll write a specific file name that indicates that
// there is no data associated with this job.
if len(alrModels) == 0 {
jk := models.JobKey{JobID: id, FileName: models.BlankFileName, ResourceType: "ALR"}
jk := models.JobKey{JobID: id, QueJobID: &queJobID, FileName: models.BlankFileName, ResourceType: "ALR"}
if err := a.Repository.CreateJobKey(ctx, jk); err != nil {
return fmt.Errorf(constants.JobKeyCreateErr, err)
}
Expand Down Expand Up @@ -245,9 +252,9 @@ func (a *AlrWorker) ProcessAlrJob(
// Reason for a go routine is to not block when writing, since disk writing is
// generally slower than memory access. We are streaming to keep mem lower.
if jobArgs.BBBasePath == "/v1/fhir" {
go goWriterV1(ctx, a, c, fileMap, result, resources[:], id)
go goWriterV1(ctx, a, c, fileMap, result, resources[:], id, queJobID)
} else {
go goWriterV2(ctx, a, c, fileMap, result, resources[:], id)
go goWriterV2(ctx, a, c, fileMap, result, resources[:], id, queJobID)
}

// Marshall into JSON and send it over the channel
Expand Down
5 changes: 3 additions & 2 deletions bcdaworker/worker/alr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"context"
"database/sql"
"math/rand"
"os"
"testing"

Expand Down Expand Up @@ -65,10 +66,10 @@ func (s *AlrWorkerTestSuite) TestNewAlrWorker() {
// Test ProcessAlrJob
func (s *AlrWorkerTestSuite) TestProcessAlrJob() {
ctx := context.Background()
err := s.alrWorker.ProcessAlrJob(ctx, s.jobArgs[0])
err := s.alrWorker.ProcessAlrJob(ctx, rand.Int63(), s.jobArgs[0])
// Check Job is processed with no errors
assert.NoError(s.T(), err)
err = s.alrWorker.ProcessAlrJob(ctx, s.jobArgs[1])
err = s.alrWorker.ProcessAlrJob(ctx, rand.Int63(), s.jobArgs[1])
// Check Job is processed with no errors
assert.NoError(s.T(), err)
}
Expand Down

0 comments on commit a4b8a28

Please sign in to comment.