MIT_65840测试网络环境的搭建与实现
由于 MIT_65840 实际上实现的大部分是分布式算法相关的内容, 或者更加关注 Client 和 Server 的操作, 因此透明化了 Client 和 Server 之间的通信, 分布式系统中 Client 和 Server 通常使用 RPC 的方式通信. 本实验中使用一些代码模拟了这一过程, 从而让我们更加关注算法实现的细节. 但是这背后的网络的实现也很值得学习与研究, 也方便后续在实现算法的时候中途进行调试的步骤.
序列化与反序列化
数据传输的基础是是网络之间数据传输时的序列化与反序列化, 这部分主要在代码中的 labgob 中, 这个封装主要是基于 Go 标准库 encoding/gob. 但是 Go 标准库中的 encoding/gob 有一些陷阱, 特别是在 MIT 6.5840 这样的分布式系统实验中:
问题 1: Encode 阶段小写字段无法序列化
type BadStruct struct {PublicField int // ✅ 大写开头, 可以序列化privateField int // ❌ 小写开头, 无法序列化, 但不报错!
}
Go 的 RPC 和 gob 只能传输大写开头的字段(exported fields), 如果不小心用了小写字段, 会导致神秘的错误或崩溃, labgob 会检测并警告小写字段.
问题2: 解码阶段解码到非默认值
在 Raft 或者其他使用 RPC 的算法中, 实现的过程中, 有些实现可能会将数据解码到一个已存在的变量, 此时如果解码目标变量已经包含非默认值, 而解码的数据是默认值, gob 不会覆盖目标变量的值.
我们可以使用下面这个例子来说明:
package mainimport ("bytes""encoding/gob""fmt"
)type Example struct {Field1 intField2 string
}func main() {// 1. 创建一个默认值的结构体original := Example{Field1: 0, // 默认值Field2: "", // 默认值}// 2. 将默认值的结构体编码为 gob 数据var buffer bytes.Bufferencoder := gob.NewEncoder(&buffer)err := encoder.Encode(original)if err != nil {fmt.Println("Encoding error:", err)return}// 3. 创建一个目标变量, 并赋予非默认值target := Example{Field1: 42, // 非默认值Field2: "hello", // 非默认值}// 4. 解码到目标变量decoder := gob.NewDecoder(&buffer)err = decoder.Decode(&target)if err != nil {fmt.Println("Decoding error:", err)return}// 5. 打印解码后的目标变量fmt.Printf("Decoded target: %+v\n", target)
}
- 编码阶段:original 是一个默认值的结构体(Field1=0, Field2=""). 它被编码为 gob 数据.
- 解码阶段:target 是一个已经包含非默认值的结构体(Field1=42, Field2="hello"). 当 gob 解码器尝试将 original 的数据解码到 target 时:Field1 和 Field2 的值是默认值(0 和 ""). gob 不会覆盖 target 中的非默认值.
- 结果:target 的值保持不变(Field1=42, Field2="hello"), 而不是被解码数据覆盖.
RPC 基础实现
在 labrpc 中定义了 MIT 65840 的网络部分, 实际主要包含三个部分, 客户端, 服务端, 以及客户端与服务端之间的网络通信.
客户端
在 src/labrpc/labrpc.go 使用下面的结构体代表一个客户端终端节点的抽象, 如下:
// ClientEnd is an abstraction of a single client end-point. multiple ClientEnds may talk to the same server.
// each ClientEnd has a unique name (usually a string or int).
type ClientEnd struct {endname interface{} // this end-point's namech chan reqMsg // copy of Network.endChdone chan struct{} // closed when Network is cleaned up
}
单个 Client 在进行远程调用或者 Call() 的时候, 函数如下:
// ClientEnd sends an RPC, wait for the reply.
// the return value indicates success; false means that
// no reply was received from the server.
func (e *ClientEnd) Call(svcMeth string, args interface{}, reply interface{}) bool {req := reqMsg{}req.endname = e.endnamereq.svcMeth = svcMethreq.argsType = reflect.TypeOf(args)req.replyCh = make(chan replyMsg)// args 是远程调用的 Function 调用的输入参数, 需要序列化通过客户端发送到服务端qb := new(bytes.Buffer)qe := labgob.NewEncoder(qb)if err := qe.Encode(args); err != nil {panic(err)}req.args = qb.Bytes()// send the request.// 当实例化一个 ClientEnd 的时候, e.ch 是网络层的 rn.endCh Channel 的一个拷贝select {case e.ch <- req:// the request has been sent.case <-e.done:// entire Network has been destroyed.return false}// wait for the reply.rep := <-req.replyChif rep.ok {rb := bytes.NewBuffer(rep.reply)rd := labgob.NewDecoder(rb)if err := rd.Decode(reply); err != nil {log.Fatalf("ClientEnd.Call(): decode reply: %v\n", err)}return true} else {return false}
}