redis集群一键导入到json以及 从json导出

实现 从redis单节点或者整个集群导出导入数据到json文件。

redis-dump 也可以有同样的功能的,他已经5年多不更新了 而且 只支持有限的数据类型,更不支持 Cluster。

没啥技术含量,花了30分钟写完。

也可以导入到mysql,文末有参考(未测试,这部分代码由chatGPT完成)

特性和基本算法:

支持集群和单节点 和主从模式,因为我一直没用过哨兵集群,所以未测试,理论上也支持。
支持10种redis数据类型,详情看代码
因为主要是为了 cluster模式,从节点可能太多,所以数据直接从主节点导出。 如果是单机模式/主从/哨兵模式,会直接导出。如果是 cluster 模式,会先查询出主节点,然后依次连接主节点导出(不连接从节点 是因为从节点太多,再去每一个节点选一个从节点太复杂也没必要)
导出 : 先判断是不是集群,如果不是,直接扫描所有key 依次导出。如果是集群 找到主节点,依次扫描所有key 导出。
导入:直接按照类型导入即可,集群会自己分片

代码

一些代码里面没有的说明

do_redis 只是连接redis集群去了,GetRedisDb_Client 的 代码参考:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
redis_addr_arr := strings.Split(redis_addr, ",")//redis_addr 是集群的每一个节点的ip:端口。如果是单节点 就一个即可
if len(redis_addr_arr) > 1 { //连接到集群
		redisClient := redis.NewClusterClient(&redis.ClusterOptions{
			Addrs:    redis_addr_arr,
			Password: redis_psw,
		})
}else{
	redisClient := redis.NewClient(&redis.Options{ //连接到单点
		Addr:     redis_addr,
		Password: redis_psw,
		DB:       0,
	})
}
redis_Clusterpsw ="12332434"

完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
package main

import (
	"ac_file"
	"coder"
	"context"
	"do_redis"
	"do_time"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"strconv"
	"strings"

	"github.com/redis/go-redis/v9"
)

var (
	cfgCommon    []byte
	redisDb      redis.UniversalClient
	redisDb_back redis.UniversalClient
	ctx          = context.Background()
	nowTimestamp int64
)

func init() {
	cfgCommon = ac_file.Get_serv_cfg_local_and_redis(cfgCommon) //从本机和redis按照策略更新和获取配置文件
	redisDb = do_redis.GetRedisDb_Client(cfgCommon)
	redisDb_back = do_redis.GetRedisBackDb_Client(cfgCommon)
	go do_time.NowTimestamp(&nowTimestamp) //秒 时间戳定时器 避免程序多次调用

}
func main() {
	Save_toJson("redis_db_date.json", redisDb)
	//load_from_json("redis_db_date.json", redisDb)
	load_from_json("redis_db_date.json", redisDb_back)

	//fmt.Println("操作完成")

}

func load_from_json(filename string, db redis.UniversalClient) {

	data, err := os.ReadFile(filename)
	if err != nil {
		fmt.Println("Error reading file:", err)
		return
	}
	restore_redis_db(data, db)
}

func Save_toJson(filename string, db redis.UniversalClient) {
	var redis_db_date []byte

	// 判断客户端是否连接到 Redis 集群

	if _, ok := db.(*redis.ClusterClient); ok {
		// 获取集群节点信息
		stateCmd := db.ClusterNodes(context.Background())
		state, err := stateCmd.Result()
		if err != nil {
			log.Println(err)
		}
		redis_Clusterpsw := coder.JsonGetStr(cfgCommon, "redis", "db", "password") //单独配置一下密码
		// 解析主节点信息
		lines := strings.Split(state, "\n")
		for _, line := range lines {
			fields := strings.Fields(line)
			if len(fields) > 2 && (fields[2] == "master" || fields[2] == "myself,master") {
				nodeAddr := fields[1]
				add := strings.Split(nodeAddr, "@")
				masterClient := redis.NewClient(&redis.Options{
					Addr:     add[0],
					Password: redis_Clusterpsw,
				})

				redis_db_dateThis := back_redis_db(masterClient)
				var m1 = make(map[string]interface{})
				var m2 = make(map[string]interface{})
				if len(redis_db_date) > 0 {
					err := json.Unmarshal(redis_db_date, &m1)
					if err != nil {
						log.Println("redis_db_date", err)
					}
				}
				if len(redis_db_dateThis) > 0 {
					err := json.Unmarshal(redis_db_dateThis, &m2)
					if err != nil {
						log.Println("redis_db_dateThis", add[0], err)
					}
				}
				if len(m2) > 0 { // 合并两个映射
					for k, v := range m2 {
						m1[k] = v
					}
				}

				mergedData, err := json.Marshal(m1) // 将合并后的映射重新编码为 JSON 字符串

				if err != nil {
					log.Println("mergedData", err)
				} else {
					redis_db_date = mergedData
				}
				masterClient.Close()

			}
		}

	} else {
		redis_db_date = back_redis_db(db)
	}

	var tmp_data interface{} // 创建一个空接口用于解析 JSON 数据
	errJson := json.Unmarshal(redis_db_date, &tmp_data)
	if errJson != nil {
		fmt.Println("redis_db_date 解析 JSON 失败:", errJson)
	} else {
		formattedJSON, err := json.MarshalIndent(tmp_data, "", "  ") // 格式化 JSON 数据
		if err != nil {
			fmt.Println("redis_db_date  格式化 JSON 失败:", err)
		} else {
			redis_db_date = formattedJSON
		}
	}

	err := os.WriteFile(filename, redis_db_date, 0644)
	if err != nil {
		fmt.Println("Error writing file:", err)
		return
	}
}

func restore_redis_db(jsonData []byte, db redis.UniversalClient) {
	// 解析导出的 JSON 数据
	var importedData map[string]interface{}
	err := json.Unmarshal(jsonData, &importedData)
	if err != nil {
		fmt.Println("Error unmarshalling JSON data:", err)
		return
	}

	// 遍历导入的数据并根据类型将其存储到 Redis
	for key, value := range importedData {
		dataMap, ok := value.(map[string]interface{})
		if !ok {
			fmt.Printf("Invalid data format for key %s\n", key)
			continue
		}

		dataType, ok := dataMap["type"].(string)
		if !ok {
			fmt.Printf("Invalid data type for key %s\n", key)
			continue
		}

		dataValue, ok := dataMap["value"]
		if !ok {
			fmt.Printf("Invalid data value for key %s\n", key)
			continue
		}

		switch dataType {
		case "string":
			err := db.Set(ctx, key, dataValue.(string), 0).Err()
			if err != nil {
				fmt.Printf("Error setting value for key %s: %v\n", key, err)
			}
		case "list":
			err := db.RPush(ctx, key, dataValue.([]interface{})...).Err()
			if err != nil {
				fmt.Printf("Error pushing values to list key %s: %v\n", key, err)
			}
		case "hash":
			hashValues, ok := dataValue.(map[string]interface{})
			if !ok {
				fmt.Printf("Invalid hash values for key %s\n", key)
				continue
			}

			err := db.HMSet(ctx, key, hashValues).Err()
			if err != nil {
				fmt.Printf("Error setting hash values for key %s: %v\n", key, err)
			}
		case "set":
			err := db.SAdd(ctx, key, dataValue.([]interface{})...).Err()
			if err != nil {
				fmt.Printf("Error adding values to set key %s: %v\n", key, err)
			}
		case "zset":
			zsetValues, ok := dataValue.([]map[string]interface{})
			if !ok {
				fmt.Printf("Invalid sorted set values for key %s\n", key)
				continue
			}

			var zsetEntries []redis.Z
			for _, zsetValue := range zsetValues {
				scoreStr, ok := zsetValue["score"].(string)
				if !ok {
					fmt.Printf("Invalid sorted set entry score for key %s\n", key)
					continue
				}

				score, err := strconv.ParseFloat(scoreStr, 64)
				if err != nil {
					fmt.Printf("Error parsing score for key %s: %v\n", key, err)
					continue
				}

				member, ok := zsetValue["value"].(string)
				if !ok {
					fmt.Printf("Invalid sorted set entry member for key %s\n", key)
					continue
				}

				zsetEntries = append(zsetEntries, redis.Z{
					Score:  score,
					Member: member,
				})
			}

			err := db.ZAdd(ctx, key, zsetEntries...).Err()
			if err != nil {
				fmt.Printf("Error adding values to sorted set key %s: %v\n", key, err)
			}

		case "bitmaps":
			bitmapsValues, ok := dataValue.(int64)
			if !ok {
				fmt.Printf("Invalid bitmaps values for key %s\n", key)
				continue
			}

			err := db.SetBit(ctx, key, 0, int(bitmapsValues)).Err()
			if err != nil {
				fmt.Printf("Error setting bitmaps values for key %s: %v\n", key, err)
			}
		case "hyperloglogs":
			hyperloglogsValues, ok := dataValue.([]interface{})
			if !ok {
				fmt.Printf("Invalid hyperloglogs values for key %s\n", key)
				continue
			}

			err := db.PFAdd(ctx, key, hyperloglogsValues...).Err()
			if err != nil {
				fmt.Printf("Error setting hyperloglogs values for key %s: %v\n", key, err)
			}
		case "geospatial":
			geospatialValues, ok := dataValue.(map[string]interface{})
			if !ok {
				fmt.Printf("Invalid geospatial values for key %s\n", key)
				continue
			}

			var geoLocation []*redis.GeoLocation
			for member, coordinates := range geospatialValues {
				coordinateMap, ok := coordinates.(map[string]interface{})
				if !ok {
					fmt.Printf("Invalid geospatial coordinate for key %s\n", key)
					continue
				}

				longitudeStr, ok := coordinateMap["longitude"].(string)
				if !ok {
					fmt.Printf("Invalid geospatial longitude for key %s\n", key)
					continue
				}

				latitudeStr, ok := coordinateMap["latitude"].(string)
				if !ok {
					fmt.Printf("Invalid geospatial latitude for key %s\n", key)
					continue
				}

				longitude, err := strconv.ParseFloat(longitudeStr, 64)
				if err != nil {
					fmt.Printf("Error parsing longitude for key %s: %v\n", key, err)
					continue
				}

				latitude, err := strconv.ParseFloat(latitudeStr, 64)
				if err != nil {
					fmt.Printf("Error parsing latitude for key %s: %v\n", key, err)
					continue
				}

				geoLocation = append(geoLocation, &redis.GeoLocation{
					Name:      member,
					Longitude: longitude,
					Latitude:  latitude,
				})
			}

			err := db.GeoAdd(ctx, key, geoLocation...).Err()
			if err != nil {
				fmt.Printf("Error adding values to geospatial key %s: %v\n", key, err)
			}
		case "bitfield":
			bitfieldValues, ok := dataValue.(string)
			if !ok {
				fmt.Printf("Invalid bitfield values for key %s\n", key)
				continue
			}

			err := db.BitField(ctx, key, bitfieldValues).Err()
			if err != nil {
				fmt.Printf("Error setting bitfield values for key %s: %v\n", key, err)
			}
		case "stream":
			streamEntries, ok := dataValue.([]interface{})
			if !ok {
				fmt.Printf("Invalid stream entries for key %s\n", key)
				continue
			}

			db.Del(ctx, key) // 这里需要根据情况判断是否应该先删掉  对应的key  不然原来key内stream有更大的id大的数据的情况下无法写入更小id的数据
			for _, entry := range streamEntries {
				entryMap, ok := entry.(map[string]interface{})
				if !ok {
					fmt.Printf("Invalid stream entry for key %s\n", key)
					continue
				}

				streamID, ok := entryMap["ID"].(string)
				if !ok {
					fmt.Printf("Invalid stream ID for key %s\n", key)
					continue
				}

				streamValues, ok := entryMap["Values"].(map[string]interface{})
				if !ok {
					fmt.Printf("Invalid values for stream entry with ID %s\n", streamID)
					continue
				}

				err := db.XAdd(ctx, &redis.XAddArgs{
					Stream: key,
					ID:     streamID,
					Values: streamValues,
				}).Err()
				if err != nil {
					fmt.Printf("Error adding entry to stream key %s: %v\n", key, err)
				}
			}

		default:
			fmt.Printf("Unsupported data type for key %s\n", key)
		}
	}
}

func back_redis_db(db redis.UniversalClient) (jsonData []byte) {

	// 获取所有键
	keys := db.Keys(ctx, "*").Val()
	// 创建一个空的 map 用于存储结果
	result := make(map[string]interface{})

	// 遍历键并获取对应的值和类型
	for _, key := range keys {
		dataType := db.Type(ctx, key).Val()

		// 根据不同的数据类型获取值
		switch dataType {
		case "string":
			value := db.Get(ctx, key).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": value,
			}
		case "list":
			values := db.LRange(ctx, key, 0, -1).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}
		case "hash":
			values := db.HGetAll(ctx, key).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}
		case "set":
			values := db.SMembers(ctx, key).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}
		case "zset":
			values := db.ZRange(ctx, key, 0, -1).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}

		case "bitmaps":
			value := db.GetBit(ctx, key, 0).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": value,
			}
		case "hyperloglogs":
			value := db.PFCount(ctx, key).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": value,
			}
		case "geospatial":
			values := db.GeoRadius(ctx, key, 0, 0, &redis.GeoRadiusQuery{Unit: "km"}).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}
		case "bitfield":
			value := db.BitField(ctx, key, "GET", "u4", "0").Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": value,
			}
		case "stream":
			streamValues := db.XRange(ctx, key, "-", "+").Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": streamValues,
			}

		default:
			fmt.Printf("Unsupported data type for key %s\n", key)
		}
	}

	// 将结果转换为 JSON 格式
	jsonData, err := json.MarshalIndent(result, "", "  ")
	if err != nil {
		fmt.Println("Error marshalling data to JSON:", err)
		return
	}

	return
}

导入导出到mysql

未经测试,也实在没必要

数据库结构

1
2
3
4
5
6
7
CREATE TABLE mytable (
    id INT AUTO_INCREMENT PRIMARY KEY,
    `key` VARCHAR(255) NOT NULL,
    `type` VARCHAR(50) NOT NULL,
    value VARCHAR(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

代码部分

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
func back_redis_db() {
	// 查询MySQL中的数据
	rows, err := mysqlDb.Query("SELECT key, type, value FROM mytable")
	if err != nil {
		fmt.Println("Error querying MySQL:", err)
		return
	}
	defer rows.Close()
	
	// 遍历查询结果并根据类型将数据存储到Redis
	for rows.Next() {
		var key, dataType string
		var value sql.NullString
		err := rows.Scan(&key, &dataType, &value)
		if err != nil {
			fmt.Println("Error scanning MySQL rows:", err)
			continue
		}
		
		if !value.Valid {
			fmt.Printf("Invalid value for key %s\n", key)
			continue
		}
		
		switch dataType {
		case "string":
			err := redisDb.Set(ctx, key, value.String, 0).Err()
			if err != nil {
				fmt.Printf("Error setting value for key %s: %v\n", key, err)
			}
		case "list":
			// 查询MySQL中的列表值并将其存储到Redis
			listValues, err := mysqlDb.Query("SELECT value FROM mytable WHERE key = ? AND type = ?", key, dataType)
			if err != nil {
				fmt.Printf("Error querying list values for key %s: %v\n", key, err)
				continue
			}
			defer listValues.Close()

			var values []interface{}
			for listValues.Next() {
				var v string
				err := listValues.Scan(&v)
				if err != nil {
					fmt.Printf("Error scanning list values for key %s: %v\n", key, err)
					continue
				}
				values = append(values, v)
			}

			err = redisDb.RPush(ctx, key, values...).Err()
			if err != nil {
				fmt.Printf("Error pushing values to list key %s: %v\n", key, err)
			}
		// 其他数据类型的处理类似...
		default:
			fmt.Printf("Unsupported data type for key %s\n", key)
		}
	}
	
	err = rows.Err()
	if err != nil {
		fmt.Println("Error iterating MySQL rows:", err)
	}
}

func restore_mysql_db(jsonData []byte) {
	// 解析导出的 JSON 数据
	var importedData map[string]interface{}
	err := json.Unmarshal(jsonData, &importedData)
	if err != nil {
		fmt.Println("Error unmarshalling JSON data:", err)
		return
	}
	
	// 遍历导入的数据并根据类型将其存储到MySQL
	for key, value := range importedData {
		dataMap, ok := value.(map[string]interface{})
		if !ok {
			fmt.Printf("Invalid data format for key %s\n", key)
			continue
		}
		
		dataType, ok := dataMap["type"].(string)
		if !ok {
			fmt.Printf("Invalid data type for key %s\n", key)
			continue
		}
		
		dataValue, ok := dataMap["value"]
		if !ok {
			fmt.Printf("Invalid data value for key %s\n", key)
			continue
		}
		
		switch dataType {
		case "string":
			_, err = mysqlDb.Exec("INSERT INTO mytable (key, type, value) VALUES (?, ?, ?)", key, dataType, dataValue.(string))
			if err != nil {
				fmt.Printf("Error inserting value for key %s: %v\n", key, err)
			}
		case "list":
			values := dataValue.([]interface{})
			for _, v := range values {
				_, err = mysqlDb.Exec("INSERT INTO mytable (key, type, value) VALUES (?, ?, ?)", key, dataType, v.(string))
				if err != nil {
					fmt.Printf("Error inserting value for key %s: %v\n", key, err)
				}
			}
		// 其他数据类型的处理类似...
		default:
			fmt.Printf("Unsupported data type for key %s\n", key)
		}
	}
}
Licensed under CC BY-NC-SA 4.0
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计