Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0012: Event driven tinkerbell #17

Merged
merged 5 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions 0012/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
---
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of my comments are marked as resolved. I would request some reply on them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was unintentional actually. I was marking resolved those we have responded to and accidentally marked your new comments as well. 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np, i understand.

id: 0012
title: Event driven design for Tinkerbell
status: ideation
authors: Gaurav Gahlot <gauravgahlot0107@gmail.com>, Gianluca Arbezzano <gianarb92@gmail.com>
---

## Summary

In the current model, the whole tinkerbell stack works on a _request-response_ model.
Instead of waiting for explicit requests or responses, we want the components to watch for certain events and act accordingly.
gauravgahlot marked this conversation as resolved.
Show resolved Hide resolved
This introduces extensibility and allows users to plug-and-play with tink.

## Goals and no-Goals

Goal:

- Move Tinkerbell in the event-driven design direction.
- Start small and have a pluggable base ready.
- Be able to handle the `phone-home` with the new event-driven system.

No-Goal:

- Disrupt the current working environment.
- Update workers to receive actions via event driven model.

## Content

Not everything can be a Tink responsibility.
Events are a scalable way to build an extensible system.
This allows different components to tap-in to the event streams and leverage the extension points.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how this event stream would be different from pub-sub message model.

Copy link
Contributor

@gianarb gianarb Sep 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pub-Sub is fire and forget. This events are persisted and can be replayed (until a TTL is reached). Anyway, pub-sub is a pattern, stream is a communication technique I presume. So I can't really answer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, to understand better I was trying to create an analogy between event stream as a part of Event-driven pattern and pub-sub model and also, its difference with later.

Reading this file and comment in the link below, i think what you are trying to implement is Event sourcing https://martinfowler.com/eaaDev/EventSourcing.html pattern and not Event-driven pattern?

https://github.com/tinkerbell/proposals/pull/17/files/b71317a6ddca4227ec1b0394d99afcfcb7cd3083#r488554642

For example, [tinkerbell/portal](https://github.com/tinkerbell/portal/) can watch for workflow events and present them on the UI.
Events are good for troubleshooting purpose because they help to build context.
Feature requests we received that can be implemented using events are: phone home, ontimeout and so on
gauravgahlot marked this conversation as resolved.
Show resolved Hide resolved

### Current Model

In the current model, the tink-server is loaded with tons of responsibilities.
Responsibilities like - managing workflow state, handing overs actions to workers, logging the events, and others.
We would like to delegate these responsibilities to respective smaller services/components.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need some more explanation on components. Where will they live in tink architecture?
Secondly, how will these components send/receive the events from the grpc, will it be redirected by tink-server ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need some more explanation on components. Where will they live in tink architecture?

it is not part of this proposal. The separation of concerns won't change here. We are just creating a mechanism that enables third parties to implement their own logic outside of Tinkerbell.

There is no redirection, as you can see in the system graph, events are stored in Postgres and there is a couple of new gRPC request served by the grpc server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I would request some details about this delegate these responsibilities to respective smaller services/components , may be with an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianarb and I are working to add more examples and implementation details so that it's clear what we intend to do here.

Instead of waiting for an explicit request, these components will be watching for certain events and act accordingly.

We have a client for each resource and it has a bunch of commands (CRUD).

```
// client for Template resource
type TemplateClient interface {
CreateTemplate(ctx context.Context, in *WorkflowTemplate, opts ...grpc.CallOption) (*CreateResponse, error)
GetTemplate(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*WorkflowTemplate, error)
DeleteTemplate(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*Empty, error)
ListTemplates(ctx context.Context, in *Empty, opts ...grpc.CallOption) (Template_ListTemplatesClient, error)
UpdateTemplate(ctx context.Context, in *WorkflowTemplate, opts ...grpc.CallOption) (*Empty, error)
}

// client for Hardware resource
type HardwareServiceClient interface {
Push(ctx context.Context, in *PushRequest, opts ...grpc.CallOption) (*Empty, error)
ByMAC(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*Hardware, error)
ByIP(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*Hardware, error)
ByID(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*Hardware, error)
All(ctx context.Context, in *Empty, opts ...grpc.CallOption) (HardwareService_AllClient, error)
Watch(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (HardwareService_WatchClient, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*Empty, error)
}

// client for Workflow resource
type WorkflowSvcClient interface {
CreateWorkflow(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error)
GetWorkflow(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*Workflow, error)
DeleteWorkflow(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*Empty, error)
ListWorkflows(ctx context.Context, in *Empty, opts ...grpc.CallOption) (WorkflowSvc_ListWorkflowsClient, error)
GetWorkflowContext(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*WorkflowContext, error)
ShowWorkflowEvents(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (WorkflowSvc_ShowWorkflowEventsClient, error)
GetWorkflowContextList(ctx context.Context, in *WorkflowContextRequest, opts ...grpc.CallOption) (*WorkflowContextList, error)
GetWorkflowContexts(ctx context.Context, in *WorkflowContextRequest, opts ...grpc.CallOption) (WorkflowSvc_GetWorkflowContextsClient, error)
GetWorkflowActions(ctx context.Context, in *WorkflowActionsRequest, opts ...grpc.CallOption) (*WorkflowActionList, error)
ReportActionStatus(ctx context.Context, in *WorkflowActionStatus, opts ...grpc.CallOption) (*Empty, error)
GetWorkflowData(ctx context.Context, in *GetWorkflowDataRequest, opts ...grpc.CallOption) (*GetWorkflowDataResponse, error)
GetWorkflowMetadata(ctx context.Context, in *GetWorkflowDataRequest, opts ...grpc.CallOption) (*GetWorkflowDataResponse, error)
GetWorkflowDataVersion(ctx context.Context, in *GetWorkflowDataRequest, opts ...grpc.CallOption) (*GetWorkflowDataResponse, error)
UpdateWorkflowData(ctx context.Context, in *UpdateWorkflowDataRequest, opts ...grpc.CallOption) (*Empty, error)
}
```

### The New Model

The new model is inspired from Kubernetes.
The resource clients should have a `Watch` function that will stream events for that particular resource.

```
Watch(ctx context.Context, in EventWatchRequest)
```

## Implementation Details

### PostgreSQL Notifications
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't really make sense at this point in time. Since PG Notify isn't persisted then there's no replay ability. Which means that needs to be handled at a higher layer. So what exactly are we gaining from using NOTIFY/LISTEN? What we'd really need if we wanted to do it in the DB is something more like auditing would be needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this won't be the real implementation, yesterday evening I was thinking about the possibility to implement it differently.

We will persist events directly in a DB table from tink-server. Where they happen. With an insert. And I think we will use listeners only on the event table to get the new line and stream them out.

In the meantime we will look at the auditing feature as well. My direction at this stage is to be 70% focused on a good grpc layer, and the rest is "make it working". When it works we can figure out how to make it better. But probably the v1 will be as I wrote, at some point in the code a new event will be stored in the event table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense. The nice thing about the trigger based approach is that the DB takes care of populating the events table itself and we don't have to worry about sprinkling code in everywhere we want an event. I'd really consider doing it that way first actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Cbkhare I think this should answer your question. The events need to be persisted into a table. NOTIFYs are from there, new watchers would SELECT to go back in time and then LISTEN for new updates. Some care would be needed to make sure we don't race and drop some events between the two.


Tinkerbell uses PostgreSQL as the data store.
Postgres provides [notifications](https://www.postgresql.org/docs/10/sql-notify.html) which can be used to watch the changes in a table as they occur.
Here changes refer to Postgres events - INSERT, UPDATE, and DELETE.

NOTIFY provides a simple interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database.
A payload string can be sent along with the notification, and higher-level mechanisms for passing structured data can be built by using tables in the database to pass additional data from notifier to listener(s).

We have added workflow notifications as an example:

- [workflow-notifications.sql](workflow-notifications.sql): SQL script to setup notifications for workflow table
- [listener.go](listener.go) - a listener watching for the notifications
- [event.log](event.log) - log for INSERT, UPDATE and DELETE notifications

## The API

### Data Model

ResourceType - a resource that an event can be associated with

- Template
- Hardware
- Workflow

EventType - an event type in tinkerbell space(a non-exhaustive list)

- CREATED
- UPDATED
- DELETED
- WORKFLOW_STARTED
- WORKFLOW_INPROGRESS
- WORKFLOW_FAILED
- WORKFLOW_TIMEOUT

Event - an event in tinkerbell space; and has the following structure:

```
{
"id": "uuid",
"resourceID": "uuid",
"resourceType": "workflow",
"eventType": "created",
"time": "",
"data": {
}
}
```

- `id`: a unique identifier (UUID) for the event occured in tinkerbell space
- `resourceID`: a unique identifier (UUID) for the resource, the event is associated with.
For example, `resourceID` will be set to `workflowID` for workflow resource and to `templateID` for the template resource.
- `resouceType`: type of the resource (template, hardware, workflow) for which the event was generated
- `eventType`: the event verb; describing the action on resource that generated the event
- `time`: the event timestamp
- `data`: the primary event payload, represented as `interface{}`.
For a workflow created event, the payload (data) can be the complete workflow structure.
For an on-timeout event the data can be an action that needs to be executed next.

### EventClient

A new `EventClient` should be developed with the primitive required for the events at least `Watch`.
At the beginning, only a `Watch` function is required but we think a natural evolution will be to serve a function that can be used from other components to fire events.
Or maybe we can do a `Watch` and `Create` straight away.
All the resource clients will have a `Watch` function that will stream events for that particular resource.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After streaming the events what this Watch function (or the method which triggered this Watch), intends to do?

Copy link
Contributor

@gianarb gianarb Sep 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns something that waits and runs your code when an event gets fired.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this would be an async implementation instead of polling. I would request a brief idea on how this wait and run on event model would be implemented, ex how are you planning to keep a thread/process open/sleep/aync until an event is received or would it be similar to RMI or leverage some go library(some guess)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watch is going to be as simple as an infinite for loop running in a goroutine. All the events as they occur will be stored in the events table and will be streamed over gRPC and it will be up to the consumer how they want to consume the stream.


```
Watch(ctx context.Context, in EventWatchRequest)
```

Where the `EventWatchRequest` can take the following structure:

```
type EventWatchRequest {
EventName // the name of the event to filter by
ResourceID // the resource ID (ResourceType required when this is set)
ResourceType // workflow, template, action, hardware
}
```

So when we create a `HardwareClient`, for instance, it would return a watcher for the `Hardware` resource type.
All the clients like `HardwareClient` will be using `EventClient` under the hood.

There is no logic to keep track of which events are fired or sent to a consumer.
Every consumer when it connects to a stream of events will specify how old the events returned should be (by default 5m).
gauravgahlot marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +198 to +199
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is going to be a lot of work on the clients that would make them more complex.

Copy link
Contributor

@gianarb gianarb Sep 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be a bit more precise? It is just a parameter, if they don't want to get old events they can ask for 0s. I think by default we should avoid the possibility for a client to lose events during reconnection (for example during a consumer's deploy)

In order to make it possible, all the events will be stored in the Events table.
The Events table will (roughly) have the following structure:

```
tinkerbell=# \d events
Table "public.events"
Column | Type | Collation | Nullable | Default
---------------+--------------------------+-----------+----------+---------
id | uuid | | not null |
resource_id | uuid | | not null |
resource_type | integer | | not null |
event_type | integer | | not null |
created_at | timestamp with time zone | | |
data | jsonb | | |
Indexes:
"events_pkey" PRIMARY KEY, btree (id)
```

The tink-server will only be responsible for generating the events.
It will not contain any business logic that needs to be executed as an event occurs.
Instead it's the consumer who will have to implement the business logic in an idempotent way.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we handling the scenario where an event was triggered but consumer didn't received it ?
In that case, neither consumer is aware if event was triggered nor tink is aware if event was received ?
Also, a possible of re-delivery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a delivery/retry logic will be an overkill at this point. Every consumer when it connects to a stream of events will specify how old the events returned should be (by default 5m). That way they will receive the events missed during the connect breakdown.

Copy link
Contributor

@gianarb gianarb Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we handling the scenario where an event was triggered but consumer didn't received it ?

We do not have a catalog of consumers. Consumers do not register themself (consumers are normal clients like the tink-cli) if a consumer is not attached to a stream or if it didn't ask for old enough events for the tink-server that consumer does not exist. When a consumer gets implemented it knows that tink-server stores events for some amount of time, it is the responsibility of who implements the consumer to figure out how to be reliable.

The tink-server responsibility is to store and stream events reliably and fastly to whoever asks for them

Copy link
Contributor

@Cbkhare Cbkhare Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my comment was deleted. Let me add it again.

Event is streamed to consumer when it occurs and also, consumer is listening onto the stream and is unaware about the occurrence of an event until it receives it.

The tink-server responsibility is to store and stream events reliably and fastly to whoever asks for them

How are we ensuring the streaming events reliably, what happens when the event is lost during the stream, may be due to network issue or service restart? Just like with a REST call we are aware about the status of the execution with status-code and resend it or take corrective actions.

Maybe this should help https://sourcegraph.com/github.com/grpc/grpc-go/-/blob/stream.go#L1331.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is answered in #17

Forcing the consumer to implement the logic in a way that can be run repeatedly simplifies the logic in Tink but it also enforce reliability in the client side.
If a client fails half way the client gets the same events again and repeat the logic.
gauravgahlot marked this conversation as resolved.
Show resolved Hide resolved

```
informer := client.WorkflowClient.Watch(&request.WatchRequest {
EventType: "WORKFLOW_TIMEOUT"
},
func(e Event) {
// Do your best. You can notify via Slack. Or start a different workflow.
})
informer.Run(ctx)
```

The `Watch` gRPC function needs to support filtering by a `ResourceType` and/or a `ResourceID` and/or an `EventType`.
gauravgahlot marked this conversation as resolved.
Show resolved Hide resolved
In case of the `Watch` is called by a resource client, for example HardwareClient, the `ResourceType` filter is fixed.

## System-context-diagram

![system-architecture](architecture.png)

## Refrences

- https://gianarb.it/blog/kubernetes-shared-informer
- https://www.infoq.com/podcasts/kubernetes-event-driven-architecture/
- [Martin Fowler’s talk](https://www.youtube.com/watch?v=STKCRSUsyP0)

## Alternatives

The alternative are:

- We can tight Tinkerbell to a streaming platform or a queuing system like Kafka, RabbitMQ
- We can build a queue in Tinkerbell itself
Binary file added 0012/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
49 changes: 49 additions & 0 deletions 0012/event.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{"level":"info","msg":"Received data from channel [workflow_changed] :","time":"2020-09-15T16:53:29+05:30"}
{
"table": "workflow",
"action": "INSERT",
"data": {
"id": "69c8aa8f-956e-44af-8b76-5ef44f52a469",
"template": "4f012614-0186-4e22-b7cf-dc6ac2f0bc1a",
"devices": {
"device_1": "08:00:27:00:00:01"
},
"created_at": "2020-09-15T11:23:28.524472+00:00",
"updated_at": "2020-09-15T11:23:28.524472+00:00",
"deleted_at": null
}
}

{"level":"info","msg":"Received data from channel [workflow_changed] :","time":"2020-09-15T16:53:32+05:30"}
{
"table": "workflow",
"action": "UPDATE",
"data": {
"id": "69c8aa8f-956e-44af-8b76-5ef44f52a469",
"template": "7284e61c-4974-4178-92ce-28fbe7c4df26",
"devices": {
"device_1": "08:00:27:00:00:01"
},
"created_at": "2020-09-15T11:23:28.524472+00:00",
"updated_at": "2020-09-15T11:23:32.214997+00:00",
"deleted_at": null
}
}

{"level":"info","msg":"Received data from channel [workflow_changed] :","time":"2020-09-15T16:53:35+05:30"}
{
"table": "workflow",
"action": "DELETE",
"data": {
"id": "69c8aa8f-956e-44af-8b76-5ef44f52a469",
"template": "7284e61c-4974-4178-92ce-28fbe7c4df26",
"devices": {
"device_1": "08:00:27:00:00:01"
},
"created_at": "2020-09-15T11:23:28.524472+00:00",
"updated_at": "2020-09-15T11:23:32.214997+00:00",
"deleted_at": null
}
}

{"level":"info","msg":"Received no events for 90 seconds, checking connection","time":"2020-09-15T16:53:45+05:30"}
65 changes: 65 additions & 0 deletions 0012/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package listener

import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"time"

"github.com/lib/pq"
log "github.com/sirupsen/logrus"
)

const conninfo = "dbname=tinkerbell user=tinkerbell password=tinkerbell sslmode=disable"

// StartListener creates a new dedicated connection for LISTEN/NOTIFY
// and starts listening for events.
func StartListener() {
_, err := sql.Open("postgres", conninfo)
if err != nil {
log.Error(err)
}

listener := pq.NewListener(conninfo, 10*time.Second, 15*time.Second, errorHandler)
err = listener.Listen("workflow_changed")
if err != nil {
log.Error(err)
}

log.Info("starting listener")
for {
waitForNotification(listener)
}
}

func errorHandler(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err.Error())
}
}

func waitForNotification(l *pq.Listener) {
for {
select {
case n := <-l.Notify:
log.Info("Received data from channel [", n.Channel, "] :")

// Prepare notification payload for pretty print
var prettyJSON bytes.Buffer
err := json.Indent(&prettyJSON, []byte(n.Extra), "", "\t")
if err != nil {
log.Error(err)
return
}
fmt.Println(string(prettyJSON.Bytes()))
return
case <-time.After(10 * time.Second):
log.Info("Received no events for 90 seconds, checking connection")
go func() {
l.Ping()
}()
return
}
}
}
39 changes: 39 additions & 0 deletions 0012/workflow-notifications.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
SET ROLE tinkerbell;

CREATE TABLE IF NOT EXISTS workflow (
id UUID UNIQUE NOT NULL
, template UUID NOT NULL
, devices JSONB NOT NULL
, created_at TIMESTAMPTZ
, updated_at TIMESTAMPTZ
, deleted_at TIMESTAMPTZ
);

CREATE OR REPLACE FUNCTION notify_workflow_changes()
RETURNS trigger AS $$
DECLARE
data json;
notification json;
BEGIN
IF (TG_OP = 'DELETE') THEN
data = row_to_json(OLD);
ELSE
data = row_to_json(NEW);
END IF;

notification = json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'data', data
);

PERFORM pg_notify('workflow_changed', notification::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER workflow_changed
AFTER INSERT OR UPDATE OR DELETE
ON workflow
FOR EACH ROW
EXECUTE PROCEDURE notify_workflow_changes()