diff --git a/functions/accept-order.js b/functions/accept-order.js index b004626..2e934f7 100644 --- a/functions/accept-order.js +++ b/functions/accept-order.js @@ -1,26 +1,24 @@ -'use strict'; +const kinesis = require('../lib/kinesis') +const log = require('../lib/log') +const correlationIds = require('../lib/correlation-ids') +const cloudwatch = require('../lib/cloudwatch') +const wrapper = require('../middleware/wrapper') -const co = require('co'); -const kinesis = require('../lib/kinesis'); -const log = require('../lib/log'); -const cloudwatch = require('../lib/cloudwatch'); -const wrapper = require('../middleware/wrapper'); +const streamName = process.env.order_events_stream -const streamName = process.env.order_events_stream; +const handler = async (event, context) => { + let req = JSON.parse(event.body) + log.debug(`request body is valid JSON`, { requestBody: event.body }) -const handler = co.wrap(function* (event, context, cb) { - let req = JSON.parse(event.body); - log.debug(`request body is valid JSON`, { requestBody: event.body }); + let restaurantName = req.restaurantName + let orderId = req.orderId + let userEmail = req.userEmail - let restaurantName = req.restaurantName; - let orderId = req.orderId; - let userEmail = req.userEmail; + correlationIds.set('order-id', orderId) + correlationIds.set('restaurant-name', restaurantName) + correlationIds.set('user-email', userEmail) - correlationIds.set('order-id', orderId); - correlationIds.set('restaurant-name', restaurantName); - correlationIds.set('user-email', userEmail); - - log.debug('restaurant accepted order', { orderId, restaurantName, userEmail }); + log.debug('restaurant accepted order', { orderId, restaurantName, userEmail }) let data = { orderId, @@ -33,21 +31,21 @@ const handler = co.wrap(function* (event, context, cb) { Data: JSON.stringify(data), // the SDK would base64 encode this for us PartitionKey: orderId, StreamName: streamName - }; + } - yield cloudwatch.trackExecTime( - "KinesisPutRecordLatency", + await cloudwatch.trackExecTime( + 'KinesisPutRecordLatency', () => kinesis.putRecord(kinesisReq).promise() - ); + ) - log.debug(`published event into Kinesis`, { eventName: 'order_accepted' }); + log.debug(`published event into Kinesis`, { eventName: 'order_accepted' }) let response = { statusCode: 200, body: JSON.stringify({ orderId }) } - cb(null, response); -}); + return response +} -module.exports.handler = wrapper(handler); \ No newline at end of file +module.exports.handler = wrapper(handler) diff --git a/functions/create-alarms.js b/functions/create-alarms.js index 1b62500..6bc2606 100644 --- a/functions/create-alarms.js +++ b/functions/create-alarms.js @@ -1,80 +1,77 @@ -'use strict'; +const _ = require('lodash') +const AWS = require('aws-sdk') +const apigateway = new AWS.APIGateway() +const cloudwatch = new AWS.CloudWatch() +const log = require('../lib/log') -const _ = require('lodash'); -const co = require('co'); -const AWS = require('aws-sdk'); -const apigateway = new AWS.APIGateway(); -const cloudwatch = new AWS.CloudWatch(); -const log = require('../lib/log'); +const alarmActions = (process.env.alarm_actions || '').split(',') +const okAction = (process.env.ok_actions || '').split(',') -const alarmActions = (process.env.alarm_actions || '').split(','); -const okAction = (process.env.ok_actions || '').split(','); +async function enableDetailedMetrics(restApiId, stageName) { + let getResp = await apigateway.getStage({ restApiId, stageName }).promise() + log.debug('get stage settings', getResp.methodSettings) -let enableDetailedMetrics = co.wrap(function* (restApiId, stageName) { - let getResp = yield apigateway.getStage({ restApiId, stageName }).promise(); - log.debug('get stage settings', getResp.methodSettings); - - let isDetailedMetricsEnabled = _.get(getResp, 'methodSettings.*/*.metricsEnabled', false); + let isDetailedMetricsEnabled = _.get(getResp, 'methodSettings.*/*.metricsEnabled', false) if (isDetailedMetricsEnabled) { - log.debug('detailed metrics already enabled', { restApiId, stageName }); + log.debug('detailed metrics already enabled', { restApiId, stageName }) } else { let updateReq = { restApiId, stageName, patchOperations: [ { - path: "/*/*/metrics/enabled", - value: "true", - op: "replace" + path: '/*/*/metrics/enabled', + value: 'true', + op: 'replace' } ] - }; - yield apigateway.updateStage(updateReq).promise(); - log.debug('enabled detailed metrics', { restApiId, stageName }); + } + await apigateway.updateStage(updateReq).promise() + log.debug('enabled detailed metrics', { restApiId, stageName }) } -}); +} -let getRestEndpoints = co.wrap(function* (restApiId) { - let resp = yield apigateway.getResources({ restApiId }).promise(); - log.debug('got REST resources', { restApiId }); +async function getRestEndpoints(restApiId) { + let resp = await apigateway.getResources({ restApiId }).promise() + log.debug('got REST resources', { restApiId }) let resourceMethods = resp.items.map(x => { - let methods = _.keys(x.resourceMethods); - return methods.map(method => ({ resource: x.path, method })); - }); + let methods = _.keys(x.resourceMethods) + return methods.map(method => ({ resource: x.path, method })) + }) - return _.flattenDeep(resourceMethods); -}); + return _.flattenDeep(resourceMethods) +} -let getRestApiName = co.wrap(function* (restApiId) { - let resp = yield apigateway.getRestApi({ restApiId }).promise(); - log.debug('got REST api', { restApiId }); +async function getRestApiName(restApiId) { + let resp = await apigateway.getRestApi({ restApiId }).promise() + log.debug('got REST api', { restApiId }) - return resp.name; -}); + return resp.name +} -let createAlarmsForEndpoints = co.wrap(function* (restApiId, stageName) { - let apiName = yield getRestApiName(restApiId); - log.debug(`API name is ${apiName}`, { restApiId, stageName }); +async function createAlarmsForEndpoints(restApiId, stageName) { + let apiName = await getRestApiName(restApiId) + log.debug(`API name is ${apiName}`, { restApiId, stageName }) - let restEndpoints = yield getRestEndpoints(restApiId); - log.debug('got REST endpoints', { restApiId, stageName, restEndpoints }); + let restEndpoints = await getRestEndpoints(restApiId) + log.debug('got REST endpoints', { restApiId, stageName, restEndpoints }) for (let endpoint of restEndpoints) { let putReq = { AlarmName: `API [${apiName}] stage [${stageName}] ${endpoint.method} ${endpoint.resource} : p99 > 1s`, MetricName: 'Latency', Dimensions: [ - { Name: 'ApiName', Value: apiName }, + { Name: 'ApiName', Value: apiName }, { Name: 'Resource', Value: endpoint.resource }, - { Name: 'Method', Value: endpoint.method }, - { Name: 'Stage', Value: stageName } + { Name: 'Method', Value: endpoint.method }, + { Name: 'Stage', Value: stageName } ], Namespace: 'AWS/ApiGateway', - Threshold: 1000, // 1s - ComparisonOperator: 'GreaterThanThreshold', - Period: 60, // per min - EvaluationPeriods: 5, + Threshold: 1000, // 1s + ComparisonOperator: 'GreaterThanThreshold', + Period: 60, // per min + EvaluationPeriods: 5, DatapointsToAlarm: 5, // 5 consecutive mins to trigger alarm ExtendedStatistic: 'p99', ActionsEnabled: true, @@ -82,20 +79,20 @@ let createAlarmsForEndpoints = co.wrap(function* (restApiId, stageName) { AlarmDescription: `auto-generated by Lambda [${process.env.AWS_LAMBDA_FUNCTION_NAME}]`, OKActions: okAction, Unit: 'Milliseconds' - }; - yield cloudwatch.putMetricAlarm(putReq).promise(); + } + await cloudwatch.putMetricAlarm(putReq).promise() } - log.debug('auto-created latency ALARMS for REST endpoints', { restApiId, stageName, restEndpoints }); -}); + log.debug('auto-created latency ALARMS for REST endpoints', { restApiId, stageName, restEndpoints }) +} -module.exports.handler = co.wrap(function* (event, context, cb) { - let restApiId = event.detail.requestParameters.restApiId; - let stageName = event.detail.requestParameters.createDeploymentInput.stageName; +module.exports.handler = async (event, context, cb) => { + let restApiId = event.detail.requestParameters.restApiId + let stageName = event.detail.requestParameters.createDeploymentInput.stageName - yield enableDetailedMetrics(restApiId, stageName); + await enableDetailedMetrics(restApiId, stageName) - yield createAlarmsForEndpoints(restApiId, stageName); + await createAlarmsForEndpoints(restApiId, stageName) - cb(null, 'ok'); -}); \ No newline at end of file + return 'ok' +} diff --git a/functions/fulfill-order.js b/functions/fulfill-order.js index d7e2acc..ae2a04f 100644 --- a/functions/fulfill-order.js +++ b/functions/fulfill-order.js @@ -1,26 +1,24 @@ -'use strict'; +const kinesis = require('../lib/kinesis') +const log = require('../lib/log') +const correlationIds = require('../lib/correlation-ids') +const cloudwatch = require('../lib/cloudwatch') +const wrapper = require('../middleware/wrapper') -const co = require('co'); -const kinesis = require('../lib/kinesis'); -const log = require('../lib/log'); -const cloudwatch = require('../lib/cloudwatch'); -const wrapper = require('../middleware/wrapper'); +const streamName = process.env.order_events_stream -const streamName = process.env.order_events_stream; +const handler = async (event, context, cb) => { + let body = JSON.parse(event.body) + log.debug(`request body is valid JSON`, { requestBody: event.body }) -const handler = co.wrap(function* (event, context, cb) { - let body = JSON.parse(event.body); - log.debug(`request body is valid JSON`, { requestBody: event.body }); + let restaurantName = body.restaurantName + let orderId = body.orderId + let userEmail = body.userEmail - let restaurantName = body.restaurantName; - let orderId = body.orderId; - let userEmail = body.userEmail; + correlationIds.set('order-id', orderId) + correlationIds.set('restaurant-name', restaurantName) + correlationIds.set('user-email', userEmail) - correlationIds.set('order-id', orderId); - correlationIds.set('restaurant-name', restaurantName); - correlationIds.set('user-email', userEmail); - - log.debug('restaurant has fulfilled order', { orderId, restaurantName, userEmail }); + log.debug('restaurant has fulfilled order', { orderId, restaurantName, userEmail }) let data = { orderId, @@ -33,21 +31,21 @@ const handler = co.wrap(function* (event, context, cb) { Data: JSON.stringify(data), // the SDK would base64 encode this for us PartitionKey: orderId, StreamName: streamName - }; + } - yield cloudwatch.trackExecTime( - "KinesisPutRecordLatency", + await cloudwatch.trackExecTime( + 'KinesisPutRecordLatency', () => kinesis.putRecord(kinesisReq).promise() - ); + ) - log.debug(`published event into Kinesis`, { eventName: 'order_fulfilled' }); + log.debug(`published event into Kinesis`, { eventName: 'order_fulfilled' }) let response = { statusCode: 200, body: JSON.stringify({ orderId }) } - cb(null, response); -}); + return response +} -module.exports.handler = wrapper(handler); \ No newline at end of file +module.exports.handler = wrapper(handler) diff --git a/functions/get-index.js b/functions/get-index.js index 863c14c..be5c753 100644 --- a/functions/get-index.js +++ b/functions/get-index.js @@ -1,103 +1,99 @@ -'use strict'; - -const co = require("co"); -const Promise = require("bluebird"); -const fs = Promise.promisifyAll(require("fs")); -const Mustache = require('mustache'); -const http = require('../lib/http'); -const URL = require('url'); -const aws4 = require('../lib/aws4'); -const log = require('../lib/log'); -const cloudwatch = require('../lib/cloudwatch'); -const AWSXRay = require('aws-xray-sdk'); -const wrapper = require('../middleware/wrapper'); -const { ssm, secretsManager } = require('middy/middlewares'); - -const STAGE = process.env.STAGE; -const awsRegion = process.env.AWS_REGION; - -const days = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']; - -var html; - -function* loadHtml() { +const Promise = require('bluebird') +const fs = Promise.promisifyAll(require('fs')) +const Mustache = require('mustache') +const http = require('../lib/http') +const URL = require('url') +const aws4 = require('../lib/aws4') +const log = require('../lib/log') +const cloudwatch = require('../lib/cloudwatch') +const AWSXRay = require('aws-xray-sdk') +const wrapper = require('../middleware/wrapper') +const { ssm, secretsManager } = require('middy/middlewares') + +const STAGE = process.env.STAGE +const awsRegion = process.env.AWS_REGION + +const days = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'] + +var html + +async function loadHtml() { if (!html) { - html = yield fs.readFileAsync('static/index.html', 'utf-8'); + html = await fs.readFileAsync('static/index.html', 'utf-8') } - - return html; + return html } -function* getRestaurants(restaurantsApiUrl) { - let url = URL.parse(restaurantsApiUrl); +function getRestaurants(restaurantsApiUrl) { + let url = URL.parse(restaurantsApiUrl) let opts = { host: url.hostname, path: url.pathname - }; + } - aws4.sign(opts); + aws4.sign(opts) let httpReq = http({ uri: restaurantsApiUrl, headers: opts.headers - }); - + }) + return new Promise((resolve, reject) => { - let f = co.wrap(function* (subsegment) { + let f = async (subsegment) => { if (subsegment) { - subsegment.addMetadata('url', restaurantsApiUrl); + subsegment.addMetadata('url', restaurantsApiUrl) } try { - let body = (yield httpReq).body; + let body = (await httpReq).body if (subsegment) { - subsegment.close(); + subsegment.close() } - resolve(body); + resolve(body) } catch (err) { if (subsegment) { - subsegment.close(err); + subsegment.close(err) } - reject(err); + reject(err) } - }); + } // the current sub/segment - let segment = AWSXRay.getSegment(); + let segment = AWSXRay.getSegment() - AWSXRay.captureAsyncFunc("getting restaurants", f, segment); - }); + AWSXRay.captureAsyncFunc('getting restaurants', f, segment) + }) } -const handler = co.wrap(function* (event, context, callback) { - yield aws4.init(); +const handler = async (event, context, callback) => { + await aws4.init() - let template = yield loadHtml(); - log.debug("loaded HTML template"); + let template = await loadHtml() + log.debug('loaded HTML template') - let restaurants = yield cloudwatch.trackExecTime( - "GetRestaurantsLatency", + let restaurants = await cloudwatch.trackExecTime( + 'GetRestaurantsLatency', () => getRestaurants(context.restaurants_api) - ); - log.debug(`loaded ${restaurants.length} restaurants`); + ) + log.debug(`loaded ${restaurants.length} restaurants`) - let dayOfWeek = days[new Date().getDay()]; + let dayOfWeek = days[new Date().getDay()] let view = { - dayOfWeek, + dayOfWeek, restaurants, awsRegion, cognitoUserPoolId: context.cognito.user_pool_id, cognitoClientId: context.cognito.client_id, searchUrl: `${context.restaurants_api}/search`, placeOrderUrl: `${context.orders_api}` - }; - let html = Mustache.render(template, view); - log.debug(`rendered HTML [${html.length} bytes]`); + } + let html = Mustache.render(template, view) + log.debug(`rendered HTML [${html.length} bytes]`) // uncomment this to cause function to err // yield http({ uri: 'https://theburningmonk.com' }); - cloudwatch.incrCount('RestaurantsReturned', restaurants.length); + cloudwatch.incrCount('RestaurantsReturned', restaurants.length) const response = { statusCode: 200, @@ -105,10 +101,10 @@ const handler = co.wrap(function* (event, context, callback) { headers: { 'content-type': 'text/html; charset=UTF-8' } - }; + } - callback(null, response); -}); + return response +} module.exports.handler = wrapper(handler) .use(ssm({ @@ -126,4 +122,4 @@ module.exports.handler = wrapper(handler) secrets: { cognito: `/bigmouth/${STAGE}/cognito` } - })); \ No newline at end of file + })) diff --git a/functions/get-restaurants.js b/functions/get-restaurants.js index deb36e3..f675acf 100644 --- a/functions/get-restaurants.js +++ b/functions/get-restaurants.js @@ -1,41 +1,38 @@ -'use strict'; - -const co = require('co'); -const AWSXRay = require('aws-xray-sdk'); -const AWS = AWSXRay.captureAWS(require('aws-sdk')); -const dynamodb = new AWS.DynamoDB.DocumentClient(); -const log = require('../lib/log'); -const cloudwatch = require('../lib/cloudwatch'); -const wrapper = require('../middleware/wrapper'); - -const defaultResults = process.env.defaultResults || 8; -const tableName = process.env.restaurants_table; - -function* getRestaurants(count) { - let req = { +const AWSXRay = require('aws-xray-sdk') +const AWS = AWSXRay.captureAWS(require('aws-sdk')) +const dynamodb = new AWS.DynamoDB.DocumentClient() +const log = require('../lib/log') +const cloudwatch = require('../lib/cloudwatch') +const wrapper = require('../middleware/wrapper') + +const defaultResults = process.env.defaultResults || 8 +const tableName = process.env.restaurants_table + +async function getRestaurants(count) { + const params = { TableName: tableName, Limit: count - }; + } - let resp = yield cloudwatch.trackExecTime( - "DynamoDBScanLatency", - () => dynamodb.scan(req).promise() - ); - return resp.Items; + let resp = await cloudwatch.trackExecTime( + 'DynamoDBScanLatency', + () => dynamodb.scan(params).promise() + ) + return resp.Items } -const handler = co.wrap(function* (event, context, cb) { - let restaurants = yield getRestaurants(defaultResults); - log.debug(`loaded ${restaurants.length} restaurants`); +const handler = async (event, context, cb) => { + let restaurants = await getRestaurants(defaultResults) + log.debug(`loaded ${restaurants.length} restaurants`) - cloudwatch.incrCount("RestaurantsReturned", restaurants.length); + cloudwatch.incrCount('RestaurantsReturned', restaurants.length) let response = { statusCode: 200, body: JSON.stringify(restaurants) } - cb(null, response); -}); + return response +} -module.exports.handler = wrapper(handler); \ No newline at end of file +module.exports.handler = wrapper(handler) diff --git a/functions/notify-restaurant.js b/functions/notify-restaurant.js index 794b4b6..d57a316 100644 --- a/functions/notify-restaurant.js +++ b/functions/notify-restaurant.js @@ -1,38 +1,35 @@ -'use strict'; +const notify = require('../lib/notify') +const retry = require('../lib/retry') +const log = require('../lib/log') +const wrapper = require('../middleware/wrapper') +const flushMetrics = require('../middleware/flush-metrics') -const co = require('co'); -const notify = require('../lib/notify'); -const retry = require('../lib/retry'); -const log = require('../lib/log'); -const wrapper = require('../middleware/wrapper'); -const flushMetrics = require('../middleware/flush-metrics'); - -const handler = co.wrap(function* (event, context, cb) { - let events = context.parsedKinesisEvents; - let orderPlaced = events.filter(r => r.eventType === 'order_placed'); - log.debug(`found ${orderPlaced.length} 'order_placed' events`); +const handler = async (event, context, cb) => { + let events = context.parsedKinesisEvents + let orderPlaced = events.filter(r => r.eventType === 'order_placed') + log.debug(`found ${orderPlaced.length} 'order_placed' events`) for (let order of orderPlaced) { - order.scopeToThis(); + order.scopeToThis() try { - yield notify.restaurantOfOrder(order); + await notify.restaurantOfOrder(order) } catch (err) { - yield retry.restaurantNotification(order); + await retry.restaurantNotification(order) - let logContext = { + const logContext = { orderId: order.orderId, restaurantName: order.restaurantName, userEmail: order.userEmail - }; - log.warn('failed to notify restaurant of new order', logContext, err); + } + log.warn('failed to notify restaurant of new order', logContext, err) } - order.unscope(); + order.unscope() } - - cb(null, "all done"); -}); + + return 'all done' +} module.exports.handler = wrapper(handler) - .use(flushMetrics); \ No newline at end of file + .use(flushMetrics) diff --git a/functions/notify-user.js b/functions/notify-user.js index 895be71..33c839b 100644 --- a/functions/notify-user.js +++ b/functions/notify-user.js @@ -1,39 +1,36 @@ -'use strict'; +const notify = require('../lib/notify') +const retry = require('../lib/retry') +const log = require('../lib/log') +const wrapper = require('../middleware/wrapper') -const co = require('co'); -const notify = require('../lib/notify'); -const retry = require('../lib/retry'); -const log = require('../lib/log'); -const wrapper = require('../middleware/wrapper'); +const flushMetrics = require('../middleware/flush-metrics') -const flushMetrics = require('../middleware/flush-metrics'); - -const handler = co.wrap(function* (event, context, cb) { - let events = context.parsedKinesisEvents; - let orderAccepted = events.filter(r => r.eventType === 'order_accepted'); - log.debug(`found ${orderAccepted.length} 'order_accepted' events`); +const handler = async (event, context, cb) => { + let events = context.parsedKinesisEvents + let orderAccepted = events.filter(r => r.eventType === 'order_accepted') + log.debug(`found ${orderAccepted.length} 'order_accepted' events`) for (let order of orderAccepted) { - order.scopeToThis(); + order.scopeToThis() try { - yield notify.userOfOrderAccepted(order); + await notify.userOfOrderAccepted(order) } catch (err) { - yield retry.userNotification(order); + await retry.userNotification(order) let logContext = { orderId: order.orderId, restaurantName: order.restaurantName, userEmail: order.userEmail - }; - log.warn('failed to notify user of accepted order', logContext, err); + } + log.warn('failed to notify user of accepted order', logContext, err) } - order.unscope(); + order.unscope() } - - cb(null, "all done"); -}); + + return 'all done' +} module.exports.handler = wrapper(handler) - .use(flushMetrics); \ No newline at end of file + .use(flushMetrics) diff --git a/functions/place-order.js b/functions/place-order.js index 9fe4e7d..2ec047b 100644 --- a/functions/place-order.js +++ b/functions/place-order.js @@ -1,41 +1,36 @@ -'use strict'; +const _ = require('lodash') +const kinesis = require('../lib/kinesis') +const chance = require('chance').Chance() +const log = require('../lib/log') +const cloudwatch = require('../lib/cloudwatch') +const correlationIds = require('../lib/correlation-ids') +const wrapper = require('../middleware/wrapper') -const _ = require('lodash'); -const co = require('co'); -const kinesis = require('../lib/kinesis'); -const chance = require('chance').Chance(); -const log = require('../lib/log'); -const cloudwatch = require('../lib/cloudwatch'); -const correlationIds = require('../lib/correlation-ids'); -const wrapper = require('../middleware/wrapper'); - -const streamName = process.env.order_events_stream; +const streamName = process.env.order_events_stream const UNAUTHORIZED = { statusCode: 401, - body: "unauthorized" + body: 'unauthorized' } -const handler = co.wrap(function* (event, context, cb) { - let req = JSON.parse(event.body); - log.debug(`request body is valid JSON`, { requestBody: event.body }); +const handler = async (event, context, cb) => { + let req = JSON.parse(event.body) + log.debug(`request body is valid JSON`, { requestBody: event.body }) - let userEmail = _.get(event, 'requestContext.authorizer.claims.email'); + let userEmail = _.get(event, 'requestContext.authorizer.claims.email') if (!userEmail) { - cb(null, UNAUTHORIZED); - log.error('unauthorized request, user email is not provided'); - - return; + log.error('unauthorized request, user email is not provided') + return UNAUTHORIZED } - let restaurantName = req.restaurantName; - let orderId = chance.guid(); + let restaurantName = req.restaurantName + let orderId = chance.guid() - correlationIds.set('order-id', orderId); - correlationIds.set('restaurant-name', restaurantName); - correlationIds.set('user-email', userEmail); + correlationIds.set('order-id', orderId) + correlationIds.set('restaurant-name', restaurantName) + correlationIds.set('user-email', userEmail) - log.debug(`placing order...`, { orderId, restaurantName, userEmail }); + log.debug(`placing order...`, { orderId, restaurantName, userEmail }) let data = { orderId, @@ -48,21 +43,21 @@ const handler = co.wrap(function* (event, context, cb) { Data: JSON.stringify(data), // the SDK would base64 encode this for us PartitionKey: orderId, StreamName: streamName - }; + } - yield cloudwatch.trackExecTime( - "KinesisPutRecordLatency", + await cloudwatch.trackExecTime( + 'KinesisPutRecordLatency', () => kinesis.putRecord(kinesisReq).promise() - ); + ) - log.debug(`published event into Kinesis`, { eventName: 'order_placed' }); + log.debug(`published event into Kinesis`, { eventName: 'order_placed' }) let response = { statusCode: 200, body: JSON.stringify({ orderId }) } - cb(null, response); -}); + return response +} -module.exports.handler = wrapper(handler); \ No newline at end of file +module.exports.handler = wrapper(handler) diff --git a/functions/retry-notify-restaurant.js b/functions/retry-notify-restaurant.js index 5bf2446..a7e82da 100644 --- a/functions/retry-notify-restaurant.js +++ b/functions/retry-notify-restaurant.js @@ -1,35 +1,33 @@ -'use strict'; +const notify = require('../lib/notify') +const log = require('../lib/log') +const cloudwatch = require('../lib/cloudwatch') +const wrapper = require('../middleware/wrapper') -const co = require('co'); -const notify = require('../lib/notify'); -const log = require('../lib/log'); -const cloudwatch = require('../lib/cloudwatch'); -const wrapper = require('../middleware/wrapper'); +const flushMetrics = require('../middleware/flush-metrics') -const flushMetrics = require('../middleware/flush-metrics'); - -const handler = co.wrap(function* (event, context, cb) { - let order = JSON.parse(event.Records[0].Sns.Message); - order.retried = true; +const handler = async (event, context, cb) => { + let order = JSON.parse(event.Records[0].Sns.Message) + order.retried = true let logContext = { orderId: order.orderId, restaurantName: order.restaurantName, userEmail: order.userEmail, retry: true - }; + } + let error try { - yield notify.restaurantOfOrder(order); - cb(null, "all done"); + await notify.restaurantOfOrder(order) } catch (err) { - log.warn('failed to notify restaurant of new order', logContext, err); - - cb(err); + log.warn('failed to notify restaurant of new order', logContext, err) + error = err } finally { - cloudwatch.incrCount("NotifyRestaurantRetried"); + cloudwatch.incrCount('NotifyRestaurantRetried') } -}); + if (error) return error + return 'all done' +} module.exports.handler = wrapper(handler) - .use(flushMetrics); \ No newline at end of file + .use(flushMetrics) diff --git a/functions/retry-notify-user.js b/functions/retry-notify-user.js index cf78c5f..6ca7b59 100644 --- a/functions/retry-notify-user.js +++ b/functions/retry-notify-user.js @@ -1,35 +1,33 @@ -'use strict'; +const notify = require('../lib/notify') +const log = require('../lib/log') +const cloudwatch = require('../lib/cloudwatch') +const wrapper = require('../middleware/wrapper') -const co = require('co'); -const notify = require('../lib/notify'); -const log = require('../lib/log'); -const cloudwatch = require('../lib/cloudwatch'); -const wrapper = require('../middleware/wrapper'); +const flushMetrics = require('../middleware/flush-metrics') -const flushMetrics = require('../middleware/flush-metrics'); - -const handler = co.wrap(function* (event, context, cb) { - let order = JSON.parse(event.Records[0].Sns.Message); - order.retried = true; +const handler = async (event, context, cb) => { + let order = JSON.parse(event.Records[0].Sns.Message) + order.retried = true let logContext = { orderId: order.orderId, restaurantName: order.restaurantName, userEmail: order.userEmail, retry: true - }; + } + let error try { - yield notify.userOfOrderAccepted(order); - cb(null, "all done"); + await notify.userOfOrderAccepted(order) } catch (err) { - log.warn('failed to notify user of accepted order', logContext, err); - - cb(err); + log.warn('failed to notify user of accepted order', logContext, err) + error = err } finally { - cloudwatch.incrCount("NotifyUserRetried"); + cloudwatch.incrCount('NotifyUserRetried') } -}); + if (error) return error + return 'all done' +} module.exports.handler = wrapper(handler) - .use(flushMetrics); \ No newline at end of file + .use(flushMetrics) diff --git a/functions/search-restaurants.js b/functions/search-restaurants.js index 8afe570..10664b2 100644 --- a/functions/search-restaurants.js +++ b/functions/search-restaurants.js @@ -1,46 +1,43 @@ -'use strict'; +const AWSXRay = require('aws-xray-sdk') +const AWS = AWSXRay.captureAWS(require('aws-sdk')) +const dynamodb = new AWS.DynamoDB.DocumentClient() +const log = require('../lib/log') +const cloudwatch = require('../lib/cloudwatch') +const wrapper = require('../middleware/wrapper') -const co = require('co'); -const AWSXRay = require('aws-xray-sdk'); -const AWS = AWSXRay.captureAWS(require('aws-sdk')); -const dynamodb = new AWS.DynamoDB.DocumentClient(); -const log = require('../lib/log'); -const cloudwatch = require('../lib/cloudwatch'); -const wrapper = require('../middleware/wrapper'); +const defaultResults = process.env.defaultResults || 8 +const tableName = process.env.restaurants_table -const defaultResults = process.env.defaultResults || 8; -const tableName = process.env.restaurants_table; - -function* findRestaurantsByTheme(theme, count) { +async function findRestaurantsByTheme(theme, count) { let req = { TableName: tableName, Limit: count, - FilterExpression: "contains(themes, :theme)", - ExpressionAttributeValues: { ":theme": theme } - }; + FilterExpression: 'contains(themes, :theme)', + ExpressionAttributeValues: { ':theme': theme } + } - let resp = yield cloudwatch.trackExecTime( - "DynamoDBScanLatency", + let resp = await cloudwatch.trackExecTime( + 'DynamoDBScanLatency', () => dynamodb.scan(req).promise() - ); - return resp.Items; + ) + return resp.Items } -const handler = co.wrap(function* (event, context, cb) { - let req = JSON.parse(event.body); - log.debug(`request body is valid JSON`, { requestBody: event.body }); +const handler = async (event, context, cb) => { + let req = JSON.parse(event.body) + log.debug(`request body is valid JSON`, { requestBody: event.body }) - let restaurants = yield findRestaurantsByTheme(req.theme, defaultResults); - log.debug(`found ${restaurants.length} restaurants`); + let restaurants = await findRestaurantsByTheme(req.theme, defaultResults) + log.debug(`found ${restaurants.length} restaurants`) - cloudwatch.incrCount("RestaurantsFound", restaurants.length); + cloudwatch.incrCount('RestaurantsFound', restaurants.length) - let response = { + const response = { statusCode: 200, body: JSON.stringify(restaurants) } - cb(null, response); -}); + return response +} -module.exports.handler = wrapper(handler); \ No newline at end of file +module.exports.handler = wrapper(handler) diff --git a/lib/aws4.js b/lib/aws4.js index 92853e1..6042a19 100644 --- a/lib/aws4.js +++ b/lib/aws4.js @@ -1,12 +1,61 @@ -var aws4 = exports, - url = require('url'), - querystring = require('querystring'), - crypto = require('crypto'), - lru = require('./lru'), - credentialsCache = lru(1000), - awscred = require('./awscred') +const url = require('url') +const querystring = require('querystring') +const crypto = require('crypto') +const lru = require('./lru') +const credentialsCache = lru(1000) +const awscred = require('./awscred') + +/* http://docs.amazonwebservices.com/general/latest/gr/signature-version-4.html */ + +/** +Example: + let url = URL.parse(restaurantsApiUrl) + aws4.sign({ + host: url.hostname, + path: url.pathname + }) +*/ +function sign(request, credentials) { + return new RequestSigner(request, credentials).sign() +} + +var isInitialized = false + +/* +Example: + await aws4.init() +*/ +function init() { + return new Promise((resolve, reject) => { + if (isInitialized) { + return resolve() + } + + if (process.env.AWS_ACCESS_KEY_ID) { + isInitialized = true + return resolve() + } else { + console.error('initializing AWS credentials...') + + awscred.load(function(error, result) { + if (error) { + reject(error) + } else { + let cred = result.credentials + process.env.AWS_ACCESS_KEY_ID = cred.accessKeyId + process.env.AWS_SECRET_ACCESS_KEY = cred.secretAccessKey -// http://docs.amazonwebservices.com/general/latest/gr/signature-version-4.html + if (cred.sessionToken) { + process.env.AWS_SESSION_TOKEN = cred.sessionToken + } + + resolve() + isInitialized = true + } + }) + } + }) +} function hmac(key, string, encoding) { return crypto.createHmac('sha256', key).update(string, 'utf8').digest(encoding) @@ -26,11 +75,11 @@ function encodeRfc3986(urlEncodedString) { // request: { path | body, [host], [method], [headers], [service], [region] } // credentials: { accessKeyId, secretAccessKey, [sessionToken] } function RequestSigner(request, credentials) { - if (typeof request === 'string') request = url.parse(request) - var headers = request.headers = (request.headers || {}), - hostParts = this.matchHost(request.hostname || request.host || headers.Host || headers.host) + var headers = request.headers = (request.headers || {}) + + var hostParts = this.matchHost(request.hostname || request.host || headers.Host || headers.host) this.request = request this.credentials = credentials || this.defaultCredentials() @@ -41,18 +90,15 @@ function RequestSigner(request, credentials) { // SES uses a different domain from the service name if (this.service === 'email') this.service = 'ses' - if (!request.method && request.body) - request.method = 'POST' + if (!request.method && request.body) { request.method = 'POST' } if (!headers.Host && !headers.host) { headers.Host = request.hostname || request.host || this.createHost() // If a port is specified explicitly, use it as is - if (request.port) - headers.Host += ':' + request.port + if (request.port) { headers.Host += ':' + request.port } } - if (!request.hostname && !request.host) - request.hostname = headers.Host || headers.host + if (!request.hostname && !request.host) { request.hostname = headers.Host || headers.host } this.isCodeCommitGit = this.service === 'codecommit' && request.method === 'GIT' } @@ -64,8 +110,7 @@ RequestSigner.prototype.matchHost = function(host) { // ES's hostParts are sometimes the other way round, if the value that is expected // to be region equals β€˜es’ switch them back // e.g. search-cluster-name-aaaa00aaaa0aaa0aaaaaaa0aaa.us-east-1.es.amazonaws.com - if (hostParts[1] === 'es') - hostParts = hostParts.reverse() + if (hostParts[1] === 'es') { hostParts = hostParts.reverse() } return hostParts } @@ -73,62 +118,71 @@ RequestSigner.prototype.matchHost = function(host) { // http://docs.aws.amazon.com/general/latest/gr/rande.html RequestSigner.prototype.isSingleRegion = function() { // Special case for S3 and SimpleDB in us-east-1 - if (['s3', 'sdb'].indexOf(this.service) >= 0 && this.region === 'us-east-1') return true + if (['s3', 'sdb'].indexOf(this.service) >= 0 && this.region === 'us-east-1') { + return true + } return ['cloudfront', 'ls', 'route53', 'iam', 'importexport', 'sts'] .indexOf(this.service) >= 0 } RequestSigner.prototype.createHost = function() { - var region = this.isSingleRegion() ? '' : - (this.service === 's3' && this.region !== 'us-east-1' ? '-' : '.') + this.region, - service = this.service === 'ses' ? 'email' : this.service + var region = this.isSingleRegion() ? '' + : (this.service === 's3' && this.region !== 'us-east-1' ? '-' : '.') + this.region + + var service = this.service === 'ses' ? 'email' : this.service return service + region + '.amazonaws.com' } RequestSigner.prototype.prepareRequest = function() { this.parsePath() - var request = this.request, headers = request.headers, query + var request = this.request; var headers = request.headers; var query if (request.signQuery) { - this.parsedPath.query = query = this.parsedPath.query || {} - if (this.credentials.sessionToken) + if (this.credentials.sessionToken) { query['X-Amz-Security-Token'] = this.credentials.sessionToken + } - if (this.service === 's3' && !query['X-Amz-Expires']) - query['X-Amz-Expires'] = 86400 + if (this.service === 's3' && !query['X-Amz-Expires']) { query['X-Amz-Expires'] = 86400 } - if (query['X-Amz-Date']) + if (query['X-Amz-Date']) { this.datetime = query['X-Amz-Date'] - else + } else { query['X-Amz-Date'] = this.getDateTime() + } query['X-Amz-Algorithm'] = 'AWS4-HMAC-SHA256' query['X-Amz-Credential'] = this.credentials.accessKeyId + '/' + this.credentialString() query['X-Amz-SignedHeaders'] = this.signedHeaders() - } else { - if (!request.doNotModifyHeaders && !this.isCodeCommitGit) { - if (request.body && !headers['Content-Type'] && !headers['content-type']) + if (request.body && !headers['Content-Type'] && !headers['content-type']) { headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=utf-8' + } - if (request.body && !headers['Content-Length'] && !headers['content-length']) + if (request.body && !headers['Content-Length'] && !headers['content-length']) { headers['Content-Length'] = Buffer.byteLength(request.body) + } - if (this.credentials.sessionToken && !headers['X-Amz-Security-Token'] && !headers['x-amz-security-token']) + if (this.credentials.sessionToken && + !headers['X-Amz-Security-Token'] && + !headers['x-amz-security-token'] + ) { headers['X-Amz-Security-Token'] = this.credentials.sessionToken + } - if (this.service === 's3' && !headers['X-Amz-Content-Sha256'] && !headers['x-amz-content-sha256']) + if (this.service === 's3' && !headers['X-Amz-Content-Sha256'] && !headers['x-amz-content-sha256']) { headers['X-Amz-Content-Sha256'] = hash(this.request.body || '', 'hex') + } - if (headers['X-Amz-Date'] || headers['x-amz-date']) + if (headers['X-Amz-Date'] || headers['x-amz-date']) { this.datetime = headers['X-Amz-Date'] || headers['x-amz-date'] - else + } else { headers['X-Amz-Date'] = this.getDateTime() + } } delete headers.Authorization @@ -152,13 +206,16 @@ RequestSigner.prototype.sign = function() { RequestSigner.prototype.getDateTime = function() { if (!this.datetime) { - var headers = this.request.headers, - date = new Date(headers.Date || headers.date || new Date) + var headers = this.request.headers + + var date = new Date(headers.Date || headers.date || new Date()) this.datetime = date.toISOString().replace(/[:\-]|\.\d{3}/g, '') // Remove the trailing 'Z' on the timestamp string for CodeCommit git access - if (this.isCodeCommitGit) this.datetime = this.datetime.slice(0, -1) + if (this.isCodeCommitGit) { + this.datetime = this.datetime.slice(0, -1) + } } return this.datetime } @@ -176,9 +233,11 @@ RequestSigner.prototype.authHeader = function() { } RequestSigner.prototype.signature = function() { - var date = this.getDate(), - cacheKey = [this.credentials.secretAccessKey, date, this.region, this.service].join(), - kDate, kRegion, kService, kCredentials = credentialsCache.get(cacheKey) + var date = this.getDate() + + var cacheKey = [this.credentials.secretAccessKey, date, this.region, this.service].join() + + var kDate; var kRegion; var kService; var kCredentials = credentialsCache.get(cacheKey) if (!kCredentials) { kDate = hmac('AWS4' + this.credentials.secretAccessKey, date) kRegion = hmac(kDate, this.region) @@ -201,15 +260,23 @@ RequestSigner.prototype.stringToSign = function() { RequestSigner.prototype.canonicalString = function() { if (!this.parsedPath) this.prepareRequest() - var pathStr = this.parsedPath.path, - query = this.parsedPath.query, - headers = this.request.headers, - queryStr = '', - normalizePath = this.service !== 's3', - decodePath = this.service === 's3' || this.request.doNotEncodePath, - decodeSlashesInPath = this.service === 's3', - firstValOnly = this.service === 's3', - bodyHash + var pathStr = this.parsedPath.path + + var query = this.parsedPath.query + + var headers = this.request.headers + + var queryStr = '' + + var normalizePath = this.service !== 's3' + + var decodePath = this.service === 's3' || this.request.doNotEncodePath + + var decodeSlashesInPath = this.service === 's3' + + var firstValOnly = this.service === 's3' + + var bodyHash if (this.service === 's3' && this.request.signQuery) { bodyHash = 'UNSIGNED-PAYLOAD' @@ -223,13 +290,15 @@ RequestSigner.prototype.canonicalString = function() { if (query) { queryStr = encodeRfc3986(querystring.stringify(Object.keys(query).sort().reduce(function(obj, key) { if (!key) return obj - obj[key] = !Array.isArray(query[key]) ? query[key] : - (firstValOnly ? query[key][0] : query[key].slice().sort()) + obj[key] = !Array.isArray(query[key]) ? query[key] + : (firstValOnly ? query[key][0] : query[key].slice().sort()) return obj }, {}))) } if (pathStr !== '/') { - if (normalizePath) pathStr = pathStr.replace(/\/{2,}/g, '/') + if (normalizePath) { + pathStr = pathStr.replace(/\/{2,}/g, '/') + } pathStr = pathStr.split('/').reduce(function(path, piece) { if (normalizePath && piece === '..') { path.pop() @@ -239,8 +308,12 @@ RequestSigner.prototype.canonicalString = function() { } return path }, []).join('/') - if (pathStr[0] !== '/') pathStr = '/' + pathStr - if (decodeSlashesInPath) pathStr = pathStr.replace(/%2F/g, '/') + if (pathStr[0] !== '/') { + pathStr = '/' + pathStr + } + if (decodeSlashesInPath) { + pathStr = pathStr.replace(/%2F/g, '/') + } } return [ @@ -291,9 +364,11 @@ RequestSigner.prototype.defaultCredentials = function() { } RequestSigner.prototype.parsePath = function() { - var path = this.request.path || '/', - queryIx = path.indexOf('?'), - query = null + var path = this.request.path || '/' + + var queryIx = path.indexOf('?') + + var query = null if (queryIx >= 0) { query = querystring.parse(path.slice(queryIx + 1)) @@ -316,8 +391,9 @@ RequestSigner.prototype.parsePath = function() { } RequestSigner.prototype.formatPath = function() { - var path = this.parsedPath.path, - query = this.parsedPath.query + var path = this.parsedPath.path + + var query = this.parsedPath.query if (!query) return path @@ -327,41 +403,8 @@ RequestSigner.prototype.formatPath = function() { return path + '?' + encodeRfc3986(querystring.stringify(query)) } -aws4.RequestSigner = RequestSigner - -aws4.sign = function(request, credentials) { - return new RequestSigner(request, credentials).sign() +module.exports = { + RequestSigner, + sign, + init } - -var isInitialized = false; -aws4.init = function() { - return new Promise(function(resolve, reject) { - if (isInitialized) { - return resolve() - } - - if (process.env.AWS_ACCESS_KEY_ID) { - isInitialized = true; - return resolve() - } else { - console.error("initializing AWS credentials...") - - awscred.load(function(error, result) { - if (error) { - reject(error) - } else { - let cred = result.credentials; - process.env.AWS_ACCESS_KEY_ID = cred.accessKeyId; - process.env.AWS_SECRET_ACCESS_KEY = cred.secretAccessKey; - - if (cred.sessionToken) { - process.env.AWS_SESSION_TOKEN = cred.sessionToken; - } - - resolve() - isInitialized = true - } - }) - } - }) -} \ No newline at end of file diff --git a/lib/awscred.js b/lib/awscred.js index b650755..e88a7ef 100644 --- a/lib/awscred.js +++ b/lib/awscred.js @@ -1,7 +1,7 @@ -var fs = require('fs'), - path = require('path'), - http = require('http'), - env = process.env +const fs = require('fs') +const path = require('path') +const http = require('http') +const { env } = process exports.credentialsCallChain = [ loadCredentialsFromEnv, @@ -34,7 +34,7 @@ function loadCredentialsAndRegion(options, cb) { if (!cb) { cb = options; options = {} } cb = once(cb) - var out = {}, callsRemaining = 2 + var out = {}; var callsRemaining = 2 function checkDone(propName) { return function(err, data) { @@ -57,11 +57,9 @@ function loadCredentials(options, cb) { credentialsCallChain[i](options, function(err, credentials) { if (err) return cb(err) - if (credentials.accessKeyId && credentials.secretAccessKey) - return cb(null, credentials) + if (credentials.accessKeyId && credentials.secretAccessKey) { return cb(null, credentials) } - if (i >= credentialsCallChain.length - 1) - return cb(null, {}) + if (i >= credentialsCallChain.length - 1) { return cb(null, {}) } nextCall(i + 1) }) @@ -77,11 +75,9 @@ function loadRegion(options, cb) { regionCallChain[i](options, function(err, region) { if (err) return cb(err) - if (region) - return cb(null, region) + if (region) { return cb(null, region) } - if (i >= regionCallChain.length - 1) - return cb(null, 'us-east-1') + if (i >= regionCallChain.length - 1) { return cb(null, 'us-east-1') } nextCall(i + 1) }) @@ -158,17 +154,19 @@ function loadCredentialsFromEcs(options, cb) { options.host = '169.254.170.2' options.path = process.env.AWS_CONTAINER_CREDENTIALS_RELATIVE_URI - return request(options, function(err, res, data) { + return request(options, function(err, res, data) { if (err && ~TIMEOUT_CODES.indexOf(err.code)) return cb(null, {}) if (err) return cb(err) - if (res.statusCode != 200) + if (res.statusCode !== 200) { return cb(new Error('Failed to fetch IAM role: ' + res.statusCode + ' ' + data)) + } try { data = JSON.parse(data) } catch (e) { } - if (res.statusCode != 200) - return cb(new Error('Failed to fetch IAM credentials: ' + res.statusCode + ' ' + data)) + if (res.statusCode !== 200) { + return cb(new Error('Failed to fetch IAM credentials: ' + res.statusCode + ' ' + data)) + } cb(null, { accessKeyId: data.AccessKeyId, @@ -198,8 +196,9 @@ function loadCredentialsFromEc2Metadata(options, cb) { if (err && ~TIMEOUT_CODES.indexOf(err.code)) return cb(null, {}) if (err) return cb(err) - if (res.statusCode != 200) + if (res.statusCode !== 200) { return cb(new Error('Failed to fetch IAM role: ' + res.statusCode + ' ' + data)) + } options.path += data.split('\n')[0] request(options, function(err, res, data) { @@ -207,8 +206,9 @@ function loadCredentialsFromEc2Metadata(options, cb) { try { data = JSON.parse(data) } catch (e) { } - if (res.statusCode != 200 || data.Code != 'Success') + if (res.statusCode !== 200 || data.Code !== 'Success') { return cb(new Error('Failed to fetch IAM credentials: ' + res.statusCode + ' ' + data)) + } cb(null, { accessKeyId: data.AccessKeyId, @@ -221,25 +221,28 @@ function loadCredentialsFromEc2Metadata(options, cb) { } function loadProfileFromIniFile(options, defaultFilename, cb) { - var filename = options.filename || path.join(resolveHome(), '.aws', defaultFilename), - profile = options.profile || resolveProfile() + var filename = options.filename || path.join(resolveHome(), '.aws', defaultFilename) + + var profile = options.profile || resolveProfile() fs.readFile(filename, 'utf8', function(err, data) { - if (err && err.code == 'ENOENT') return cb(null, {}) + if (err && err.code === 'ENOENT') return cb(null, {}) if (err) return cb(err) cb(null, parseAwsIni(data)[profile] || {}) }) } function loadProfileFromIniFileSync(options, defaultFilename) { - var filename = options.filename || path.join(resolveHome(), '.aws', defaultFilename), - profile = options.profile || resolveProfile(), - data + var filename = options.filename || path.join(resolveHome(), '.aws', defaultFilename) + + var profile = options.profile || resolveProfile() + + var data try { data = fs.readFileSync(filename, 'utf8') } catch (err) { - if (err.code == 'ENOENT') return {} + if (err.code === 'ENOENT') return {} throw err } @@ -291,10 +294,13 @@ function resolveHome() { // Fairly strict INI parser – will only deal with alpha keys, must be within sections function parseAwsIni(ini) { - var section, - out = Object.create(null), - re = /^\[([^\]]+)\]\s*$|^([a-z_]+)\s*=\s*(.+?)\s*$/, - lines = ini.split(/\r?\n/) + var section + + var out = Object.create(null) + + var re = /^\[([^\]]+)\]\s*$|^([a-z_]+)\s*=\s*(.+?)\s*$/ + + var lines = ini.split(/\r?\n/) lines.forEach(function(line) { var match = line.match(re) @@ -336,4 +342,4 @@ function once(cb) { called = true cb.apply(this, arguments) } -} \ No newline at end of file +} diff --git a/lib/cloudwatch.js b/lib/cloudwatch.js index 52b2862..6b202b7 100644 --- a/lib/cloudwatch.js +++ b/lib/cloudwatch.js @@ -1,162 +1,168 @@ -'use strict'; - -const co = require('co'); -const AWSXRay = require('aws-xray-sdk'); -const AWS = AWSXRay.captureAWS(require('aws-sdk')); -const log = require('./log'); -const cloudwatch = new AWS.CloudWatch(); - -const namespace = 'big-mouth'; -const async = (process.env.async_metrics || 'false') === 'true'; +const AWSXRay = require('aws-xray-sdk') +const AWS = AWSXRay.captureAWS(require('aws-sdk')) +const log = require('./log') +const cloudwatch = new AWS.CloudWatch() +const { + async_metrics, // eslint-disable-line + AWS_LAMBDA_FUNCTION_NAME, + AWS_LAMBDA_FUNCTION_VERSION, + STAGE +} = process.env + +const namespace = 'big-mouth' +const async = (async_metrics || 'false') === 'true' // eslint-disable-line camelcase // the Lambda execution environment defines a number of env variables: // https://docs.aws.amazon.com/lambda/latest/dg/current-supported-versions.html // and the serverless framework also defines a STAGE env variable too -const dimensions = - [ - { Name: 'Function', Value: process.env.AWS_LAMBDA_FUNCTION_NAME }, - { Name: 'Version', Value: process.env.AWS_LAMBDA_FUNCTION_VERSION }, - { Name: 'Stage', Value: process.env.STAGE } - ] - .filter(dim => dim.Value); +const dimensions = [ + { Name: 'Function', Value: AWS_LAMBDA_FUNCTION_NAME }, + { Name: 'Version', Value: AWS_LAMBDA_FUNCTION_VERSION }, + { Name: 'Stage', Value: STAGE } +].filter(dim => dim.Value) -let countMetrics = {}; -let timeMetrics = {}; +let countMetrics = {} +let timeMetrics = {} function getCountMetricData(name, value) { return { - MetricName : name, - Dimensions : dimensions, - Unit : 'Count', - Value : value - }; + MetricName: name, + Dimensions: dimensions, + Unit: 'Count', + Value: value + } } function getTimeMetricData(name, statsValues) { return { - MetricName : name, - Dimensions : dimensions, - Unit : 'Milliseconds', - StatisticValues : statsValues - }; + MetricName: name, + Dimensions: dimensions, + Unit: 'Milliseconds', + StatisticValues: statsValues + } } function getCountMetricDatum() { - let keys = Object.keys(countMetrics); + let keys = Object.keys(countMetrics) if (keys.length === 0) { - return []; + return [] } - let metricDatum = keys.map(key => getCountMetricData(key, countMetrics[key])); - countMetrics = {}; // zero out the recorded count metrics - return metricDatum; + let metricDatum = keys.map(key => getCountMetricData(key, countMetrics[key])) + countMetrics = {} // zero out the recorded count metrics + return metricDatum } function getTimeMetricDatum() { - let keys = Object.keys(timeMetrics); + let keys = Object.keys(timeMetrics) if (keys.length === 0) { - return []; + return [] } - let metricDatum = keys.map(key => getTimeMetricData(key, timeMetrics[key])); - timeMetrics = {}; // zero out the recorded time metrics - return metricDatum; + let metricDatum = keys.map(key => getTimeMetricData(key, timeMetrics[key])) + timeMetrics = {} // zero out the recorded time metrics + return metricDatum } -let flush = co.wrap(function* () { - let countDatum = getCountMetricDatum(); - let timeDatum = getTimeMetricDatum(); - let allDatum = countDatum.concat(timeDatum); +async function flush() { + let countDatum = getCountMetricDatum() + let timeDatum = getTimeMetricDatum() + let allDatum = countDatum.concat(timeDatum) - if (allDatum.length == 0) { return; } + if (allDatum.length === 0) { + return + } - let metricNames = allDatum.map(x => x.MetricName).join(','); - log.debug(`flushing [${allDatum.length}] metrics to CloudWatch: ${metricNames}`); + let metricNames = allDatum.map(x => x.MetricName).join(',') + log.debug(`flushing [${allDatum.length}] metrics to CloudWatch: ${metricNames}`) - var params = { + const params = { MetricData: allDatum, Namespace: namespace - }; + } try { - yield cloudwatch.putMetricData(params).promise(); - log.debug(`flushed [${allDatum.length}] metrics to CloudWatch: ${metricNames}`); + await cloudwatch.putMetricData(params).promise() + log.debug(`flushed [${allDatum.length}] metrics to CloudWatch: ${metricNames}`) } catch (err) { - log.warn(`cloudn't flush [${allDatum.length}] CloudWatch metrics`, null, err); - } -}); + log.warn(`couldn't flush [${allDatum.length}] CloudWatch metrics`, null, err) + } +} function clear() { - countMetrics = {}; - timeMetrics = {}; + countMetrics = {} + timeMetrics = {} } function incrCount(metricName, count) { - count = count || 1; + count = count || 1 if (async) { - console.log(`MONITORING|${count}|count|${metricName}|${namespace}`); + console.log(`MONITORING|${count}|count|${metricName}|${namespace}`) } else { if (countMetrics[metricName]) { - countMetrics[metricName] += count; + countMetrics[metricName] += count } else { - countMetrics[metricName] = count; + countMetrics[metricName] = count } } } function recordTimeInMillis(metricName, ms) { if (!ms) { - return; + return } - log.debug(`new execution time for [${metricName}] : ${ms} milliseconds`); + log.debug(`new execution time for [${metricName}] : ${ms} milliseconds`) if (async) { - console.log(`MONITORING|${ms}|milliseconds|${metricName}|${namespace}`); + console.log(`MONITORING|${ms}|milliseconds|${metricName}|${namespace}`) } else { if (timeMetrics[metricName]) { - let metric = timeMetrics[metricName]; - metric.Sum += ms; - metric.Maximum = Math.max(metric.Maximum, ms); - metric.Minimum = Math.min(metric.Minimum, ms); - metric.SampleCount += 1; + let metric = timeMetrics[metricName] + metric.Sum += ms + metric.Maximum = Math.max(metric.Maximum, ms) + metric.Minimum = Math.min(metric.Minimum, ms) + metric.SampleCount += 1 } else { let statsValues = { - Maximum : ms, - Minimum : ms, - SampleCount : 1, - Sum : ms - }; - timeMetrics[metricName] = statsValues; + Maximum: ms, + Minimum: ms, + SampleCount: 1, + Sum: ms + } + timeMetrics[metricName] = statsValues } } } function trackExecTime(metricName, f) { - if (!f || typeof f !== "function") { - throw new Error('cloudWatch.trackExecTime requires a function, eg. () => 42'); + if (!f || typeof f !== 'function') { + throw new Error('cloudWatch.trackExecTime requires a function, eg. () => 42') } if (!metricName) { - throw new Error('cloudWatch.trackExecTime requires a metric name, eg. "CloudSearch-latency"'); + throw new Error('cloudWatch.trackExecTime requires a metric name, eg. "CloudSearch-latency"') } - let start = new Date().getTime(), end; - let res = f(); - + let start = new Date().getTime() + let end + let res = f() + // anything with a 'then' function can be considered a Promise... // http://stackoverflow.com/a/27746324/55074 + + // Check isPromise if (!res.hasOwnProperty('then')) { - end = new Date().getTime(); - recordTimeInMillis(metricName, end-start); - return res; + end = new Date().getTime() + recordTimeInMillis(metricName, end - start) + return res } else { return res.then(x => { - end = new Date().getTime(); - recordTimeInMillis(metricName, end-start); - return x; - }); + end = new Date().getTime() + recordTimeInMillis(metricName, end - start) + return x + }) } } @@ -166,4 +172,4 @@ module.exports = { incrCount, trackExecTime, recordTimeInMillis -}; \ No newline at end of file +} diff --git a/lib/correlation-ids.js b/lib/correlation-ids.js index be1115c..46d193e 100644 --- a/lib/correlation-ids.js +++ b/lib/correlation-ids.js @@ -1,26 +1,39 @@ -'use strict'; +/** + * Manage context keys as node Globals + */ -let clearAll = () => global.CONTEXT = undefined; +function clearAll() { + global.CONTEXT = undefined +} -let replaceAllWith = ctx => global.CONTEXT = ctx; +function replaceAllWith(ctx) { + global.CONTEXT = ctx +} -let set = (key, value) => { - if (!key.startsWith("x-correlation-")) { - key = "x-correlation-" + key; +const prefix = 'x-correlation-' + +function prefixKey(key) { + if (!key.startsWith(prefix)) { + return `${prefix}${key}` } + return key +} +function set(key, value) { + const contextKey = prefixKey(key) if (!global.CONTEXT) { - global.CONTEXT = {}; + global.CONTEXT = {} } + global.CONTEXT[contextKey] = value +} - global.CONTEXT[key] = value; -}; - -let get = () => global.CONTEXT || {}; +function get() { + return global.CONTEXT || {} +} module.exports = { - clearAll: clearAll, - replaceAllWith: replaceAllWith, - set: set, - get: get -}; \ No newline at end of file + clearAll, + replaceAllWith, + set, + get +} diff --git a/lib/http.js b/lib/http.js index 0eab77c..7a748e5 100644 --- a/lib/http.js +++ b/lib/http.js @@ -1,87 +1,89 @@ -'use strict'; - -const correlationIds = require('./correlation-ids'); -const http = require('superagent-promise')(require('superagent'), Promise); +const correlationIds = require('./correlation-ids') +const superAgent = require('superagent') +const http = require('superagent-promise')(superAgent, Promise) function getRequest (options) { - let uri = options.uri; - let method = options.method || ''; + let uri = options.uri + let method = options.method || '' switch (method.toLowerCase()) { case '': case 'get': - return http.get(uri); + return http.get(uri) case 'head': - return http.head(uri); + return http.head(uri) case 'post': - return http.post(uri); + return http.post(uri) case 'put': - return http.put(uri); + return http.put(uri) case 'delete': - return http.del(uri); + return http.del(uri) default: - throw new Error(`unsupported method : ${method.toLowerCase()}`); + throw new Error(`unsupported method : ${method.toLowerCase()}`) } } function setHeaders (request, headers) { - let headerNames = Object.keys(headers); - headerNames.forEach(h => request = request.set(h, headers[h])); + let headerNames = Object.keys(headers) + headerNames.forEach(h => { + request = request.set(h, headers[h]) + }) - return request; + return request } function setQueryStrings (request, qs) { if (!qs) { - return request; + return request } - - return request.query(qs); + + return request.query(qs) } function setBody (request, body) { if (!body) { - return request; + return request } - return request.send(body); + return request.send(body) } -// options: { -// uri : string -// method : GET (default) | POST | PUT | HEAD -// headers : object -// qs : object -// body : object -// } -let Req = (options) => { +/** + * Request with context + * @param {object} options - request options + * @param {object} options.uri - url + * @param {object} options.method - method. default GET + * @param {object} options.qs - qs + * @param {object} options.body - body + */ +function Req(options) { if (!options) { - throw new Error('no HTTP request options is provided'); + throw new Error('no HTTP request options is provided') } if (!options.uri) { - throw new Error('no HTTP uri is specified'); + throw new Error('no HTTP uri is specified') } - const context = correlationIds.get(); + const context = correlationIds.get() // copy the provided headers last so it overrides the values from the context - let headers = Object.assign({}, context, options.headers); + let headers = Object.assign({}, context, options.headers) - let request = getRequest(options); + let request = getRequest(options) - request = setHeaders(request, headers); - request = setQueryStrings(request, options.qs); - request = setBody(request, options.body); + request = setHeaders(request, headers) + request = setQueryStrings(request, options.qs) + request = setBody(request, options.body) return request .catch(e => { if (e.response && e.response.error) { - throw e.response.error; + throw e.response.error } - - throw e; - }); -}; -module.exports = Req; \ No newline at end of file + throw e + }) +} + +module.exports = Req diff --git a/lib/kinesis.js b/lib/kinesis.js index de1ff28..3852586 100644 --- a/lib/kinesis.js +++ b/lib/kinesis.js @@ -1,44 +1,42 @@ -'use strict'; - -const _ = require('lodash'); -const AWSXRay = require('aws-xray-sdk'); -const AWS = AWSXRay.captureAWS(require('aws-sdk')); -const Kinesis = new AWS.Kinesis(); -const log = require('./log'); -const correlationIds = require('./correlation-ids'); +const AWSXRay = require('aws-xray-sdk') +const AWS = AWSXRay.captureAWS(require('aws-sdk')) +const Kinesis = new AWS.Kinesis() +const log = require('./log') +const correlationIds = require('./correlation-ids') function tryJsonParse(data) { - if (!_.isString(data)) { - return null; + // !_.isString(data) + if (typeof data !== 'string') { + return null } try { - return JSON.parse(data); + return JSON.parse(data) } catch (err) { - log.warn('only JSON string data can be modified to insert correlation IDs'); - return null; + log.warn('only JSON string data can be modified to insert correlation IDs') + return null } } function addCorrelationIds(data) { // only do with with JSON string data - const payload = tryJsonParse(data); + const payload = tryJsonParse(data) if (!payload) { - return data; + return data } - const context = correlationIds.get(); - const newData = Object.assign({ __context__: context }, payload); - return JSON.stringify(newData); + const context = correlationIds.get() + const newData = Object.assign({ __context__: context }, payload) + return JSON.stringify(newData) } function putRecord(params, cb) { - const newData = addCorrelationIds(params.Data); - params = Object.assign({}, params, { Data: newData }); + const newData = addCorrelationIds(params.Data) + params = Object.assign({}, params, { Data: newData }) - return Kinesis.putRecord(params, cb); + return Kinesis.putRecord(params, cb) }; -const client = Object.assign({}, Kinesis, { putRecord }); +const client = Object.assign({}, Kinesis, { putRecord }) -module.exports = client; \ No newline at end of file +module.exports = client diff --git a/lib/log.js b/lib/log.js index 429bb34..7b75079 100644 --- a/lib/log.js +++ b/lib/log.js @@ -1,15 +1,13 @@ -'use strict'; - -const correlationIds = require('./correlation-ids'); +const correlationIds = require('./correlation-ids') const LogLevels = { - DEBUG : 0, - INFO : 1, - WARN : 2, - ERROR : 3 -}; + DEBUG: 0, + INFO: 1, + WARN: 2, + ERROR: 3 +} -// most of these are available through the Node.js execution environment for Lambda +// Available through the Node.js execution environment for Lambda // see https://docs.aws.amazon.com/lambda/latest/dg/current-supported-versions.html const DEFAULT_CONTEXT = { awsRegion: process.env.AWS_REGION || process.env.AWS_DEFAULT_REGION, @@ -17,64 +15,81 @@ const DEFAULT_CONTEXT = { functionVersion: process.env.AWS_LAMBDA_FUNCTION_VERSION, functionMemorySize: process.env.AWS_LAMBDA_FUNCTION_MEMORY_SIZE, stage: process.env.ENVIRONMENT || process.env.STAGE -}; +} function getContext () { // if there's a global variable for all the current request context then use it - const context = correlationIds.get(); + const context = correlationIds.get() if (context) { // note: this is a shallow copy, which is ok as we're not going to mutate anything - return Object.assign({}, DEFAULT_CONTEXT, context); + return Object.assign({}, DEFAULT_CONTEXT, context) } - - return DEFAULT_CONTEXT; + return DEFAULT_CONTEXT } // default to debug if not specified function logLevelName() { - return process.env.log_level || 'DEBUG'; + return process.env.log_level || 'DEBUG' } -function isEnabled (level) { - return level >= LogLevels[logLevelName()]; +function isEnabled(level) { + return level >= LogLevels[logLevelName()] } -function appendError(params, err) { +function appendError(params = {}, err) { if (!err) { - return params; + return params } - return Object.assign( - {}, - params || {}, - { errorName: err.name, errorMessage: err.message, stackTrace: err.stack } - ); + return Object.assign({}, params, { + errorName: err.name, + errorMessage: err.message, + stackTrace: err.stack + }) } -function log (levelName, message, params) { +function log(levelName, message, params = {}) { if (!isEnabled(LogLevels[levelName])) { - return; + return } - let context = getContext(); - let logMsg = Object.assign({}, context, params); - logMsg.level = levelName; - logMsg.message = message; + const context = getContext() + let logMsg = Object.assign({}, context, params) + logMsg.level = levelName + logMsg.message = message - console.log(JSON.stringify(logMsg)); + console.log(JSON.stringify(logMsg)) } -module.exports.debug = (msg, params) => log('DEBUG', msg, params); -module.exports.info = (msg, params) => log('INFO', msg, params); -module.exports.warn = (msg, params, error) => log('WARN', msg, appendError(params, error)); -module.exports.error = (msg, params, error) => log('ERROR', msg, appendError(params, error)); +function debug(msg, params) { + return log('DEBUG', msg, params) +} -module.exports.enableDebug = () => { - const oldLevel = process.env.log_level; - process.env.log_level = 'DEBUG'; +function info(msg, params) { + return log('INFO', msg, params) +} +function warn(msg, params, error) { + return log('WARN', msg, appendError(params, error)) +} + +function error(msg, params, error) { + return log('ERROR', msg, appendError(params, error)) +} + +function enableDebug() { + const oldLevel = process.env.log_level + process.env.log_level = 'DEBUG' // return a function to perform the rollback return () => { - process.env.log_level = oldLevel; + process.env.log_level = oldLevel } -}; \ No newline at end of file +} + +module.exports = { + enableDebug, + debug, + info, + warn, + error +} diff --git a/lib/lru.js b/lib/lru.js index 1418c2a..fc7e023 100644 --- a/lib/lru.js +++ b/lib/lru.js @@ -1,4 +1,4 @@ -module.exports = function(size) { +module.exports = function cache(size) { return new LruCache(size) } @@ -42,14 +42,13 @@ LruCache.prototype.prune = function() { } } - function DoublyLinkedList() { this.firstNode = null this.lastNode = null } DoublyLinkedList.prototype.moveToFront = function(node) { - if (this.firstNode == node) return + if (this.firstNode === node) return this.remove(node) @@ -75,22 +74,21 @@ DoublyLinkedList.prototype.pop = function() { } DoublyLinkedList.prototype.remove = function(node) { - if (this.firstNode == node) { + if (this.firstNode === node) { this.firstNode = node.next } else if (node.prev != null) { node.prev.next = node.next } - if (this.lastNode == node) { + if (this.lastNode === node) { this.lastNode = node.prev } else if (node.next != null) { node.next.prev = node.prev } } - function DoublyLinkedNode(key, val) { this.key = key this.val = val this.prev = null this.next = null -} \ No newline at end of file +} diff --git a/lib/notify.js b/lib/notify.js index 320f5ba..950e0dc 100644 --- a/lib/notify.js +++ b/lib/notify.js @@ -1,104 +1,100 @@ -'use strict'; +const sns = require('./sns') +const kinesis = require('./kinesis') +const chance = require('chance').Chance() +const log = require('./log') +const cloudwatch = require('./cloudwatch') -const _ = require('lodash'); -const co = require('co'); -const sns = require('./sns'); -const kinesis = require('./kinesis'); -const chance = require('chance').Chance(); -const log = require('./log'); -const cloudwatch = require('./cloudwatch'); +const streamName = process.env.order_events_stream +const restaurantTopicArn = process.env.restaurant_notification_topic +const userTopicArn = process.env.user_notification_topic -const streamName = process.env.order_events_stream; -const restaurantTopicArn = process.env.restaurant_notification_topic; -const userTopicArn = process.env.user_notification_topic; - -let notifyRestaurantOfOrder = co.wrap(function* (order) { +async function notifyRestaurantOfOrder(order) { try { - if (chance.bool({likelihood: 75})) { // 75% chance of failure - throw new Error("boom"); + if (chance.bool({ likelihood: 75 })) { // 75% chance of failure + throw new Error('boom') } let snsReq = { Message: JSON.stringify(order), TopicArn: restaurantTopicArn - }; - yield cloudwatch.trackExecTime( - "SnsPublishLatency", + } + await cloudwatch.trackExecTime( + 'SnsPublishLatency', () => sns.publish(snsReq).promise() - ); + ) let logContext = { orderId: order.orderId, restaurantName: order.restaurantName, userEmail: order.userEmail - }; - log.debug('notified restaurant of new order', logContext); + } + log.debug('notified restaurant of new order', logContext) - let data = _.clone(order); - data.eventType = 'restaurant_notified'; + let data = Object.assign({}, order) + data.eventType = 'restaurant_notified' let kinesisReq = { Data: JSON.stringify(data), // the SDK would base64 encode this for us PartitionKey: order.orderId, StreamName: streamName - }; - yield cloudwatch.trackExecTime( - "KinesisPutRecordLatency", + } + await cloudwatch.trackExecTime( + 'KinesisPutRecordLatency', () => kinesis.putRecord(kinesisReq).promise() - ); - log.debug('published event into Kinesis', { eventName: 'restaurant_notified' }); + ) + log.debug('published event into Kinesis', { eventName: 'restaurant_notified' }) - cloudwatch.incrCount('NotifyRestaurantSuccess'); + cloudwatch.incrCount('NotifyRestaurantSuccess') } catch (err) { - cloudwatch.incrCount('NotifyRestaurantFailed'); - throw err; + cloudwatch.incrCount('NotifyRestaurantFailed') + throw err } -}); +} -let notifyUserOfOrderAccepted = co.wrap(function* (order) { +async function notifyUserOfOrderAccepted(order) { try { - if (chance.bool({likelihood: 75})) { // 75% chance of failure - throw new Error("boom"); + if (chance.bool({ likelihood: 75 })) { // 75% chance of failure + throw new Error('boom') } - + let snsReq = { Message: JSON.stringify(order), TopicArn: userTopicArn - }; - yield cloudwatch.trackExecTime( - "SnsPublishLatency", + } + await cloudwatch.trackExecTime( + 'SnsPublishLatency', () => sns.publish(snsReq).promise() - ); - + ) + let logContext = { orderId: order.orderId, restaurantName: order.restaurantName, userEmail: order.userEmail - }; - log.debug('notified user of accepted order', logContext); - - let data = _.clone(order); - data.eventType = 'user_notified'; - + } + log.debug('notified user of accepted order', logContext) + + let data = Object.assign({}, order) + data.eventType = 'user_notified' + let kinesisReq = { Data: JSON.stringify(data), // the SDK would base64 encode this for us PartitionKey: order.orderId, StreamName: streamName - }; - yield cloudwatch.trackExecTime( - "KinesisPutRecordLatency", + } + await cloudwatch.trackExecTime( + 'KinesisPutRecordLatency', () => kinesis.putRecord(kinesisReq).promise() - ); - log.debug(`published event into Kinesis`, { eventName: 'user_notified' }); - - cloudwatch.incrCount('NotifyUserSuccess'); + ) + log.debug(`published event into Kinesis`, { eventName: 'user_notified' }) + + cloudwatch.incrCount('NotifyUserSuccess') } catch (err) { - cloudwatch.incrCount('NotifyUserFailed'); - throw err; + cloudwatch.incrCount('NotifyUserFailed') + throw err } -}); +} module.exports = { restaurantOfOrder: notifyRestaurantOfOrder, userOfOrderAccepted: notifyUserOfOrderAccepted -}; \ No newline at end of file +} diff --git a/lib/retry.js b/lib/retry.js index 4894f97..eb625fd 100644 --- a/lib/retry.js +++ b/lib/retry.js @@ -1,54 +1,51 @@ -'use strict'; +const sns = require('./sns') +const log = require('./log') +const cloudwatch = require('./cloudwatch') -const co = require('co'); -const sns = require('./sns'); -const log = require('./log'); -const cloudwatch = require('./cloudwatch'); +const restaurantRetryTopicArn = process.env.restaurant_notification_retry_topic +const userRetryTopicArn = process.env.user_notification_retry_topic -const restaurantRetryTopicArn = process.env.restaurant_notification_retry_topic; -const userRetryTopicArn = process.env.user_notification_retry_topic; - -let retryRestaurantNotification = co.wrap(function* (order) { - let snsReq = { +async function retryRestaurantNotification(order) { + const snsReq = { Message: JSON.stringify(order), TopicArn: restaurantRetryTopicArn - }; - yield cloudwatch.trackExecTime( - "SnsPublishLatency", + } + await cloudwatch.trackExecTime( + 'SnsPublishLatency', () => sns.publish(snsReq).promise() - ); + ) - let logContext = { + const logContext = { orderId: order.orderId, restaurantName: order.restaurantName, userEmail: order.userEmail - }; - log.debug('queued restaurant notification for retry', logContext); + } + log.debug('queued restaurant notification for retry', logContext) - cloudwatch.incrCount("NotifyRestaurantQueued"); -}); + cloudwatch.incrCount('NotifyRestaurantQueued') +} -let retryUserNotification = co.wrap(function* (order) { - let snsReq = { +async function retryUserNotification(order) { + const snsReq = { Message: JSON.stringify(order), TopicArn: userRetryTopicArn - }; - yield cloudwatch.trackExecTime( - "SnsPublishLatency", + } + await cloudwatch.trackExecTime( + 'SnsPublishLatency', () => sns.publish(snsReq).promise() - ); + ) - let logContext = { + const logContext = { orderId: order.orderId, restaurantName: order.restaurantName, userEmail: order.userEmail - }; - log.debug('queued user notification for retry', logContext); + } + log.debug('queued user notification for retry', logContext) - cloudwatch.incrCount("NotifyUserQueued"); -}); + cloudwatch.incrCount('NotifyUserQueued') +} module.exports = { restaurantNotification: retryRestaurantNotification, userNotification: retryUserNotification -}; \ No newline at end of file +} diff --git a/lib/sns.js b/lib/sns.js index 9b0874b..047a37b 100644 --- a/lib/sns.js +++ b/lib/sns.js @@ -1,32 +1,32 @@ -'use strict'; - -const AWSXRay = require('aws-xray-sdk'); -const AWS = AWSXRay.captureAWS(require('aws-sdk')); -const SNS = new AWS.SNS(); -const correlationIds = require('./correlation-ids'); +const AWSXRay = require('aws-xray-sdk') +const AWS = AWSXRay.captureAWS(require('aws-sdk')) +const SNS = new AWS.SNS() +const correlationIds = require('./correlation-ids') function addCorrelationIds(messageAttributes) { - let attributes = {}; - let context = correlationIds.get(); + let attributes = {} + let context = correlationIds.get() for (let key in context) { attributes[key] = { DataType: 'String', StringValue: context[key] - }; + } } // use `attribtues` as base so if the user's message attributes would override // our correlation IDs - return Object.assign(attributes, messageAttributes || {}); + return Object.assign(attributes, messageAttributes || {}) } function publish(params, cb) { - const newMessageAttributes = addCorrelationIds(params.MessageAttributes); - params = Object.assign(params, { MessageAttributes: newMessageAttributes }); + const newMessageAttributes = addCorrelationIds(params.MessageAttributes) + params = Object.assign(params, { + MessageAttributes: newMessageAttributes + }) - return SNS.publish(params, cb); -}; + return SNS.publish(params, cb) +} -const client = Object.assign({}, SNS, { publish }); +const client = Object.assign({}, SNS, { publish }) -module.exports = client; \ No newline at end of file +module.exports = client diff --git a/middleware/capture-correlation-ids.js b/middleware/capture-correlation-ids.js index e0d60d2..b40ad32 100644 --- a/middleware/capture-correlation-ids.js +++ b/middleware/capture-correlation-ids.js @@ -1,138 +1,136 @@ -'use strict'; - -const correlationIds = require('../lib/correlation-ids'); -const log = require('../lib/log'); +const correlationIds = require('../lib/correlation-ids') +const log = require('../lib/log') function captureHttp(headers, awsRequestId, sampleDebugLogRate) { if (!headers) { - log.warn(`Request ${awsRequestId} is missing headers`); - return; + log.warn(`Request ${awsRequestId} is missing headers`) + return } - let context = { awsRequestId }; + let context = { awsRequestId } for (const header in headers) { if (header.toLowerCase().startsWith('x-correlation-')) { - context[header] = headers[header]; + context[header] = headers[header] } } - + if (!context['x-correlation-id']) { - context['x-correlation-id'] = awsRequestId; + context['x-correlation-id'] = awsRequestId } // forward the original User-Agent on if (headers['User-Agent']) { - context['User-Agent'] = headers['User-Agent']; + context['User-Agent'] = headers['User-Agent'] } if (headers['Debug-Log-Enabled']) { - context['Debug-Log-Enabled'] = headers['Debug-Log-Enabled']; + context['Debug-Log-Enabled'] = headers['Debug-Log-Enabled'] } else { - context['Debug-Log-Enabled'] = Math.random() < sampleDebugLogRate ? 'true' : 'false'; + context['Debug-Log-Enabled'] = Math.random() < sampleDebugLogRate ? 'true' : 'false' } - correlationIds.replaceAllWith(context); + correlationIds.replaceAllWith(context) } function parsePayload (record) { - let json = new Buffer(record.kinesis.data, 'base64').toString('utf8'); - return JSON.parse(json); + let json = Buffer.from(record.kinesis.data, 'base64').toString('utf8') + return JSON.parse(json) } function captureKinesis(event, context, sampleDebugLogRate) { - const awsRequestId = context.awsRequestId; + const awsRequestId = context.awsRequestId const events = event .Records .map(parsePayload) .map(record => { - // the wrapped kinesis client would put the correlation IDs as part of + // the wrapped kinesis client would put the correlation IDs as part of // the payload as a special __context__ property - let recordContext = record.__context__ || {}; - recordContext.awsRequestId = awsRequestId; + let recordContext = record.__context__ || {} + recordContext.awsRequestId = awsRequestId - delete record.__context__; + delete record.__context__ if (!recordContext['x-correlation-id']) { - recordContext['x-correlation-id'] = awsRequestId; + recordContext['x-correlation-id'] = awsRequestId } if (!recordContext['Debug-Log-Enabled']) { - recordContext['Debug-Log-Enabled'] = Math.random() < sampleDebugLogRate ? 'true' : 'false'; + recordContext['Debug-Log-Enabled'] = Math.random() < sampleDebugLogRate ? 'true' : 'false' } - let debugLog = recordContext['Debug-Log-Enabled'] === 'true'; + let debugLog = recordContext['Debug-Log-Enabled'] === 'true' - let oldContext = undefined; - let debugLogRollback = undefined; + let oldContext + let debugLogRollback // lets you add more correlation IDs for just this record record.addToScope = (key, value) => { - if (!key.startsWith("x-correlation-")) { - key = "x-correlation-" + key; + if (!key.startsWith('x-correlation-')) { + key = 'x-correlation-' + key } - recordContext[key] = value; - correlationIds.set(key, value); + recordContext[key] = value + correlationIds.set(key, value) } record.scopeToThis = () => { if (!oldContext) { - oldContext = correlationIds.get(); - correlationIds.replaceAllWith(recordContext); + oldContext = correlationIds.get() + correlationIds.replaceAllWith(recordContext) } if (debugLog) { - debugLogRollback = log.enableDebug(); + debugLogRollback = log.enableDebug() } - }; + } record.unscope = () => { if (oldContext) { - correlationIds.replaceAllWith(oldContext); + correlationIds.replaceAllWith(oldContext) } if (debugLogRollback) { - debugLogRollback(); + debugLogRollback() } } - return record; - }); + return record + }) - context.parsedKinesisEvents = events; + context.parsedKinesisEvents = events - correlationIds.replaceAllWith({ awsRequestId }); + correlationIds.replaceAllWith({ awsRequestId }) } function captureSns(records, awsRequestId, sampleDebugLogRate) { - let context = { awsRequestId }; + let context = { awsRequestId } + + const snsRecord = records[0].Sns + const msgAttributes = snsRecord.MessageAttributes - const snsRecord = records[0].Sns; - const msgAttributes = snsRecord.MessageAttributes; - for (var msgAttribute in msgAttributes) { if (msgAttribute.toLowerCase().startsWith('x-correlation-')) { - context[msgAttribute] = msgAttributes[msgAttribute].Value; + context[msgAttribute] = msgAttributes[msgAttribute].Value } if (msgAttribute === 'User-Agent') { - context['User-Agent'] = msgAttributes['User-Agent'].Value; + context['User-Agent'] = msgAttributes['User-Agent'].Value } if (msgAttribute === 'Debug-Log-Enabled') { - context['Debug-Log-Enabled'] = msgAttributes['Debug-Log-Enabled'].Value; + context['Debug-Log-Enabled'] = msgAttributes['Debug-Log-Enabled'].Value } } - + if (!context['x-correlation-id']) { - context['x-correlation-id'] = awsRequestId; + context['x-correlation-id'] = awsRequestId } if (!context['Debug-Log-Enabled']) { - context['Debug-Log-Enabled'] = Math.random() < sampleDebugLogRate ? 'true' : 'false'; + context['Debug-Log-Enabled'] = Math.random() < sampleDebugLogRate ? 'true' : 'false' } - correlationIds.replaceAllWith(context); + correlationIds.replaceAllWith(context) } function isApiGatewayEvent(event) { @@ -141,44 +139,44 @@ function isApiGatewayEvent(event) { function isKinesisEvent(event) { if (!event.hasOwnProperty('Records')) { - return false; + return false } - + if (!Array.isArray(event.Records)) { - return false; + return false } - return event.Records[0].eventSource === 'aws:kinesis'; + return event.Records[0].eventSource === 'aws:kinesis' } function isSnsEvent(event) { if (!event.hasOwnProperty('Records')) { - return false; + return false } - + if (!Array.isArray(event.Records)) { - return false; + return false } - return event.Records[0].EventSource === 'aws:sns'; + return event.Records[0].EventSource === 'aws:sns' } module.exports = (config) => { - const sampleDebugLogRate = config.sampleDebugLogRate || 0.01; + const sampleDebugLogRate = config.sampleDebugLogRate || 0.01 return { - before: (handler, next) => { - correlationIds.clearAll(); + before: (handler, next) => { + correlationIds.clearAll() if (isApiGatewayEvent(handler.event)) { - captureHttp(handler.event.headers, handler.context.awsRequestId, sampleDebugLogRate); + captureHttp(handler.event.headers, handler.context.awsRequestId, sampleDebugLogRate) } else if (isKinesisEvent(handler.event)) { - captureKinesis(handler.event, handler.context, sampleDebugLogRate); + captureKinesis(handler.event, handler.context, sampleDebugLogRate) } else if (isSnsEvent(handler.event)) { - captureSns(handler.event.Records, handler.context.awsRequestId, sampleDebugLogRate); + captureSns(handler.event.Records, handler.context.awsRequestId, sampleDebugLogRate) } next() } - }; -}; \ No newline at end of file + } +} diff --git a/middleware/flush-metrics.js b/middleware/flush-metrics.js index fcd36c5..789c47a 100644 --- a/middleware/flush-metrics.js +++ b/middleware/flush-metrics.js @@ -1,13 +1,11 @@ -'use strict'; - -const log = require('../lib/log'); -const cloudwatch = require('../lib/cloudwatch'); +const log = require('../lib/log') +const cloudwatch = require('../lib/cloudwatch') module.exports = { after: (handler, next) => { - cloudwatch.flush().then(_ => next()); + cloudwatch.flush().then(_ => next()) }, onError: (handler, next) => { - cloudwatch.flush().then(_ => next(handler.error)); + cloudwatch.flush().then(_ => next(handler.error)) } -}; \ No newline at end of file +} diff --git a/middleware/function-shield.js b/middleware/function-shield.js index adec064..3f0b1fd 100644 --- a/middleware/function-shield.js +++ b/middleware/function-shield.js @@ -1,20 +1,18 @@ -'use strict'; - -const FuncShield = require('@puresec/function-shield'); +const FuncShield = require('@puresec/function-shield') module.exports = () => { return { before: (handler, next) => { FuncShield.configure({ policy: { - outbound_connectivity: "block", - read_write_tmp: "block", - create_child_process: "block" + outbound_connectivity: 'block', + read_write_tmp: 'block', + create_child_process: 'block' }, token: process.env.FUNCTION_SHIELD_TOKEN - }); + }) - next(); + next() } - }; -}; \ No newline at end of file + } +} diff --git a/middleware/sample-logging.js b/middleware/sample-logging.js index c8a3a6c..2f60cee 100644 --- a/middleware/sample-logging.js +++ b/middleware/sample-logging.js @@ -1,42 +1,41 @@ -'use strict'; -const correlationIds = require('../lib/correlation-ids'); -const log = require('../lib/log'); +const correlationIds = require('../lib/correlation-ids') +const log = require('../lib/log') // config should be { sampleRate: double } where sampleRate is between 0.0-1.0 module.exports = (config) => { - let rollback = undefined; + let rollback const isDebugEnabled = () => { - const context = correlationIds.get(); + const context = correlationIds.get() if (context['Debug-Log-Enabled'] === 'true') { - return true; + return true } - return config.sampleRate && Math.random() <= config.sampleRate; + return config.sampleRate && Math.random() <= config.sampleRate } return { before: (handler, next) => { if (isDebugEnabled()) { - rollback = log.enableDebug(); + rollback = log.enableDebug() } - next(); + next() }, after: (handler, next) => { if (rollback) { - rollback(); + rollback() } - next(); + next() }, onError: (handler, next) => { - let awsRequestId = handler.context.awsRequestId; - let invocationEvent = JSON.stringify(handler.event); - log.error('invocation failed', { awsRequestId, invocationEvent }, handler.error); + let awsRequestId = handler.context.awsRequestId + let invocationEvent = JSON.stringify(handler.event) + log.error('invocation failed', { awsRequestId, invocationEvent }, handler.error) - next(handler.error); + next(handler.error) } - }; -}; \ No newline at end of file + } +} diff --git a/middleware/wrapper.js b/middleware/wrapper.js index 915b59a..afaee95 100644 --- a/middleware/wrapper.js +++ b/middleware/wrapper.js @@ -1,13 +1,11 @@ -'use strict'; - -const middy = require('middy'); -const sampleLogging = require('./sample-logging'); -const captureCorrelationIds = require('./capture-correlation-ids'); -const functionShield = require('./function-shield'); +const middy = require('middy') +const sampleLogging = require('./sample-logging') +const captureCorrelationIds = require('./capture-correlation-ids') +const functionShield = require('./function-shield') module.exports = f => { return middy(f) .use(captureCorrelationIds({ sampleDebugLogRate: 0.01 })) .use(sampleLogging({ sampleRate: 0.01 })) - .use(functionShield()); -}; \ No newline at end of file + .use(functionShield()) +} diff --git a/seed-restaurants.js b/seed-restaurants.js index 2c3976a..b8180d7 100644 --- a/seed-restaurants.js +++ b/seed-restaurants.js @@ -1,62 +1,62 @@ -'use strict'; +const AWS = require('aws-sdk') +AWS.config.region = 'us-east-1' +const dynamodb = new AWS.DynamoDB.DocumentClient() -const co = require('co'); -const AWS = require('aws-sdk'); -AWS.config.region = 'us-east-1'; -const dynamodb = new AWS.DynamoDB.DocumentClient(); +const restaurants = [ + { + name: 'Fangtasia', + image: 'https://d2qt42rcwzspd6.cloudfront.net/manning/fangtasia.png', + themes: ['true blood'] + }, + { + name: "Shoney's", + image: "https://d2qt42rcwzspd6.cloudfront.net/manning/shoney's.png", + themes: ['cartoon', 'rick and morty'] + }, + { + name: "Freddy's BBQ Joint", + image: "https://d2qt42rcwzspd6.cloudfront.net/manning/freddy's+bbq+joint.png", + themes: ['netflix', 'house of cards'] + }, + { + name: 'Pizza Planet', + image: 'https://d2qt42rcwzspd6.cloudfront.net/manning/pizza+planet.png', + themes: ['netflix', 'toy story'] + }, + { + name: 'Leaky Cauldron', + image: 'https://d2qt42rcwzspd6.cloudfront.net/manning/leaky+cauldron.png', + themes: ['movie', 'harry potter'] + }, + { + name: "Lil' Bits", + image: 'https://d2qt42rcwzspd6.cloudfront.net/manning/lil+bits.png', + themes: ['cartoon', 'rick and morty'] + }, + { + name: 'Fancy Eats', + image: 'https://d2qt42rcwzspd6.cloudfront.net/manning/fancy+eats.png', + themes: ['cartoon', 'rick and morty'] + }, + { + name: 'Don Cuco', + image: 'https://d2qt42rcwzspd6.cloudfront.net/manning/don%20cuco.png', + themes: ['cartoon', 'rick and morty'] + }, +] -let restaurants = [ - { - name: "Fangtasia", - image: "https://d2qt42rcwzspd6.cloudfront.net/manning/fangtasia.png", - themes: ["true blood"] - }, - { - name: "Shoney's", - image: "https://d2qt42rcwzspd6.cloudfront.net/manning/shoney's.png", - themes: ["cartoon", "rick and morty"] - }, - { - name: "Freddy's BBQ Joint", - image: "https://d2qt42rcwzspd6.cloudfront.net/manning/freddy's+bbq+joint.png", - themes: ["netflix", "house of cards"] - }, - { - name: "Pizza Planet", - image: "https://d2qt42rcwzspd6.cloudfront.net/manning/pizza+planet.png", - themes: ["netflix", "toy story"] - }, - { - name: "Leaky Cauldron", - image: "https://d2qt42rcwzspd6.cloudfront.net/manning/leaky+cauldron.png", - themes: ["movie", "harry potter"] - }, - { - name: "Lil' Bits", - image: "https://d2qt42rcwzspd6.cloudfront.net/manning/lil+bits.png", - themes: ["cartoon", "rick and morty"] - }, - { - name: "Fancy Eats", - image: "https://d2qt42rcwzspd6.cloudfront.net/manning/fancy+eats.png", - themes: ["cartoon", "rick and morty"] - }, - { - name: "Don Cuco", - image: "https://d2qt42rcwzspd6.cloudfront.net/manning/don%20cuco.png", - themes: ["cartoon", "rick and morty"] - }, -]; - -let putReqs = restaurants.map(x => ({ +const putReqs = restaurants.map(x => ({ PutRequest: { Item: x } -})); +})) -let req = { +const req = { RequestItems: { 'restaurants': putReqs } -}; -dynamodb.batchWrite(req).promise().then(() => console.log("all done")); \ No newline at end of file +} + +dynamodb.batchWrite(req).promise().then(() => { + console.log('all done') +})