1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
| package main
import (
"ac_file"
"coder"
"context"
"do_redis"
"do_time"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"strings"
"github.com/redis/go-redis/v9"
)
var (
cfgCommon []byte
redisDb redis.UniversalClient
redisDb_back redis.UniversalClient
ctx = context.Background()
nowTimestamp int64
)
func init() {
cfgCommon = ac_file.Get_serv_cfg_local_and_redis(cfgCommon) //从本机和redis按照策略更新和获取配置文件
redisDb = do_redis.GetRedisDb_Client(cfgCommon)
redisDb_back = do_redis.GetRedisBackDb_Client(cfgCommon)
go do_time.NowTimestamp(&nowTimestamp) //秒 时间戳定时器 避免程序多次调用
}
func main() {
Save_toJson("redis_db_date.json", redisDb)
//load_from_json("redis_db_date.json", redisDb)
load_from_json("redis_db_date.json", redisDb_back)
//fmt.Println("操作完成")
}
func load_from_json(filename string, db redis.UniversalClient) {
data, err := os.ReadFile(filename)
if err != nil {
fmt.Println("Error reading file:", err)
return
}
restore_redis_db(data, db)
}
func Save_toJson(filename string, db redis.UniversalClient) {
var redis_db_date []byte
// 判断客户端是否连接到 Redis 集群
if _, ok := db.(*redis.ClusterClient); ok {
// 获取集群节点信息
stateCmd := db.ClusterNodes(context.Background())
state, err := stateCmd.Result()
if err != nil {
log.Println(err)
}
redis_Clusterpsw := coder.JsonGetStr(cfgCommon, "redis", "db", "password") //单独配置一下密码
// 解析主节点信息
lines := strings.Split(state, "\n")
for _, line := range lines {
fields := strings.Fields(line)
if len(fields) > 2 && (fields[2] == "master" || fields[2] == "myself,master") {
nodeAddr := fields[1]
add := strings.Split(nodeAddr, "@")
masterClient := redis.NewClient(&redis.Options{
Addr: add[0],
Password: redis_Clusterpsw,
})
redis_db_dateThis := back_redis_db(masterClient)
var m1 = make(map[string]interface{})
var m2 = make(map[string]interface{})
if len(redis_db_date) > 0 {
err := json.Unmarshal(redis_db_date, &m1)
if err != nil {
log.Println("redis_db_date", err)
}
}
if len(redis_db_dateThis) > 0 {
err := json.Unmarshal(redis_db_dateThis, &m2)
if err != nil {
log.Println("redis_db_dateThis", add[0], err)
}
}
if len(m2) > 0 { // 合并两个映射
for k, v := range m2 {
m1[k] = v
}
}
mergedData, err := json.Marshal(m1) // 将合并后的映射重新编码为 JSON 字符串
if err != nil {
log.Println("mergedData", err)
} else {
redis_db_date = mergedData
}
masterClient.Close()
}
}
} else {
redis_db_date = back_redis_db(db)
}
var tmp_data interface{} // 创建一个空接口用于解析 JSON 数据
errJson := json.Unmarshal(redis_db_date, &tmp_data)
if errJson != nil {
fmt.Println("redis_db_date 解析 JSON 失败:", errJson)
} else {
formattedJSON, err := json.MarshalIndent(tmp_data, "", " ") // 格式化 JSON 数据
if err != nil {
fmt.Println("redis_db_date 格式化 JSON 失败:", err)
} else {
redis_db_date = formattedJSON
}
}
err := os.WriteFile(filename, redis_db_date, 0644)
if err != nil {
fmt.Println("Error writing file:", err)
return
}
}
func restore_redis_db(jsonData []byte, db redis.UniversalClient) {
// 解析导出的 JSON 数据
var importedData map[string]interface{}
err := json.Unmarshal(jsonData, &importedData)
if err != nil {
fmt.Println("Error unmarshalling JSON data:", err)
return
}
// 遍历导入的数据并根据类型将其存储到 Redis
for key, value := range importedData {
dataMap, ok := value.(map[string]interface{})
if !ok {
fmt.Printf("Invalid data format for key %s\n", key)
continue
}
dataType, ok := dataMap["type"].(string)
if !ok {
fmt.Printf("Invalid data type for key %s\n", key)
continue
}
dataValue, ok := dataMap["value"]
if !ok {
fmt.Printf("Invalid data value for key %s\n", key)
continue
}
switch dataType {
case "string":
err := db.Set(ctx, key, dataValue.(string), 0).Err()
if err != nil {
fmt.Printf("Error setting value for key %s: %v\n", key, err)
}
case "list":
err := db.RPush(ctx, key, dataValue.([]interface{})...).Err()
if err != nil {
fmt.Printf("Error pushing values to list key %s: %v\n", key, err)
}
case "hash":
hashValues, ok := dataValue.(map[string]interface{})
if !ok {
fmt.Printf("Invalid hash values for key %s\n", key)
continue
}
err := db.HMSet(ctx, key, hashValues).Err()
if err != nil {
fmt.Printf("Error setting hash values for key %s: %v\n", key, err)
}
case "set":
err := db.SAdd(ctx, key, dataValue.([]interface{})...).Err()
if err != nil {
fmt.Printf("Error adding values to set key %s: %v\n", key, err)
}
case "zset":
zsetValues, ok := dataValue.([]map[string]interface{})
if !ok {
fmt.Printf("Invalid sorted set values for key %s\n", key)
continue
}
var zsetEntries []redis.Z
for _, zsetValue := range zsetValues {
scoreStr, ok := zsetValue["score"].(string)
if !ok {
fmt.Printf("Invalid sorted set entry score for key %s\n", key)
continue
}
score, err := strconv.ParseFloat(scoreStr, 64)
if err != nil {
fmt.Printf("Error parsing score for key %s: %v\n", key, err)
continue
}
member, ok := zsetValue["value"].(string)
if !ok {
fmt.Printf("Invalid sorted set entry member for key %s\n", key)
continue
}
zsetEntries = append(zsetEntries, redis.Z{
Score: score,
Member: member,
})
}
err := db.ZAdd(ctx, key, zsetEntries...).Err()
if err != nil {
fmt.Printf("Error adding values to sorted set key %s: %v\n", key, err)
}
case "bitmaps":
bitmapsValues, ok := dataValue.(int64)
if !ok {
fmt.Printf("Invalid bitmaps values for key %s\n", key)
continue
}
err := db.SetBit(ctx, key, 0, int(bitmapsValues)).Err()
if err != nil {
fmt.Printf("Error setting bitmaps values for key %s: %v\n", key, err)
}
case "hyperloglogs":
hyperloglogsValues, ok := dataValue.([]interface{})
if !ok {
fmt.Printf("Invalid hyperloglogs values for key %s\n", key)
continue
}
err := db.PFAdd(ctx, key, hyperloglogsValues...).Err()
if err != nil {
fmt.Printf("Error setting hyperloglogs values for key %s: %v\n", key, err)
}
case "geospatial":
geospatialValues, ok := dataValue.(map[string]interface{})
if !ok {
fmt.Printf("Invalid geospatial values for key %s\n", key)
continue
}
var geoLocation []*redis.GeoLocation
for member, coordinates := range geospatialValues {
coordinateMap, ok := coordinates.(map[string]interface{})
if !ok {
fmt.Printf("Invalid geospatial coordinate for key %s\n", key)
continue
}
longitudeStr, ok := coordinateMap["longitude"].(string)
if !ok {
fmt.Printf("Invalid geospatial longitude for key %s\n", key)
continue
}
latitudeStr, ok := coordinateMap["latitude"].(string)
if !ok {
fmt.Printf("Invalid geospatial latitude for key %s\n", key)
continue
}
longitude, err := strconv.ParseFloat(longitudeStr, 64)
if err != nil {
fmt.Printf("Error parsing longitude for key %s: %v\n", key, err)
continue
}
latitude, err := strconv.ParseFloat(latitudeStr, 64)
if err != nil {
fmt.Printf("Error parsing latitude for key %s: %v\n", key, err)
continue
}
geoLocation = append(geoLocation, &redis.GeoLocation{
Name: member,
Longitude: longitude,
Latitude: latitude,
})
}
err := db.GeoAdd(ctx, key, geoLocation...).Err()
if err != nil {
fmt.Printf("Error adding values to geospatial key %s: %v\n", key, err)
}
case "bitfield":
bitfieldValues, ok := dataValue.(string)
if !ok {
fmt.Printf("Invalid bitfield values for key %s\n", key)
continue
}
err := db.BitField(ctx, key, bitfieldValues).Err()
if err != nil {
fmt.Printf("Error setting bitfield values for key %s: %v\n", key, err)
}
case "stream":
streamEntries, ok := dataValue.([]interface{})
if !ok {
fmt.Printf("Invalid stream entries for key %s\n", key)
continue
}
db.Del(ctx, key) // 这里需要根据情况判断是否应该先删掉 对应的key 不然原来key内stream有更大的id大的数据的情况下无法写入更小id的数据
for _, entry := range streamEntries {
entryMap, ok := entry.(map[string]interface{})
if !ok {
fmt.Printf("Invalid stream entry for key %s\n", key)
continue
}
streamID, ok := entryMap["ID"].(string)
if !ok {
fmt.Printf("Invalid stream ID for key %s\n", key)
continue
}
streamValues, ok := entryMap["Values"].(map[string]interface{})
if !ok {
fmt.Printf("Invalid values for stream entry with ID %s\n", streamID)
continue
}
err := db.XAdd(ctx, &redis.XAddArgs{
Stream: key,
ID: streamID,
Values: streamValues,
}).Err()
if err != nil {
fmt.Printf("Error adding entry to stream key %s: %v\n", key, err)
}
}
default:
fmt.Printf("Unsupported data type for key %s\n", key)
}
}
}
func back_redis_db(db redis.UniversalClient) (jsonData []byte) {
// 获取所有键
keys := db.Keys(ctx, "*").Val()
// 创建一个空的 map 用于存储结果
result := make(map[string]interface{})
// 遍历键并获取对应的值和类型
for _, key := range keys {
dataType := db.Type(ctx, key).Val()
// 根据不同的数据类型获取值
switch dataType {
case "string":
value := db.Get(ctx, key).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": value,
}
case "list":
values := db.LRange(ctx, key, 0, -1).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "hash":
values := db.HGetAll(ctx, key).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "set":
values := db.SMembers(ctx, key).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "zset":
values := db.ZRange(ctx, key, 0, -1).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "bitmaps":
value := db.GetBit(ctx, key, 0).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": value,
}
case "hyperloglogs":
value := db.PFCount(ctx, key).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": value,
}
case "geospatial":
values := db.GeoRadius(ctx, key, 0, 0, &redis.GeoRadiusQuery{Unit: "km"}).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "bitfield":
value := db.BitField(ctx, key, "GET", "u4", "0").Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": value,
}
case "stream":
streamValues := db.XRange(ctx, key, "-", "+").Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": streamValues,
}
default:
fmt.Printf("Unsupported data type for key %s\n", key)
}
}
// 将结果转换为 JSON 格式
jsonData, err := json.MarshalIndent(result, "", " ")
if err != nil {
fmt.Println("Error marshalling data to JSON:", err)
return
}
return
}
|