提到事务必谈 ACID 特性, 基于悲观锁的实现会有读写冲突问题,性能很低,为了解决这个问题,主流数据库大多采用版本控制 mvcc[1] 技术,比如 oracle, mysql, postgresql 等等。读可以不加锁,只需要读历史版本即可 (写写还是冲突). 根据事务能看到不同版本的数据,还产生了隔离级别的问题,比如 mysql 默认的 repeatable-read, oracle 默认的 read-commited. 本文暂时只讲 mvcc, 隔离实现放到下文。
mvcc 不同数据库实现也不同,mysql 原地更新数据,将多版本保存到 undo, 而 postgresql 直接插入不同版本数据,过期的数据由 vacuum 来删除。etcd 的实现类似 pg, 本次分享看一下 etcd 的实现原理。
Revision
可以先阅读我的文章 etcd 中让人头大的 version, revision, createRevision, modRevision[2] 来了解下几个版本的概念。
type revision struct {
 // main is the main revision of a set of changes that happen atomically.
 main int64
 // sub is the sub revision of a change in a set of changes that happen
 // atomically. Each change has different increasing sub revision in that
 // set.
 sub int64
}
main 是版本 id, 逻辑时间戳全局递增。sub 表示当前事务内操作 changes 的顺序 id, 从 0 开始递增。
静态存储
etcd 的 mvcc 数据存储分两部分:内存保存所有 key 对应的版本信息,用于快速范围查询与点查,而磁盘存储所有不同版本的真实数据。

kvindex btree
内存数据由 btree 来维护,从图上可以看到,key 是用户真实的 key, value 是对应所有的版本信息。
type keyIndex struct {
 key         []byte
 modified    revision // the main rev of the last modification
 generations []generation
}
// generation contains multiple revisions of a key.
type generation struct {
 ver     int64
 created revision // when the generation is created (put in first revision).
 revs    []revision
}
keyIndex 保存 key 的所有版本信息,每删除一次都会生成一个 generation, 每个 generation 保存了这个生命周期内从创建到删除中间的所有版本号。

磁盘 boltdb
磁盘负责存储所有数据,key 是 revision, value 是 mvccpb.KeyValue, 存储引擎是 boltdb
type KeyValue struct {
 // key is the key in bytes. An empty key is not allowed.
 Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
 // create_revision is the revision of last creation on this key.
 CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"`
 // mod_revision is the revision of last modification on this key.
 ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"`
 // version is the version of the key. A deletion resets
 // the version to zero and any modification of the key
 // increases its version.
 Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
 // value is the value held by the key, in bytes.
 Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
 // lease is the ID of the lease that attached to key.
 // When the attached lease expires, the key will be deleted.
 // If lease is 0, then no lease is attached to the key.
 Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"`
}
mvccpb.KeyValue 存储本次操作的 key, value, 还有相关的所有版本信息。
Range 查找
每次数据操作,都会在 etcdserver 层开启一个事务 txn, Range 操作是 Read 读事务,然后调用 txn 的 Range 方法,直接看 mvcc 目录下 kvstore_txn.go 文件的实现。
func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
 return tr.rangeKeys(key, end, tr.Rev(), ro)
}
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
 rev := ro.Rev
 if rev > curRev {
  return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
 }
 if rev <= 0 {
  rev = curRev
 }
 if rev   return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
 }
  
 revpairs := tr.s.kvindex.Revisions(key, end, rev)
  ......
 kvs := make([]mvccpb.KeyValue, limit)
 revBytes := newRevBytes()
 for i, revpair := range revpairs[:len(kvs)] {
  revToBytes(revpair, revBytes)
  _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
    ......
  if err := kvs[i].Unmarshal(vs[0]); err != nil {
    ......
  }
 }
 tr.trace.Step("range keys from bolt db")
 return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}
省略部份无关代码,直接看主干部份
- 检查所查找的 rev 版本是否有效,超过当前版本不行,被 compact 删除的也不行
 - 根据指定版本去 kvindex 即内存 btree 中查找,所有符合 rev 版本从 key 到 end 的版本信息
 - 遍历所有版本,
UnsafeRange去底层磁盘 boltdb 中获取真实 key/value 
Put 更新数据
etcdserver 层同样要开启事务,只不过是写事务。然后实现直接看 mvcc 目录下 kvstore_txn.go
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
 rev := tw.beginRev + 1
 c := rev
 oldLease := lease.NoLease
 // if the key exists before, use its previous created and
 // get its previous leaseID
 _, created, ver, err := tw.s.kvindex.Get(key, rev)
 if err == nil {
  c = created.main
  oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
 }
 tw.trace.Step("get key's previous created_revision and leaseID")
 ibytes := newRevBytes()
 idxRev := revision{main: rev, sub: int64(len(tw.changes))}
 revToBytes(idxRev, ibytes)
 ver = ver + 1
 kv := mvccpb.KeyValue{
  Key:            key,
  Value:          value,
  CreateRevision: c,
  ModRevision:    rev,
  Version:        ver,
  Lease:          int64(leaseID),
 }
 d, err := kv.Marshal()
  ......
 tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
 tw.s.kvindex.Put(key, idxRev)
 tw.changes = append(tw.changes, kv)
 tw.trace.Step("store kv pair into bolt db")
  ......
}
省去不太相关的 lease 操作,只看主干逻辑
- 根据当前版本 key, rev 查找内存 kvindex, 看看是否有当前 key 的版本记录。主要是获取这个 key 当前的 
createdRevision与Version - 生成 
mvccpb.KeyValue信息,主要是确定这次操作的 ModRevision UnsafeSeqPut操作写磁盘 boltdb, key 是Revision, value 是mvccpb.KeyValue序列化后的数据- 最后更新 kvindex btree
 
Delete 删除
同样需要开启写事务,直接看源码
func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
 if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
  return n, tw.beginRev + 1
 }
 return 0, tw.beginRev
}
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
 rrev := tw.beginRev
 if len(tw.changes) > 0 {
  rrev++
 }
 keys, _ := tw.s.kvindex.Range(key, end, rrev)
 if len(keys) == 0 {
  return 0
 }
 for _, key := range keys {
  tw.delete(key)
 }
 return int64(len(keys))
}
同样需要先查找内存 kvindex, 找到所有符合的待删除版本,然后调用 delete 去删
func (tw *storeTxnWrite) delete(key []byte) {
 ibytes := newRevBytes()
 idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
 revToBytes(idxRev, ibytes)
 if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
  ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
 } else {
  // TODO: remove this in v3.5
  ibytes = appendMarkTombstone(nil, ibytes)
 }
 kv := mvccpb.KeyValue{Key: key}
 d, err := kv.Marshal()
 if err != nil {
  if tw.storeTxnRead.s.lg != nil {
   tw.storeTxnRead.s.lg.Fatal(
    "failed to marshal mvccpb.KeyValue",
    zap.Error(err),
   )
  } else {
   plog.Fatalf("cannot marshal event: %v", err)
  }
 }
 tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
 err = tw.s.kvindex.Tombstone(key, idxRev)
 ......
}
- 生成 ibytes, 然后追加一个 
appendMarkTombstone标记,表示这个 revision 是 delete,并且生成一个只含有 key 的 mvccpb.KeyValue UnsafeSeqPut删除磁盘 boltdb, 注意这里底层只是标记删除,磁盘空间并不释放Tombstone结束当前生命周期,生成一个新的空 generation, 更新 kvindex
数据过期
与 pg 比较像,过期与删除数据都是惰性删除的。etcd 可以配置只保留固定时间的数据,所以会周期性的 Compact. 同样分为两部分,内存 btree 数据如果发现当前 generation 为空,并且最大 Revision 己过期,那就从 btree 中删除。
磁盘数据由 boltdb 维护,只会标记为删除,磁盘空间可以回收利用,但是不会自动释放,只有调用 Defrag 才会重建磁盘文件。
另外说到存储引擎 boltdb, 这个东西性能一般,除了 etcd 没有什么知名项目在用。读事务是并发,但是写事务是串行的,所以内部会将尽可能多的写入 batch 一起操作,异步提交。
小结
这次分享就这些,以后面还会分享更多关于 etcd 的内容,如果感兴趣,可以关注并转发(:
参考资料
[1]什么是 mvcc: https://en.wikipedia.org/wiki/Multiversion_concurrency_control,
[2]etcd 中让人头大的 version, revision, createRevision, modRevision: https://mp.weixin.qq.com/s/TFcSEBBMnb0wJ_A3R4Jfqw,