|
| 1 | +/** |
| 2 | + * @typedef {Object} SegmentTrackEvent |
| 3 | + * @see {@link https://segment.com/docs/connections/spec/track/} for full track event spec |
| 4 | + * @see {@link https://segment.com/docs/connections/spec/common/} for full common fields spec |
| 5 | + * @property {String} event Track event name |
| 6 | + * @property {String} messageId Original API message identifier |
| 7 | + * @property {String} userId Tracked user id that we want to modify and re-track |
| 8 | + * @property {String} timestamp Timestamp of event in ISO 8601 format |
| 9 | + * @property {Object} properties Track event arbitrary properties, we want to add 3 own |
| 10 | + */ |
| 11 | + |
| 12 | +/** |
| 13 | + * @typedef {Object} FunctionSettings |
| 14 | + * @property {String} apiKey Write key of source we forward new user identifier and enriched track event to |
| 15 | + */ |
| 16 | + |
| 17 | +/** |
| 18 | + * Call Segment V1 API, using HTTP API Segment Source by api (write) key |
| 19 | + * @see {@link https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/} for source doc |
| 20 | + * @internal |
| 21 | + * @param {String} apiKey |
| 22 | + * @param {String<'alias'|'track'>} route |
| 23 | + * @param {Object} body |
| 24 | + * @return {Promise<void>} |
| 25 | + */ |
| 26 | +async function callSegmentAPI(apiKey, route, body) { |
| 27 | + let response; |
| 28 | + |
| 29 | + try { |
| 30 | + response = await fetch(`https://api.segment.io/v1/${route}`, { |
| 31 | + method: 'POST', |
| 32 | + headers: { |
| 33 | + Authorization: `Basic ${btoa(`${apiKey}:`)}`, |
| 34 | + 'Content-Type': 'application/json', |
| 35 | + }, |
| 36 | + body: JSON.stringify(body), |
| 37 | + }); |
| 38 | + } catch (error) { |
| 39 | + // Retry on connection error |
| 40 | + throw new RetryError(error.message); |
| 41 | + } |
| 42 | + |
| 43 | + if (response.status >= 500 || response.status === 429) { |
| 44 | + // Retry on 5xx (server errors) and 429s (rate limits) |
| 45 | + throw new RetryError(`Failed with ${response.status}`); |
| 46 | + } |
| 47 | +} |
| 48 | + |
| 49 | +/** |
| 50 | + * Handle track event |
| 51 | + * @param {SegmentTrackEvent} event |
| 52 | + * @param {FunctionSettings} settings |
| 53 | + * @return {Promise<void>} |
| 54 | + */ |
| 55 | +async function onTrack(event, settings) { |
| 56 | + const { apiKey } = settings; |
| 57 | + |
| 58 | + if (!apiKey) { |
| 59 | + // Settings not configured properly |
| 60 | + throw new ValidationError('Forward source write key is required'); |
| 61 | + } |
| 62 | + |
| 63 | + // Build anticipated user id, as MD5 hash of previous user id |
| 64 | + const anticipatedId = crypto.createHash('md5').update(event.userId).digest('hex'); |
| 65 | + |
| 66 | + // Update user identity from track event with anticipated one |
| 67 | + await callSegmentAPI(apiKey, 'alias', { |
| 68 | + previousId: event.userId, |
| 69 | + userId: anticipatedId, |
| 70 | + timestamp: event.timestamp, |
| 71 | + }); |
| 72 | + |
| 73 | + // Split previous event user id into 3 slugs that will enrich new event |
| 74 | + const [firstUserIdSlug, secondUserIdSlug, thirdUserIdSlug] = event.userId.split('-'); |
| 75 | + |
| 76 | + // Clone previous event into new, with anticipated user id and enriched properties |
| 77 | + const newEvent = { |
| 78 | + ...event, |
| 79 | + userId: anticipatedId, |
| 80 | + properties: { |
| 81 | + ...event.properties, |
| 82 | + firstUserIdSlug, |
| 83 | + secondUserIdSlug, |
| 84 | + thirdUserIdSlug, |
| 85 | + }, |
| 86 | + }; |
| 87 | + |
| 88 | + // Delete previous event's messageId from clone, so it can be reposted |
| 89 | + delete newEvent.messageId; |
| 90 | + |
| 91 | + // Repost event with enriched properties for the new anticipated identity |
| 92 | + await callSegmentAPI(apiKey, 'track', newEvent); |
| 93 | +} |
0 commit comments