Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(BQStream): add batch support #1377

Merged
merged 19 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions features.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"LAMBDA": true,
"WOOTRIC": true,
"GOOGLE_CLOUD_FUNCTION": true,
"BQSTREAM": true,
"CLICKUP": true,
"FRESHMARKETER": true,
"FRESHSALES": true,
Expand Down
7 changes: 7 additions & 0 deletions src/v0/destinations/bqstream/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Ref - https://cloud.google.com/bigquery/quotas#streaming_inserts
const MAX_ROWS_PER_REQUEST = 500;

module.exports = {
MAX_ROWS_PER_REQUEST,
DESTINATION: "BQSTREAM"
};
100 changes: 98 additions & 2 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
/* eslint-disable no-console */
const _ = require('lodash');
const { EventType } = require('../../../constants');
const {
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
} = require('../../util');
const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');

const getInsertIdColValue = (properties, insertIdCol) => {
Expand All @@ -13,7 +21,7 @@ const getInsertIdColValue = (properties, insertIdCol) => {
return null;
};

const process = async (event) => {
const process = (event) => {
const { message } = event;
const { properties, type } = message;
// EventType validation
Expand All @@ -33,6 +41,7 @@ const process = async (event) => {
if (propInsertId) {
props.insertId = propInsertId;
}

return {
datasetId,
tableId,
Expand All @@ -41,4 +50,91 @@ const process = async (event) => {
};
};

module.exports = { process };
const batchEvents = (eventsChunk) => {
const batchedResponseList = [];

// arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...]
const arrayChunks = _.chunk(eventsChunk, MAX_ROWS_PER_REQUEST);

// list of chunks [ [..], [..] ]
arrayChunks.forEach((chunk) => {
const batchResponseList = [];
const metadata = [];

let batchEventResponse = defaultBatchRequestConfig();
const { message, destination } = chunk[0];

// Batch event into dest batch structure
chunk.forEach((ev) => {
// Pixel code must be added above "batch": [..]
batchResponseList.push(ev.message.properties);
metadata.push(ev.metadata);
});

batchEventResponse.batchedRequest = {
datasetId: message.datasetId,
tableId: message.tableId,
projectId: message.projectId,
properties: batchResponseList,
};

batchEventResponse = {
...batchEventResponse,
metadata,
destination,
};

batchedResponseList.push(
getSuccessRespEvents(
batchEventResponse.batchedRequest,
batchEventResponse.metadata,
batchEventResponse.destination,
true,
),
);
});

return batchedResponseList;
};

const processRouterDest = (inputs) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = []; // temporary variable to divide payload into chunks
const errorRespList = [];
Promise.all(
inputs.map(async (event) => {
akashrpo marked this conversation as resolved.
Show resolved Hide resolved
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
errorRespList.push(errRespEvent);
}
}),
);

let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
return [...batchedResponseList, ...errorRespList];
};

module.exports = { process, processRouterDest };
40 changes: 28 additions & 12 deletions test/__tests__/bqstream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,44 @@ const version = "v0";

const transformer = require(`../../src/${version}/destinations/${integration}/transform`);

//for router test
const inputRouterDataFile = fs.readFileSync(
// Processor
const inputDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_input.json`)
);
const outputRouterDataFile = fs.readFileSync(
const outputDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_output.json`)
);
const inputProcData = JSON.parse(inputRouterDataFile);
const expectedProcData = JSON.parse(outputRouterDataFile);
const inputData = JSON.parse(inputDataFile);
const expectedData = JSON.parse(outputDataFile);

// Router Test files
const inputRouterDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_router_input.json`)
);
const outputRouterDataFile = fs.readFileSync(
path.resolve(__dirname, `./data/${integration}_router_output.json`)
);
const inputRouterData = JSON.parse(inputRouterDataFile);
const expectedRouterData = JSON.parse(outputRouterDataFile);

describe(`${name} Tests`, () => {
describe("Router Tests", () => {
it("Payload", async () => {
inputProcData.forEach(async (input, ind) => {
describe("Processor", () => {
inputData.forEach(async (input, index) => {
it(`Payload - ${index}`, async () => {
try {
const routerOutput = await transformer.process(input);
expect(routerOutput).toEqual(expectedProcData[ind]);
const output = await transformer.process(input);
expect(output).toEqual(expectedData[index]);
} catch (error) {
console.error(error);
expect(routerOutput).toEqual(error);
expect(error.message).toEqual(expectedData[index].error);
}
});
});
});

describe("Router Tests", () => {
it("Payload", async () => {
const routerOutput = await transformer.processRouterDest(inputRouterData);
expect(routerOutput).toEqual(expectedRouterData);
});
});
});
7 changes: 6 additions & 1 deletion test/__tests__/data/bqstream_output.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
{
"datasetId": "gc_dataset",
"projectId": "gc-project-id",
"properties": { "count": 25, "productId": 6, "productName": "Product-5", "insertId": "6" },
"properties": {
"count": 25,
"productId": 6,
"productName": "Product-5",
"insertId": "6"
},
"tableId": "gc_table"
}
]
160 changes: 160 additions & 0 deletions test/__tests__/data/bqstream_router_input.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
[
{
"message": {
"type": "track",
"event": "insert product",
"sentAt": "2021-09-08T11:10:45.466Z",
"userId": "user12345",
"channel": "web",
"context": {
"os": {
"name": "",
"version": ""
},
"app": {
"name": "RudderLabs JavaScript SDK",
"build": "1.0.0",
"version": "1.1.18",
"namespace": "com.rudderlabs.javascript"
},
"page": {
"url": "http://127.0.0.1:5500/index.html",
"path": "/index.html",
"title": "Document",
"search": "",
"tab_url": "http://127.0.0.1:5500/index.html",
"referrer": "$direct",
"initial_referrer": "$direct",
"referring_domain": "",
"initial_referring_domain": ""
},
"locale": "en-GB",
"screen": {
"width": 1536,
"height": 960,
"density": 2,
"innerWidth": 1536,
"innerHeight": 776
},
"traits": {},
"library": {
"name": "RudderLabs JavaScript SDK",
"version": "1.1.18"
},
"campaign": {},
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36"
},
"rudderId": "fa2994a5-2a81-45fd-9919-fcf5596ad380",
"messageId": "e2d1a383-d9a2-4e03-a9dc-131d153c4d95",
"timestamp": "2021-11-15T14:06:42.497+05:30",
"properties": {
"count": 10,
"productId": 10,
"productName": "Product-10"
},
"receivedAt": "2021-11-15T14:06:42.497+05:30",
"request_ip": "[::1]",
"anonymousId": "d8b2ed61-7fa5-4ef8-bd92-6a506157c0cf",
"integrations": {
"All": true
},
"originalTimestamp": "2021-09-08T11:10:45.466Z"
},
"metadata": {
"jobId": 1
},
"destination": {
"Config": {
"rudderAccountId": "1z8LpaSAuFR9TPWL6fECZfjmRa-",
"projectId": "gc-project-id",
"datasetId": "gc_dataset",
"tableId": "gc_table",
"insertId": "productId",
"eventDelivery": true,
"eventDeliveryTS": 1636965406397
},
"Enabled": true,
"ID": "1WXjIHpu7ETXgjfiGPW3kCUgZFR",
"Name": "bqstream test"
}
},
{
"message": {
"type": "track",
"event": "insert product",
"sentAt": "2021-09-08T11:10:45.466Z",
"userId": "user12345",
"channel": "web",
"context": {
"os": {
"name": "",
"version": ""
},
"app": {
"name": "RudderLabs JavaScript SDK",
"build": "1.0.0",
"version": "1.1.18",
"namespace": "com.rudderlabs.javascript"
},
"page": {
"url": "http://127.0.0.1:5500/index.html",
"path": "/index.html",
"title": "Document",
"search": "",
"tab_url": "http://127.0.0.1:5500/index.html",
"referrer": "$direct",
"initial_referrer": "$direct",
"referring_domain": "",
"initial_referring_domain": ""
},
"locale": "en-GB",
"screen": {
"width": 1536,
"height": 960,
"density": 2,
"innerWidth": 1536,
"innerHeight": 776
},
"traits": {},
"library": {
"name": "RudderLabs JavaScript SDK",
"version": "1.1.18"
},
"campaign": {},
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36"
},
"rudderId": "fa2994a5-2a81-45fd-9919-fcf5596ad380",
"messageId": "e2d1a383-d9a2-4e03-a9dc-131d153c4d95",
"timestamp": "2021-11-15T14:06:42.497+05:30",
"properties": {
"count": 20,
"productId": 20,
"productName": "Product-20"
},
"receivedAt": "2021-11-15T14:06:42.497+05:30",
"request_ip": "[::1]",
"anonymousId": "d8b2ed61-7fa5-4ef8-bd92-6a506157c0cf",
"integrations": {
"All": true
},
"originalTimestamp": "2021-09-08T11:10:45.466Z"
},
"metadata": {
"jobId": 2
},
"destination": {
"Config": {
"rudderAccountId": "1z8LpaSAuFR9TPWL6fECZfjmRa-",
"projectId": "gc-project-id",
"datasetId": "gc_dataset",
"tableId": "gc_table",
"insertId": "productId",
"eventDelivery": true,
"eventDeliveryTS": 1636965406397
},
"Enabled": true,
"ID": "1WXjIHpu7ETXgjfiGPW3kCUgZFR",
"Name": "bqstream test"
}
}
]
Loading