Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Plugin Capabilities implementation #384

Merged
merged 25 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7c1fbc2
WIP: capabilities implementation
neilkakkar May 17, 2021
7804942
Merge branch 'master' of https://github.com/PostHog/plugin-server int…
neilkakkar May 17, 2021
bf03525
add tests for plugin capabilities
neilkakkar May 17, 2021
95670b5
address comments, update tests
neilkakkar May 18, 2021
a04a197
fix tests
neilkakkar May 18, 2021
7cf3b44
clean up
neilkakkar May 18, 2021
e5bd9d1
merge master
neilkakkar May 18, 2021
063a7e5
fix tests relevant to new types
neilkakkar May 18, 2021
8dcb16c
update capabilities type definition
neilkakkar May 18, 2021
1697073
update tests, only set capabilities if theyve changed
neilkakkar May 18, 2021
b2fd679
fix remaining tests
neilkakkar May 18, 2021
378f5bc
address comments
neilkakkar May 19, 2021
227aa19
merge master
neilkakkar May 19, 2021
a6837d5
more typing fixes
neilkakkar May 19, 2021
56e0bee
address comments
neilkakkar May 20, 2021
902db6e
merge master
neilkakkar May 20, 2021
f9fc935
Merge branch 'master' into capabilities
neilkakkar May 20, 2021
c72b680
Merge branch 'master' of https://github.com/PostHog/plugin-server int…
neilkakkar May 21, 2021
c1d4d95
address comments new tests
neilkakkar May 24, 2021
8d7f160
prettify
neilkakkar May 24, 2021
caf3cde
fix failing test
neilkakkar May 24, 2021
56fda74
Merge branch 'master' into capabilities
neilkakkar May 25, 2021
81bc013
Merge branch 'master' into capabilities
mariusandra May 25, 2021
a4ee3ab
merge master
neilkakkar May 25, 2021
f560a8f
few more server->hub
neilkakkar May 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ Q: Where is teamID populated? At event creation time? (in posthog/posthog? row.p

TODO

#### How VM Extensions Work

TODO

### End Notes

<a name="f1">1</a>: What are tasks? - TASKS_PER_WORKER - a Piscina setting (https://github.com/piscinajs/piscina#constructor-new-piscinaoptions) -> concurrentTasksPerWorker
Expand Down
20 changes: 13 additions & 7 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ export interface Plugin {
from_web?: boolean
created_at: string
updated_at: string
capabilities?: Record<string, any>
capabilities?: Capabilities
}

export interface Capabilities {
neilkakkar marked this conversation as resolved.
Show resolved Hide resolved
jobs?: string[]
scheduled_tasks?: string[]
methods?: string[]
}

export interface PluginConfig {
Expand Down Expand Up @@ -255,13 +261,13 @@ export type WorkerMethods = {
export interface PluginConfigVMReponse {
vm: VM
methods: {
setupPlugin: () => Promise<void>
teardownPlugin: () => Promise<void>
onEvent: (event: PluginEvent) => Promise<void>
onSnapshot: (event: PluginEvent) => Promise<void>
processEvent: (event: PluginEvent) => Promise<PluginEvent>
setupPlugin: (() => Promise<void>) | undefined
teardownPlugin: (() => Promise<void>) | undefined
onEvent: ((event: PluginEvent) => Promise<void>) | undefined
onSnapshot: ((event: PluginEvent) => Promise<void>) | undefined
processEvent: ((event: PluginEvent) => Promise<PluginEvent>) | undefined
neilkakkar marked this conversation as resolved.
Show resolved Hide resolved
// DEPRECATED
processEventBatch: (batch: PluginEvent[]) => Promise<PluginEvent[]>
processEventBatch: ((batch: PluginEvent[]) => Promise<PluginEvent[]>) | undefined
}
tasks: Record<PluginTaskType, Record<string, PluginTask>>
}
Expand Down
20 changes: 20 additions & 0 deletions src/utils/db/sql.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
Capabilities,
Plugin,
PluginAttachmentDB,
PluginConfig,
Expand Down Expand Up @@ -51,6 +52,25 @@ export async function getPluginConfigRows(server: PluginsServer): Promise<Plugin
return rows
}

export async function setPluginCapabilities(
server: PluginsServer,
pluginConfig: PluginConfig,
capabilities: Capabilities
): Promise<void> {
await server.db.postgresQuery(
'UPDATE posthog_plugin SET capabilities = ($1) WHERE id = $2',
[capabilities, pluginConfig.plugin_id],
'setPluginCapabilities'
)
await server.db.createPluginLogEntry(
pluginConfig,
PluginLogEntrySource.System,
PluginLogEntryType.Info,
`Set plugin capabilities (instance ID ${server.instanceId}).`,
server.instanceId
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the capabilities haven't changed, there's no need to update them nor log that we set them. That would be needless logspam :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't believe these logs are valuable user-side at all. Plugin loaded/unloaded events can be important because of plugin setup/teardown methods – while capabilities are just an internal optimization of ours.

}

export async function setError(
server: PluginsServer,
pluginError: PluginError | null,
Expand Down
60 changes: 58 additions & 2 deletions src/worker/plugins/loadPlugin.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import * as fs from 'fs'
import * as path from 'path'

import { PluginConfig, PluginJsonConfig, PluginsServer } from '../../types'
import { Capabilities, PluginConfig, PluginJsonConfig, PluginsServer } from '../../types'
import { processError } from '../../utils/db/error'
import { setPluginCapabilities } from '../../utils/db/sql'
import { getFileFromArchive, pluginDigest } from '../../utils/utils'

export async function loadPlugin(server: PluginsServer, pluginConfig: PluginConfig): Promise<boolean> {
export async function loadPlugin(
server: PluginsServer,
pluginConfig: PluginConfig,
prevConfig?: PluginConfig
): Promise<boolean> {
const { plugin } = pluginConfig

if (!plugin) {
Expand Down Expand Up @@ -53,6 +58,7 @@ export async function loadPlugin(server: PluginsServer, pluginConfig: PluginConf
indexJs,
`local ${pluginDigest(plugin)} from "${pluginPath}"!`
)
await inferPluginCapabilities(server, pluginConfig, prevConfig)
return true
} else if (plugin.archive) {
let config: PluginJsonConfig = {}
Expand All @@ -72,13 +78,15 @@ export async function loadPlugin(server: PluginsServer, pluginConfig: PluginConf

if (indexJs) {
void pluginConfig.vm?.initialize!(server, pluginConfig, indexJs, pluginDigest(plugin))
await inferPluginCapabilities(server, pluginConfig, prevConfig)
return true
} else {
pluginConfig.vm?.failInitialization!()
await processError(server, pluginConfig, `Could not load index.js for ${pluginDigest(plugin)}!`)
}
} else if (plugin.plugin_type === 'source' && plugin.source) {
void pluginConfig.vm?.initialize!(server, pluginConfig, plugin.source, pluginDigest(plugin))
await inferPluginCapabilities(server, pluginConfig, prevConfig)
return true
} else {
pluginConfig.vm?.failInitialization!()
Expand All @@ -94,3 +102,51 @@ export async function loadPlugin(server: PluginsServer, pluginConfig: PluginConf
}
return false
}

async function inferPluginCapabilities(server: PluginsServer, pluginConfig: PluginConfig, prevConfig?: PluginConfig) {
neilkakkar marked this conversation as resolved.
Show resolved Hide resolved
// infer on load implies there's no lazy loading, but all workers get
// these properties loaded
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could lazy load them. We won't do anything with the loaded capabilities for the moment, and eventually set them in a separate "install" step anyway. No need to block for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I could unblock them, but that still doesn't mean the VMs are lazy loaded right? since inferPluginCapabilities() will load the VM on startup, before the worker gets any event.

Maybe I should just remove this comment, just realised it's confusing w.r.t what is being lazy loaded.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The funny code here is this:

void pluginConfig.vm?.initialize!(server, pluginConfig, plugin.source, pluginDigest(plugin))
await inferPluginCapabilities(server, pluginConfig, prevConfig)

The first function is run in the background, the second one we await for. The second function awaits for a promise (resolveInternalVm) that gets set when the first function that's running in the background finishes.

In essence, the plugin is not lazy anymore, but we block until the VM setup has completed.

To get around it, we could just void the inferPluginCapabilities call and let them run in the background, however that also feels slightly hacky.

A possibly better idea is to put this inferPluginCapabilities inside the LazyVM.initialize function. There you probably won't need prevConfig either. Just check if the current pluginConfig.plugin.capabilities matches reality or not. If not, update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that.. makes a lot of sense. Ha, would've been nice if I made this leap myself when thinking about the implementation.

Come to think of it, I don't need the prevConfig in the current setup either, for similar reasons - I already have a pluginConfig.plugin.capabilities to check.

const vm = await pluginConfig.vm?.resolveInternalVm
const capabilities: Capabilities = { scheduled_tasks: [], jobs: [], methods: [] }
neilkakkar marked this conversation as resolved.
Show resolved Hide resolved

const tasks = vm?.tasks
const methods = vm?.methods

if (methods) {
for (const [key, value] of Object.entries(methods)) {
if (value !== undefined) {
neilkakkar marked this conversation as resolved.
Show resolved Hide resolved
capabilities.methods!.push(key)
mariusandra marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

if (tasks?.schedule) {
for (const [key, value] of Object.entries(tasks.schedule)) {
if (value) {
capabilities.scheduled_tasks!.push(key)
}
}
}

if (tasks?.job) {
for (const [key, value] of Object.entries(tasks.job)) {
if (value) {
capabilities.jobs!.push(key)
}
}
}

const prevCapabilities = prevConfig?.plugin?.capabilities
if (
prevCapabilities &&
capabilities.jobs?.sort().toString() == prevCapabilities.jobs?.sort().toString() &&
capabilities.scheduled_tasks?.sort().toString() == prevCapabilities.scheduled_tasks?.sort().toString() &&
capabilities.methods?.sort().toString() == prevCapabilities.methods?.sort().toString()
) {
neilkakkar marked this conversation as resolved.
Show resolved Hide resolved
// pass - no change in capabilities
} else {
await setPluginCapabilities(server, pluginConfig, capabilities)
}

pluginConfig.plugin!.capabilities = capabilities
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here again the ! could cause subtle issues. After all, we're not sure this function will always be called with pluginConfigs that contain a plugin. It might be the case when we follow the current trail that leads to this function, but it might not be the case in the future.

Often the solution is to use a guard, such as if (!pluginConfig.plugin) { throw new Error('how the hell did you get here?') }, or just return, depends on the case. Guards are best placed at the top of a function as well.

}
2 changes: 1 addition & 1 deletion src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export async function runProcessEvent(server: PluginsServer, event: PluginEvent)

try {
returnedEvent = (await processEvent(returnedEvent)) || null
if (returnedEvent.team_id != teamId) {
if (returnedEvent?.team_id != teamId) {
neilkakkar marked this conversation as resolved.
Show resolved Hide resolved
returnedEvent = null // don't try to ingest events with modified teamIDs
throw new Error('Illegal Operation: Plugin tried to change teamID')
}
Expand Down
2 changes: 1 addition & 1 deletion src/worker/plugins/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export async function setupPlugins(server: PluginsServer): Promise<void> {
pluginConfig.vm = prevConfig.vm
} else {
pluginConfig.vm = new LazyPluginVM()
pluginVMLoadPromises.push(loadPlugin(server, pluginConfig))
pluginVMLoadPromises.push(loadPlugin(server, pluginConfig, prevConfig))

if (prevConfig) {
void teardownPlugins(server, prevConfig)
Expand Down
10 changes: 9 additions & 1 deletion tests/helpers/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export const plugin60: Plugin = {
is_preinstalled: false,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
capabilities: {},
capabilities: {}, // inferred on setup
}

export const pluginAttachment1: PluginAttachmentDB = {
Expand Down Expand Up @@ -139,3 +139,11 @@ export function mockPluginTempFolder(indexJs: string, pluginJson?: string): [Plu
},
]
}

export const mockPluginSourceCode = (indexJs: string): Plugin => ({
...plugin60,
archive: null,
plugin_type: 'source',
url: undefined,
source: indexJs,
})
21 changes: 14 additions & 7 deletions tests/helpers/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,15 @@ async function insertRow(db: Pool, table: string, object: Record<string, any>):
const params = Object.keys(object)
.map((_, i) => `\$${i + 1}`)
.join(',')
const values = Object.values(object).map((value) => {
if (Array.isArray(value) && value.length > 0) {
return JSON.stringify(value)
}
return value
})

try {
await db.query(`INSERT INTO ${table} (${keys}) VALUES (${params})`, Object.values(object))
await db.query(`INSERT INTO ${table} (${keys}) VALUES (${params})`, values)
} catch (error) {
console.error(`Error on table ${table} when inserting object:\n`, object, '\n', error)
throw error
Expand Down Expand Up @@ -122,11 +129,11 @@ export async function createUserTeamAndOrganization(
organization_id: organizationId,
app_urls: [],
name: 'TEST PROJECT',
event_names: JSON.stringify([]),
event_names_with_usage: JSON.stringify([]),
event_properties: JSON.stringify([]),
event_properties_with_usage: JSON.stringify([]),
event_properties_numerical: JSON.stringify([]),
event_names: [],
event_names_with_usage: [],
event_properties: [],
event_properties_with_usage: [],
event_properties_numerical: [],
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
anonymize_ips: false,
Expand All @@ -140,7 +147,7 @@ export async function createUserTeamAndOrganization(
api_token: `THIS IS NOT A TOKEN FOR TEAM ${teamId}`,
test_account_filters: [],
timezone: 'UTC',
data_attributes: JSON.stringify(['data-attr']),
data_attributes: ['data-attr'],
})
}

Expand Down
3 changes: 3 additions & 0 deletions tests/helpers/sqlMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@ export const getPluginAttachmentRows = (s.getPluginAttachmentRows as unknown) as
export const getPluginConfigRows = (s.getPluginConfigRows as unknown) as jest.MockedFunction<
UnPromisify<typeof s.getPluginConfigRows>
>
export const setPluginCapabilities = (s.setPluginCapabilities as unknown) as jest.MockedFunction<
UnPromisify<typeof s.setPluginCapabilities>
>
export const setError = (s.setError as unknown) as jest.MockedFunction<UnPromisify<typeof s.setError>>
export const disablePlugin = (s.disablePlugin as unknown) as jest.MockedFunction<UnPromisify<void>>
Loading