1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
| package main
import (
"ac_file"
"context"
"do_redis"
"do_time"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var (
cfgCommon []byte
redisDb redis.UniversalClient
ctx = context.Background()
)
func init() {
cfgCommon = ac_file.Get_serv_cfg_local_and_redis(cfgCommon) //从本机和redis按照策略更新和获取配置文件
redisDb = do_redis.GetRedisDb_Client(cfgCommon)
}
func main() {
// 发布消息到指定 Stream
_, err := redisDb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream", // Stream 名称
Values: map[string]interface{}{
"key1": "value1",
"key2": "value2",
},
}).Result()
if err != nil {
fmt.Println("Failed to publish message:", err)
return
}
// 创建消费者组
groupName := "mygroup"
// _, err := redisDb.XGroupCreateMkStream(ctx, "mystream", groupName, "$").Result()
//_, err = redisDb.XGroupCreate(ctx, "mystream", groupName, "$").Result() //$ 是从末尾开始接受
_, err = redisDb.XGroupCreate(ctx, "mystream", groupName, "0").Result() //0 是从头开始
if err != nil {
fmt.Println("Failed to create consumer group:", err)
}
time.Sleep(2 * time.Second)
// 创建一个新的消息处理协程
go sub_get()
// 在主协程中等待退出信号
select {}
}
func sub_get() {
for {
pubsub := redisDb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "mygroup",
Consumer: "test",
Streams: []string{"mystream", ">"},
//Streams: []string{"mystream", "0-0"},
Count: 2,
Block: 0,
})
messages, err := pubsub.Result()
if err != nil {
fmt.Println("Failed to read from stream:", err)
continue
}
for _, msg := range messages[0].Messages {
fmt.Printf("Consumer %s received message from Stream %s with ID: %s, data: %v\n",
"test", messages[0].Stream, msg.ID, msg.Values)
// 确认已处理消息
_, err := redisDb.XAck(ctx, "mystream", "mygroup", msg.ID).Result()
if err != nil {
fmt.Println("Failed to acknowledge message:", err)
}
/*
else {
// 从 messages 切片中移除已处理的消息
messages[0].Messages = messages[0].Messages[1:]
}
*/
}
}
}
|