redis集群一键导入到json以及 从json导出
mq
实现 从redis单节点或者整个集群导出导入数据到json文件。
redis-dump 也可以有同样的功能的,他已经5年多不更新了 而且 只支持有限的数据类型,更不支持 Cluster。
没啥技术含量,花了30分钟写完。
也可以导入到mysql,文末有参考(未测试,这部分代码由chatGPT完成)
#特性和基本算法:
支持集群和单节点 和主从模式,因为我一直没用过哨兵集群,所以未测试,理论上也支持。
支持10种redis数据类型,详情看代码
因为主要是为了 cluster模式,从节点可能太多,所以数据直接从主节点导出。
如果是单机模式/主从/哨兵模式,会直接导出。如果是 cluster 模式,会先查询出主节点,然后依次连接主节点导出(不连接从节点 是因为从节点太多,再去每一个节点选一个从节点太复杂也没必要)
导出 : 先判断是不是集群,如果不是,直接扫描所有key 依次导出。如果是集群 找到主节点,依次扫描所有key 导出。
导入:直接按照类型导入即可,集群会自己分片
#代码
#一些代码里面没有的说明
do_redis 只是连接redis集群去了,GetRedisDb_Client 的 代码参考:
go
redis_addr_arr := strings.Split(redis_addr, ",")//redis_addr 是集群的每一个节点的ip:端口。如果是单节点 就一个即可
if len(redis_addr_arr) > 1 { //连接到集群
redisClient := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: redis_addr_arr,
Password: redis_psw,
})
}else{
redisClient := redis.NewClient(&redis.Options{ //连接到单点
Addr: redis_addr,
Password: redis_psw,
DB: 0,
})
}
redis_Clusterpsw ="12332434"
#完整代码
go
package main
import (
"ac_file"
"coder"
"context"
"do_redis"
"do_time"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"strings"
"github.com/redis/go-redis/v9"
)
var (
cfgCommon []byte
redisDb redis.UniversalClient
redisDb_back redis.UniversalClient
ctx = context.Background()
nowTimestamp int64
)
func init() {
cfgCommon = ac_file.Get_serv_cfg_local_and_redis(cfgCommon) //从本机和redis按照策略更新和获取配置文件
redisDb = do_redis.GetRedisDb_Client(cfgCommon)
redisDb_back = do_redis.GetRedisBackDb_Client(cfgCommon)
go do_time.NowTimestamp(&nowTimestamp) //秒 时间戳定时器 避免程序多次调用
}
func main() {
Save_toJson("redis_db_date.json", redisDb)
//load_from_json("redis_db_date.json", redisDb)
load_from_json("redis_db_date.json", redisDb_back)
//fmt.Println("操作完成")
}
func load_from_json(filename string, db redis.UniversalClient) {
data, err := os.ReadFile(filename)
if err != nil {
fmt.Println("Error reading file:", err)
return
}
restore_redis_db(data, db)
}
func Save_toJson(filename string, db redis.UniversalClient) {
var redis_db_date []byte
// 判断客户端是否连接到 Redis 集群
if _, ok := db.(*redis.ClusterClient); ok {
// 获取集群节点信息
stateCmd := db.ClusterNodes(context.Background())
state, err := stateCmd.Result()
if err != nil {
log.Println(err)
}
redis_Clusterpsw := coder.JsonGetStr(cfgCommon, "redis", "db", "password") //单独配置一下密码
// 解析主节点信息
lines := strings.Split(state, "\n")
for _, line := range lines {
fields := strings.Fields(line)
if len(fields) > 2 && (fields[2] == "master" || fields[2] == "myself,master") {
nodeAddr := fields[1]
add := strings.Split(nodeAddr, "@")
masterClient := redis.NewClient(&redis.Options{
Addr: add[0],
Password: redis_Clusterpsw,
})
redis_db_dateThis := back_redis_db(masterClient)
var m1 = make(map[string]interface{})
var m2 = make(map[string]interface{})
if len(redis_db_date) > 0 {
err := json.Unmarshal(redis_db_date, &m1)
if err != nil {
log.Println("redis_db_date", err)
}
}
if len(redis_db_dateThis) > 0 {
err := json.Unmarshal(redis_db_dateThis, &m2)
if err != nil {
log.Println("redis_db_dateThis", add[0], err)
}
}
if len(m2) > 0 { // 合并两个映射
for k, v := range m2 {
m1[k] = v
}
}
mergedData, err := json.Marshal(m1) // 将合并后的映射重新编码为 JSON 字符串
if err != nil {
log.Println("mergedData", err)
} else {
redis_db_date = mergedData
}
masterClient.Close()
}
}
} else {
redis_db_date = back_redis_db(db)
}
var tmp_data interface{} // 创建一个空接口用于解析 JSON 数据
errJson := json.Unmarshal(redis_db_date, &tmp_data)
if errJson != nil {
fmt.Println("redis_db_date 解析 JSON 失败:", errJson)
} else {
formattedJSON, err := json.MarshalIndent(tmp_data, "", " ") // 格式化 JSON 数据
if err != nil {
fmt.Println("redis_db_date 格式化 JSON 失败:", err)
} else {
redis_db_date = formattedJSON
}
}
err := os.WriteFile(filename, redis_db_date, 0644)
if err != nil {
fmt.Println("Error writing file:", err)
return
}
}
func restore_redis_db(jsonData []byte, db redis.UniversalClient) {
// 解析导出的 JSON 数据
var importedData map[string]interface{}
err := json.Unmarshal(jsonData, &importedData)
if err != nil {
fmt.Println("Error unmarshalling JSON data:", err)
return
}
// 遍历导入的数据并根据类型将其存储到 Redis
for key, value := range importedData {
dataMap, ok := value.(map[string]interface{})
if !ok {
fmt.Printf("Invalid data format for key %s\n", key)
continue
}
dataType, ok := dataMap["type"].(string)
if !ok {
fmt.Printf("Invalid data type for key %s\n", key)
continue
}
dataValue, ok := dataMap["value"]
if !ok {
fmt.Printf("Invalid data value for key %s\n", key)
continue
}
switch dataType {
case "string":
err := db.Set(ctx, key, dataValue.(string), 0).Err()
if err != nil {
fmt.Printf("Error setting value for key %s: %v\n", key, err)
}
case "list":
err := db.RPush(ctx, key, dataValue.([]interface{})...).Err()
if err != nil {
fmt.Printf("Error pushing values to list key %s: %v\n", key, err)
}
case "hash":
hashValues, ok := dataValue.(map[string]interface{})
if !ok {
fmt.Printf("Invalid hash values for key %s\n", key)
continue
}
err := db.HMSet(ctx, key, hashValues).Err()
if err != nil {
fmt.Printf("Error setting hash values for key %s: %v\n", key, err)
}
case "set":
err := db.SAdd(ctx, key, dataValue.([]interface{})...).Err()
if err != nil {
fmt.Printf("Error adding values to set key %s: %v\n", key, err)
}
case "zset":
zsetValues, ok := dataValue.([]map[string]interface{})
if !ok {
fmt.Printf("Invalid sorted set values for key %s\n", key)
continue
}
var zsetEntries []redis.Z
for _, zsetValue := range zsetValues {
scoreStr, ok := zsetValue["score"].(string)
if !ok {
fmt.Printf("Invalid sorted set entry score for key %s\n", key)
continue
}
score, err := strconv.ParseFloat(scoreStr, 64)
if err != nil {
fmt.Printf("Error parsing score for key %s: %v\n", key, err)
continue
}
member, ok := zsetValue["value"].(string)
if !ok {
fmt.Printf("Invalid sorted set entry member for key %s\n", key)
continue
}
zsetEntries = append(zsetEntries, redis.Z{
Score: score,
Member: member,
})
}
err := db.ZAdd(ctx, key, zsetEntries...).Err()
if err != nil {
fmt.Printf("Error adding values to sorted set key %s: %v\n", key, err)
}
case "bitmaps":
bitmapsValues, ok := dataValue.(int64)
if !ok {
fmt.Printf("Invalid bitmaps values for key %s\n", key)
continue
}
err := db.SetBit(ctx, key, 0, int(bitmapsValues)).Err()
if err != nil {
fmt.Printf("Error setting bitmaps values for key %s: %v\n", key, err)
}
case "hyperloglogs":
hyperloglogsValues, ok := dataValue.([]interface{})
if !ok {
fmt.Printf("Invalid hyperloglogs values for key %s\n", key)
continue
}
err := db.PFAdd(ctx, key, hyperloglogsValues...).Err()
if err != nil {
fmt.Printf("Error setting hyperloglogs values for key %s: %v\n", key, err)
}
case "geospatial":
geospatialValues, ok := dataValue.(map[string]interface{})
if !ok {
fmt.Printf("Invalid geospatial values for key %s\n", key)
continue
}
var geoLocation []*redis.GeoLocation
for member, coordinates := range geospatialValues {
coordinateMap, ok := coordinates.(map[string]interface{})
if !ok {
fmt.Printf("Invalid geospatial coordinate for key %s\n", key)
continue
}
longitudeStr, ok := coordinateMap["longitude"].(string)
if !ok {
fmt.Printf("Invalid geospatial longitude for key %s\n", key)
continue
}
latitudeStr, ok := coordinateMap["latitude"].(string)
if !ok {
fmt.Printf("Invalid geospatial latitude for key %s\n", key)
continue
}
longitude, err := strconv.ParseFloat(longitudeStr, 64)
if err != nil {
fmt.Printf("Error parsing longitude for key %s: %v\n", key, err)
continue
}
latitude, err := strconv.ParseFloat(latitudeStr, 64)
if err != nil {
fmt.Printf("Error parsing latitude for key %s: %v\n", key, err)
continue
}
geoLocation = append(geoLocation, &redis.GeoLocation{
Name: member,
Longitude: longitude,
Latitude: latitude,
})
}
err := db.GeoAdd(ctx, key, geoLocation...).Err()
if err != nil {
fmt.Printf("Error adding values to geospatial key %s: %v\n", key, err)
}
case "bitfield":
bitfieldValues, ok := dataValue.(string)
if !ok {
fmt.Printf("Invalid bitfield values for key %s\n", key)
continue
}
err := db.BitField(ctx, key, bitfieldValues).Err()
if err != nil {
fmt.Printf("Error setting bitfield values for key %s: %v\n", key, err)
}
case "stream":
streamEntries, ok := dataValue.([]interface{})
if !ok {
fmt.Printf("Invalid stream entries for key %s\n", key)
continue
}
db.Del(ctx, key) // 这里需要根据情况判断是否应该先删掉 对应的key 不然原来key内stream有更大的id大的数据的情况下无法写入更小id的数据
for _, entry := range streamEntries {
entryMap, ok := entry.(map[string]interface{})
if !ok {
fmt.Printf("Invalid stream entry for key %s\n", key)
continue
}
streamID, ok := entryMap["ID"].(string)
if !ok {
fmt.Printf("Invalid stream ID for key %s\n", key)
continue
}
streamValues, ok := entryMap["Values"].(map[string]interface{})
if !ok {
fmt.Printf("Invalid values for stream entry with ID %s\n", streamID)
continue
}
err := db.XAdd(ctx, &redis.XAddArgs{
Stream: key,
ID: streamID,
Values: streamValues,
}).Err()
if err != nil {
fmt.Printf("Error adding entry to stream key %s: %v\n", key, err)
}
}
default:
fmt.Printf("Unsupported data type for key %s\n", key)
}
}
}
func back_redis_db(db redis.UniversalClient) (jsonData []byte) {
// 获取所有键
keys := db.Keys(ctx, "*").Val()
// 创建一个空的 map 用于存储结果
result := make(map[string]interface{})
// 遍历键并获取对应的值和类型
for _, key := range keys {
dataType := db.Type(ctx, key).Val()
// 根据不同的数据类型获取值
switch dataType {
case "string":
value := db.Get(ctx, key).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": value,
}
case "list":
values := db.LRange(ctx, key, 0, -1).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "hash":
values := db.HGetAll(ctx, key).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "set":
values := db.SMembers(ctx, key).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "zset":
values := db.ZRange(ctx, key, 0, -1).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "bitmaps":
value := db.GetBit(ctx, key, 0).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": value,
}
case "hyperloglogs":
value := db.PFCount(ctx, key).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": value,
}
case "geospatial":
values := db.GeoRadius(ctx, key, 0, 0, &redis.GeoRadiusQuery{Unit: "km"}).Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": values,
}
case "bitfield":
value := db.BitField(ctx, key, "GET", "u4", "0").Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": value,
}
case "stream":
streamValues := db.XRange(ctx, key, "-", "+").Val()
result[key] = map[string]interface{}{
"type": dataType,
"value": streamValues,
}
default:
fmt.Printf("Unsupported data type for key %s\n", key)
}
}
// 将结果转换为 JSON 格式
jsonData, err := json.MarshalIndent(result, "", " ")
if err != nil {
fmt.Println("Error marshalling data to JSON:", err)
return
}
return
}
#导入导出到mysql
未经测试,也实在没必要
数据库结构
sql
CREATE TABLE mytable (
id INT AUTO_INCREMENT PRIMARY KEY,
`key` VARCHAR(255) NOT NULL,
`type` VARCHAR(50) NOT NULL,
value VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
代码部分
go
func back_redis_db() {
// 查询MySQL中的数据
rows, err := mysqlDb.Query("SELECT key, type, value FROM mytable")
if err != nil {
fmt.Println("Error querying MySQL:", err)
return
}
defer rows.Close()
// 遍历查询结果并根据类型将数据存储到Redis
for rows.Next() {
var key, dataType string
var value sql.NullString
err := rows.Scan(&key, &dataType, &value)
if err != nil {
fmt.Println("Error scanning MySQL rows:", err)
continue
}
if !value.Valid {
fmt.Printf("Invalid value for key %s\n", key)
continue
}
switch dataType {
case "string":
err := redisDb.Set(ctx, key, value.String, 0).Err()
if err != nil {
fmt.Printf("Error setting value for key %s: %v\n", key, err)
}
case "list":
// 查询MySQL中的列表值并将其存储到Redis
listValues, err := mysqlDb.Query("SELECT value FROM mytable WHERE key = ? AND type = ?", key, dataType)
if err != nil {
fmt.Printf("Error querying list values for key %s: %v\n", key, err)
continue
}
defer listValues.Close()
var values []interface{}
for listValues.Next() {
var v string
err := listValues.Scan(&v)
if err != nil {
fmt.Printf("Error scanning list values for key %s: %v\n", key, err)
continue
}
values = append(values, v)
}
err = redisDb.RPush(ctx, key, values...).Err()
if err != nil {
fmt.Printf("Error pushing values to list key %s: %v\n", key, err)
}
// 其他数据类型的处理类似...
default:
fmt.Printf("Unsupported data type for key %s\n", key)
}
}
err = rows.Err()
if err != nil {
fmt.Println("Error iterating MySQL rows:", err)
}
}
func restore_mysql_db(jsonData []byte) {
// 解析导出的 JSON 数据
var importedData map[string]interface{}
err := json.Unmarshal(jsonData, &importedData)
if err != nil {
fmt.Println("Error unmarshalling JSON data:", err)
return
}
// 遍历导入的数据并根据类型将其存储到MySQL
for key, value := range importedData {
dataMap, ok := value.(map[string]interface{})
if !ok {
fmt.Printf("Invalid data format for key %s\n", key)
continue
}
dataType, ok := dataMap["type"].(string)
if !ok {
fmt.Printf("Invalid data type for key %s\n", key)
continue
}
dataValue, ok := dataMap["value"]
if !ok {
fmt.Printf("Invalid data value for key %s\n", key)
continue
}
switch dataType {
case "string":
_, err = mysqlDb.Exec("INSERT INTO mytable (key, type, value) VALUES (?, ?, ?)", key, dataType, dataValue.(string))
if err != nil {
fmt.Printf("Error inserting value for key %s: %v\n", key, err)
}
case "list":
values := dataValue.([]interface{})
for _, v := range values {
_, err = mysqlDb.Exec("INSERT INTO mytable (key, type, value) VALUES (?, ?, ?)", key, dataType, v.(string))
if err != nil {
fmt.Printf("Error inserting value for key %s: %v\n", key, err)
}
}
// 其他数据类型的处理类似...
default:
fmt.Printf("Unsupported data type for key %s\n", key)
}
}
}