Skip to content

Commit

Permalink
Revert "✨ Updated email analytics job to prioritize open events (#20800
Browse files Browse the repository at this point in the history
…)"

This reverts commit 4267ff9.
  • Loading branch information
9larsons committed Aug 27, 2024
1 parent d736b0f commit 51c8444
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 487 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,19 @@ class EmailAnalyticsServiceWrapper {
});
}

async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest opened events started');
async fetchLatest({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatestOpenedEvents({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest opens)`);
return totalEvents;
}

async fetchLatestNonOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest non-opened events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatestNonOpenedEvents({maxEvents});
const totalEvents = await this.service.fetchLatest({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest)`);
return totalEvents;
}

async fetchMissing({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch missing events started');
logging.info('[EmailAnalytics] Fetch missing started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchMissing({maxEvents});
Expand All @@ -94,7 +83,7 @@ class EmailAnalyticsServiceWrapper {
if (maxEvents < 300) {
return 0;
}
logging.info('[EmailAnalytics] Fetch scheduled events started');
logging.info('[EmailAnalytics] Fetch scheduled started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchScheduled({maxEvents});
Expand All @@ -111,31 +100,13 @@ class EmailAnalyticsServiceWrapper {
}
this.fetching = true;

// NOTE: Data shows we can process ~7500 events per minute on Pro; this can vary locally
try {
// Prioritize opens since they are the most important (only data directly displayed to users)
await this.fetchLatestOpenedEvents({maxEvents: Infinity});

// Set limits on how much we fetch without checkings for opened events. During surge events (following newsletter send)
// we want to make sure we don't spend too much time collecting delivery data.
const c1 = await this.fetchLatestNonOpenedEvents({maxEvents: 20000});
if (c1 > 15000) {
this.fetching = false;
logging.info('[EmailAnalytics] Restarting fetch due to high event count');
this.startFetch();
return;
}
const c2 = await this.fetchMissing({maxEvents: 20000});
if ((c1 + c2) > 15000) {
this.fetching = false;
logging.info('[EmailAnalytics] Restarting fetch due to high event count');
this.startFetch();
return;
}
const c1 = await this.fetchLatest({maxEvents: Infinity});
const c2 = await this.fetchMissing({maxEvents: Infinity});

// Only fetch scheduled if we didn't fetch a lot of normal events
await this.fetchScheduled({maxEvents: 20000 - c1 - c2});

this.fetching = false;
} catch (e) {
logging.error(e, 'Error while fetching email analytics');
Expand Down
27 changes: 11 additions & 16 deletions ghost/core/core/server/services/email-analytics/lib/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,30 @@ module.exports = {
return emailCount && emailCount.count > 0;
},

/**
* Retrieves the timestamp of the last seen event for the specified email analytics events.
* @param {string[]} events - The email analytics events to consider (default: ['delivered', 'opened', 'failed']).
* @returns {Promise<Date|null>} The timestamp of the last seen event, or null if no events are found.
*/
async getLastEventTimestamp(events = ['delivered', 'opened', 'failed']) {
async getLastSeenEventTimestamp() {
const startDate = new Date();

// separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
let {maxOpenedAt} = events.includes('opened') ? await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() : null;
let {maxDeliveredAt} = events.includes('delivered') ? await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() : null;
let {maxFailedAt} = events.includes('failed') ? await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() : null;

if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxOpenedAt = new Date(maxOpenedAt);
}
// three separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
let {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
let {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
let {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};

if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxDeliveredAt = new Date(maxDeliveredAt);
}

if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxOpenedAt = new Date(maxOpenedAt);
}

if (maxFailedAt && !(maxFailedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxFailedAt = new Date(maxFailedAt);
}

const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);

return lastSeenEventTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('EmailEventStorage', function () {
before(async function () {
// Stub queries before boot
const queries = require('../../../../core/server/services/email-analytics/lib/queries');
sinon.stub(queries, 'getLastEventTimestamp').callsFake(async function () {
sinon.stub(queries, 'getLastSeenEventTimestamp').callsFake(async function () {
// This is required because otherwise the last event timestamp will be now, and that is too close to NOW to start fetching new events
return new Date(2000, 0, 1);
});
Expand Down Expand Up @@ -78,7 +78,7 @@ describe('EmailEventStorage', function () {

// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const result = await emailAnalytics.fetchLatestNonOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -125,7 +125,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('delivered_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestNonOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -170,7 +170,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('opened_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -250,7 +250,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('delivered_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -346,7 +346,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('delivered_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -439,7 +439,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('failed_at'), null, 'This test requires a failed email recipient');

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -529,7 +529,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('failed_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -645,7 +645,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('failed_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -747,7 +747,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -849,7 +849,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -951,7 +951,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -1015,7 +1015,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -1074,7 +1074,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -1118,7 +1118,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);
});

Expand All @@ -1132,7 +1132,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 0);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];

await emailAnalytics.fetchLatestOpenedEvents();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down Expand Up @@ -72,7 +72,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];

await emailAnalytics.fetchLatestOpenedEvents();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down Expand Up @@ -100,7 +100,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];

await emailAnalytics.fetchLatestOpenedEvents();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down Expand Up @@ -128,7 +128,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];

await emailAnalytics.fetchLatestOpenedEvents();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down Expand Up @@ -163,7 +163,7 @@ describe('MailgunEmailSuppressionList', function () {
timestamp: Math.round(timestamp.getTime() / 1000)
}];

await emailAnalytics.fetchLatestOpenedEvents();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();

const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const MailgunClient = require('@tryghost/mailgun-client');

const DEFAULT_EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained';
const EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained';
const PAGE_LIMIT = 300;
const DEFAULT_TAGS = ['bulk-email'];

Expand All @@ -26,12 +26,11 @@ class EmailAnalyticsProviderMailgun {
* @param {Number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
* @param {Date} [options.begin]
* @param {Date} [options.end]
* @param {String[]} [options.events]
*/
fetchLatest(batchHandler, options) {
const mailgunOptions = {
limit: PAGE_LIMIT,
event: options?.events ? options.events.join(' OR ') : DEFAULT_EVENT_FILTER,
event: EVENT_FILTER,
tags: this.tags.join(' AND '),
begin: options.begin ? options.begin.getTime() / 1000 : undefined,
end: options.end ? options.end.getTime() / 1000 : undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,28 +155,5 @@ describe('EmailAnalyticsProviderMailgun', function () {
tags: 'bulk-email AND custom-tag'
}, batchHandler, {maxEvents: undefined});
});

it('uses provided events when supplied', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
apiKey: 'apiKey',
domain: 'domain.com',
baseUrl: 'https://api.mailgun.net/v3'
}
});
const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings});

const batchHandler = sinon.spy();
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);

await mailgunProvider.fetchLatest(batchHandler, {events: ['delivered'], begin: LATEST_TIMESTAMP});

sinon.assert.calledWithExactly(mailgunFetchEventsStub, {
...MAILGUN_OPTIONS,
event: 'delivered',
tags: 'bulk-email'
}, batchHandler, {maxEvents: undefined});
});
});
});
Loading

0 comments on commit 51c8444

Please sign in to comment.