Skip to content

Commit a0d9e92

Browse files
sincereflyXudong Liu
andauthored
feat: 支持 Redis Stream 消息队列 (#9)
* feat: 支持 Redis Stream 消息队列 * 优化代码取消注释及错误返回 * fix: 修复部分问题 * fix: 修复 Redis 队列不支持多Consumer 问题并优化 Pending 消息取值流程 * fix: 从外部传入 redis 实例,支持集群模式 * fix: 依赖 redis 从 v9 降级到 v8 * fix: 随机生成消费者名称 * fix: 传入 Redis Options 初始化, 使用 v9 库, 支持传入 global_idle, 私有函数名 * fix: 去掉了 redisService 接口, 支持部分 Redis Options 参数 * fix: 增加了连接池大小参数 --------- Co-authored-by: Xudong Liu <xudong.liu@ewp-group.com>
1 parent fab4276 commit a0d9e92

File tree

10 files changed

+616
-15
lines changed

10 files changed

+616
-15
lines changed

cloud/common.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,21 @@ import (
1313
type Provider string
1414

1515
const (
16-
AWSProvider Provider = "aws"
17-
TencentCloudProvider Provider = "tencentcloud"
16+
AWSProvider Provider = "aws"
17+
TencentCloudProvider Provider = "tencentcloud"
18+
StandaloneRedisProvider Provider = "standalone_redis"
19+
ClusterRedisProvider Provider = "cluster_redis"
1820
)
1921

2022
var (
21-
ErrUnsupportedCloudProvider = fmt.Errorf("unsupported provider, only support %s and %s", AWSProvider, TencentCloudProvider)
22-
ErrProviderNotTencentCloud = errors.New("provider is not tencentcloud")
23-
ErrProviderNotAWS = errors.New("provider is not aws")
24-
ErrEmptySecretID = errors.New("secret_id is empty")
25-
ErrEmptySecretKey = errors.New("secret_key is empty")
26-
ErrEmptyRegion = errors.New("region is empty")
23+
ErrUnsupportedCloudProvider = fmt.Errorf("unsupported provider, only support %s, %s and %s", AWSProvider, TencentCloudProvider, StandaloneRedisProvider)
24+
ErrProviderNotTencentCloud = errors.New("provider is not tencentcloud")
25+
ErrProviderNotAWS = errors.New("provider is not aws")
26+
ErrProviderNotStandaloneRedis = errors.New("provider is not standalone redis")
27+
ErrProviderNotClusterRedis = errors.New("provider is not cluster redis")
28+
ErrEmptySecretID = errors.New("secret_id is empty")
29+
ErrEmptySecretKey = errors.New("secret_key is empty")
30+
ErrEmptyRegion = errors.New("region is empty")
2731
)
2832

2933
type Option interface {
@@ -35,6 +39,8 @@ type Option interface {
3539
GetAssumeRegion() string
3640
CheckAWS() error
3741
CheckTencentCloud() error
42+
CheckStandaloneRedis() error
43+
CheckClusterRedis() error
3844
}
3945

4046
type CommonOption struct {
@@ -104,6 +110,14 @@ func (option CommonOption) CheckTencentCloud() error {
104110
return option.check()
105111
}
106112

113+
func (option CommonOption) CheckStandaloneRedis() error {
114+
return ErrProviderNotStandaloneRedis
115+
}
116+
117+
func (option CommonOption) CheckClusterRedis() error {
118+
return ErrProviderNotClusterRedis
119+
}
120+
107121
// AwsNewSession
108122
func AwsNewSession(option Option) (*session.Session, *aws.Config, error) {
109123
var creds *credentials.Credentials

cloud/examples/queue/queue.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,36 @@ import (
99
"github.com/byte-power/gorich/cloud/queue"
1010
)
1111

12+
// Configure Addr/Addrs, Password to run standalone/cluster redis example.
1213
// Configure token, url, topic_name and subscription_name to run tencentcloud example.
1314
// Configure secret_id, secret_key, region, and queue_name to run this example.
1415
func main() {
16+
17+
// Redis 单节点
18+
//optionForBaseRedis := queue.StandaloneRedisQueueOption{
19+
// Addr: "localhost:6379",
20+
// Password: "",
21+
// ConsumerGroup: "save_task_consumer_group",
22+
// Idle: 10,
23+
//}
24+
25+
// Redis 集群
26+
optionForBaseRedis := queue.ClusterRedisQueueOption{
27+
Addrs: []string{
28+
"localhost:7000",
29+
"localhost:7001",
30+
"localhost:7002",
31+
"localhost:7003",
32+
"localhost:7004",
33+
"localhost:7005",
34+
},
35+
Password: "",
36+
ConsumerGroup: "save_task_consumer_group",
37+
Idle: 10,
38+
}
39+
40+
queue_examples("test_queue_name", optionForBaseRedis)
41+
1542
optionForTencentCloud := queue.TencentCloudQueueOption{
1643
Token: "access_jwt_token_xxx",
1744
URL: "http://pulsar-xxxxxxxxx.tdmq.ap-gz.public.tencenttdmq.com:8080",

0 commit comments

Comments
 (0)