Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Updated email analytics job to prioritize open events #20800

Merged
merged 7 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,30 @@ class EmailAnalyticsServiceWrapper {
});
}

async fetchLatest({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest started');
async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest opened events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatest({maxEvents});
const totalEvents = await this.service.fetchLatestOpenedEvents({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest opens)`);
return totalEvents;
}

async fetchLatestNonOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest non-opened events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatestNonOpenedEvents({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest)`);
return totalEvents;
}

async fetchMissing({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch missing started');
logging.info('[EmailAnalytics] Fetch missing events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchMissing({maxEvents});
Expand All @@ -83,7 +94,7 @@ class EmailAnalyticsServiceWrapper {
if (maxEvents < 300) {
return 0;
}
logging.info('[EmailAnalytics] Fetch scheduled started');
logging.info('[EmailAnalytics] Fetch scheduled events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchScheduled({maxEvents});
Expand All @@ -100,13 +111,31 @@ class EmailAnalyticsServiceWrapper {
}
this.fetching = true;

// NOTE: Data shows we can process ~7500 events per minute on Pro; this can vary locally
try {
const c1 = await this.fetchLatest({maxEvents: Infinity});
const c2 = await this.fetchMissing({maxEvents: Infinity});
// Prioritize opens since they are the most important (only data directly displayed to users)
await this.fetchLatestOpenedEvents({maxEvents: Infinity});

// Set limits on how much we fetch without checkings for opened events. During surge events (following newsletter send)
// we want to make sure we don't spend too much time collecting delivery data.
const c1 = await this.fetchLatestNonOpenedEvents({maxEvents: 20000});
if (c1 > 15000) {
this.fetching = false;
logging.info('[EmailAnalytics] Restarting fetch due to high event count');
this.startFetch();
return;
}
const c2 = await this.fetchMissing({maxEvents: 20000});
if ((c1 + c2) > 15000) {
this.fetching = false;
logging.info('[EmailAnalytics] Restarting fetch due to high event count');
this.startFetch();
return;
}

// Only fetch scheduled if we didn't fetch a lot of normal events
await this.fetchScheduled({maxEvents: 20000 - c1 - c2});

this.fetching = false;
} catch (e) {
logging.error(e, 'Error while fetching email analytics');
Expand Down
27 changes: 16 additions & 11 deletions ghost/core/core/server/services/email-analytics/lib/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,35 @@ module.exports = {
return emailCount && emailCount.count > 0;
},

async getLastSeenEventTimestamp() {
/**
* Retrieves the timestamp of the last seen event for the specified email analytics events.
* @param {string[]} events - The email analytics events to consider (default: ['delivered', 'opened', 'failed']).
* @returns {Promise<Date|null>} The timestamp of the last seen event, or null if no events are found.
*/
async getLastEventTimestamp(events = ['delivered', 'opened', 'failed']) {
const startDate = new Date();

// three separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
let {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
let {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
let {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};

if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) {
// separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
let {maxOpenedAt} = events.includes('opened') ? await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() : null;
let {maxDeliveredAt} = events.includes('delivered') ? await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() : null;
let {maxFailedAt} = events.includes('failed') ? await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() : null;
if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxDeliveredAt = new Date(maxDeliveredAt);
maxOpenedAt = new Date(maxOpenedAt);
}

if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxOpenedAt = new Date(maxOpenedAt);
maxDeliveredAt = new Date(maxDeliveredAt);
}

if (maxFailedAt && !(maxFailedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxFailedAt = new Date(maxFailedAt);
}

const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);

return lastSeenEventTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('EmailEventStorage', function () {
before(async function () {
// Stub queries before boot
const queries = require('../../../../core/server/services/email-analytics/lib/queries');
sinon.stub(queries, 'getLastSeenEventTimestamp').callsFake(async function () {
sinon.stub(queries, 'getLastEventTimestamp').callsFake(async function () {
// This is required because otherwise the last event timestamp will be now, and that is too close to NOW to start fetching new events
return new Date(2000, 0, 1);
});
Expand Down Expand Up @@ -78,7 +78,7 @@ describe('EmailEventStorage', function () {

// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestNonOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -125,7 +125,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('delivered_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestNonOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -170,7 +170,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('opened_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -250,7 +250,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('delivered_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -346,7 +346,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('delivered_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -439,7 +439,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('failed_at'), null, 'This test requires a failed email recipient');

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -529,7 +529,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('failed_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -645,7 +645,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('failed_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -747,7 +747,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -849,7 +849,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -951,7 +951,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -1015,7 +1015,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -1074,7 +1074,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -1118,7 +1118,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 1);
});

Expand All @@ -1132,7 +1132,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatest();
const result = await emailAnalytics.fetchLatestOpenedEvents();
assert.equal(result, 0);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];

await emailAnalytics.fetchLatest();
await emailAnalytics.fetchLatestOpenedEvents();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down Expand Up @@ -72,7 +72,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];

await emailAnalytics.fetchLatest();
await emailAnalytics.fetchLatestOpenedEvents();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down Expand Up @@ -100,7 +100,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];

await emailAnalytics.fetchLatest();
await emailAnalytics.fetchLatestOpenedEvents();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down Expand Up @@ -128,7 +128,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];

await emailAnalytics.fetchLatest();
await emailAnalytics.fetchLatestOpenedEvents();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down Expand Up @@ -163,7 +163,7 @@ describe('MailgunEmailSuppressionList', function () {
timestamp: Math.round(timestamp.getTime() / 1000)
}];

await emailAnalytics.fetchLatest();
await emailAnalytics.fetchLatestOpenedEvents();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const MailgunClient = require('@tryghost/mailgun-client');

const EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained';
const DEFAULT_EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained';
const PAGE_LIMIT = 300;
const DEFAULT_TAGS = ['bulk-email'];

Expand All @@ -26,11 +26,12 @@ class EmailAnalyticsProviderMailgun {
* @param {Number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
* @param {Date} [options.begin]
* @param {Date} [options.end]
* @param {String[]} [options.events]
*/
fetchLatest(batchHandler, options) {
const mailgunOptions = {
limit: PAGE_LIMIT,
event: EVENT_FILTER,
event: options?.events ? options.events.join(' OR ') : DEFAULT_EVENT_FILTER,
tags: this.tags.join(' AND '),
begin: options.begin ? options.begin.getTime() / 1000 : undefined,
end: options.end ? options.end.getTime() / 1000 : undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,28 @@ describe('EmailAnalyticsProviderMailgun', function () {
tags: 'bulk-email AND custom-tag'
}, batchHandler, {maxEvents: undefined});
});

it('uses provided events when supplied', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
apiKey: 'apiKey',
domain: 'domain.com',
baseUrl: 'https://api.mailgun.net/v3'
}
});
const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings});

const batchHandler = sinon.spy();
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);

await mailgunProvider.fetchLatest(batchHandler, {events: ['delivered'], begin: LATEST_TIMESTAMP});

sinon.assert.calledWithExactly(mailgunFetchEventsStub, {
...MAILGUN_OPTIONS,
event: 'delivered',
tags: 'bulk-email'
}, batchHandler, {maxEvents: undefined});
});
});
});
Loading
Loading