Skip to content

Commit

Permalink
lib/bin/s3: add limit to upload-blobs (#1188)
Browse files Browse the repository at this point in the history
  • Loading branch information
alxndrsn committed Sep 19, 2024
1 parent b348912 commit 242f613
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 8 deletions.
14 changes: 12 additions & 2 deletions lib/bin/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const { program, Argument } = require('commander');
const { program, Argument, InvalidArgumentError } = require('commander');

const { getCount, setFailedToPending, uploadPending } = require('../task/s3');

const positiveInt = raw => {
const parsed = Number(raw);
if (!Number.isInteger(parsed) || parsed < 1) {
throw new InvalidArgumentError('Must be a positive integer');
}
return parsed;
};

program.command('count-blobs')
.addArgument(new Argument('status').choices(['pending', 'in_progress', 'uploaded', 'failed']))
.action(getCount);
program.command('reset-failed-to-pending').action(setFailedToPending);
program.command('upload-pending').action(uploadPending);
program.command('upload-pending')
.argument('[limit]', 'maximum number of blobs to upload', positiveInt)
.action(uploadPending);
program.parse();
11 changes: 8 additions & 3 deletions lib/model/query/blobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,14 @@ const uploadBlobIfAvailable = async container => {
return res;
};

const s3UploadPending = () => async (container) => {
// eslint-disable-next-line no-await-in-loop
while (await uploadBlobIfAvailable(container));
const s3UploadPending = (limit) => async (container) => {
if (limit) {
// eslint-disable-next-line no-await-in-loop, no-param-reassign, no-plusplus
while (await uploadBlobIfAvailable(container) && --limit);
} else {
// eslint-disable-next-line no-await-in-loop
while (await uploadBlobIfAvailable(container));
}
};

module.exports = {
Expand Down
8 changes: 5 additions & 3 deletions lib/task/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ const setFailedToPending = withContainer(({ s3, Blobs }) => async () => {
console.log(`${count} blobs marked for re-uploading.`);
});

const uploadPending = withContainer(({ s3, Blobs }) => async () => {
const uploadPending = withContainer(({ s3, Blobs }) => async (limit) => {
assertEnabled(s3);
const count = await Blobs.s3CountByStatus('pending');

const pendingCount = await Blobs.s3CountByStatus('pending');
const count = limit ? Math.min(pendingCount, limit) : pendingCount;

const signals = ['SIGINT', 'SIGTERM'];

Expand All @@ -44,7 +46,7 @@ const uploadPending = withContainer(({ s3, Blobs }) => async () => {

try {
console.log(`Uploading ${count} blobs...`);
await Blobs.s3UploadPending();
await Blobs.s3UploadPending(limit);
console.log(`[${new Date().toISOString()}]`, 'Upload completed.');
} finally {
signals.forEach(s => process.removeListener(s, shutdownListener));
Expand Down
72 changes: 72 additions & 0 deletions test/integration/task/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,78 @@ describe('task: s3', () => {
global.s3.uploads.attempted.should.equal(1);
global.s3.uploads.successful.should.equal(1);
}));

describe('with limit', () => {
let originalLog;
let consoleLog;

beforeEach(() => {
// eslint-disable-next-line no-console
originalLog = console.log;
consoleLog = [];
// eslint-disable-next-line no-console
console.log = (...args) => consoleLog.push(args.map(String).join(' '));
});

afterEach(() => {
// eslint-disable-next-line no-console
console.log = originalLog;
});

it('should upload requested number of blobs, and ignore others', testTask(async (container) => {
// given
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });

// when
await uploadPending(6);

// then
consoleLog[0].should.deepEqual('Uploading 6 blobs...');
assertUploadCount(6);
}));

it('should not complain if blob count is less than limit', testTask(async (container) => {
// given
await aBlobExistsWith(container, { status: 'pending' });

// when
await uploadPending(1000000);

// then
consoleLog[0].should.deepEqual('Uploading 1 blobs...');
assertUploadCount(1);
}));

it('should upload all blobs if limit is zero', testTask(async (container) => {
// given
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });
await aBlobExistsWith(container, { status: 'pending' });

// when
await uploadPending(0);

// then
consoleLog[0].should.deepEqual('Uploading 10 blobs...');
assertUploadCount(10);
}));
});
});
});
});

0 comments on commit 242f613

Please sign in to comment.