-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(backend): discovery function #85
Conversation
|
||
import * as console from 'console'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import * as console from 'console'; | |
import * as console from 'console'; |
import * as AWS from 'aws-sdk'; | ||
import * as Nano from 'nano'; | ||
|
||
const TIMEOUT_MILLISECONDS = 10000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const TIMEOUT_MILLISECONDS = 10000; | |
const TIMEOUT_MILLISECONDS = 10_000; |
|
||
const TIMEOUT_MILLISECONDS = 10000; | ||
const CONSTRUCT_KEYWORDS = ['cdk', 'aws-cdk', 'cdk8s', 'cdktf']; | ||
const MARKER_FILE_NAME = 'marker'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want to give this a more descriptive name, e.g: npmjs-last-transaction-id
or something like that.
* npm registry API docs: https://github.com/npm/registry/blob/master/docs/REGISTRY-API.md | ||
* @param context a Lambda execution context | ||
*/ | ||
export async function handler(context: Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the first argument to a lambda function not the input payload? I don't reckon the context makes it in first position...
export async function handler(context: Context) { | ||
s3 = new AWS.S3(); | ||
sqs = new AWS.SQS(); | ||
functionContext = context; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather not store the Context
instance out of the function (it should never survive between executions).
'Lambda-Run-Id': functionContext.awsRequestId, | ||
}, | ||
}).promise().then(() => ok(), ko); | ||
}).on('error', function (error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}).on('error', function (error) { | |
}).on('error', (error) => { |
}).promise().catch(function(error) { | ||
throw Error(`Failed sending message to SQS\n ${error}`); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}).promise().catch(function(error) { | |
throw Error(`Failed sending message to SQS\n ${error}`); | |
}); | |
}).promise(); |
}; | ||
|
||
await sqs.sendMessage({ | ||
MessageBody: JSON.stringify(message), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageBody: JSON.stringify(message), | |
MessageBody: JSON.stringify(message, null, 2), |
* @param pkgJason | ||
*/ | ||
function isJsiiModule(pkgJason: VersionInfo): boolean { | ||
return (pkgJason?.jsii !== undefined); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return (pkgJason?.jsii !== undefined); | |
return pkgJason?.jsii != null; |
* @returns returns true if the package.json contains a jsii clause, false otherwise | ||
* @param pkgJason | ||
*/ | ||
function isJsiiModule(pkgJason: VersionInfo): boolean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkgJason
🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I miss him!
# Conflicts: # .projen/tasks.json # package.json # yarn.lock
e9114ad
to
29ee913
Compare
Updates to the discovery function draft PR
const NPM_REPLICA_REGISTRY_URL = 'https://replicate.npmjs.com/'; | ||
|
||
/** | ||
* The S3 prefix for storing change that we failed processing | ||
*/ | ||
const FAILED_CHANGE_PREFIX = 'failed'; | ||
|
||
let s3: AWS.S3; | ||
let sqs: AWS.SQS; | ||
let s3: AWS.S3 | undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we allow undefined here? The clients are required, and this only makes it so we need to add !
to every reference of s3/sqs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we'll add unit tests on this, we'll need to be able to reset the clients so that we can use a new mock for each test. This implies allowing undefined
.
One trick that @iliapolo suggested to me before is to actually vend those clients from a s3Client
and sqsClient
function, which lazily creates a new client if necessary... So that becomes the only place where a null-check has to exist.
throw new Error('The NOTIFICATION_QUEUE_URL environment variable is not set'); | ||
} | ||
queueUrl = process.env.QUEUE_URL; | ||
|
||
AWS.config.update({ | ||
region: process.env.AWS_REGION, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the default behavior...
@@ -91,17 +89,17 @@ export async function handler(context: Context) { | |||
// ignores changes which are not package update, see https://github.com/cdklabs/construct-hub/issues/3#issuecomment-858246275 | |||
.filter(change => change.doc.name) | |||
.map(change => { | |||
return processPackageUpdate(change) | |||
return processPackageUpdate(change, context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not store this in a global variable same as we did for the AWS SDK clients, passing it to every function call seems cumbersome
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlike the clients, which can be shared between executions, the context
is distinct for each execution, and should not survive across executions. Saving it to a global variable is actually risky IMO.
* @returns returns true if the package.json contains a jsii clause, false otherwise | ||
* @param pkgJason | ||
*/ | ||
function isJsiiModule(pkgJason: VersionInfo): boolean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I miss him!
…t-hub into neta/discovery-function
…t-hub into neta/discovery-function
} catch (error) { | ||
console.log(`Failed to load marker for bucket: ${stagingBucket}, will start read from latest`); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, the cheapest thing will be to actually kill the execution. Updated.
Given that the SDK has a built in retry mechanism if we failed we should exit.
db.changesReader.stop(); | ||
return Promise.reject(new Error(`Error while processing batch, marker will not be updated, exiting.\n${error}`)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed the .then()
/.catch()
style a bit, but only so it will be easier to understand the below.
Any type of error thrown from this point will cause an UnhandledPromiseRejectionWarning
:
(node:78472) UnhandledPromiseRejectionWarning: Error: Error while processing batch, marker will not be updated, exiting.
CredentialsError: <...>
This goes for both return Promise.reject(...)
and Throw Error()
. This is because both returns a Promise
which is unhandled. Since this code is actually executing in the on(
batch, ...)callback, the Promise needs to be resolved in the library code (I think?), which is not, causing the
Promise` to go unhandled.
This does not have actual impact on the function execution as calling db.changesReader.stop()
will terminate the program execution, it will just print this ugly warning to the log.
Thoughts?
…t-hub into neta/discovery-function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing unit tests for runtime code and construct
public constructor(scope: Construct, id: string, props: DiscoveryFunctionProps) { | ||
super(scope, id); | ||
|
||
const lambda = new Handler(this, 'Default', { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the monitoring story for this function?
src/construct-hub.ts
Outdated
const stagingBucket = new s3.Bucket(this, 'StagingBucket', { | ||
lifecycleRules: [ | ||
{ | ||
prefix: 'packages', // delete the staged tarball after 30 days |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The prefix should be some const that the runtime code uses as well or passed as an environment variable
let s3: AWS.S3 | undefined; | ||
let sqs: AWS.SQS | undefined; | ||
|
||
let stagingBucket: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are all of these defined outside of the function closure?
await s3.getObject({ | ||
Bucket: stagingBucket, | ||
Key: MARKER_FILE_NAME, | ||
}).promise().then(function(data) { | ||
marker = data.Body?.toString('utf-8'); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a good practice to mix async with then():
await s3.getObject({ | |
Bucket: stagingBucket, | |
Key: MARKER_FILE_NAME, | |
}).promise().then(function(data) { | |
marker = data.Body?.toString('utf-8'); | |
}); | |
marker = (await s3.getObject({ | |
Bucket: stagingBucket, | |
Key: MARKER_FILE_NAME, | |
}).promise()).Body?.toString('utf-8'); | |
}); |
marker = data.Body?.toString('utf-8'); | ||
}); | ||
} catch (error) { | ||
throw new Error(`Failed to load marker for bucket: ${stagingBucket}, exiting`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens the first time?
'Lambda-Log-Stream': context.logStreamName, | ||
'Lambda-Run-Id': context.awsRequestId, | ||
}, | ||
}).promise().then(() => (void undefined)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's going on here?
I would highly recommend avoiding resolving promises explicitly and stick to straightforward await
-based style, which is easier to reason about
async function copyPackageToS3(versionInfo: VersionInfo, key: string, context: Context): Promise<Buffer> { | ||
console.log(`uploading tarball to s3, key: ${key}`); | ||
return new Promise<Buffer>((ok, ko) => { | ||
https.get(versionInfo.dist.tarball, (response) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we have some nice async http library already
f85b855
to
c72d7d7
Compare
# Conflicts: # src/__tests__/__snapshots__/construct-hub.test.ts.snap # src/__tests__/devapp/__snapshots__/snapshot.test.ts.snap
c72d7d7
to
0144652
Compare
A discovery function which triggers every 5 minutes and reads the npm couchdb
_change
endpoint. For every change, if the concerning package is ajsii
construct, the package versiontarball
is copied from thenpmjs
registry to a staging a s3 bucket, and a message with details of the change will be posted on an SQS queue.Every change to the registry is posted sequently to the couchdb _changes endpoint. A change example:
The
seq
is the change sequence number, the id is the package name.A few notes on the implementation:
The
_changes
endpoint is designed to be consumed as a never ending stream. Since we are using a Lambda function with a fixed execution time, the function reads the latest changes in batches and exits. This is only true when reading from latest, and not in the backfill scenario, in which there are more changes than what the function can handle in its life time. In the meantime we accept that if the function times out while a batch is being processed, that batch will be processed again. This is true since that in the end of every batch processed, the latestseq
is stored to the S3 bucket.Test
Resources:
closes #3
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license