台州网站建设方案服务代理ip 海外
台州网站建设方案服务,代理ip 海外,软件系统商城定制开发,公众号开发者密码前言
redis的核心是数据的快速存储#xff0c;下面就来分析一下godis的底层存储是如何实现#xff0c;先分析单机服务。
此文采用抓大放小原则#xff0c;先大的流程方向#xff0c;再抓细节。
流程图 源码分析
现在以客户端连接#xff0c;并发起set key val命令为例…前言
redis的核心是数据的快速存储下面就来分析一下godis的底层存储是如何实现先分析单机服务。
此文采用抓大放小原则先大的流程方向再抓细节。
流程图 源码分析
现在以客户端连接并发起set key val命令为例子 在单机部署的时候服务启动会创建一个处理实例并创建一个单机的db
// redis/server.go
// 创建一个处理实例
// MakeHandler creates a Handler instance
func MakeHandler() *Handler {// redis的一个存储引擎var db database.DB// 创建是集群还是单例if config.Properties.ClusterEnable {db cluster.MakeCluster()} else {db database2.NewStandaloneServer()}return Handler{db: db,}
}有客户端连接会生成一个异步方法处理每个客户端一旦有客户端的消息都会进入Handle方法。
// redis/server/server.go
// 处理接收到客户端的命令
// Handle receives and executes redis commands
func (h *Handler) Handle(ctx context.Context, conn net.Conn) {if h.closing.Get() {// closing handler refuse new connection_ conn.Close()return}client : connection.NewConn(conn)// 存储一个客户端h.activeConn.Store(client, struct{}{})// 获取字符串ch : parser.ParseStream(conn)// 接收客户端数据for payload : range ch {// 遍历消息体// ......... 经过各种校验// 获取到客户端信息r, ok : payload.Data.(*protocol.MultiBulkReply)if !ok {logger.Error(require multi bulk protocol)continue}// 执行结果result : h.db.Exec(client, r.Args)// 结果回复if result ! nil {_, _ client.Write(result.ToBytes())} else {_, _ client.Write(unknownErrReplyBytes)}}
}客户端的各种命令进行判断set是属于正常的数据操作命令直接通过判断获取数据库并在当前数据库中执行
// database/server.go
func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {defer func() {if err : recover(); err ! nil {logger.Warn(fmt.Sprintf(error occurs: %v\n%s, err, string(debug.Stack())))result protocol.UnknownErrReply{}}}()cmdName : strings.ToLower(string(cmdLine[0]))// pingif cmdName ping {return Ping(c, cmdLine[1:])}// authenticateif cmdName auth {return Auth(c, cmdLine[1:])}// ........// 各种各样的判断暂时不管// 获取当前的数据索引// normal commandsdbIndex : c.GetDBIndex()// 获取当前数据库selectedDB, errReply : server.selectDB(dbIndex)if errReply ! nil {return errReply}// 以当前数据库执行命令return selectedDB.Exec(c, cmdLine)
}命令名称解析出来后从cmdTable获取对应的执行方法如prepare、executor
// Exec executes command within one database
func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) redis.Reply {// transaction control commands and other commands which cannot execute within transactioncmdName : strings.ToLower(string(cmdLine[0]))// ...return db.execNormalCommand(cmdLine)
}func (db *DB) execNormalCommand(cmdLine [][]byte) redis.Reply {// 获取到正常的执行命令cmdName : strings.ToLower(string(cmdLine[0]))// 获取到commondcmd, ok : cmdTable[cmdName]if !ok {return protocol.MakeErrReply(ERR unknown command cmdName )}if !validateArity(cmd.arity, cmdLine) {return protocol.MakeArgNumErrReply(cmdName)}prepare : cmd.preparewrite, read : prepare(cmdLine[1:])db.addVersion(write...)// 数据库上锁db.RWLocks(write, read)// 命令执行完后解锁defer db.RWUnLocks(write, read)// 执行命令方法fun : cmd.executorreturn fun(db, cmdLine[1:])
}set命令对应的方法从代码可以发现其实数据是存储在定义的map结构的集合中自此命令已经执行完毕返回执行结果。
func execSet(db *DB, args [][]byte) redis.Reply {// 提取keykey : string(args[0])// 提取valvalue : args[1]// 提取策略policy : upsertPolicy// 提取过期时间ttl : unlimitedTTL// parse options// 如何参数大于2个说明有其他参数需要做其他处理// .....entity : database.DataEntity{Data: value,}var result int// 更新策略switch policy {case upsertPolicy:// 默认策略db.PutEntity(key, entity)result 1case insertPolicy:result db.PutIfAbsent(key, entity)case updatePolicy:result db.PutIfExists(key, entity)}if result 0 {if ttl ! unlimitedTTL {expireTime : time.Now().Add(time.Duration(ttl) * time.Millisecond)// 设置过期时间db.Expire(key, expireTime)db.addAof(CmdLine{[]byte(SET),args[0],args[1],})db.addAof(aof.MakeExpireCmd(key, expireTime).Args)} else {db.Persist(key) // override ttldb.addAof(utils.ToCmdLine3(set, args...))}}if result 0 {return protocol.OkReply{}}return protocol.NullBulkReply{}
}// database.go
// 将数据放入DB
// PutEntity a DataEntity into DB
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {// 当前数据库的数据字段ret : db.data.PutWithLock(key, entity)// db.insertCallback may be set as nil, during if and actually callback// so introduce a local variable cbif cb : db.insertCallback; ret 0 cb ! nil {cb(db.index, key, entity)}return ret
}// datastruct/dict/concurrent.go
// ConcurrentDict is thread safe map using sharding lock
// 这里可以看出数据其实就是存在map集合里面
type ConcurrentDict struct {table []*shardcount int32shardCount int
}type shard struct {m map[string]interface{}mutex sync.RWMutex
}// datastruct/dict/concurrent.go
func (dict *ConcurrentDict) PutWithLock(key string, val interface{}) (result int) {if dict nil {panic(dict is nil)}hashCode : fnv32(key)index : dict.spread(hashCode)s : dict.getShard(index)// 将数据放入map中if _, ok : s.m[key]; ok {s.m[key] valreturn 0}dict.addCount()// 存储kv结构数据完成s.m[key] valreturn 1
}
其实还有一个问题就是cmdTable怎么来的为什么fun(db, cmdLine[1:])就完成了
在router.go这个代码中是生成一个新的cmdTable的map集合registerCommand这个函数是将各种命令塞入cmdTable里面。每个数据结构如string等都有定义的方法。
main启动前都会调用init()这个是golang特殊的函数顺序按照文件的顺序执行。
这里就是在服务启动前将所有命令注册到cmdTable集合。
// database/router.go
// 命令集
var cmdTable make(map[string]*command)
// ....
// 注册命令将命令存放在cmdTable集合里面
// registerCommand registers a normal command, which only read or modify a limited number of keys
func registerCommand(name string, executor ExecFunc, prepare PreFunc, rollback UndoFunc, arity int, flags int) *command {name strings.ToLower(name)cmd : command{name: name,executor: executor,prepare: prepare,undo: rollback,arity: arity,flags: flags,}cmdTable[name] cmdreturn cmd
}//// database/string.gofunc execSet(db *DB, args [][]byte) redis.Reply {
//....
}// execSetNX sets string if not exists
func execSetNX(db *DB, args [][]byte) redis.Reply {// .....
}// execSetEX sets string and its ttl
func execSetEX(db *DB, args [][]byte) redis.Reply {// ...
}func init() {// 调用注册命令函数注册方法如Set则是执行execSet方法registerCommand(Set, execSet, writeFirstKey, rollbackFirstKey, -3, flagWrite).attachCommandExtra([]string{redisFlagWrite, redisFlagDenyOOM}, 1, 1, 1)registerCommand(SetNx, execSetNX, writeFirstKey, rollbackFirstKey, 3, flagWrite).attachCommandExtra([]string{redisFlagWrite, redisFlagDenyOOM, redisFlagFast}, 1, 1, 1)registerCommand(SetEX, execSetEX, writeFirstKey, rollbackFirstKey, 4, flagWrite).attachCommandExtra([]string{redisFlagWrite, redisFlagDenyOOM}, 1, 1, 1)// .....
}
ExecFunc是规范方法每个命令对应的执行都按照规范定义。
// database/router.gotype command struct {// 命令名称name string// 执行方法executor ExecFunc// prepare returns related keys commandprepare PreFunc// undo generates undo-log before command actually executed, in case the command needs to be rolled backundo UndoFunc// arity means allowed number of cmdArgs, arity 0 means len(args) -arity.// for example: the arity of get is 2, mget is -2arity intflags intextra *commandExtra
}// // database/database.go
// 执行方法接口
// ExecFunc is interface for command executor
// args dont include cmd line
type ExecFunc func(db *DB, args [][]byte) redis.Reply
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/87820.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!