Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ improvements #451

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ REDIS_URI=redis://redis:6379
REDIS_PREFIX_KEY=evdocker

RABBITMQ_ENABLED=false
RABBITMQ_GLOBAL_EVENT_QUEUE=false
RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672

WEBSOCKET_ENABLED=false
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ ENV REDIS_URI=redis://redis:6379
ENV REDIS_PREFIX_KEY=evolution

ENV RABBITMQ_ENABLED=false
ENV RABBITMQ_GLOBAL_EVENT_QUEUE=false
ENV RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672

ENV WEBSOCKET_ENABLED=false
Expand Down
2 changes: 2 additions & 0 deletions src/config/env.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export type Redis = {

export type Rabbitmq = {
ENABLED: boolean;
GLOBAL_EVENT_QUEUE: boolean;
URI: string;
};

Expand Down Expand Up @@ -282,6 +283,7 @@ export class ConfigService {
},
RABBITMQ: {
ENABLED: process.env?.RABBITMQ_ENABLED === 'true',
GLOBAL_EVENT_QUEUE: process.env?.RABBITMQ_GLOBAL_EVENT_QUEUE === 'true',
URI: process.env.RABBITMQ_URI || '',
},
SQS: {
Expand Down
1 change: 1 addition & 0 deletions src/dev-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ REDIS:

RABBITMQ:
ENABLED: false
GLOBAL_EVENT_QUEUE: false
URI: "amqp://guest:guest@localhost:5672"

SQS:
Expand Down
66 changes: 61 additions & 5 deletions src/libs/amqp.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as amqp from 'amqplib/callback_api';

import { configService, Rabbitmq } from '../config/env.config';
import { configService, HttpServer, Rabbitmq } from '../config/env.config';
import { Logger } from '../config/logger.config';

const logger = new Logger('AMQP');
Expand All @@ -9,8 +9,8 @@ let amqpChannel: amqp.Channel | null = null;

export const initAMQP = () => {
return new Promise<void>((resolve, reject) => {
const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
amqp.connect(uri, (error, connection) => {
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
amqp.connect(rabbitConfig.URI, (error, connection) => {
if (error) {
reject(error);
return;
Expand Down Expand Up @@ -45,6 +45,7 @@ export const getAMQP = (): amqp.Channel | null => {

export const initQueues = (instanceName: string, events: string[]) => {
if (!instanceName || !events || !events.length) return;
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');

const queues = events.map((event) => {
return `${event.replace(/_/g, '.').toLowerCase()}`;
Expand All @@ -60,7 +61,7 @@ export const initQueues = (instanceName: string, events: string[]) => {
assert: true,
});

const queueName = `${instanceName}.${event}`;
const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`;

amqp.assertQueue(queueName, {
durable: true,
Expand All @@ -76,6 +77,7 @@ export const initQueues = (instanceName: string, events: string[]) => {

export const removeQueues = (instanceName: string, events: string[]) => {
if (!events || !events.length) return;
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');

const channel = getAMQP();

Expand All @@ -94,10 +96,64 @@ export const removeQueues = (instanceName: string, events: string[]) => {
assert: true,
});

const queueName = `${instanceName}.${event}`;
const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`;

amqp.deleteQueue(queueName);
});

channel.deleteExchange(exchangeName);
};

interface SendEventData {
instanceName: string;
wuid: string;
event: string;
apiKey?: string;
data: any;
}

export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => {
const exchangeName = instanceName ?? 'evolution_exchange';

amqpChannel.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
assert: true,
});

const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`;

amqpChannel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: { 'x-queue-type': 'quorum' },
});

amqpChannel.bindQueue(queueName, exchangeName, event);

const serverUrl = configService.get<HttpServer>('SERVER').URL;
const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds
const localISOTime = new Date(Date.now() - tzoffset).toISOString();
const now = localISOTime;

const message = {
event,
instance: instanceName,
data,
server_url: serverUrl,
date_time: now,
sender: wuid,
};

if (apiKey) {
message['apikey'] = apiKey;
}

logger.log({
queueName,
exchangeName,
event,
});
amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
};
41 changes: 7 additions & 34 deletions src/whatsapp/services/whatsapp.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
import { Logger } from '../../config/logger.config';
import { ROOT_DIR } from '../../config/path.config';
import { NotFoundException } from '../../exceptions';
import { getAMQP, removeQueues } from '../../libs/amqp.server';
import { getAMQP, removeQueues, sendEventData } from '../../libs/amqp.server';
import { getIO } from '../../libs/socket.server';
import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server';
import { ChamaaiRaw, IntegrationRaw, ProxyRaw, RabbitmqRaw, SettingsRaw, SqsRaw, TypebotRaw } from '../models';
Expand Down Expand Up @@ -685,40 +685,13 @@ export class WAStartupService {

if (amqp) {
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
const exchangeName = this.instanceName ?? 'evolution_exchange';

amqp.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
assert: true,
});

const queueName = `${this.instanceName}.${event}`;

amqp.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});

amqp.bindQueue(queueName, exchangeName, event);

const message = {
event,
instance: this.instance.name,
sendEventData({
data,
server_url: serverUrl,
date_time: now,
sender: this.wuid,
};

if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
}

amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
event,
instanceName: this.instanceName,
wuid: this.wuid,
apiKey: expose && instanceApikey ? instanceApikey : undefined,
});

if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
Expand Down