Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: onboard algolia destination to cdk 2.0 #1474

Merged
merged 20 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7c3b474
feat: onboard algolia to cdk 2.0
saikumarrs Oct 4, 2022
c28234f
Merge remote-tracking branch 'origin/master' into feat.onboard-algoli…
saikumarrs Oct 11, 2022
ca2765f
fix: use the dependent modules from v0 destination
saikumarrs Oct 11, 2022
c55c097
feat: add router workflow
saikumarrs Oct 11, 2022
48cba61
feat: error handling added to router transform handler
saikumarrs Oct 11, 2022
69019f5
Merge branch 'master' into feat.onboard-algolia-to-cdk
saikumarrs Oct 11, 2022
4929584
Merge remote-tracking branch 'origin/master' into feat.onboard-algoli…
saikumarrs Oct 26, 2022
5513866
refactor: remove redundant code
saikumarrs Oct 26, 2022
d4919c0
refactor: router transform workflow
saikumarrs Oct 26, 2022
0b5ebd5
Merge branch 'master' into feat.onboard-algolia-to-cdk
koladilip Oct 26, 2022
f251b0e
Fix jsonata single element array issue
koladilip Oct 27, 2022
51aaccd
Fix versionedRouter.js
koladilip Oct 27, 2022
d164774
Merge branch 'master' into feat.onboard-algolia-to-cdk
saikumarrs Oct 27, 2022
257b3da
Fix versionedRouter to handle when RT dest is supported
koladilip Oct 27, 2022
957bf73
Merge branch 'feat.onboard-algolia-to-cdk' of github.com:rudderlabs/r…
koladilip Oct 27, 2022
2b02531
Add live compare test versionedRouter test files
koladilip Oct 27, 2022
75f0749
Fix PR comments
koladilip Oct 27, 2022
bb2e771
Upgrade rudder-workflow-engine
koladilip Oct 27, 2022
3bbcca5
Refactor handleCDKV2 to processCdkV2Workflow
koladilip Oct 28, 2022
101e14d
Merge branch 'master' into feat.onboard-algolia-to-cdk
koladilip Oct 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions __tests__/algolia-cdk.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
const fs = require("fs");
const path = require("path");
const { TRANSFORMER_METRIC } = require("../v0/util/constant");
const { getWorkflowEngine } = require("../cdk/v2/handler");

const integration = "algolia";
const name = "Algolia";

const procWorkflowEnginePromise = getWorkflowEngine(integration, TRANSFORMER_METRIC.ERROR_AT.PROC);

const inputDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_input.json`)
);
const outputDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_output.json`)
);
const inputData = JSON.parse(inputDataFile);
const expectedData = JSON.parse(outputDataFile);

describe(`${name} Tests`, () => {
describe("Processor Tests", () => {
inputData.forEach((input, index) => {
it(`${name} - payload: ${index}`, async () => {
const expected = expectedData[index];
try {
procWorkflowEngine = await procWorkflowEnginePromise;
const result = await procWorkflowEngine.execute(input);
expect(result.output).toEqual(expected);
} catch (error) {
// console.log(error);
expect(error.message).toEqual(expected.message);
}
});
});
});

const inputRouterDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_router_input.json`)
);
const outputRouterDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_router_output.json`)
);
const inputRouterData = JSON.parse(inputRouterDataFile);
const expectedRouterData = JSON.parse(outputRouterDataFile);

const rtWorkflowEnginePromise = getWorkflowEngine(integration, TRANSFORMER_METRIC.ERROR_AT.RT);
describe("Router Tests", () => {
inputRouterData.forEach((input, index) => {
it(`${name} - payload: ${index}`, async () => {
const expected = expectedRouterData[index];
try {
rtWorkflowEngine = await rtWorkflowEnginePromise;
const result = await rtWorkflowEngine.execute(input);
expect(result.output).toEqual(expected);
} catch (error) {
// console.log(error);
expect(error.message).toEqual(expected.message);
}
});
});
});
});
47 changes: 21 additions & 26 deletions __tests__/algolia.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,28 @@ inputData.forEach((input, index) => {
});
});

const batchInputDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_batch_input.json`)
const routerInputDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_router_input.json`)
);
const batchOutputDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_batch_output.json`)
const routerInputData = JSON.parse(routerInputDataFile);
const routerOutputDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_router_output.json`)
);
const routerOutputData = JSON.parse(routerOutputDataFile);

// const batchInputData = JSON.parse(batchInputDataFile);
// const batchExpectedData = JSON.parse(batchOutputDataFile);

// batchInputData.forEach((input, index) => {
// test(`${name} Batching ${index}`, () => {
// const output = transformer.batch(input);
// //console.log(output);
// expect(Array.isArray(output)).toEqual(true);
// expect(output.length).toEqual(batchExpectedData[index].length);
// output.forEach((input, indexInner) => {
// expect(output[indexInner]).toEqual(batchExpectedData[index][indexInner]);
// })
// });
// });

// Batching using routerTransform
test('Batching', async () => {
const batchInputData = JSON.parse(batchInputDataFile);
const batchExpectedData = JSON.parse(batchOutputDataFile);
const output = await transformer.processRouterDest(batchInputData);
expect(Array.isArray(output)).toEqual(true);
expect(output).toEqual(batchExpectedData);
describe('Router Tests', () => {
routerInputData.forEach((input, index) => {
it(`${name} Tests: payload - ${index}`, async () => {
let output, expected;
try {
output = await transformer.processRouterDest(input);
expected = routerOutputData[index]
} catch (error) {
output = error.message;
// console.log(output);
expected = routerOutputData[index].message;
}
expect(output).toEqual(expected);
});
});
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[
[
{
"message": {
"type": "track",
Expand Down Expand Up @@ -251,5 +252,8 @@
"IsProcessorEnabled": true
}
}
]
],
[],
{}
]

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[
[
{
"batchedRequest": {
"version": "1",
Expand Down Expand Up @@ -97,5 +98,14 @@
"IsProcessorEnabled": true
}
}
]
],
{
"message": "Invalid event array",
"statusCode": 400
},
{
"message": "Invalid event array",
"statusCode": 400
}
]

19 changes: 19 additions & 0 deletions cdk/v2/destinations/algolia/procWorkflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
bindings:
- name: EventType
path: ../../../../constants

steps:
- name: validateInput
template: |
(
$assert($exists(message.type), "message Type is not present. Aborting message.");
$assert($exists(destination.Config.apiKey), "Invalid Api Key");
$assert($exists(destination.Config.applicationId), "Invalid Application Id");
$assert(message.type in [$EventType.TRACK],
"message type " & message.type & " not supported")
)
- name: trackEventWorkflow
condition: message.type = $EventType.TRACK
externalWorkflow:
path: ./trackEventWorkflow.yaml

68 changes: 68 additions & 0 deletions cdk/v2/destinations/algolia/rtWorkflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
bindings:
- path: ../../../../v0/destinations/algolia/config

steps:
- name: validateInput
template: |
(
$assert($type($) = "array" and $count($) > 0, "Invalid event array")
)

- name: prepareContext
template:
$setContext("batchMode", true)

- name: transform
externalWorkflow:
path: ./procWorkflow.yaml
loopOverInput: true

- name: successfulEvents
template: |
$outputs.transform#$i.output.{
"output": $,
/* $$ refers to input (root of the document) */
"destination": $$[$i].destination,
"metadata": $$[$i].metadata
}
- name: failedEvents
template: |
$outputs.transform#$i.error.{
"metadata": $toArray($$[$i].metadata),
"batched": false,
"statusCode": status,
"error": message
}
- name: batchSuccessfulEvents
description: Batches the successfulEvents
template: |
(
$batches := $chunk($outputs.successfulEvents, $MAX_BATCH_SIZE);
$batches.{
"batchedRequest": {
"body": {
"JSON": {"events": output},
"JSON_ARRAY": {},
"XML": {},
"FORM": {}
},
"version": "1",
"type": "REST",
"method": "POST",
"endpoint": $ENDPOINT,
"headers": $[0].destination.Config.{
"X-Algolia-Application-Id": applicationId,
"X-Algolia-API-Key": apiKey
},
"params": {},
"files": {}
},
"metadata": metadata,
"batched": true,
"statusCode": 200,
"destination": $[0].destination
} ~> $toArray;
)
- name: finalPayload
template: |
$outputs.($append(batchSuccessfulEvents, failedEvents))
86 changes: 86 additions & 0 deletions cdk/v2/destinations/algolia/trackEventWorkflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
bindings:
- path: ../../../../v0/destinations/algolia/util
- path: ../../../../v0/destinations/algolia/config
- path: ../../../../v0/util

steps:
- name: validateInput
template: |
(
$assert($exists(message.event), "event is required for track call");
)

- name: preparePayload
template: |
(
$eventTypeMap := $eventTypeMapping(destination.Config);
$event := message.event.$trim().$lowercase();
$eventType := $getValueFromMessage(message, "properties.eventType") ? $getValueFromMessage(message, "properties.eventType") : $lookup($eventTypeMap, $event);
$assert($exists($eventType), "eventType is mandatory for track call");
$setContext("payload", $constructPayload(message, $trackMapping));
$newPayload := $merge([$context.payload, { "eventName": $event, "eventType": $eventType }]);
$setContext("payload", $newPayload);
$setContext("payload", $genericpayloadValidator($context.payload));
)

- name: populateProductsData
condition: $context.payload.eventName in ["product list viewed", "order completed"] and $exists(message.properties.products)
steps:
- name: populateForClickEvent
condition: $context.payload.eventType = "click"
template: |
(
$validProducts := message.properties.products[$exists(objectId) and $exists(position)]#$i[$i<20];
$objectAndPositionIds := {
"objectIDs": $validProducts.objectId,
"positions": $validProducts.position
};
$objLen := $count($validProducts.objectId);
$posLen := $count($validProducts.position);
$newPayload := $merge([$context.payload, $objectAndPositionIds]);
$setContext("payload", $clickPayloadValidator($newPayload))
)
- name: populateForOtherEvents
condition: $context.payload.eventType != "click"
template: |
(
$objectIDs := {
"objectIDs": message.properties.products.objectId#$i[$i<20]
};
$setContext("payload", $merge([$context.payload, $objectIDs]));
)

- name: validateDestPayload
template: |
(
$assert($context.payload.($not(filters and objectIDs)), "event can't have both objectIds and filters at the same time.");
$assert($context.payload.(filters or objectIDs), "Either filters or objectIds is required.");
)

- name: prepareResponsePayloadForBatch
condition: $exists($context.batchMode)
template: |
$context.payload
onComplete: return

- name: prepareResponsePayload
condition: $not($exists($context.batchMode))
template: |
{
"body": {
"JSON": {"events": [$context.payload]},
"JSON_ARRAY": {},
"XML": {},
"FORM": {}
},
"version": "1",
"type": "REST",
"method": "POST",
"endpoint": $ENDPOINT,
"headers": {
"X-Algolia-Application-Id": destination.Config.applicationId,
"X-Algolia-API-Key": destination.Config.apiKey
},
"params": {},
"files": {}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
bindings:
- name: EventType
path: ../../../constants
path: ../../../../constants
- name: SourceKeys
path: ../../../v0/util/data/GenericFieldMapping.json
path: ../../../../v0/util/data/GenericFieldMapping.json
exportAll: true
- path: ../../../v0/destinations/pinterest_tag/utils
- path: ../../../../v0/destinations/pinterest_tag/utils
- name: ENDPOINT
path: ../../../v0/destinations/pinterest_tag/config
path: ../../../../v0/destinations/pinterest_tag/config
steps:
- name: validateInput
template: |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
bindings:
- path: ../../../v0/destinations/pinterest_tag/config
- path: ../../../../v0/destinations/pinterest_tag/config
steps:
- name: prepareContext
template: $setContext("batchMode", true)
Expand Down
2 changes: 1 addition & 1 deletion cdk/v2/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async function getWorkflowPath(
function getRootPathForDestination(destName) {
// TODO: Resolve the CDK v2 destination directory
// path from the root directory
return path.join(CDK_V2_ROOT_DIR, destName);
return path.join(CDK_V2_ROOT_DIR, "destinations", destName);
}

async function getPlatformBindingsPaths() {
Expand Down
5 changes: 2 additions & 3 deletions v0/destinations/algolia/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ const process = event => {
};

const processRouterDest = async inputs => {
const errorRespEvents = checkInvalidRtTfEvents(inputs, "ALGOLIA");
if (errorRespEvents.length > 0) {
return errorRespEvents;
if (!Array.isArray(inputs) || inputs.length === 0) {
throw new CustomError("Invalid event array", 400);
}

const inputChunks = returnArrayOfSubarrays(inputs, MAX_BATCH_SIZE);
Expand Down
Loading