Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): discovery function #85

Merged
merged 28 commits into from
Jun 16, 2021
Merged

Conversation

NetaNir
Copy link
Contributor

@NetaNir NetaNir commented Jun 15, 2021

A discovery function which triggers every 5 minutes and reads the npm couchdb _change endpoint. For every change, if the concerning package is a jsii construct, the package version tarball is copied from the npmjs registry to a staging a s3 bucket, and a message with details of the change will be posted on an SQS queue.

Every change to the registry is posted sequently to the couchdb _changes endpoint. A change example:

  {
  seq: 3299,
  id: "package_name",
  changes: [
    {
      rev: "1-81a5c9873827084aaab78b22424cfaa8",
    },
  ],
  doc: {
    _id: "package_name",
    _rev: "1-81a5c9873827084aaab78b22424cfaa8",
    name: "package_name",
    description: "...",
    "dist-tags": {
      latest: "0.0.1-beta4",
    },
    versions: {
      "0.0.1-beta1": {},
      "0.0.1-beta2": {},
      "0.0.1-beta3": {},
      "0.0.1-beta4": {}
    },
    readme: "# NOTE\nThis is beautiful README the I wrote cause I needed something for the PR, but it can have all the things markup support",
    time: {
    modified: "2021-06-15T00:59:56.624Z",
    created: "2018-02-02T03:50:11.983Z",
    "0.0.1-beta1": "2018-02-02T03:50:11.983Z",
    "0.0.1-beta2": "2018-02-02T04:04:09.226Z",
    "0.0.1-beta3": "2018-02-20T04:18:16.096Z",
    "0.0.1-beta4": "2018-02-20T06:26:18.341Z",
    },
    keywords: [
      "json",
      "schema",
    ],
    repository: {
      type: "",
      url: "",
    },
    readmeFilename: "README.md",
  },
}

see example query https://replicate.npmjs.com/registry/_changes?descending=true&limit=10&include_Docs=true

The seq is the change sequence number, the id is the package name.

A few notes on the implementation:

  • The _changes endpoint is designed to be consumed as a never ending stream. Since we are using a Lambda function with a fixed execution time, the function reads the latest changes in batches and exits. This is only true when reading from latest, and not in the backfill scenario, in which there are more changes than what the function can handle in its life time. In the meantime we accept that if the function times out while a batch is being processed, that batch will be processed again. This is true since that in the end of every batch processed, the latest seq is stored to the S3 bucket.

  • Test

Resources:

closes #3


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license

@NetaNir NetaNir marked this pull request as draft June 15, 2021 07:36
Comment on lines 1 to 2

import * as console from 'console';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import * as console from 'console';
import * as console from 'console';

import * as AWS from 'aws-sdk';
import * as Nano from 'nano';

const TIMEOUT_MILLISECONDS = 10000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const TIMEOUT_MILLISECONDS = 10000;
const TIMEOUT_MILLISECONDS = 10_000;


const TIMEOUT_MILLISECONDS = 10000;
const CONSTRUCT_KEYWORDS = ['cdk', 'aws-cdk', 'cdk8s', 'cdktf'];
const MARKER_FILE_NAME = 'marker';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to give this a more descriptive name, e.g: npmjs-last-transaction-id or something like that.

* npm registry API docs: https://github.com/npm/registry/blob/master/docs/REGISTRY-API.md
* @param context a Lambda execution context
*/
export async function handler(context: Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the first argument to a lambda function not the input payload? I don't reckon the context makes it in first position...

export async function handler(context: Context) {
s3 = new AWS.S3();
sqs = new AWS.SQS();
functionContext = context;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not store the Context instance out of the function (it should never survive between executions).

'Lambda-Run-Id': functionContext.awsRequestId,
},
}).promise().then(() => ok(), ko);
}).on('error', function (error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}).on('error', function (error) {
}).on('error', (error) => {

Comment on lines 206 to 208
}).promise().catch(function(error) {
throw Error(`Failed sending message to SQS\n ${error}`);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}).promise().catch(function(error) {
throw Error(`Failed sending message to SQS\n ${error}`);
});
}).promise();

};

await sqs.sendMessage({
MessageBody: JSON.stringify(message),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MessageBody: JSON.stringify(message),
MessageBody: JSON.stringify(message, null, 2),

* @param pkgJason
*/
function isJsiiModule(pkgJason: VersionInfo): boolean {
return (pkgJason?.jsii !== undefined);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return (pkgJason?.jsii !== undefined);
return pkgJason?.jsii != null;

* @returns returns true if the package.json contains a jsii clause, false otherwise
* @param pkgJason
*/
function isJsiiModule(pkgJason: VersionInfo): boolean {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkgJason 🤣

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I miss him!

@RomainMuller RomainMuller force-pushed the neta/discovery-function branch 2 times, most recently from e9114ad to 29ee913 Compare June 15, 2021 16:32
Automation and others added 2 commits June 15, 2021 16:36
const NPM_REPLICA_REGISTRY_URL = 'https://replicate.npmjs.com/';

/**
* The S3 prefix for storing change that we failed processing
*/
const FAILED_CHANGE_PREFIX = 'failed';

let s3: AWS.S3;
let sqs: AWS.SQS;
let s3: AWS.S3 | undefined;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we allow undefined here? The clients are required, and this only makes it so we need to add ! to every reference of s3/sqs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we'll add unit tests on this, we'll need to be able to reset the clients so that we can use a new mock for each test. This implies allowing undefined.

One trick that @iliapolo suggested to me before is to actually vend those clients from a s3Client and sqsClient function, which lazily creates a new client if necessary... So that becomes the only place where a null-check has to exist.

throw new Error('The NOTIFICATION_QUEUE_URL environment variable is not set');
}
queueUrl = process.env.QUEUE_URL;

AWS.config.update({
region: process.env.AWS_REGION,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default behavior...

@@ -91,17 +89,17 @@ export async function handler(context: Context) {
// ignores changes which are not package update, see https://github.com/cdklabs/construct-hub/issues/3#issuecomment-858246275
.filter(change => change.doc.name)
.map(change => {
return processPackageUpdate(change)
return processPackageUpdate(change, context)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not store this in a global variable same as we did for the AWS SDK clients, passing it to every function call seems cumbersome

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the clients, which can be shared between executions, the context is distinct for each execution, and should not survive across executions. Saving it to a global variable is actually risky IMO.

* @returns returns true if the package.json contains a jsii clause, false otherwise
* @param pkgJason
*/
function isJsiiModule(pkgJason: VersionInfo): boolean {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I miss him!

@NetaNir NetaNir marked this pull request as ready for review June 16, 2021 03:04
Comment on lines 68 to 70
} catch (error) {
console.log(`Failed to load marker for bucket: ${stagingBucket}, will start read from latest`);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the cheapest thing will be to actually kill the execution. Updated.

Given that the SDK has a built in retry mechanism if we failed we should exit.

Comment on lines 114 to 115
db.changesReader.stop();
return Promise.reject(new Error(`Error while processing batch, marker will not be updated, exiting.\n${error}`));
Copy link
Contributor Author

@NetaNir NetaNir Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the .then()/.catch() style a bit, but only so it will be easier to understand the below.

Any type of error thrown from this point will cause an UnhandledPromiseRejectionWarning:

(node:78472) UnhandledPromiseRejectionWarning: Error: Error while processing batch, marker will not be updated, exiting.
CredentialsError: <...>

This goes for both return Promise.reject(...) and Throw Error(). This is because both returns a Promise which is unhandled. Since this code is actually executing in the on(batch, ...)callback, the Promise needs to be resolved in the library code (I think?), which is not, causing thePromise` to go unhandled.

This does not have actual impact on the function execution as calling db.changesReader.stop() will terminate the program execution, it will just print this ugly warning to the log.

Thoughts?

@RomainMuller

Copy link
Contributor

@eladb eladb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing unit tests for runtime code and construct

package.json Outdated Show resolved Hide resolved
src/backend/discovery/index.ts Outdated Show resolved Hide resolved
src/backend/discovery/index.ts Show resolved Hide resolved
src/backend/discovery/index.ts Outdated Show resolved Hide resolved
src/backend/discovery/index.ts Outdated Show resolved Hide resolved
src/backend/discovery/index.ts Outdated Show resolved Hide resolved
src/backend/discovery/index.ts Outdated Show resolved Hide resolved
public constructor(scope: Construct, id: string, props: DiscoveryFunctionProps) {
super(scope, id);

const lambda = new Handler(this, 'Default', {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the monitoring story for this function?

const stagingBucket = new s3.Bucket(this, 'StagingBucket', {
lifecycleRules: [
{
prefix: 'packages', // delete the staged tarball after 30 days
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prefix should be some const that the runtime code uses as well or passed as an environment variable

let s3: AWS.S3 | undefined;
let sqs: AWS.SQS | undefined;

let stagingBucket: string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are all of these defined outside of the function closure?

Comment on lines 61 to 66
await s3.getObject({
Bucket: stagingBucket,
Key: MARKER_FILE_NAME,
}).promise().then(function(data) {
marker = data.Body?.toString('utf-8');
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a good practice to mix async with then():

Suggested change
await s3.getObject({
Bucket: stagingBucket,
Key: MARKER_FILE_NAME,
}).promise().then(function(data) {
marker = data.Body?.toString('utf-8');
});
marker = (await s3.getObject({
Bucket: stagingBucket,
Key: MARKER_FILE_NAME,
}).promise()).Body?.toString('utf-8');
});

marker = data.Body?.toString('utf-8');
});
} catch (error) {
throw new Error(`Failed to load marker for bucket: ${stagingBucket}, exiting`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens the first time?

'Lambda-Log-Stream': context.logStreamName,
'Lambda-Run-Id': context.awsRequestId,
},
}).promise().then(() => (void undefined))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here?

I would highly recommend avoiding resolving promises explicitly and stick to straightforward await-based style, which is easier to reason about

src/backend/discovery/discovery.lambda.ts Outdated Show resolved Hide resolved
src/backend/discovery/discovery.lambda.ts Outdated Show resolved Hide resolved
src/backend/discovery/discovery.lambda.ts Outdated Show resolved Hide resolved
src/backend/discovery/discovery.lambda.ts Outdated Show resolved Hide resolved
async function copyPackageToS3(versionInfo: VersionInfo, key: string, context: Context): Promise<Buffer> {
console.log(`uploading tarball to s3, key: ${key}`);
return new Promise<Buffer>((ok, ko) => {
https.get(versionInfo.dist.tarball, (response) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have some nice async http library already

src/backend/discovery/discovery.lambda.ts Outdated Show resolved Hide resolved
# Conflicts:
#	src/__tests__/__snapshots__/construct-hub.test.ts.snap
#	src/__tests__/devapp/__snapshots__/snapshot.test.ts.snap
@RomainMuller RomainMuller merged commit e097e11 into main Jun 16, 2021
@RomainMuller RomainMuller deleted the neta/discovery-function branch June 16, 2021 12:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement: Discovery Function
3 participants