golang stream 和redis之间实现订阅和发送的

简单的实现,但是完成了 发布和订阅 ack 的操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main

import (
	"ac_file"
	"context"
	"do_redis"
	"do_time"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var (
	cfgCommon    []byte
	redisDb      redis.UniversalClient
	ctx          = context.Background()
)

func init() {
	cfgCommon = ac_file.Get_serv_cfg_local_and_redis(cfgCommon) //从本机和redis按照策略更新和获取配置文件
	redisDb = do_redis.GetRedisDb_Client(cfgCommon)
}
func main() {

	// 发布消息到指定 Stream
	_, err := redisDb.XAdd(ctx, &redis.XAddArgs{
		Stream: "mystream", // Stream 名称
		Values: map[string]interface{}{
			"key1": "value1",
			"key2": "value2",
		},
	}).Result()
	if err != nil {
		fmt.Println("Failed to publish message:", err)
		return
	}

	// 创建消费者组
	groupName := "mygroup"
	//	_, err := redisDb.XGroupCreateMkStream(ctx, "mystream", groupName, "$").Result()
	//_, err = redisDb.XGroupCreate(ctx, "mystream", groupName, "$").Result() //$ 是从末尾开始接受
	_, err = redisDb.XGroupCreate(ctx, "mystream", groupName, "0").Result() //0 是从头开始
	if err != nil {
		fmt.Println("Failed to create consumer group:", err)
	}
	time.Sleep(2 * time.Second)

	// 创建一个新的消息处理协程
	go sub_get()

	// 在主协程中等待退出信号
	select {}
}

func sub_get() {

	for {
		pubsub := redisDb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    "mygroup",
			Consumer: "test",
			Streams:  []string{"mystream", ">"},
			//Streams: []string{"mystream", "0-0"},
			Count: 2,
			Block: 0,
		})
		messages, err := pubsub.Result()
		if err != nil {
			fmt.Println("Failed to read from stream:", err)
			continue
		}
		for _, msg := range messages[0].Messages {
			fmt.Printf("Consumer %s received message from Stream %s with ID: %s, data: %v\n",
				"test", messages[0].Stream, msg.ID, msg.Values)

			// 确认已处理消息
			_, err := redisDb.XAck(ctx, "mystream", "mygroup", msg.ID).Result()
			if err != nil {
				fmt.Println("Failed to acknowledge message:", err)
			}
			/*
				else {
					// 从 messages 切片中移除已处理的消息
					messages[0].Messages = messages[0].Messages[1:]
				}
			*/
		}
	}
}
Licensed under CC BY-NC-SA 4.0
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计