diff --git a/bcdaworker/queueing/manager/alr.go b/bcdaworker/queueing/manager/alr.go index f95595850..40107555f 100644 --- a/bcdaworker/queueing/manager/alr.go +++ b/bcdaworker/queueing/manager/alr.go @@ -64,7 +64,7 @@ func checkIfCancelled(ctx context.Context, r repository.Repository, // startALRJob is the Job that the worker will run from the pool. This function // has been written here (alr.go) to separate from beneficiary FHIR workflow. // This job is handled by the same worker pool that works on beneficiary. -func (q *masterQueue) startAlrJob(job *que.Job) error { +func (q *masterQueue) startAlrJob(queJob *que.Job) error { // Creating Context for possible cancellation; used by checkIfCancelled fn ctx := context.Background() @@ -73,13 +73,13 @@ func (q *masterQueue) startAlrJob(job *que.Job) error { // Unmarshall JSON that contains the job details var jobArgs models.JobAlrEnqueueArgs - err := json.Unmarshal(job.Args, &jobArgs) + err := json.Unmarshal(queJob.Args, &jobArgs) // If we cannot unmarshall this, it would be very problematic. if err != nil { // TODO: perhaps just fail the job? // Currently, we retry it q.alrLog.Warnf("Failed to unmarhall job.Args '%s' %s.", - job.Args, err) + queJob.Args, err) } // Validate the job like bcdaworker/worker/worker.go#L43 @@ -90,7 +90,7 @@ func (q *masterQueue) startAlrJob(job *que.Job) error { if errors.Is(err, repository.ErrJobNotFound) { // Parent job is not found // If parent job is not found reach maxretry, fail the job - if job.ErrorCount >= q.MaxRetry { + if queJob.ErrorCount >= q.MaxRetry { q.alrLog.Errorf("No job found for ID: %d acoID: %s. Retries exhausted. Removing job from queue.", jobArgs.ID, jobArgs.CMSID) // By returning a nil error response, we're singaling to que-go to remove this job from the jobqueue. @@ -100,20 +100,30 @@ func (q *masterQueue) startAlrJob(job *que.Job) error { return fmt.Errorf("could not retrieve job from database") } // Else that job just doesn't exist - return fmt.Errorf("failed to valiate job: %w", err) + return fmt.Errorf("failed to validate job: %w", err) } // If the job was cancelled... if alrJobs.Status == models.JobStatusCancelled { q.alrLog.Warnf("ALR big job has been cancelled, worker will not be tasked for %s", - job.Args) + queJob.Args) return nil } // If the job has been failed by a previous worker... if alrJobs.Status == models.JobStatusFailed { q.alrLog.Warnf("ALR big job has been failed, worker will not be tasked for %s", - job.Args) + queJob.Args) return nil } + // if the que job was already processed... + _, err = q.repository.GetJobKey(ctx, uint(jobArgs.ID), queJob.ID) + if err == nil { + // If there was no error, we found a job key and can avoid re-processing the job. + q.alrLog.Warnf("ALR que job (que_jobs.id) %d has already been processed, worker will not be tasked for %s", queJob.ID, queJob.Args) + return nil + } + if !errors.Is(err, repository.ErrJobKeyNotFound) { + return fmt.Errorf("Failed to search for job keys in database: %w", err) + } // End of validation // Check if the job was cancelled @@ -121,7 +131,7 @@ func (q *masterQueue) startAlrJob(job *que.Job) error { // Before moving forward, check if this job has failed before // If it has reached the maxRetry, stop the parent job - if job.ErrorCount > q.MaxRetry { + if queJob.ErrorCount > q.MaxRetry { // Fail the job - ALL OR NOTHING err = q.repository.UpdateJobStatus(ctx, jobArgs.ID, models.JobStatusFailed) if err != nil { @@ -142,16 +152,16 @@ func (q *masterQueue) startAlrJob(job *que.Job) error { // don't fail the job. if !errors.Is(err, repository.ErrJobNotUpdated) { q.alrLog.Warnf("Failed to update job status '%s' %s.", - job.Args, err) + queJob.Args, err) return err } } // Run ProcessAlrJob, which is the meat of the whole operation - err = q.alrWorker.ProcessAlrJob(ctx, jobArgs) + err = q.alrWorker.ProcessAlrJob(ctx, queJob.ID, jobArgs) if err != nil { // This means the job did not finish - q.alrLog.Warnf("Failed to complete job.Args '%s' %s", job.Args, err) + q.alrLog.Warnf("Failed to complete job.Args '%s' %s", queJob.Args, err) // Re-enqueue the job return err } @@ -177,7 +187,7 @@ func (q *masterQueue) startAlrJob(job *que.Job) error { err = q.repository.UpdateJobStatus(ctx, jobArgs.ID, models.JobStatusCompleted) if err != nil { // This means the job did not finish for various reason - q.alrLog.Warnf("Failed to update job to complete for '%s' %s", job.Args, err) + q.alrLog.Warnf("Failed to update job to complete for '%s' %s", queJob.Args, err) // Re-enqueue the job return err } @@ -201,16 +211,15 @@ func (q *masterQueue) isJobComplete(ctx context.Context, jobID uint) (bool, erro return false, nil } - // Possible source of error - //completedCount, err := q.repository.GetJobKeyCount(ctx, jobID) - //if err != nil { - //return false, fmt.Errorf("failed to get job key count: %w", err) - //} + completedCount, err := q.repository.GetUniqueJobKeyCount(ctx, jobID) + if err != nil { + return false, fmt.Errorf("failed to get job key count: %w", err) + } - if j.CompletedJobCount >= j.JobCount { + if completedCount >= j.JobCount { q.alrLog.WithFields(logrus.Fields{ "jobID": j.ID, - "jobCount": j.JobCount, "completedJobCount": j.CompletedJobCount}). + "jobCount": j.JobCount, "completedJobCount": completedCount}). Println("Excess number of jobs completed.") return true, nil } diff --git a/bcdaworker/queueing/manager/que_test.go b/bcdaworker/queueing/manager/que_test.go index 3db744938..da2cc8c5b 100644 --- a/bcdaworker/queueing/manager/que_test.go +++ b/bcdaworker/queueing/manager/que_test.go @@ -262,16 +262,24 @@ func TestStartAlrJob(t *testing.T) { // Since the worker is tested by BFD, it is not tested here // and we jump straight to the work err = master.startAlrJob(&que.Job{ + ID: rand.Int63(), Args: jobArgsJson, }) assert.NoError(t, err) + + // Check job is in progress + alrJob, err := r.GetJobByID(ctx, id) + assert.NoError(t, err) + assert.Equal(t, models.JobStatusInProgress, alrJob.Status) + err = master.startAlrJob(&que.Job{ + ID: rand.Int63(), Args: jobArgsJson2, }) assert.NoError(t, err) // Check job is complete - alrJob, err := r.GetJobByID(ctx, id) + alrJob, err = r.GetJobByID(ctx, id) assert.NoError(t, err) assert.Equal(t, models.JobStatusCompleted, alrJob.Status) } diff --git a/bcdaworker/repository/mock_repository.go b/bcdaworker/repository/mock_repository.go index a3bbbee6e..c19d18cd6 100644 --- a/bcdaworker/repository/mock_repository.go +++ b/bcdaworker/repository/mock_repository.go @@ -200,6 +200,34 @@ func (_m *MockRepository) GetJobKeyCount(ctx context.Context, jobID uint) (int, return r0, r1 } +// GetUniqueJobKeyCount provides a mock function with given fields: ctx, jobID +func (_m *MockRepository) GetUniqueJobKeyCount(ctx context.Context, jobID uint) (int, error) { + ret := _m.Called(ctx, jobID) + + if len(ret) == 0 { + panic("no return value specified for GetUniqueJobKeyCount") + } + + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint) (int, error)); ok { + return rf(ctx, jobID) + } + if rf, ok := ret.Get(0).(func(context.Context, uint) int); ok { + r0 = rf(ctx, jobID) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func(context.Context, uint) error); ok { + r1 = rf(ctx, jobID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // IncrementCompletedJobCount provides a mock function with given fields: ctx, jobID func (_m *MockRepository) IncrementCompletedJobCount(ctx context.Context, jobID uint) error { ret := _m.Called(ctx, jobID) diff --git a/bcdaworker/repository/postgres/repository.go b/bcdaworker/repository/postgres/repository.go index bbf87af83..5b791a337 100644 --- a/bcdaworker/repository/postgres/repository.go +++ b/bcdaworker/repository/postgres/repository.go @@ -178,6 +178,18 @@ func (r *Repository) GetJobKeyCount(ctx context.Context, jobID uint) (int, error return count, nil } +func (r *Repository) GetUniqueJobKeyCount(ctx context.Context, jobID uint) (int, error) { + sb := sqlFlavor.NewSelectBuilder().Select("COUNT(DISTINCT que_job_id)").From("job_keys") + sb.Where(sb.Equal("job_id", jobID)) + + query, args := sb.Build() + var count int + if err := r.QueryRowContext(ctx, query, args...).Scan(&count); err != nil { + return -1, err + } + return count, nil +} + func (r *Repository) GetJobKey(ctx context.Context, jobID uint, queJobID int64) (*models.JobKey, error) { sb := sqlFlavor.NewSelectBuilder().Select("id").From("job_keys") sb.Where(sb.And(sb.Equal("job_id", jobID), sb.Equal("que_job_id", queJobID))) diff --git a/bcdaworker/repository/repository.go b/bcdaworker/repository/repository.go index 92de478b1..e43c4b57e 100644 --- a/bcdaworker/repository/repository.go +++ b/bcdaworker/repository/repository.go @@ -39,6 +39,7 @@ type jobKeyRepository interface { CreateJobKey(ctx context.Context, jobKey models.JobKey) error CreateJobKeys(ctx context.Context, jobKeys []models.JobKey) error GetJobKeyCount(ctx context.Context, jobID uint) (int, error) + GetUniqueJobKeyCount(ctx context.Context, jobID uint) (int, error) GetJobKey(ctx context.Context, jobID uint, queJobID int64) (*models.JobKey, error) } diff --git a/bcdaworker/worker/alr.go b/bcdaworker/worker/alr.go index e7f0779b9..a11c5faa3 100644 --- a/bcdaworker/worker/alr.go +++ b/bcdaworker/worker/alr.go @@ -59,7 +59,7 @@ func NewAlrWorker(db *sql.DB) AlrWorker { } func goWriterV1(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, fileMap map[string]*os.File, - result chan error, resourceTypes []string, id uint) { + result chan error, resourceTypes []string, id uint, queJobID int64) { writerPool := make([]*bufio.Writer, len(fileMap)) @@ -105,20 +105,23 @@ func goWriterV1(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, file } // update the jobs keys + var jobKeys []models.JobKey for resource, path := range fileMap { filename := filepath.Base(path.Name()) - jk := models.JobKey{JobID: id, FileName: filename, ResourceType: resource} - if err := a.Repository.CreateJobKey(ctx, jk); err != nil { - result <- fmt.Errorf(constants.JobKeyCreateErr, err) - return - } + jk := models.JobKey{JobID: id, QueJobID: &queJobID, FileName: filename, ResourceType: resource} + jobKeys = append(jobKeys, jk) + } + + if err := a.Repository.CreateJobKeys(ctx, jobKeys); err != nil { + result <- fmt.Errorf(constants.JobKeyCreateErr, err) + return } result <- nil } func goWriterV2(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, fileMap map[string]*os.File, - result chan error, resourceTypes []string, id uint) { + result chan error, resourceTypes []string, id uint, queJobID int64) { writerPool := make([]*bufio.Writer, len(fileMap)) @@ -164,13 +167,16 @@ func goWriterV2(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, file } // update the jobs keys + var jobKeys []models.JobKey for resource, path := range fileMap { filename := filepath.Base(path.Name()) - jk := models.JobKey{JobID: id, FileName: filename, ResourceType: resource} - if err := a.Repository.CreateJobKey(ctx, jk); err != nil { - result <- fmt.Errorf(constants.JobKeyCreateErr, err) - return - } + jk := models.JobKey{JobID: id, QueJobID: &queJobID, FileName: filename, ResourceType: resource} + jobKeys = append(jobKeys, jk) + } + + if err := a.Repository.CreateJobKeys(ctx, jobKeys); err != nil { + result <- fmt.Errorf(constants.JobKeyCreateErr, err) + return } result <- nil @@ -184,6 +190,7 @@ func goWriterV2(ctx context.Context, a *AlrWorker, c chan *alr.AlrFhirBulk, file // ProcessAlrJob is a function called by the Worker to serve ALR data to users func (a *AlrWorker) ProcessAlrJob( ctx context.Context, + queJobID int64, jobArgs models.JobAlrEnqueueArgs, ) error { @@ -200,10 +207,10 @@ func (a *AlrWorker) ProcessAlrJob( return err } - // If we did not have an ALR data to write, we'll write a specific file name that indicates that the + // If we did not have any ALR data to write, we'll write a specific file name that indicates that // there is no data associated with this job. if len(alrModels) == 0 { - jk := models.JobKey{JobID: id, FileName: models.BlankFileName, ResourceType: "ALR"} + jk := models.JobKey{JobID: id, QueJobID: &queJobID, FileName: models.BlankFileName, ResourceType: "ALR"} if err := a.Repository.CreateJobKey(ctx, jk); err != nil { return fmt.Errorf(constants.JobKeyCreateErr, err) } @@ -245,9 +252,9 @@ func (a *AlrWorker) ProcessAlrJob( // Reason for a go routine is to not block when writing, since disk writing is // generally slower than memory access. We are streaming to keep mem lower. if jobArgs.BBBasePath == "/v1/fhir" { - go goWriterV1(ctx, a, c, fileMap, result, resources[:], id) + go goWriterV1(ctx, a, c, fileMap, result, resources[:], id, queJobID) } else { - go goWriterV2(ctx, a, c, fileMap, result, resources[:], id) + go goWriterV2(ctx, a, c, fileMap, result, resources[:], id, queJobID) } // Marshall into JSON and send it over the channel diff --git a/bcdaworker/worker/alr_test.go b/bcdaworker/worker/alr_test.go index 9860c884d..477f20fb3 100644 --- a/bcdaworker/worker/alr_test.go +++ b/bcdaworker/worker/alr_test.go @@ -3,6 +3,7 @@ package worker import ( "context" "database/sql" + "math/rand" "os" "testing" @@ -65,10 +66,10 @@ func (s *AlrWorkerTestSuite) TestNewAlrWorker() { // Test ProcessAlrJob func (s *AlrWorkerTestSuite) TestProcessAlrJob() { ctx := context.Background() - err := s.alrWorker.ProcessAlrJob(ctx, s.jobArgs[0]) + err := s.alrWorker.ProcessAlrJob(ctx, rand.Int63(), s.jobArgs[0]) // Check Job is processed with no errors assert.NoError(s.T(), err) - err = s.alrWorker.ProcessAlrJob(ctx, s.jobArgs[1]) + err = s.alrWorker.ProcessAlrJob(ctx, rand.Int63(), s.jobArgs[1]) // Check Job is processed with no errors assert.NoError(s.T(), err) }