Skip to content

Commit

Permalink
Pulse fixed versions from group pairing (#17)
Browse files Browse the repository at this point in the history
* implements changes from afharo:poc/pulse/notifications

* Comments out actual es call to search 'pulse-poc-raw*' as it's throwing an error
  • Loading branch information
TinaHeiligers authored Jan 14, 2020
1 parent dabb3e8 commit e333d7f
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 210 deletions.
11 changes: 9 additions & 2 deletions src/core/public/pulse/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ export interface PulseServiceStart {}

const channelNames = ['default', 'notifications', 'errors'];

const logger = {
...console,
// eslint-disable-next-line no-console
fatal: (...args: any[]) => console.error(...args),
get: () => logger,
};

export class PulseService {
private retriableErrors = 0;
private readonly channels: Map<string, PulseChannel>;
Expand All @@ -45,7 +52,7 @@ export class PulseService {
channelNames.map((id): [string, PulseChannel] => {
const instructions$ = new Subject<PulseInstruction>();
this.instructions.set(id, instructions$);
const channel = new PulseChannel({ id, instructions$ });
const channel = new PulseChannel({ id, instructions$, logger });
return [channel.id, channel];
})
);
Expand Down Expand Up @@ -120,7 +127,7 @@ export class PulseService {

const responseBody: InstructionsResponse = await response.json();

responseBody.channels.forEach(channel => {
responseBody.channels.forEach((channel: PulseChannel) => {
const instructions$ = this.instructions.get(channel.id);
if (!instructions$) {
throw new Error(
Expand Down
28 changes: 22 additions & 6 deletions src/core/server/pulse/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,43 @@
*/

import { Subject } from 'rxjs';
// import { IClusterClient } from '../elasticsearch';
import { PulseCollectorConstructor } from './types';
import { IClusterClient } from '../elasticsearch';
import { SavedObjectsServiceSetup } from '../saved_objects';
import { Logger } from '../logging';

export interface PulseInstruction {
owner: string;
id: string;
value: unknown;
}

interface ChannelConfig {
export interface ChannelConfig {
id: string;
instructions$: Subject<PulseInstruction>;
logger: Logger;
}
export interface ChannelSetupContext {
elasticsearch: IClusterClient;
savedObjects: SavedObjectsServiceSetup;
}

export class PulseChannel {
public readonly getRecords: () => Promise<Record<string, any>>;
export class PulseChannel<Payload = any, Rec = Payload> {
private readonly collector: any;

constructor(private readonly config: ChannelConfig) {
this.collector = require(`${__dirname}/collectors/${this.id}`);
this.getRecords = this.collector.getRecords;
const Collector: PulseCollectorContructor = require(`${__dirname}/collectors/${this.id}`)
.Collector;
this.collector = new Collector(this.config.logger);
}

public async setup(setupContext: ChannelSetupContexxt) {
return this.collector.setup(setupContext);
}

public async getRecords() {
return this.collector.getRecords();
}
public get id() {
return this.config.id;
}
Expand Down
21 changes: 12 additions & 9 deletions src/core/server/pulse/collectors/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
* under the License.
*/

export async function getRecords() {
return [];
}
// import { IClusterClient } from '../../elasticsearch';

// export async function getRecords(elasticsearch: IClusterClient) {
// const pingResult = await elasticsearch.callAsInternalUser('ping');
import { PulseCollector } from '../types';
export class Collector extends PulseCollector<unknown, { ping_received: boolean }> {
public async putRecord() {}
public async getRecords() {
if (this.elasticsearch) {
const pingResult = await this.elasticsearch.callAsInternalUser('ping');

// return [{ ping_received: pingResult }];
// }
return [{ ping_received: pingResult }];
}
return [];
// throw Error(`Default collector not initialised with an "elasticsearch" client!`);
}
}
52 changes: 46 additions & 6 deletions src/core/server/pulse/collectors/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,56 @@
// be stored as an individual document in the errors channel index
// by the service

import { PulseCollector, CollectorSetupContext } from '../types';

export interface Payload {
errorId: string;
}

const payloads: Payload[] = [];
export class Collector extends PulseCollector<Payload> {
private payloads: Payload[] = [];
private readonly indexName = '.pulse-errors';

export async function putRecord(payload: Payload) {
payloads.push(payload);
}
public async setup(deps: CollectorSetupContext) {
await super.setup(deps);
const exists = await this.elasticsearch!.callAsInternalUser('indices.exists', {
index: this.indexName,
});
if (!exists) {
await this.elasticsearch!.callAsInternalUser('indices.create', {
index: this.indexName,
body: {
settings: {
number_of_shards: 1,
},
mappings: {
properties: {
errorId: {
type: 'keyword',
},
},
},
},
});
}
}
public async putRecord(payload: Payload) {
this.payloads.push(payload);
if (this.elasticsearch) {
await this.elasticsearch.callAsInternalUser('create', {
index: this.indexName,
body: payload,
});
}
}

export async function getRecords() {
return payloads;
public async getRecords() {
if (this.elasticsearch) {
const results = await this.elasticsearch.callAsInternalUser('search', {
index: this.indexName,
});
// TODO: Set results as sent and return them
}
return this.payloads.splice(0, this.payloads.length);
}
}
9 changes: 7 additions & 2 deletions src/core/server/pulse/collectors/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
* under the License.
*/

export async function getRecords() {
return [];
import { PulseCollector } from '../types';

export class Collector extends PulseCollector {
public async putRecord() {}
public async getRecords() {
return [];
}
}
106 changes: 18 additions & 88 deletions src/core/server/pulse/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ import { Subject } from 'rxjs';
import fetch from 'node-fetch';
import { CoreContext } from '../core_context';
import { Logger } from '../logging';
import { ElasticsearchServiceSetup } from '../elasticsearch';
import { ElasticsearchServiceSetup, IClusterClient } from '../elasticsearch';
import { PulseChannel, PulseInstruction } from './channel';
import { sendPulse, Fetcher } from './send_pulse';
import { SavedObjectsServiceSetup } from '../saved_objects';

export interface InternalPulseService {
getChannel: (id: string) => PulseChannel;
}

export interface PulseSetupDeps {
elasticsearch: ElasticsearchServiceSetup;
savedObjects: SavedObjectsServiceSetup;
}

export type PulseServiceSetup = InternalPulseService;
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface PulseServiceStart {}

interface ChannelResponse {
id: string;
Expand All @@ -62,16 +62,15 @@ export class PulseService {
private readonly log: Logger;
private readonly channels: Map<string, PulseChannel>;
private readonly instructions$: Map<string, Subject<any>> = new Map();
// private readonly subscriptions: Set<NodeJS.Timer> = new Set();
// private elasticsearch?: IClusterClient;
private elasticsearch?: IClusterClient;

constructor(coreContext: CoreContext) {
this.log = coreContext.logger.get('pulse-service');
this.channels = new Map(
channelNames.map((id): [string, PulseChannel] => {
const instructions$ = new Subject<PulseInstruction>();
this.instructions$.set(id, instructions$);
const channel = new PulseChannel({ id, instructions$ });
const channel = new PulseChannel({ id, instructions$, logger: this.log });
return [channel.id, channel];
})
);
Expand All @@ -80,18 +79,23 @@ export class PulseService {
public async setup(deps: PulseSetupDeps): Promise<InternalPulseService> {
this.log.debug('Setting up pulse service');

this.elasticsearch = deps.elasticsearch.createClient('pulse-service');
this.channels.forEach(channel =>
channel.setup({
elasticsearch: this.elasticsearch!,
savedObjects: deps.savedObjects,
})
);
// poll for instructions every second for this deployment
setInterval(() => {
// eslint-disable-next-line no-console
this.loadInstructions().catch(err => console.error(err.stack));
}, 1000);
this.loadInstructions().catch(err => this.log.error(err.stack));
}, 10000);

this.log.debug('Will attempt first telemetry collection in 5 seconds...');

// eslint-disable-next-line no-console
console.log('Will attempt first telemetry collection in 5 seconds...');
setTimeout(() => {
setInterval(() => {
// eslint-disable-next-line no-console
this.sendTelemetry().catch(err => console.error(err.stack));
this.sendTelemetry().catch(err => this.log.error(err.stack));
}, 5000);
}, 5000);

Expand Down Expand Up @@ -145,8 +149,7 @@ export class PulseService {
private handleRetriableError() {
this.retriableErrors++;
if (this.retriableErrors === 1) {
// eslint-disable-next-line no-console
console.warn(
this.log.warn(
'Kibana is not yet available at http://localhost:5601/api, will continue to check for the next 120 seconds...'
);
} else if (this.retriableErrors > 120) {
Expand All @@ -172,76 +175,3 @@ export class PulseService {
return await sendPulse(this.channels, fetcher);
}
}

// public async start() {
// this.log.info('Starting service');
// if (!this.elasticsearch) {
// throw Error(`The 'PulseService.setup' method needs to be called before the 'start' method`);
// }
// const elasticsearch = this.elasticsearch;

// // poll for instructions every second for this deployment
// const loadInstructionSubcription = setInterval(() => {
// this.loadInstructions().catch(err => this.log.error(err.stack));
// }, 1000);
// this.subscriptions.add(loadInstructionSubcription);

// this.log.debug('Will attempt first telemetry collection in 5 seconds...');
// const sendTelemetrySubcription = setInterval(() => {
// this.sendTelemetry(elasticsearch).catch(err => this.log.error(err.stack));
// }, 5000);
// this.subscriptions.add(sendTelemetrySubcription);
// }

// public async stop() {
// this.subscriptions.forEach(subscription => {
// clearInterval(subscription);
// this.subscriptions.delete(subscription);
// });
// }

// private retriableErrors = 0;

// private async sendTelemetry(elasticsearch: IClusterClient) {
// this.log.debug('Sending telemetry');
// const url = 'http://localhost:5601/api/pulse_poc/intake/123';

// const channels = [];
// for (const channel of this.channels.values()) {
// const records = await channel.getRecords(elasticsearch);
// this.log.debug(`Channel "${channel.id}" returns the records ${JSON.stringify(records)}`);
// channels.push({
// records,
// channel_id: channel.id,
// });
// }

// let response: any;
// try {
// response = await fetch(url, {
// method: 'post',
// headers: {
// 'content-type': 'application/json',
// 'kbn-xsrf': 'true',
// },
// body: JSON.stringify({
// channels,
// }),
// });
// } catch (err) {
// if (!err.message.includes('ECONNREFUSED')) {
// throw err;
// }
// // the instructions polling should handle logging for this case, yay for POCs
// return;
// }
// if (response.status === 503) {
// // the instructions polling should handle logging for this case, yay for POCs
// return;
// }

// if (response.status !== 200) {
// const responseBody = await response.text();
// throw new Error(`${response.status}: ${responseBody}`);
// }
// }
44 changes: 44 additions & 0 deletions src/core/server/pulse/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { IClusterClient } from '../elasticsearch';
import { SavedObjectsServiceSetup, ISavedObjectsRepository } from '../saved_objects';
import { Logger } from '../logging';

export type PulseCollectorConstructor = new (logger: Logger) => PulseCollector;

export interface CollectorSetupContext {
elasticsearch: IClusterClient;
savedObjects: SavedObjectsServiceSetup;
}

export abstract class PulseCollector<Payload = unknown, PulseRecord = Payload> {
protected savedObjects?: ISavedObjectsRepository;
protected elasticsearch?: IClusterClient;

constructor(protected readonly logger: Logger) {}

public abstract async putRecord(payload: Payload): Promise<void>;
public abstract async getRecords(): Promise<PulseRecord[]>;

public async setup(setupContext: CollectorSetupContext) {
this.savedObjects = setupContext.savedObjects.createInternalRepository();
this.elasticsearch = setupContext.elasticsearch;
}
}
Loading

0 comments on commit e333d7f

Please sign in to comment.