diff --git a/src/channel/redis.channel-config.ts b/src/channel/redis.channel-config.ts index eb86a09..d5d080e 100644 --- a/src/channel/redis.channel-config.ts +++ b/src/channel/redis.channel-config.ts @@ -1,19 +1,24 @@ import { ChannelConfig } from '@nestjstools/messaging'; - export class RedisChannelConfig extends ChannelConfig { public readonly connection: Connection; public readonly queue: string; constructor({ - name, - connection, - queue, - enableConsumer, - avoidErrorsForNotExistedHandlers, - middlewares, - normalizer, - }: RedisChannelConfig) { - super(name, avoidErrorsForNotExistedHandlers, middlewares, enableConsumer, normalizer) + name, + connection, + queue, + enableConsumer, + avoidErrorsForNotExistedHandlers, + middlewares, + normalizer, + }: RedisChannelConfig) { + super( + name, + avoidErrorsForNotExistedHandlers, + middlewares, + enableConsumer, + normalizer, + ); this.connection = connection; this.queue = queue; } @@ -22,4 +27,11 @@ export class RedisChannelConfig extends ChannelConfig { interface Connection { host: string; port: number; + password?: string; + db?: number; + /** + * This prefix is not used as RedisOptions keyPrefix, it is used as prefix for BullMQ + * Read more: https://github.com/taskforcesh/bullmq/issues/1219#issuecomment-1113903785 + */ + keyPrefix?: string; } diff --git a/src/channel/redis.channel-factory.ts b/src/channel/redis.channel-factory.ts index fd3fb4c..48e4088 100644 --- a/src/channel/redis.channel-factory.ts +++ b/src/channel/redis.channel-factory.ts @@ -1,11 +1,13 @@ import { RedisChannel } from './redis.channel'; -import {Injectable} from "@nestjs/common"; +import { Injectable } from '@nestjs/common'; import { ChannelFactory, IChannelFactory } from '@nestjstools/messaging'; import { RedisChannelConfig } from './redis.channel-config'; @Injectable() @ChannelFactory(RedisChannelConfig) -export class RedisChannelFactory implements IChannelFactory { +export class RedisChannelFactory + implements IChannelFactory +{ create(channelConfig: RedisChannelConfig): RedisChannel { return new RedisChannel(channelConfig); } diff --git a/src/channel/redis.channel.ts b/src/channel/redis.channel.ts index c224257..72271de 100644 --- a/src/channel/redis.channel.ts +++ b/src/channel/redis.channel.ts @@ -8,6 +8,14 @@ export class RedisChannel extends Channel { constructor(config: RedisChannelConfig) { super(config); - this.queue = new Queue(config.queue, { connection: config.connection }); + this.queue = new Queue(config.queue, { + connection: { + host: config.connection.host, + port: config.connection.port, + password: config.connection.password, + db: config.connection.db, + }, + prefix: config.connection.keyPrefix, + }); } } diff --git a/src/consumer/redis-messaging.consumer.ts b/src/consumer/redis-messaging.consumer.ts index 7b65f59..5bfdfbc 100644 --- a/src/consumer/redis-messaging.consumer.ts +++ b/src/consumer/redis-messaging.consumer.ts @@ -8,11 +8,16 @@ import { Worker } from 'bullmq'; @Injectable() @MessageConsumer(RedisChannel) -export class RedisMessagingConsumer implements IMessagingConsumer, OnApplicationShutdown { +export class RedisMessagingConsumer + implements IMessagingConsumer, OnApplicationShutdown +{ private channel?: RedisChannel = undefined; private worker?: Worker = undefined; - async consume(dispatcher: ConsumerMessageDispatcher, channel: RedisChannel): Promise { + async consume( + dispatcher: ConsumerMessageDispatcher, + channel: RedisChannel, + ): Promise { this.channel = channel; this.worker = new Worker( @@ -20,13 +25,24 @@ export class RedisMessagingConsumer implements IMessagingConsumer, async (job) => { dispatcher.dispatch(new ConsumerMessage(job.data, job.name)); }, - { connection: channel.config.connection } + { + connection: { + host: channel.config.connection.host, + port: channel.config.connection.port, + password: channel.config.connection.password, + db: channel.config.connection.db, + }, + prefix: channel.config.connection.keyPrefix, + }, ); return Promise.resolve(); } - onError(errored: ConsumerDispatchedMessageError, channel: RedisChannel): Promise { + onError( + errored: ConsumerDispatchedMessageError, + channel: RedisChannel, + ): Promise { return Promise.resolve(); } diff --git a/src/message-bus/redis-message-bus-factory.ts b/src/message-bus/redis-message-bus-factory.ts index 98d90bd..9a65d50 100644 --- a/src/message-bus/redis-message-bus-factory.ts +++ b/src/message-bus/redis-message-bus-factory.ts @@ -1,14 +1,15 @@ import { Injectable } from '@nestjs/common'; import { RedisMessageBus } from './redis-message.bus'; import { RedisChannel } from '../channel/redis.channel'; -import {IMessageBusFactory} from "@nestjstools/messaging"; -import {MessageBusFactory} from "@nestjstools/messaging"; -import {IMessageBus} from "@nestjstools/messaging"; +import { IMessageBusFactory } from '@nestjstools/messaging'; +import { MessageBusFactory } from '@nestjstools/messaging'; +import { IMessageBus } from '@nestjstools/messaging'; @Injectable() @MessageBusFactory(RedisChannel) -export class RedisMessageBusFactory implements IMessageBusFactory { - +export class RedisMessageBusFactory + implements IMessageBusFactory +{ create(channel: RedisChannel): IMessageBus { return new RedisMessageBus(channel); } diff --git a/src/message-bus/redis-message.bus.ts b/src/message-bus/redis-message.bus.ts index b3e20da..1988e6c 100644 --- a/src/message-bus/redis-message.bus.ts +++ b/src/message-bus/redis-message.bus.ts @@ -5,10 +5,7 @@ import { RedisChannel } from '../channel/redis.channel'; @Injectable() export class RedisMessageBus implements IMessageBus { - constructor( - private readonly redisChannel: RedisChannel, - ) { - } + constructor(private readonly redisChannel: RedisChannel) {} async dispatch(message: RoutingMessage): Promise { this.redisChannel.queue.add(message.messageRoutingKey, message.message);