Skip to content

Commit

Permalink
chore: add more metrics to hub stream reader (#2302)
Browse files Browse the repository at this point in the history
We need some more instrumentation to understand how much of a time to
read delay is accounted for by the hub stream -> redis step vs the redis
-> postgres step.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR adds more metrics to `HubSubscriber`.

### Detailed summary
- Added `shardKey` property to `EventStreamHubSubscriber`
- Added metrics for batch size and process time per event and per batch

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
aditiharini committed Sep 11, 2024
1 parent f3752b2 commit 4e897e9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/rude-dogs-whisper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/shuttle": patch
---

chore: add more metrics to HubSubscriber
14 changes: 14 additions & 0 deletions packages/shuttle/src/shuttle/hubSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { sleep } from "../utils";
import { RedisClient } from "./redis";
import { HubClient } from "./hub";
import { ProcessResult } from "./index";
import { statsd } from "../statsd";

interface HubEventsEmitter {
onError: (error: Error, stopped: boolean) => void;
Expand Down Expand Up @@ -183,6 +184,7 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
private redis: RedisClient;
public readonly streamKey: string;
public readonly redisKey: string;
private readonly shardKey: string;
private eventsToAdd: [HubEvent, Buffer][];
public eventBatchSize = 100;
private eventBatchLastFlushedAt = 0;
Expand Down Expand Up @@ -212,6 +214,7 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
this.eventsToAdd = [];
this.beforeProcess = options?.beforeProcess;
this.afterProcess = options?.afterProcess;
this.shardKey = shardKey;
}

public override async getLastEventId(): Promise<number | undefined> {
Expand All @@ -234,6 +237,7 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
this.eventBatchBytes >= this.maxBatchBytesBeforeForceFlush ||
Date.now() - this.eventBatchLastFlushedAt > this.maxTimeBetweenBatchFlushes
) {
const startTime = Date.now();
// Empties the current batch
const eventBatch = this.eventsToAdd.splice(0, this.eventsToAdd.length);
const events = eventBatch.map(([evt, _evtBytes]) => evt);
Expand Down Expand Up @@ -261,6 +265,16 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
if (this.afterProcess) {
await this.afterProcess.call(this, eventToWriteBatch, eventBytesToWriteBatch);
}

const processTime = Date.now() - startTime;

statsd.gauge("hub.event.subscriber.last_batch_size", events.length, { source: this.shardKey });

statsd.timing("hub.event.subscriber.process_time.per_event", processTime / events.length, {
source: this.shardKey,
});

statsd.timing("hub.event.subscriber.process_time.per_batch", processTime, { source: this.shardKey });
}

return true;
Expand Down

0 comments on commit 4e897e9

Please sign in to comment.