From cf4d219bcda020b9631f1906161dfa55eed02773 Mon Sep 17 00:00:00 2001 From: kastov Date: Tue, 8 Jul 2025 05:32:03 +0300 Subject: [PATCH 1/4] refactor: update RedisChannelConfig to use ConnectionOptions --- src/channel/redis.channel-config.ts | 32 +++++++++++++++-------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/channel/redis.channel-config.ts b/src/channel/redis.channel-config.ts index eb86a09..1e33a4e 100644 --- a/src/channel/redis.channel-config.ts +++ b/src/channel/redis.channel-config.ts @@ -1,25 +1,27 @@ import { ChannelConfig } from '@nestjstools/messaging'; +import { ConnectionOptions } from 'bullmq'; export class RedisChannelConfig extends ChannelConfig { - public readonly connection: Connection; + public readonly connection: ConnectionOptions; public readonly queue: string; constructor({ - name, - connection, - queue, - enableConsumer, - avoidErrorsForNotExistedHandlers, - middlewares, - normalizer, - }: RedisChannelConfig) { - super(name, avoidErrorsForNotExistedHandlers, middlewares, enableConsumer, normalizer) + name, + connection, + queue, + enableConsumer, + avoidErrorsForNotExistedHandlers, + middlewares, + normalizer, + }: RedisChannelConfig) { + super( + name, + avoidErrorsForNotExistedHandlers, + middlewares, + enableConsumer, + normalizer, + ); this.connection = connection; this.queue = queue; } } - -interface Connection { - host: string; - port: number; -} From 7977d6d9727235723b198432a489e713c0b9cee6 Mon Sep 17 00:00:00 2001 From: kastov Date: Sat, 12 Jul 2025 07:16:03 +0300 Subject: [PATCH 2/4] refactor: rename connection property to connectionOptions and enhance ConnectionOptions interface in RedisChannelConfig --- src/channel/redis.channel-config.ts | 28 +++++++++++++++++--- src/channel/redis.channel.ts | 5 +++- src/consumer/redis-messaging.consumer.ts | 19 ++++++++++--- src/message-bus/redis-message-bus-factory.ts | 11 ++++---- src/message-bus/redis-message.bus.ts | 5 +--- 5 files changed, 50 insertions(+), 18 deletions(-) diff --git a/src/channel/redis.channel-config.ts b/src/channel/redis.channel-config.ts index 1e33a4e..da0ac3c 100644 --- a/src/channel/redis.channel-config.ts +++ b/src/channel/redis.channel-config.ts @@ -1,13 +1,12 @@ import { ChannelConfig } from '@nestjstools/messaging'; -import { ConnectionOptions } from 'bullmq'; export class RedisChannelConfig extends ChannelConfig { - public readonly connection: ConnectionOptions; + public readonly connectionOptions: ConnectionOptions; public readonly queue: string; constructor({ name, - connection, + connectionOptions, queue, enableConsumer, avoidErrorsForNotExistedHandlers, @@ -21,7 +20,28 @@ export class RedisChannelConfig extends ChannelConfig { enableConsumer, normalizer, ); - this.connection = connection; + this.connectionOptions = connectionOptions; this.queue = queue; } } + +interface ConnectionOptions { + redis: { + host: string; + port: number; + /** + * If set, client will send AUTH command with the value of this option when connected. + */ + password?: string; + /** + * Database index to use. + * + * @default 0 + */ + db?: number; + }; + /** + * Prefix for all queue keys. + */ + prefix?: string; +} diff --git a/src/channel/redis.channel.ts b/src/channel/redis.channel.ts index c224257..b6aee1b 100644 --- a/src/channel/redis.channel.ts +++ b/src/channel/redis.channel.ts @@ -8,6 +8,9 @@ export class RedisChannel extends Channel { constructor(config: RedisChannelConfig) { super(config); - this.queue = new Queue(config.queue, { connection: config.connection }); + this.queue = new Queue(config.queue, { + connection: config.connectionOptions.redis, + prefix: config.connectionOptions.prefix, + }); } } diff --git a/src/consumer/redis-messaging.consumer.ts b/src/consumer/redis-messaging.consumer.ts index 7b65f59..80cc96f 100644 --- a/src/consumer/redis-messaging.consumer.ts +++ b/src/consumer/redis-messaging.consumer.ts @@ -8,11 +8,16 @@ import { Worker } from 'bullmq'; @Injectable() @MessageConsumer(RedisChannel) -export class RedisMessagingConsumer implements IMessagingConsumer, OnApplicationShutdown { +export class RedisMessagingConsumer + implements IMessagingConsumer, OnApplicationShutdown +{ private channel?: RedisChannel = undefined; private worker?: Worker = undefined; - async consume(dispatcher: ConsumerMessageDispatcher, channel: RedisChannel): Promise { + async consume( + dispatcher: ConsumerMessageDispatcher, + channel: RedisChannel, + ): Promise { this.channel = channel; this.worker = new Worker( @@ -20,13 +25,19 @@ export class RedisMessagingConsumer implements IMessagingConsumer, async (job) => { dispatcher.dispatch(new ConsumerMessage(job.data, job.name)); }, - { connection: channel.config.connection } + { + connection: channel.config.connectionOptions.redis, + prefix: channel.config.connectionOptions.prefix, + }, ); return Promise.resolve(); } - onError(errored: ConsumerDispatchedMessageError, channel: RedisChannel): Promise { + onError( + errored: ConsumerDispatchedMessageError, + channel: RedisChannel, + ): Promise { return Promise.resolve(); } diff --git a/src/message-bus/redis-message-bus-factory.ts b/src/message-bus/redis-message-bus-factory.ts index 98d90bd..9a65d50 100644 --- a/src/message-bus/redis-message-bus-factory.ts +++ b/src/message-bus/redis-message-bus-factory.ts @@ -1,14 +1,15 @@ import { Injectable } from '@nestjs/common'; import { RedisMessageBus } from './redis-message.bus'; import { RedisChannel } from '../channel/redis.channel'; -import {IMessageBusFactory} from "@nestjstools/messaging"; -import {MessageBusFactory} from "@nestjstools/messaging"; -import {IMessageBus} from "@nestjstools/messaging"; +import { IMessageBusFactory } from '@nestjstools/messaging'; +import { MessageBusFactory } from '@nestjstools/messaging'; +import { IMessageBus } from '@nestjstools/messaging'; @Injectable() @MessageBusFactory(RedisChannel) -export class RedisMessageBusFactory implements IMessageBusFactory { - +export class RedisMessageBusFactory + implements IMessageBusFactory +{ create(channel: RedisChannel): IMessageBus { return new RedisMessageBus(channel); } diff --git a/src/message-bus/redis-message.bus.ts b/src/message-bus/redis-message.bus.ts index b3e20da..1988e6c 100644 --- a/src/message-bus/redis-message.bus.ts +++ b/src/message-bus/redis-message.bus.ts @@ -5,10 +5,7 @@ import { RedisChannel } from '../channel/redis.channel'; @Injectable() export class RedisMessageBus implements IMessageBus { - constructor( - private readonly redisChannel: RedisChannel, - ) { - } + constructor(private readonly redisChannel: RedisChannel) {} async dispatch(message: RoutingMessage): Promise { this.redisChannel.queue.add(message.messageRoutingKey, message.message); From 636c5b4c8519c655c0de1c7133e4be7cc1bb29b0 Mon Sep 17 00:00:00 2001 From: kastov Date: Sat, 12 Jul 2025 07:18:13 +0300 Subject: [PATCH 3/4] refactor: remove unused comments --- src/channel/redis.channel-config.ts | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/channel/redis.channel-config.ts b/src/channel/redis.channel-config.ts index da0ac3c..106a793 100644 --- a/src/channel/redis.channel-config.ts +++ b/src/channel/redis.channel-config.ts @@ -29,19 +29,8 @@ interface ConnectionOptions { redis: { host: string; port: number; - /** - * If set, client will send AUTH command with the value of this option when connected. - */ password?: string; - /** - * Database index to use. - * - * @default 0 - */ db?: number; }; - /** - * Prefix for all queue keys. - */ prefix?: string; } From d7a91cc7f5f2bf043d6d982fd08ced07e2121f11 Mon Sep 17 00:00:00 2001 From: kastov Date: Mon, 14 Jul 2025 03:51:21 +0300 Subject: [PATCH 4/4] refactor: update RedisChannelConfig to use a simplified Connection interface and adjust related components --- src/channel/redis.channel-config.ts | 25 ++++++++++++------------ src/channel/redis.channel-factory.ts | 6 ++++-- src/channel/redis.channel.ts | 9 +++++++-- src/consumer/redis-messaging.consumer.ts | 9 +++++++-- 4 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/channel/redis.channel-config.ts b/src/channel/redis.channel-config.ts index 106a793..d5d080e 100644 --- a/src/channel/redis.channel-config.ts +++ b/src/channel/redis.channel-config.ts @@ -1,12 +1,11 @@ import { ChannelConfig } from '@nestjstools/messaging'; - export class RedisChannelConfig extends ChannelConfig { - public readonly connectionOptions: ConnectionOptions; + public readonly connection: Connection; public readonly queue: string; constructor({ name, - connectionOptions, + connection, queue, enableConsumer, avoidErrorsForNotExistedHandlers, @@ -20,17 +19,19 @@ export class RedisChannelConfig extends ChannelConfig { enableConsumer, normalizer, ); - this.connectionOptions = connectionOptions; + this.connection = connection; this.queue = queue; } } -interface ConnectionOptions { - redis: { - host: string; - port: number; - password?: string; - db?: number; - }; - prefix?: string; +interface Connection { + host: string; + port: number; + password?: string; + db?: number; + /** + * This prefix is not used as RedisOptions keyPrefix, it is used as prefix for BullMQ + * Read more: https://github.com/taskforcesh/bullmq/issues/1219#issuecomment-1113903785 + */ + keyPrefix?: string; } diff --git a/src/channel/redis.channel-factory.ts b/src/channel/redis.channel-factory.ts index fd3fb4c..48e4088 100644 --- a/src/channel/redis.channel-factory.ts +++ b/src/channel/redis.channel-factory.ts @@ -1,11 +1,13 @@ import { RedisChannel } from './redis.channel'; -import {Injectable} from "@nestjs/common"; +import { Injectable } from '@nestjs/common'; import { ChannelFactory, IChannelFactory } from '@nestjstools/messaging'; import { RedisChannelConfig } from './redis.channel-config'; @Injectable() @ChannelFactory(RedisChannelConfig) -export class RedisChannelFactory implements IChannelFactory { +export class RedisChannelFactory + implements IChannelFactory +{ create(channelConfig: RedisChannelConfig): RedisChannel { return new RedisChannel(channelConfig); } diff --git a/src/channel/redis.channel.ts b/src/channel/redis.channel.ts index b6aee1b..72271de 100644 --- a/src/channel/redis.channel.ts +++ b/src/channel/redis.channel.ts @@ -9,8 +9,13 @@ export class RedisChannel extends Channel { constructor(config: RedisChannelConfig) { super(config); this.queue = new Queue(config.queue, { - connection: config.connectionOptions.redis, - prefix: config.connectionOptions.prefix, + connection: { + host: config.connection.host, + port: config.connection.port, + password: config.connection.password, + db: config.connection.db, + }, + prefix: config.connection.keyPrefix, }); } } diff --git a/src/consumer/redis-messaging.consumer.ts b/src/consumer/redis-messaging.consumer.ts index 80cc96f..5bfdfbc 100644 --- a/src/consumer/redis-messaging.consumer.ts +++ b/src/consumer/redis-messaging.consumer.ts @@ -26,8 +26,13 @@ export class RedisMessagingConsumer dispatcher.dispatch(new ConsumerMessage(job.data, job.name)); }, { - connection: channel.config.connectionOptions.redis, - prefix: channel.config.connectionOptions.prefix, + connection: { + host: channel.config.connection.host, + port: channel.config.connection.port, + password: channel.config.connection.password, + db: channel.config.connection.db, + }, + prefix: channel.config.connection.keyPrefix, }, );