Skip to content

Commit

Permalink
fix: log data and old data on fail to enqueue (#2183)
Browse files Browse the repository at this point in the history
  • Loading branch information
cabljac committed Sep 18, 2024
1 parent 3b22f83 commit 77d05e5
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 53 deletions.
4 changes: 4 additions & 0 deletions firestore-bigquery-export/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Version 0.1.55

feat - log failed queued tasks

## Version 0.1.54

fixed - bump changetracker and fix more vulnerabilities
Expand Down
2 changes: 2 additions & 0 deletions firestore-bigquery-export/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ To install an extension, your project must be on the [Blaze (pay as you go) plan

* Collection path: What is the path of the collection that you would like to export? You may use `{wildcard}` notation to match a subcollection of all documents in a collection (for example: `chatrooms/{chatid}/posts`). Parent Firestore Document IDs from `{wildcards}` can be returned in `path_params` as a JSON formatted string.

* Enable logging failed exports: If enabled, the extension will exports that failed to enqueue to the Firebase console, to prevent data loss.

* Enable Wildcard Column field with Parent Firestore Document IDs: If enabled, creates a column containing a JSON object of all wildcard ids from a documents path.

* Dataset ID: What ID would you like to use for your BigQuery dataset? This extension will create the dataset, if it doesn't already exist.
Expand Down
15 changes: 14 additions & 1 deletion firestore-bigquery-export/extension.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

name: firestore-bigquery-export
version: 0.1.54
version: 0.1.55
specVersion: v1beta

displayName: Stream Firestore to BigQuery
Expand Down Expand Up @@ -206,6 +206,19 @@ params:
default: posts
required: true

- param: LOG_FAILED_EXPORTS
label: Enable logging failed exports
description: >-
If enabled, the extension will exports that failed to enqueue to the
Firebase console, to prevent data loss.
type: select
options:
- label: Yes
value: yes
- label: No
value: no
required: true

- param: WILDCARD_IDS
label: Enable Wildcard Column field with Parent Firestore Document IDs
description: >-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Object {
"instanceId": undefined,
"kmsKeyName": "test",
"location": "us-central1",
"logFailedExportData": false,
"maxDispatchesPerSecond": 10,
"tableId": "my_table",
"timePartitioning": null,
Expand Down Expand Up @@ -74,4 +75,4 @@ Object {
",
"validationRegex": "^[a-zA-Z0-9_]+$",
}
`;
`;
1 change: 1 addition & 0 deletions firestore-bigquery-export/functions/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export function clustering(clusters: string | undefined) {
}

export default {
logFailedExportData: process.env.LOG_FAILED_EXPORTS === "yes",
bqProjectId: process.env.BIGQUERY_PROJECT_ID,
databaseId: "(default)",
collectionPath: process.env.COLLECTION_PATH,
Expand Down
121 changes: 72 additions & 49 deletions firestore-bigquery-export/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,27 @@ import * as logs from "./logs";
import * as events from "./events";
import { getChangeType, getDocumentId, resolveWildcardIds } from "./util";

const eventTrackerConfig = {
tableId: config.tableId,
datasetId: config.datasetId,
datasetLocation: config.datasetLocation,
backupTableId: config.backupCollectionId,
transformFunction: config.transformFunction,
timePartitioning: config.timePartitioning,
timePartitioningField: config.timePartitioningField,
timePartitioningFieldType: config.timePartitioningFieldType,
timePartitioningFirestoreField: config.timePartitioningFirestoreField,
databaseId: config.databaseId,
clustering: config.clustering,
wildcardIds: config.wildcardIds,
bqProjectId: config.bqProjectId,
useNewSnapshotQuerySyntax: config.useNewSnapshotQuerySyntax,
skipInit: true,
kmsKeyName: config.kmsKeyName,
};

const eventTracker: FirestoreEventHistoryTracker =
new FirestoreBigQueryEventHistoryTracker({
tableId: config.tableId,
datasetId: config.datasetId,
datasetLocation: config.datasetLocation,
backupTableId: config.backupCollectionId,
transformFunction: config.transformFunction,
timePartitioning: config.timePartitioning,
timePartitioningField: config.timePartitioningField,
timePartitioningFieldType: config.timePartitioningFieldType,
timePartitioningFirestoreField: config.timePartitioningFirestoreField,
databaseId: config.databaseId,
clustering: config.clustering,
wildcardIds: config.wildcardIds,
bqProjectId: config.bqProjectId,
useNewSnapshotQuerySyntax: config.useNewSnapshotQuerySyntax,
skipInit: true,
kmsKeyName: config.kmsKeyName,
});
new FirestoreBigQueryEventHistoryTracker(eventTrackerConfig);

logs.init();

Expand Down Expand Up @@ -97,60 +99,81 @@ export const fsexportbigquery = functions
.document(config.collectionPath)
.onWrite(async (change, context) => {
logs.start();
try {
const changeType = getChangeType(change);
const documentId = getDocumentId(change);
const changeType = getChangeType(change);
const documentId = getDocumentId(change);

const isCreated = changeType === ChangeType.CREATE;
const isDeleted = changeType === ChangeType.DELETE;

const isCreated = changeType === ChangeType.CREATE;
const isDeleted = changeType === ChangeType.DELETE;
const data = isDeleted ? undefined : change.after?.data();
const oldData =
isCreated || config.excludeOldData ? undefined : change.before?.data();

const data = isDeleted ? undefined : change.after.data();
const oldData =
isCreated || config.excludeOldData ? undefined : change.before.data();
/**
* Serialize early before queueing in cloud task
* Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum
*/
let serializedData: any;
let serializedOldData: any;

try {
serializedData = eventTracker.serializeData(data);
serializedOldData = eventTracker.serializeData(oldData);
} catch (err) {
logs.error(false, "Failed to serialize data", err, null, null);
throw err;
}

try {
await events.recordStartEvent({
documentId,
changeType,
before: {
data: change.before.data(),
},
after: {
data: change.after.data(),
},
before: { data: change.before.data() },
after: { data: change.after.data() },
context: context.resource,
});
} catch (err) {
logs.error(false, "Failed to record start event", err, null, null);
throw err;
}

try {
const queue = getFunctions().taskQueue(
`locations/${config.location}/functions/syncBigQuery`,
config.instanceId
);

/**
* enqueue data cannot currently handle documentdata
* Serialize early before queueing in clopud task
* Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum
*/
const seializedData = eventTracker.serializeData(data);
const serializedOldData = eventTracker.serializeData(oldData);

await queue.enqueue({
context,
changeType,
documentId,
data: seializedData,
data: serializedData,
oldData: serializedOldData,
});
} catch (err) {
await events.recordErrorEvent(err as Error);
logs.error(err);
const eventAgeMs = Date.now() - Date.parse(context.timestamp);
const eventMaxAgeMs = 10000;
const event = {
timestamp: context.timestamp, // This is a Cloud Firestore commit timestamp with microsecond precision.
operation: changeType,
documentName: context.resource.name,
documentId: documentId,
pathParams: config.wildcardIds ? context.params : null,
eventId: context.eventId,
data: serializedData,
oldData: serializedOldData,
};

if (eventAgeMs > eventMaxAgeMs) {
return;
await events.recordErrorEvent(err as Error);
// Only log the error once here
if (!err.logged) {
logs.error(
config.logFailedExportData,
"Failed to enqueue task to syncBigQuery",
err,
event,
eventTrackerConfig
);
}

throw err;
return;
}

logs.complete();
Expand Down
20 changes: 18 additions & 2 deletions firestore-bigquery-export/functions/src/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,24 @@ export const dataTypeInvalid = (
);
};

export const error = (err: Error) => {
logger.error("Error when mirroring data to BigQuery", err);
export const error = (
includeEvent: boolean,
message: string,
err: Error,
event: any,
eventTrackerConfig: any
) => {
if (includeEvent) {
logger.error(`Error when mirroring data to BigQuery: ${message}`, {
error: err,
event,
eventTrackerConfig,
});
} else {
logger.error(`Error when mirroring data to BigQuery: ${message}`, {
error: err,
});
}
};

export const init = () => {
Expand Down

0 comments on commit 77d05e5

Please sign in to comment.