Skip to content

Commit

Permalink
[Code] Add duration for queued tasks (#31885)
Browse files Browse the repository at this point in the history
  • Loading branch information
mw-ding authored Feb 26, 2019
1 parent 46b9627 commit 22d64d9
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 4 deletions.
3 changes: 3 additions & 0 deletions x-pack/plugins/code/server/__tests__/clone_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ describe('clone_worker_tests', () => {
url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git',
},
options: {},
timestamp: 0,
});

assert.ok(newInstanceSpy.calledOnce);
Expand Down Expand Up @@ -160,6 +161,7 @@ describe('clone_worker_tests', () => {
url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git',
},
options: {},
timestamp: 0,
},
{
uri: 'github.com/Microsoft/TypeScript-Node-Starter',
Expand Down Expand Up @@ -197,6 +199,7 @@ describe('clone_worker_tests', () => {
url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git',
},
options: {},
timestamp: 0,
});

// Expect EsClient index to be called to update the progress to 0.
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/code/server/lib/esqueue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ function formatJobObject(job) {
type: job._type,
id: job._id,
// Expose the payload of the job even when the job failed/timeout
payload: job._source.payload.payload,
...job._source.payload,
};
}

Expand Down
26 changes: 23 additions & 3 deletions x-pack/plugins/code/server/queue/abstract_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ export abstract class AbstractWorker implements Worker {

// Assemble jobs, for now most of the job object construction should be the same.
public createJob(payload: any, options: any): Job {
const timestamp = moment().valueOf();
if (options.timeout !== undefined || options.timeout !== null) {
return {
payload,
options,
timestamp,
};
} else {
return {
Expand All @@ -37,6 +39,7 @@ export abstract class AbstractWorker implements Worker {
...options,
timeout: this.getTimeoutMs(payload),
},
timestamp,
};
}
}
Expand Down Expand Up @@ -101,17 +104,29 @@ export abstract class AbstractWorker implements Worker {
}

public async onJobCompleted(job: Job, res: WorkerResult) {
this.log.info(`${this.id} job completed with result ${JSON.stringify(res)}`);
this.log.info(
`${this.id} job completed with result ${JSON.stringify(
res
)} in ${this.workerTaskDurationSeconds(job)} seconds.`
);
return await this.updateProgress(res.uri, WorkerReservedProgress.COMPLETED);
}

public async onJobExecutionError(res: any) {
this.log.error(`${this.id} job execution error ${JSON.stringify(res)}.`);
this.log.error(
`${this.id} job execution error ${JSON.stringify(res)} in ${this.workerTaskDurationSeconds(
res.job
)} seconds.`
);
return await this.updateProgress(res.job.payload.uri, WorkerReservedProgress.ERROR);
}

public async onJobTimeOut(res: any) {
this.log.error(`${this.id} job timed out ${JSON.stringify(res)}`);
this.log.error(
`${this.id} job timed out ${JSON.stringify(res)} in ${this.workerTaskDurationSeconds(
res.job
)} seconds.`
);
return await this.updateProgress(res.job.payload.uri, WorkerReservedProgress.TIMEOUT);
}

Expand All @@ -126,4 +141,9 @@ export abstract class AbstractWorker implements Worker {
// Set to 1 hour by default. Override this function for sub classes if necessary.
return moment.duration(1, 'hour').asMilliseconds();
}

private workerTaskDurationSeconds(job: Job) {
const diff = moment().diff(moment(job.timestamp));
return moment.duration(diff).asSeconds();
}
}
3 changes: 3 additions & 0 deletions x-pack/plugins/code/server/queue/delete_worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ test('Execute delete job.', async () => {
uri: 'github.com/elastic/kibana',
},
options: {},
timestamp: 0,
});

expect(cancelIndexJobSpy.calledOnce).toBeTruthy();
Expand Down Expand Up @@ -115,6 +116,7 @@ test('On delete job enqueued.', async () => {
uri: 'github.com/elastic/kibana',
},
options: {},
timestamp: 0,
});

expect(indexSpy.calledOnce).toBeTruthy();
Expand Down Expand Up @@ -144,6 +146,7 @@ test('On delete job completed.', async () => {
uri: 'github.com/elastic/kibana',
},
options: {},
timestamp: 0,
},
{
uri: 'github.com/elastic/kibana',
Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/code/server/queue/index_worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ test('Execute index job.', async () => {
},
options: {},
cancellationToken: cToken,
timestamp: 0,
});

expect(cancelIndexJobSpy.calledOnce).toBeTruthy();
Expand Down Expand Up @@ -119,6 +120,7 @@ test('Execute index job and then cancel.', async () => {
},
options: {},
cancellationToken: cToken,
timestamp: 0,
});

// Cancel the index job.
Expand Down Expand Up @@ -153,6 +155,7 @@ test('On index job enqueued.', async () => {
uri: 'github.com/elastic/kibana',
},
options: {},
timestamp: 0,
});

expect(indexSpy.calledOnce).toBeTruthy();
Expand Down Expand Up @@ -180,6 +183,7 @@ test('On index job completed.', async () => {
uri: 'github.com/elastic/kibana',
},
options: {},
timestamp: 0,
},
{
uri: 'github.com/elastic/kibana',
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/code/server/queue/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ import { CancellationToken } from '../lib/esqueue';
export interface Job {
payload: any;
options: any;
timestamp: number;
cancellationToken?: CancellationToken;
}
1 change: 1 addition & 0 deletions x-pack/plugins/code/server/queue/update_worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ test('Execute update job', async () => {
uri: 'mockrepo',
},
options: {},
timestamp: 0,
});

expect(newInstanceSpy.calledOnce).toBeTruthy();
Expand Down

0 comments on commit 22d64d9

Please sign in to comment.