redis集群一键导入到json以及 从json导出

mq

实现 从redis单节点或者整个集群导出导入数据到json文件。

redis-dump 也可以有同样的功能的,他已经5年多不更新了 而且 只支持有限的数据类型,更不支持 Cluster。

没啥技术含量,花了30分钟写完。

也可以导入到mysql,文末有参考(未测试,这部分代码由chatGPT完成)

#特性和基本算法:

支持集群和单节点 和主从模式,因为我一直没用过哨兵集群,所以未测试,理论上也支持。
支持10种redis数据类型,详情看代码
因为主要是为了 cluster模式,从节点可能太多,所以数据直接从主节点导出。
如果是单机模式/主从/哨兵模式,会直接导出。如果是 cluster 模式,会先查询出主节点,然后依次连接主节点导出(不连接从节点 是因为从节点太多,再去每一个节点选一个从节点太复杂也没必要)
导出 : 先判断是不是集群,如果不是,直接扫描所有key 依次导出。如果是集群 找到主节点,依次扫描所有key 导出。
导入:直接按照类型导入即可,集群会自己分片

#代码

#一些代码里面没有的说明

do_redis 只是连接redis集群去了,GetRedisDb_Client 的 代码参考:

go
redis_addr_arr := strings.Split(redis_addr, ",")//redis_addr 是集群的每一个节点的ip:端口。如果是单节点 就一个即可
if len(redis_addr_arr) > 1 { //连接到集群
		redisClient := redis.NewClusterClient(&redis.ClusterOptions{
			Addrs:    redis_addr_arr,
			Password: redis_psw,
		})
}else{
	redisClient := redis.NewClient(&redis.Options{ //连接到单点
		Addr:     redis_addr,
		Password: redis_psw,
		DB:       0,
	})
}
redis_Clusterpsw ="12332434"

#完整代码

go
package main

import (
	"ac_file"
	"coder"
	"context"
	"do_redis"
	"do_time"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"strconv"
	"strings"

	"github.com/redis/go-redis/v9"
)

var (
	cfgCommon    []byte
	redisDb      redis.UniversalClient
	redisDb_back redis.UniversalClient
	ctx          = context.Background()
	nowTimestamp int64
)

func init() {
	cfgCommon = ac_file.Get_serv_cfg_local_and_redis(cfgCommon) //从本机和redis按照策略更新和获取配置文件
	redisDb = do_redis.GetRedisDb_Client(cfgCommon)
	redisDb_back = do_redis.GetRedisBackDb_Client(cfgCommon)
	go do_time.NowTimestamp(&nowTimestamp) //秒 时间戳定时器 避免程序多次调用

}
func main() {
	Save_toJson("redis_db_date.json", redisDb)
	//load_from_json("redis_db_date.json", redisDb)
	load_from_json("redis_db_date.json", redisDb_back)

	//fmt.Println("操作完成")

}

func load_from_json(filename string, db redis.UniversalClient) {

	data, err := os.ReadFile(filename)
	if err != nil {
		fmt.Println("Error reading file:", err)
		return
	}
	restore_redis_db(data, db)
}

func Save_toJson(filename string, db redis.UniversalClient) {
	var redis_db_date []byte

	// 判断客户端是否连接到 Redis 集群

	if _, ok := db.(*redis.ClusterClient); ok {
		// 获取集群节点信息
		stateCmd := db.ClusterNodes(context.Background())
		state, err := stateCmd.Result()
		if err != nil {
			log.Println(err)
		}
		redis_Clusterpsw := coder.JsonGetStr(cfgCommon, "redis", "db", "password") //单独配置一下密码
		// 解析主节点信息
		lines := strings.Split(state, "\n")
		for _, line := range lines {
			fields := strings.Fields(line)
			if len(fields) > 2 && (fields[2] == "master" || fields[2] == "myself,master") {
				nodeAddr := fields[1]
				add := strings.Split(nodeAddr, "@")
				masterClient := redis.NewClient(&redis.Options{
					Addr:     add[0],
					Password: redis_Clusterpsw,
				})

				redis_db_dateThis := back_redis_db(masterClient)
				var m1 = make(map[string]interface{})
				var m2 = make(map[string]interface{})
				if len(redis_db_date) > 0 {
					err := json.Unmarshal(redis_db_date, &m1)
					if err != nil {
						log.Println("redis_db_date", err)
					}
				}
				if len(redis_db_dateThis) > 0 {
					err := json.Unmarshal(redis_db_dateThis, &m2)
					if err != nil {
						log.Println("redis_db_dateThis", add[0], err)
					}
				}
				if len(m2) > 0 { // 合并两个映射
					for k, v := range m2 {
						m1[k] = v
					}
				}

				mergedData, err := json.Marshal(m1) // 将合并后的映射重新编码为 JSON 字符串

				if err != nil {
					log.Println("mergedData", err)
				} else {
					redis_db_date = mergedData
				}
				masterClient.Close()

			}
		}

	} else {
		redis_db_date = back_redis_db(db)
	}

	var tmp_data interface{} // 创建一个空接口用于解析 JSON 数据
	errJson := json.Unmarshal(redis_db_date, &tmp_data)
	if errJson != nil {
		fmt.Println("redis_db_date 解析 JSON 失败:", errJson)
	} else {
		formattedJSON, err := json.MarshalIndent(tmp_data, "", "  ") // 格式化 JSON 数据
		if err != nil {
			fmt.Println("redis_db_date  格式化 JSON 失败:", err)
		} else {
			redis_db_date = formattedJSON
		}
	}

	err := os.WriteFile(filename, redis_db_date, 0644)
	if err != nil {
		fmt.Println("Error writing file:", err)
		return
	}
}

func restore_redis_db(jsonData []byte, db redis.UniversalClient) {
	// 解析导出的 JSON 数据
	var importedData map[string]interface{}
	err := json.Unmarshal(jsonData, &importedData)
	if err != nil {
		fmt.Println("Error unmarshalling JSON data:", err)
		return
	}

	// 遍历导入的数据并根据类型将其存储到 Redis
	for key, value := range importedData {
		dataMap, ok := value.(map[string]interface{})
		if !ok {
			fmt.Printf("Invalid data format for key %s\n", key)
			continue
		}

		dataType, ok := dataMap["type"].(string)
		if !ok {
			fmt.Printf("Invalid data type for key %s\n", key)
			continue
		}

		dataValue, ok := dataMap["value"]
		if !ok {
			fmt.Printf("Invalid data value for key %s\n", key)
			continue
		}

		switch dataType {
		case "string":
			err := db.Set(ctx, key, dataValue.(string), 0).Err()
			if err != nil {
				fmt.Printf("Error setting value for key %s: %v\n", key, err)
			}
		case "list":
			err := db.RPush(ctx, key, dataValue.([]interface{})...).Err()
			if err != nil {
				fmt.Printf("Error pushing values to list key %s: %v\n", key, err)
			}
		case "hash":
			hashValues, ok := dataValue.(map[string]interface{})
			if !ok {
				fmt.Printf("Invalid hash values for key %s\n", key)
				continue
			}

			err := db.HMSet(ctx, key, hashValues).Err()
			if err != nil {
				fmt.Printf("Error setting hash values for key %s: %v\n", key, err)
			}
		case "set":
			err := db.SAdd(ctx, key, dataValue.([]interface{})...).Err()
			if err != nil {
				fmt.Printf("Error adding values to set key %s: %v\n", key, err)
			}
		case "zset":
			zsetValues, ok := dataValue.([]map[string]interface{})
			if !ok {
				fmt.Printf("Invalid sorted set values for key %s\n", key)
				continue
			}

			var zsetEntries []redis.Z
			for _, zsetValue := range zsetValues {
				scoreStr, ok := zsetValue["score"].(string)
				if !ok {
					fmt.Printf("Invalid sorted set entry score for key %s\n", key)
					continue
				}

				score, err := strconv.ParseFloat(scoreStr, 64)
				if err != nil {
					fmt.Printf("Error parsing score for key %s: %v\n", key, err)
					continue
				}

				member, ok := zsetValue["value"].(string)
				if !ok {
					fmt.Printf("Invalid sorted set entry member for key %s\n", key)
					continue
				}

				zsetEntries = append(zsetEntries, redis.Z{
					Score:  score,
					Member: member,
				})
			}

			err := db.ZAdd(ctx, key, zsetEntries...).Err()
			if err != nil {
				fmt.Printf("Error adding values to sorted set key %s: %v\n", key, err)
			}

		case "bitmaps":
			bitmapsValues, ok := dataValue.(int64)
			if !ok {
				fmt.Printf("Invalid bitmaps values for key %s\n", key)
				continue
			}

			err := db.SetBit(ctx, key, 0, int(bitmapsValues)).Err()
			if err != nil {
				fmt.Printf("Error setting bitmaps values for key %s: %v\n", key, err)
			}
		case "hyperloglogs":
			hyperloglogsValues, ok := dataValue.([]interface{})
			if !ok {
				fmt.Printf("Invalid hyperloglogs values for key %s\n", key)
				continue
			}

			err := db.PFAdd(ctx, key, hyperloglogsValues...).Err()
			if err != nil {
				fmt.Printf("Error setting hyperloglogs values for key %s: %v\n", key, err)
			}
		case "geospatial":
			geospatialValues, ok := dataValue.(map[string]interface{})
			if !ok {
				fmt.Printf("Invalid geospatial values for key %s\n", key)
				continue
			}

			var geoLocation []*redis.GeoLocation
			for member, coordinates := range geospatialValues {
				coordinateMap, ok := coordinates.(map[string]interface{})
				if !ok {
					fmt.Printf("Invalid geospatial coordinate for key %s\n", key)
					continue
				}

				longitudeStr, ok := coordinateMap["longitude"].(string)
				if !ok {
					fmt.Printf("Invalid geospatial longitude for key %s\n", key)
					continue
				}

				latitudeStr, ok := coordinateMap["latitude"].(string)
				if !ok {
					fmt.Printf("Invalid geospatial latitude for key %s\n", key)
					continue
				}

				longitude, err := strconv.ParseFloat(longitudeStr, 64)
				if err != nil {
					fmt.Printf("Error parsing longitude for key %s: %v\n", key, err)
					continue
				}

				latitude, err := strconv.ParseFloat(latitudeStr, 64)
				if err != nil {
					fmt.Printf("Error parsing latitude for key %s: %v\n", key, err)
					continue
				}

				geoLocation = append(geoLocation, &redis.GeoLocation{
					Name:      member,
					Longitude: longitude,
					Latitude:  latitude,
				})
			}

			err := db.GeoAdd(ctx, key, geoLocation...).Err()
			if err != nil {
				fmt.Printf("Error adding values to geospatial key %s: %v\n", key, err)
			}
		case "bitfield":
			bitfieldValues, ok := dataValue.(string)
			if !ok {
				fmt.Printf("Invalid bitfield values for key %s\n", key)
				continue
			}

			err := db.BitField(ctx, key, bitfieldValues).Err()
			if err != nil {
				fmt.Printf("Error setting bitfield values for key %s: %v\n", key, err)
			}
		case "stream":
			streamEntries, ok := dataValue.([]interface{})
			if !ok {
				fmt.Printf("Invalid stream entries for key %s\n", key)
				continue
			}

			db.Del(ctx, key) // 这里需要根据情况判断是否应该先删掉  对应的key  不然原来key内stream有更大的id大的数据的情况下无法写入更小id的数据
			for _, entry := range streamEntries {
				entryMap, ok := entry.(map[string]interface{})
				if !ok {
					fmt.Printf("Invalid stream entry for key %s\n", key)
					continue
				}

				streamID, ok := entryMap["ID"].(string)
				if !ok {
					fmt.Printf("Invalid stream ID for key %s\n", key)
					continue
				}

				streamValues, ok := entryMap["Values"].(map[string]interface{})
				if !ok {
					fmt.Printf("Invalid values for stream entry with ID %s\n", streamID)
					continue
				}

				err := db.XAdd(ctx, &redis.XAddArgs{
					Stream: key,
					ID:     streamID,
					Values: streamValues,
				}).Err()
				if err != nil {
					fmt.Printf("Error adding entry to stream key %s: %v\n", key, err)
				}
			}

		default:
			fmt.Printf("Unsupported data type for key %s\n", key)
		}
	}
}

func back_redis_db(db redis.UniversalClient) (jsonData []byte) {

	// 获取所有键
	keys := db.Keys(ctx, "*").Val()
	// 创建一个空的 map 用于存储结果
	result := make(map[string]interface{})

	// 遍历键并获取对应的值和类型
	for _, key := range keys {
		dataType := db.Type(ctx, key).Val()

		// 根据不同的数据类型获取值
		switch dataType {
		case "string":
			value := db.Get(ctx, key).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": value,
			}
		case "list":
			values := db.LRange(ctx, key, 0, -1).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}
		case "hash":
			values := db.HGetAll(ctx, key).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}
		case "set":
			values := db.SMembers(ctx, key).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}
		case "zset":
			values := db.ZRange(ctx, key, 0, -1).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}

		case "bitmaps":
			value := db.GetBit(ctx, key, 0).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": value,
			}
		case "hyperloglogs":
			value := db.PFCount(ctx, key).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": value,
			}
		case "geospatial":
			values := db.GeoRadius(ctx, key, 0, 0, &redis.GeoRadiusQuery{Unit: "km"}).Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": values,
			}
		case "bitfield":
			value := db.BitField(ctx, key, "GET", "u4", "0").Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": value,
			}
		case "stream":
			streamValues := db.XRange(ctx, key, "-", "+").Val()
			result[key] = map[string]interface{}{
				"type":  dataType,
				"value": streamValues,
			}

		default:
			fmt.Printf("Unsupported data type for key %s\n", key)
		}
	}

	// 将结果转换为 JSON 格式
	jsonData, err := json.MarshalIndent(result, "", "  ")
	if err != nil {
		fmt.Println("Error marshalling data to JSON:", err)
		return
	}

	return
}

#导入导出到mysql

未经测试,也实在没必要

数据库结构

sql
CREATE TABLE mytable (
    id INT AUTO_INCREMENT PRIMARY KEY,
    `key` VARCHAR(255) NOT NULL,
    `type` VARCHAR(50) NOT NULL,
    value VARCHAR(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

代码部分

go
func back_redis_db() {
	// 查询MySQL中的数据
	rows, err := mysqlDb.Query("SELECT key, type, value FROM mytable")
	if err != nil {
		fmt.Println("Error querying MySQL:", err)
		return
	}
	defer rows.Close()
	
	// 遍历查询结果并根据类型将数据存储到Redis
	for rows.Next() {
		var key, dataType string
		var value sql.NullString
		err := rows.Scan(&key, &dataType, &value)
		if err != nil {
			fmt.Println("Error scanning MySQL rows:", err)
			continue
		}
		
		if !value.Valid {
			fmt.Printf("Invalid value for key %s\n", key)
			continue
		}
		
		switch dataType {
		case "string":
			err := redisDb.Set(ctx, key, value.String, 0).Err()
			if err != nil {
				fmt.Printf("Error setting value for key %s: %v\n", key, err)
			}
		case "list":
			// 查询MySQL中的列表值并将其存储到Redis
			listValues, err := mysqlDb.Query("SELECT value FROM mytable WHERE key = ? AND type = ?", key, dataType)
			if err != nil {
				fmt.Printf("Error querying list values for key %s: %v\n", key, err)
				continue
			}
			defer listValues.Close()

			var values []interface{}
			for listValues.Next() {
				var v string
				err := listValues.Scan(&v)
				if err != nil {
					fmt.Printf("Error scanning list values for key %s: %v\n", key, err)
					continue
				}
				values = append(values, v)
			}

			err = redisDb.RPush(ctx, key, values...).Err()
			if err != nil {
				fmt.Printf("Error pushing values to list key %s: %v\n", key, err)
			}
		// 其他数据类型的处理类似...
		default:
			fmt.Printf("Unsupported data type for key %s\n", key)
		}
	}
	
	err = rows.Err()
	if err != nil {
		fmt.Println("Error iterating MySQL rows:", err)
	}
}

func restore_mysql_db(jsonData []byte) {
	// 解析导出的 JSON 数据
	var importedData map[string]interface{}
	err := json.Unmarshal(jsonData, &importedData)
	if err != nil {
		fmt.Println("Error unmarshalling JSON data:", err)
		return
	}
	
	// 遍历导入的数据并根据类型将其存储到MySQL
	for key, value := range importedData {
		dataMap, ok := value.(map[string]interface{})
		if !ok {
			fmt.Printf("Invalid data format for key %s\n", key)
			continue
		}
		
		dataType, ok := dataMap["type"].(string)
		if !ok {
			fmt.Printf("Invalid data type for key %s\n", key)
			continue
		}
		
		dataValue, ok := dataMap["value"]
		if !ok {
			fmt.Printf("Invalid data value for key %s\n", key)
			continue
		}
		
		switch dataType {
		case "string":
			_, err = mysqlDb.Exec("INSERT INTO mytable (key, type, value) VALUES (?, ?, ?)", key, dataType, dataValue.(string))
			if err != nil {
				fmt.Printf("Error inserting value for key %s: %v\n", key, err)
			}
		case "list":
			values := dataValue.([]interface{})
			for _, v := range values {
				_, err = mysqlDb.Exec("INSERT INTO mytable (key, type, value) VALUES (?, ?, ?)", key, dataType, v.(string))
				if err != nil {
					fmt.Printf("Error inserting value for key %s: %v\n", key, err)
				}
			}
		// 其他数据类型的处理类似...
		default:
			fmt.Printf("Unsupported data type for key %s\n", key)
		}
	}
}

评论