实现可靠传输
1. 结合代码和 LOG 文件分析针对每个项目举例说明解决效果。
RDT1.0
对应 Log 日志:Log 1.0.txt,接收文件 recvData 1.0.txt
RDT1.0 版本是在可靠信道上进行可靠的数据传输,因此没有过多的内容需要说明,发送方 Log 日志如下:
接收方 Log 日志如下:
从发送方和接收方的发送数据报的数量我们就可以看出信道是没有出任何错的,双方也正常完成了全部内容传输
RDT2.0
对应 Log 日志:Log 2.0.txt,接收文件 recvData 2.0.txt
RDT2.0 版本是在可能出现位错的信道上进行传输,只需要在 1.0 的基础上做出如下几点更改即可:
① 添加校验和 Checksum 的计算,代码如下:
package com.ouc.tcp.test;import com.ouc.tcp.message.TCP_HEADER;
import com.ouc.tcp.message.TCP_PACKET;public class CheckSum {/*计算TCP报文段校验和:只需校验TCP首部中的seq、ack和sum,以及TCP数据字段*/public static short computeChkSum(TCP_PACKET tcpPack) {//计算校验和int checkSum = 0;TCP_HEADER header = tcpPack.getTcpH();int[] data = tcpPack.getTcpS().getData();int length = data.length;int[] header_info = new int[3];header_info[0] = header.getTh_ack(); //seqheader_info[1] = header.getTh_seq(); //ack//header_info[2] = header.getTh_sum(); //sum//这里不代入sum进行计算是为了少更改Receiver的已有代码int maxValue = 0xffff;int modulus = 65536;for(int i = 0; i < 2; i++) {if(checkSum > maxValue) {checkSum = checkSum % modulus + checkSum / modulus;}checkSum = checkSum + header_info[i];}for(int i = 0; i < length; i++) {if(checkSum > maxValue) {checkSum = checkSum % modulus + checkSum / modulus;}checkSum = checkSum + data[i];}if(checkSum > maxValue) {checkSum = checkSum % modulus + checkSum / modulus;}checkSum = ~checkSum;//System.out.println("checksum=" + checkSum);return (short) checkSum;}
}
这里我的校验和的计算方式是仿照了 UDP 的校验和计算方式,但是老师提供的代码中的 Receiver 类中的校验和的判断是这么写的:
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum())
如果严格按照 UDP 计算校验和的方法,上述 if 语句的左边就会计算出来 0,而 if 语句的右边给出的值应该不是 0,那么这个 if 语句就不成立了,而应该改为:
if(CheckSum.computeChkSum(recvPack) == 0)
为了少更改 Receiver 类中的已有代码,这里我在计算校验和的时候没有把 sum 代入进来,只计算了 seq,ack,以及 TCP 数据字段
② 在 Sender 的 recv 函数中加入对于 ACK 包的 ack 字段的检测:如果检测到 NACK,重发,代码如下:
if(recvPack.getTcpH().getTh_ack() == -1) { //2.0版本检测NACKudt_send(tcpPack);return;
}
③ 调整 Receiver 的代码,在检测到 corrupt 之后返回 NACK,代码如下:
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum()) {//校验通过,这里代码省略了
} else {//校验未通过System.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);tcpH.setTh_ack(-1);ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));
}
运行程序,得到发送方 Log 日志如下:
由于 2.0 版本的假设,我们可以知道只有发送方会出现错误,接收方不会出现错误,因此发送方的 eFlag 设置成 1,接受方的 eFlag 设置成 0
在上图中,我们可以看到,发送方共犯了 13 个错误,因此有 13 个包需要重发,共计 1013 个包,数字是对的
在发送方的日志中我们也可以实际地看到这种犯错误并重发来修正的过程,下面以 Log 日志中的两处作为例子:
同时,我们可以去接收方查看一下接收方对应处的日志,来检查接收方的 ACK/NACK 机制是否正常运行了:
可以看到,我们的接收方在 6001 的正常 ack 之前,以及 24601 的正常 ack 之前,都先给发送方回了一个 NACK 包,因此我们可以得出发送方与接收方都在正常工作的结论。
RDT2.1
对应 Log 日志:Log 2.1.txt,接收文件:recvData 2.1.txt,控制台日志:consoleLog 2.1.txt
①RDT2.1 是在 RDT2.0 的基础上解决 ack/nack 包会出错的问题,我们在发送方的 recv()函数的代码中做如下更改:
if(CheckSum.computeChkSum(recvPack) != recvPack.getTcpH().getTh_sum()) { //2.1版本检测corruptSystem.out.println("corrupt");udt_send(tcpPack); return;
}
② 将 Receiver 中的 rdt_recv()函数修改如下:
int seqInPack = recvPack.getTcpH().getTh_seq();
System.out.println("seqInPack = " + seqInPack);
//2.0版本:检查校验码,生成ACK
//2.1版本,加入对seqInPack的判断(使用序号判断来代替书中0和1两个状态)
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum() && seqInPack == sequence) {//校验通过,并且是我期待的包//代码省略
} else if(seqInPack == sequence){//2.0版本 NAKSystem.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);tcpH.setTh_ack(-1);ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//回复ACK报文段System.out.println("ack包序号为" + ackPack.getTcpH().getTh_seq());reply(ackPack);
} else {//2.0版本 重复System.out.println("重复");//seqInPack != sequence,说明该数据报我已经接收过了tcpH.setTh_ack(recvPack.getTcpH().getTh_seq());ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//回复ACK报文段System.out.println("ack包序号为" + ackPack.getTcpH().getTh_seq());reply(ackPack);
}
由于 2.1 版本的假设,发送方和接收方都有可能出现错误,因此双方的 eFlag 都应该改成 1,运行程序,得到发送方日志如下:
接收方的日志如下:
我们可以从这个 Log 的数据中看出来:发送方犯了 13 个错误,因此这 13 个错误都需要重传;接收方犯了 16 个错误,对于这 16 个错误的 ack 包,发送方不知道接收方是否 ack 了,因此也需要重传,所以发送方共计发送了 1000+13+16=1029 个数据包
发送方错误举例(上图为发送方日志,下图为接收方日志):
可以看到,发送方犯了错,于是接收方回了 NACK,发送方进行重传,这个重传的包被正常 ack
接收方错误举例(上图为发送方日志,下图为接收方日志):
可以看到,发送方没有犯错,但是包也没有正常 ack,原因是接收方的 ack 出现了错误,因此发送方重传了该包,并正常地收到 ack 了
由此,我们可以得出发送方与接收方都在正常工作的结论
RDT2.2
对应 Log 日志:Log 2.2.txt,对应接收文件:recvData 2.2.txt,对应控制台日志:consoleLog 2.2.txt
RDT2.2 版本与 RDT2.1 版本的功能是相同的,唯一区别只是不再使用 ack/nack 的确认方式,而是统一使用 ack,如果接收方检测到包的 corrupt,那么返回一个过期的 ack 即可,这里我还是使用序号的方式来进行检测,即:
如果接收方接收到了一个正常的包,就正常返回这个包的序号作为 ack
如果接收方接收到了一个 corrupt 的包,或者一个过期的包,就返回上一个包的序号作为 ack
(该算法的合理性论证如下:正常的发送与接收就不说明了;说明一下接收方收到一个过期的包的情况:由于现在是停止等待协议,因此如果接收方接收到了一个过期的包,它只可能是上一个包,因此我们应该返回上一个包的序号作为 ack 来告诉发送方我们正常接收了这个包,虽然接收方实际上是不需要这个包的)
对代码更改如下:
① 更改 Sender 的 recv()函数的最开始对包的检测部分如下:
//注:前面的代码中要将2.0版本的检测NACK隐去if(CheckSum.computeChkSum(recvPack) != recvPack.getTcpH().getTh_sum()) { //2.1版本检测corruptSystem.out.println("corrupt");udt_send(tcpPack); return;
}
if(recvPack.getTcpH().getTh_seq() < sequence) { //2.2版本,无NAKSystem.out.println("ack报文编号" + recvPack.getTcpH().getTh_seq() + "已重复收到");System.out.println("想要的报文编号是" + sequence);//该ack报文我已经收到过了udt_send(tcpPack);return;
}//注:后面的代码中有 接收到一个正常包之后更新sequence的值的功能
② 更改 Receiver 的 rdt_recv()函数中的校验和不正确但包编号是对的的情况的代码:
else if(seqInPack == sequence){//2.0版本 NAK
// System.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));
// System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());
// System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);
// tcpH.setTh_ack(-1);
// ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());
// tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//2.2版本 无NAK,改用序号不足的ack来充当NAKSystem.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);//回复ACK报文段System.out.println("ack包序号为" + ackPack.getTcpH().getTh_seq());reply(ackPack);}
运行代码结果可见 Log 2.2.txt,由于功能上与 RDT2.1 完全一致,这里不再赘述
RDT3.0
对应 Log 日志:Log 3.0 -1.txt、Log 3.0 -2.txt,接收文件:recvData 3.0 -1.txt、recvData 3.0 -2.txt
注:后缀带 1 的是发送方会错会丢包,接收方只会错;后缀带 2 的是发送方与接收方都是会错会丢包
RDT3.0 的最大进步是可以处理包的 Loss 了,从 2.2 上到 3.0 版本只需要更改发送方代码即可,发送方的状态机如下:
我们对照上图来修改代码(其实是 2.2 上到 3.0 非常简单,所以我现在已经记不全 3.0 都改了什么了)
① 首先在 Sender 类中加入一个私有变量 UDT_Timer:
private UDT_Timer timer; //3.0版本,计时器
② 在发送方的 rdt_Send()函数中加入如下代码:
//用于3.0版本:设置计时器和超时重传任务timer = new UDT_Timer();UDT_RetransTask reTrans = new UDT_RetransTask(client, tcpPack);//每隔3秒执行重传,直到收到ACKtimer.schedule(reTrans, 3000, 3000);
③ 在发送方的 waitACK()函数中加入如下代码:
④ 这里我是严格按照状态机来写的,因此我去除了发送方收到 corrupt 的 ack 包以及序号不对的 ack 包之后的重发,相当于是不管发生什么,都等到超时事件被触发的时候才重发
将发送方和接收方的 eFlag 都调整成 4,运行代码(每运行一次 3.0 版本都要经历一次漫长的等待,太太太太太慢了)
以下日志分析我采用发送方和接收方都会错会丢包的日志 2 来进行分析:
发送方日志如下:
接收方日志如下:
从整体来看,可以得到 1015=1000+12+3,1006=1000+6 的正确结论,接下来我们再从细节上看一下我们的系统是否在正常工作:
- 发送方 Wrong:
- 发送方 Loss:
- 接收方 Wrong(上为发送方日志,下为接收方日志):
- 接收方 Loss(上为发送方日志,下为接收方日志):
综上,我们可以看出我们的 RDT3.0 正常运行了(但是太太太太太慢了),不过令人高兴的是,这是我们最后一次使用停止等待协议了,接下来我们就全面迈进流水线协议时代了
选择响应协议
对应 Log 日志:Log SR.txt,接收文件:recvData SR.txt
选择响应协议是一个变化比较大的版本,工作量也非常多,在我的 GitHub 记录中,这也是第一次推了两个子版本的协议(第一个版本我的发送方采用的是选择响应协议,接收方采用的是 Go-Back-N 协议,其结果就是……跑一次需要大概 10 分钟 QAQ;第二个版本是双方采用选择响应协议,效率一下子就上去了)
主要工作如下:
① 构建所有窗口的父类:Window 类(窗口大小设的 15):
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.message.TCP_PACKET;import java.util.TimerTask;public class Window {public Client client;public int size = 15;public TCP_PACKET[] packets = new TCP_PACKET[size];public volatile int base = 0;public volatile int nextseqnum = 0;public volatile int end = size - 1;public volatile int sequence = 1;public boolean[] isAck = new boolean[size];public Window(Client client) {this.client = client;}public boolean isFull() {return nextseqnum == end;}
}
注:为什么要加个 volatile 呢?这是痛苦地 debug 并且各种百度了一天之后的成果(心痛),不加 volatile 会出现各种各样的奇奇怪怪的问题
② 构建接收窗口:
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.message.TCP_PACKET;import java.util.Vector;public class SR_ReceiveWindow extends Window {public SR_ReceiveWindow(Client client) {super(client);}public Vector<TCP_PACKET> recvPacket(TCP_PACKET packet) {Vector<TCP_PACKET> vector = new Vector<>();int seq = packet.getTcpH().getTh_seq();int index = seq % size;System.out.println("ReceiveWindow信息如下:");System.out.print("seq = " + seq);System.out.print("index = " + index);System.out.print(" base = " + base);System.out.print(" nextseqnum = " + nextseqnum);System.out.println(" end = " + end);if(index >= 0) {isAck[index] = true;packets[index] = packet;
// client.send(packet);if(seq == base) { //收到的包是窗口的第一个包int i;for(i = base; i <= end && isAck[i % size]; i++) {vector.addElement(packets[i % size]);isAck[i % size] = false;packets[i % size] = null;}base = i; //移动窗口位置end = base + size - 1;}}return vector;}
}
③ 构建发送窗口:
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.client.UDT_RetransTask;
import com.ouc.tcp.client.UDT_Timer;
import com.ouc.tcp.message.TCP_PACKET;import java.util.TimerTask;public class SR_SendWindow extends Window{public UDT_Timer[] timers = new UDT_Timer[size];public SR_SendWindow(Client client) {super(client);}public void sendPacket(TCP_PACKET packet) {System.out.println(packet.getTcpH().getTh_seq());//在窗口中初始化这个包的相关数据int index = nextseqnum % size;packets[index] = packet;isAck[index] = false;timers[index] = new UDT_Timer();UDT_RetransTask task = new UDT_RetransTask(client, packet);timers[index].schedule(task, 3000, 3000);nextseqnum++;packet.getTcpH().setTh_eflag((byte)4);client.send(packet);}public void recvPacket(TCP_PACKET packet) {int ack = packet.getTcpH().getTh_ack(); //System.out.println("接收到了ack包,ack号为" + ack);if(ack >= base && ack <= base + size) {int index = ack % size;if(timers[index] != null)timers[index].cancel();isAck[index] = true;System.out.print("index = " + index);System.out.print(" base = " + base);System.out.print(" nextseqnum = " + nextseqnum);System.out.println(" end = " + end);if(ack == base) {//收到的包是窗口的第一个包,将窗口下沿向前推到一个unAckd seq#int i;for(i = base; i <= nextseqnum && isAck[i % size]; i++) {packets[i % size] = null;isAck[i % size] = false;if(timers[i % size] != null) {timers[i % size].cancel();timers[i % size] = null;}}base = Math.min(i, nextseqnum);System.out.println("base2 = " + base);end = base + size - 1;}}}}
④ 将 Sender 中的工作更改成为交给 SendWindow 来做
public void rdt_send(int dataIndex, int[] appData) {//生成TCP数据报(设置序号和数据字段/校验和),注意打包的顺序tcpH = new TCP_HEADER();tcpS = new TCP_SEGMENT();tcpH.setTh_seq(dataIndex);//包序号设置为字节流号:tcpS.setData(appData);tcpH.setTh_sum((short)0); //需要初始化校验和以进行计算tcpPack = new TCP_PACKET(tcpH, tcpS, destinAddr);tcpH.setTh_sum(CheckSum.computeChkSum(tcpPack));tcpPack.setTcpH(tcpH);while(window.isFull());TCP_PACKET packet = new TCP_PACKET(tcpH, tcpS, destinAddr);try {window.sendPacket(packet.clone());} catch (CloneNotSupportedException e) {e.printStackTrace();}
}
public void recv(TCP_PACKET recvPack) {if(CheckSum.computeChkSum(recvPack) != recvPack.getTcpH().getTh_sum()) { //2.1版本检测corrupt并作出处理System.out.println("corrupt");//udt_send(tcpPack); //GBN版本 corrupt不需处理return;}window.recvPacket(recvPack); //使用窗口来处理ackSystem.out.println("Receive ACK Number: "+ recvPack.getTcpH().getTh_ack());ackQueue.add(recvPack.getTcpH().getTh_ack());System.out.println();
}
⑤ 将 Receiver 中的回复 ack 包以外的工作交给 ReceiverWindow 来完成
public void rdt_recv(TCP_PACKET recvPack) {int seqInPack = recvPack.getTcpH().getTh_seq();//2.0版本:检查校验码,生成ACK//2.1版本,加入对seqInPack的判断(代替书中0和1两个状态)//if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum() && seqInPack == sequence) {System.out.println("seqInPack = " + seqInPack);if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum() && seqInPack >= window.base && seqInPack < window.base + window.size) {//是我期望的序号 && 校验通过//生成ACK报文段(设置确认号)tcpH.setTh_ack(recvPack.getTcpH().getTh_seq());ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//回复ACK报文段try {Vector<TCP_PACKET> vector = window.recvPacket(recvPack.clone());if(vector != null && vector.size() > 0) {for (int i = 0; i < vector.size(); i++) {dataQueue.add(vector.get(i).getTcpS().getData());}//交付数据(每20组数据交付一次)//if(dataQueue.size() >= 20) //SR版本修改交付情况deliver_data();}} catch (CloneNotSupportedException e) {e.printStackTrace();}reply(ackPack);System.out.println("ack包序号为" + ackPack.getTcpH().getTh_seq());}else if(seqInPack < window.base && seqInPack > window.base - window.size) {//收到了一个序号小于我的包//SR版本:收到了一个窗口以外的包System.out.println("该包在窗口以外");tcpH.setTh_ack(seqInPack);ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//回复ACK报文段reply(ackPack);}else {//GBN版本//reply(ackPack);//SR版本:do nothing}}
⑥ 注:在实现 GBN-SR 版本升级到 SR 版本的过程中,我把我的系统的包的序号体系修改了一下,由原来的 1,101,201,301,401……改成了 0,1,2,3,4……,修改之后大幅降低了思考难度与编码难度(不然在维护窗口的时候要时刻想清楚要不要把包的序号整除一个 100)
运行代码,对日志进行分析:
由上面这一段发送方日志我们可以看出来我们现在确实是流水线协议,而不是停止等待协议(19 号的重发与 19 的第一次发并不挨着)
以下两张图片,第一张是发送方日志,第二张是接收方日志
由这一段发送方日志我们可以看出来发送方窗口的大小限制了发送方的窗口继续往前推进(窗口满了,所以新包不能再发送,只能等着旧包超时重传)
同时,我们也可以看出我们的重传是谁超时重传谁,而不是像 GBN 版本一样整个窗口全都重传
我们还可以从这里的接收方日志中看出虽然 184 号包出现了问题,但是没有影响接收方对 185 186 187 等后续的包的接收,这也说明了我们的 SR 版本的正确性
拥塞控制 Taho
对应 Log 日志:Log Taho2.txt,接收文件:recvData Taho2.txt,控制台日志:consoleLog Taho2.txt
注:Log Taho.txt、recvData Taho.txt、consoleLog Taho.txt 所对应的 Taho 版本存在潜在的整型溢出问题,因此不是 Taho 的最终版本
以下内容按照 Taho Fixed 版本进行描述:
Taho 版本的有限状态机(来自《计算机网络教程:自顶向下方法》)
Taho 版本要解决的一个最重大的问题就是要改变发送方窗口的大小,接收方不用做什么改变
① 我们对发送方的窗口做出如下改变:
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.client.UDT_RetransTask;
import com.ouc.tcp.client.UDT_Timer;
import com.ouc.tcp.message.TCP_PACKET;import java.util.HashMap;
import java.util.TimerTask;public class Taho_SendWindow extends SR_SendWindow{private int ssthresh;private int wrongAckNum;private int status; //status=0代表慢启动,status=1代表拥塞避免private HashMap<Integer, Integer> hashMap = new HashMap<>();public Taho_SendWindow(Client client) {super(client);size = 1;ssthresh = Integer.MAX_VALUE;wrongAckNum = 0;}@Overridepublic void sendPacket(TCP_PACKET packet) {System.out.println(packet.getTcpH().getTh_seq());//在窗口中初始化这个包的相关数据int index = packet.getTcpH().getTh_seq();packets[index] = packet;isAck[index] = false;timers[index] = new UDT_Timer();hashMap.put(nextseqnum, index);
// UDT_RetransTask task = new UDT_RetransTask(client, packet);Taho_RetransmitTask task = null;try {task = new Taho_RetransmitTask(client, packet.clone());} catch (CloneNotSupportedException e) {e.printStackTrace();}timers[index].schedule(task, 3000, 3000);nextseqnum++;packet.getTcpH().setTh_eflag((byte)7);client.send(packet);}@Overridepublic void recvPacket(TCP_PACKET packet) {int ack = packet.getTcpH().getTh_ack();System.out.println("\nTaho_SenderWindow\n接收到了ack包,ack号为" + ack);if (ack >= base) {System.out.print("size: " + size);if (size < ssthresh) {if(size * 2 <= 0) {//处理整型溢出现象size = Integer.MAX_VALUE/2;} else {size = Math.min(Integer.MAX_VALUE/2, size * 2);}} else {if(size + 1 <= 0) {//处理整型溢出现象size = Integer.MAX_VALUE/2;} else {size = Math.min(Integer.MAX_VALUE/2, size + 1);}}System.out.println(" --> " + size);}if(ack >= base) {int index = ack;if(timers[index] != null) {timers[index].cancel();timers[index] = null;}isAck[index] = true;if(ack == base) {//收到的包是窗口的第一个包,将窗口下沿向前推到一个unAckd seq#int i;for(i = base; i <= nextseqnum && isAck[i]; i++) {packets[i] = null;isAck[i] = false;if(timers[i] != null) {timers[i].cancel();timers[i] = null;}}base = Math.min(i, nextseqnum);System.out.println("base2 = " + base);end = base + size - 1;}}System.out.print("index = " + ack);System.out.print(" base = " + base);System.out.print(" nextseqnum = " + nextseqnum);System.out.println(" end = " + end);}class Taho_RetransmitTask extends RetransmitTask {int number;TCP_PACKET packet;public Taho_RetransmitTask(Client client, TCP_PACKET packet) {super(client, packet);number = packet.getTcpH().getTh_seq();this.packet = packet;}@Overridepublic void run() {System.out.println("执行重传,size已置成1");ssthresh = Math.max(size / 2, 1);size = 1;super.run();if(timers[number] != null) {timers[number].cancel();timers[number] = null;}timers[number] = new UDT_Timer();Taho_RetransmitTask task = new Taho_RetransmitTask(client, packet);timers[number].schedule(task, 3000, 3000);}}
}
注:该类继承自上一个 SR 版本的发送窗口
注 2:这一版本进入拥塞避免的条件只有超时这一条件
② 修改 Window 类的 isFull 方法,使其可以同时应用于旧版本和 Taho 及以上版本
public boolean isFull() {return nextseqnum >= end;
}
运行程序,观察发送方日志:
可以看到虽然 210 号出了问题,但是一直没有重传(窗口没满,并且计时器没到)
210 号重发的时候已经是 432 号发完了,这时会引起一次超时重传,因此窗口大小会骤降为 1
对应的命令行日志如下:
这里 size 变成了 1,因此窗口会被判定成满的,于是新的包发不了,只能等待旧包重发,于是就有了以下的现象:
由于窗口太小,因此只能等到把前面的未 ack 的包全都重发了并且 ack 了,才有可能发新的包
类似的例子还有这里:
504 的重发导致窗口缩减成尺寸为 1,因此只能等到 520 的重发完成才能继续往前推进
同时,这两部分的日志联合起来,我们也可以得知在这 200 个包的发送过程中,我们的窗口又再次慢慢变大了
拥塞控制 Reno
对应 Log 日志:Log Reno Fixed.txt,接收文件:recvData Reno Fixed.txt,窗口大小变化日志:windowSize Reno Fixed.txt
Reno 版本的有限状态机(来自《计算机网络教程:自顶向下方法》)
从 Taho 版本上到 Reno 版本吗,我做了这么几件事情:
① 在发送方加入了冗余 ack 的判断,当收到冗余 ack 的次数达到 3 次的时候,执行快速重传
② 加入了快速恢复阶段
③ 将 3 次冗余 ack 也变成了切换状态的条件之一
④ 将窗口尺寸变化改成了 Reno 版本的形式(/2 + 3)
1 月 6 日更新:
⑤ 在 Fixed 版本中修正了慢启动的 bug
这一个版本更改过多(这一个版本也是让我在 Git 上面上传了多个子版本的一个版本,工作量着实不小),代码如下:
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.client.UDT_RetransTask;
import com.ouc.tcp.client.UDT_Timer;
import com.ouc.tcp.message.TCP_PACKET;import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.TimerTask;public class Reno_SendWindow extends SR_SendWindow{private int ssthresh;private int wrongAckNum = 0;private int status; //status=0代表慢启动,status=1代表拥塞避免, status=2代表快速恢复private int tempAdd = 1;private int count = 0;private HashMap<Integer, Integer> hashMap = new HashMap<>();public Reno_SendWindow(Client client) {super(client);size = 1;ssthresh = Integer.MAX_VALUE;wrongAckNum = 0;status = 0;}@Overridepublic void sendPacket(TCP_PACKET packet) {System.out.println(packet.getTcpH().getTh_seq());//在窗口中初始化这个包的相关数据int index = packet.getTcpH().getTh_seq();packets[index] = packet;isAck[index] = false;timers[index] = new UDT_Timer();hashMap.put(nextseqnum, index);
// UDT_RetransTask task = new UDT_RetransTask(client, packet);Taho_RetransmitTask task = null;try {task = new Taho_RetransmitTask(client, packet.clone());} catch (CloneNotSupportedException e) {e.printStackTrace();}timers[index].schedule(task, 3000, 3000);nextseqnum++;packet.getTcpH().setTh_eflag((byte)7);client.send(packet);}@Overridepublic void recvPacket(TCP_PACKET packet) {System.out.println("size : " + size);int ack = packet.getTcpH().getTh_ack();System.out.println("\nReno_SenderWindow\n接收到了ack包,ack号为" + ack);if(status == 0) {size++;if(size >= ssthresh) {status = 1;}} else if(status == 1) {count++;if(count >= size) {count = 0;size++;}}if(ack > base) {if(status == 2) {size++;System.out.println("快速恢复状态,一个重复的ACK到达");} else {wrongAckNum++;if(wrongAckNum >= 3) {if(status == 0 || status == 1) {ssthresh = size / 2;size = ssthresh + 3;status = 2;System.out.println("慢启动/拥塞避免状态执行快速重传,窗口大小已置为" + size + ",已进入快速恢复状态");}wrongAckNum = 0;if(timers[base] != null) {timers[base].cancel();timers[base] = new UDT_Timer();try {Taho_RetransmitTask task = new Taho_RetransmitTask(client, packets[base].clone());timers[base].schedule(task, 3000, 3000);} catch (CloneNotSupportedException e) {e.printStackTrace();}}try {client.send(packets[base].clone());} catch (CloneNotSupportedException e) {e.printStackTrace();}}}}else if (ack >= base) {if(status == 2 && !isAck[ack]) {//快速恢复状态,一个新的ACK到达size = ssthresh;status = 1;count = 0;System.out.println("快速恢复状态,一个新的ACK到达,进入拥塞避免状态");}}if(ack >= base) {int index = ack;if(timers[index] != null) {timers[index].cancel();timers[index] = null;}isAck[index] = true;if(ack == base) {//收到的包是窗口的第一个包,将窗口下沿向前推到一个unAckd seq#int i;for(i = base; i <= nextseqnum && isAck[i]; i++) {packets[i] = null;isAck[i] = false;if(timers[i] != null) {timers[i].cancel();timers[i] = null;}}base = Math.min(i, nextseqnum);System.out.println("base2 = " + base);end = base + size - 1;}}System.out.println("size : " + size);System.out.print("index = " + ack);System.out.print(" base = " + base);System.out.print(" nextseqnum = " + nextseqnum);System.out.println(" end = " + end);File fw = new File("windowSize.txt");BufferedWriter writer;try {writer = new BufferedWriter(new FileWriter(fw, true));writer.write("ack = " + ack + " size = " + size + " ssthresh = " + ssthresh + "\n");writer.flush();writer.close();} catch (IOException e) {}}class Taho_RetransmitTask extends RetransmitTask {int number;TCP_PACKET packet;public Taho_RetransmitTask(Client client, TCP_PACKET packet) {super(client, packet);number = packet.getTcpH().getTh_seq();this.packet = packet;}@Overridepublic void run() {
// if(number > base + size) {
// System.out.println("number = " + number);
// System.out.println("base + size = " + (base+size));
// //超出部分不做处理
// if(timers[number] != null) {
// timers[number].cancel();
// timers[number] = null;
// }
// timers[number] = new UDT_Timer();
// Taho_RetransmitTask task = new Taho_RetransmitTask(client, packet);
// timers[number].schedule(task, 3000, 3000);
// return;
// }if(status == 0) {ssthresh = Math.max(size / 2, 1);size = 1;System.out.println("慢启动状态超时, size已置成1, ssthresh = " + ssthresh);} else if(status == 2) {ssthresh = Math.max(size / 2, 1);size = 1;status = 0;System.out.println("快速恢复状态超时, size已置成1, ssthresh = " + ssthresh);} else if(status == 1) {ssthresh = Math.max(size / 2, 1);size = 1;status = 0;System.out.println("拥塞避免状态超时,size已置成1, ssthresh = " + ssthresh);}super.run();if(timers[number] != null) {timers[number].cancel();timers[number] = null;}timers[number] = new UDT_Timer();Taho_RetransmitTask task = new Taho_RetransmitTask(client, packet);timers[number].schedule(task, 3000, 3000);}}
}
将发送方和接收方的 eFlag 改成 7,运行代码,分析日志文件中的错误、延迟、丢失三种情况:
案例 1:
发送方的包延迟了,于是在 3 次冗余 ack 之后,发送方进行了快速重传
可以看到,这里从拥塞避免状态进入了快速恢复状态,然后立刻就接收到了一个新的 ack,因此进入拥塞避免状态,size 变成和 ssthresh 相同,因此 size=ssthresh=3(我的窗口大小日志是在收了这个包以后才输出的,因此日志中显示不出进入快速恢复状态那一瞬间的窗口大小)
由于此时处于拥塞避免状态,因此一个发送轮次结束后,窗口尺寸才会 +1,我们可以看到在 3 个 3 之后变成 4,4 个 4 之后变成 5,5 个 5 之后变成 6,可以看到我们的窗口变化是正确的,这个案例也可以作为加法增大的正确性的证明。
案例 2:
发送方的包丢失了,于是在 3 次冗余 ack 之后,发送方进行了快速重传
案例 3:
发送方的包出错了,于是在 3 次冗余 ack 之后,发送方进行了快速重传
案例 4:
初始窗口尺寸为 1,在收到第一个 ack 包之后窗口尺寸就会变为 2,之后是变为 3,4,5……上图是对于慢启动的正确性的证明(ssthresh 的初始值我设置的 Integer.Max)
注:快速重传机制基本上保证了根本不会超时(笑),只要不是接收方的所有包都 delay 了,基本上就不会出现发送方重传的问题(毕竟我这是在选择响应协议的基础上做的),纵览整个日志,也确实没看到有超时重发的例子……
2. 说明在实验过程中采用迭代开发的优点或问题。
这次实验让我对迭代式开发有了非常深刻的体会,我觉得迭代开发优缺点都相对比较明显
我认为迭代开发主要有以下优点:
① 每一个迭代版本的目标非常明确,这与连续开发是不同的,我清楚我做到什么地步,要实现什么样的效果就算是完成了这样的一个迭代版本,也相当于是对于自己的项目进度有一个比较明确的进度条(有一个进度条能让我对自己的项目有一个更好的把控)。软件工程中也学到过,直接估计一个项目的总工作量是很难的,但是如果我们采用迭代开发的话,目标就相对明确,工作量也就随之相对明确了
② 完成每一个迭代版本我可以向 GitHub 上推一个版本,这样我在做下一个迭代版本的开发时,一旦出现一个非常严重的问题,我可以直接回退回上一个大的迭代版本,重新来过;如果不采用迭代开发,就只能凭借推 Git 的时候提交的简短的 summary 和 description 来勉强记忆这个 Git commit 已经完成到什么程度了,这样一旦需要回滚,需要把代码整个过一遍来确定我做了哪些内容没做哪些内容(这些内容很难在提交 commit 的时候精确描述)
③ 迭代式开发的焦点与重点非常明确,不至于出现开发大型项目的时候容易出现的项目太大下不去手的问题
④ 针对于这个项目而言,这样的迭代式开发能够让我真切地体会到每个版本的优缺点(3.0 版本和 GBN 版本让我印象非常深刻),并且在实验结束后的现在,可以说我对于每一个版本都非常非常熟悉了,如果直接开发最后的版本,那么这些中间过程我是不能了解到的,自然也不会对整个 tcp 版本的发展历史有所了解有所掌握
我认为迭代开发主要有以下缺点:
① 这个项目中从要求上来说共分为 1.0 2.0 2.1 2.2 3.0 GBN/SR 拥塞控制这么几个大的迭代版本,但是实际上我在做的时候大的迭代版本数远远不止于此:
我实际在做的时候迭代版本是以下的:
Initial Commit-> RDT 1.0 -> RDT 2.0 -> RDT 2.1 -> RDT 2.2 -> RDT SR-GBN(发送方 SR,接收方 GBN) -> RDT SR -> RDT Taho -> RDT Taho Fixed -> RDT FR(快速重传) -> RDT Reno
由于我在开发的时候没有把之前的代码删掉,而是把他们注释掉了,并且我在编程的时候会写明这个代码是哪一个版本进行添加/修改的,因此我可以比较明确地看到我哪个版本做了什么(除非有多个版本连续修改同一块代码),但是现在全部写完了再回头看,其实最开始的代码(或者可以说 3.0 版本之前的代码),没剩多少了……我觉得在某种意义上来说,这也算是增添了比较多的工作量
② 这个项目相当于是老师为我们规定了迭代的版本,如果是其他的项目(如软件工程项目),由开发者自行规定迭代版本,很可能出现迭代版本安排设置不合理的情况,从而极大地影响开发效率,我猜测:如果迭代版本安排过小,就失去了它的意义;如果迭代版本安排过大,就与不采用迭代开发没有本质区别了。因此,迭代开发会受到制定迭代计划的好坏的影响
3. 总结完成大作业过程中已经解决的主要问题和自己采取的相应解决方法
① recvData 输出不完整(SR-GBN 版本升到 SR 版本过程中出现)
这个 recvData 文件我特意存储了下来,可以看到其他的文件都是 694KB,只有这个文件是 680KB,我百思不得其解==(多线程的程序很难直接调试)
最后把接收方的代码分成了非常多的小的功能模块,然后每一个功能模块都用 System.out.println 来进行输出日志,来进行详细地查看,仍然没有找到问题所在
输出日志仍然没有解决我的问题,于是我在纸上手动执行了一次代码,大概执行了两趟,我就发现了问题所在,问题出在一段我从来没有修改过,甚至可以说从来没有注意过的代码上:
//交付数据(每20组数据交付一次)
if(dataQueue.size() >= 20)deliver_data();
这一个简单的 if 语句与我的 SR 版本写的缓冲区的交付数据不搭配,因此就会造成整个数据的最后一小段还在 dataQueue 中放着,不够 20,因此没有交付,所以接收文件中少一段
心得体会:多线程真的非常难以 debug,并且用输出法来检查问题也非常麻烦(命令行的日志过多,找我自己的日志也很麻烦),有的时候手动执行以下代码是不错的选择;再有就是,细致地了解自己的代码,对他们要有完全的掌控,不然不定什么时候就会出现错误
② 对于后续迭代版本出现的窗口有点难以下手(3.0 版本升到 SR 版本过程中出现)
我先仔细整理了一下后续版本要实现的功能或方法,以及需要的成员变量,然后打了一个 UML 图的草稿,类似下图:
把继承关系理顺了之后,把函数名以及相关的注释都标在了上面,接下来将每一个函数需要做什么明确地记录在纸上,然后每一个迭代版本按照自己的草稿逐步填入进去就可以了
③ 最开始的序号系统是 0,101,201……给我的思考和编码都带来了极大的麻烦(3.0 版本升到 GBN 版本过程中出现)
在构建窗口的时候,必须时刻小心,这里是不是需要把序号除 100,那里是不是不应该把序号除 100,带来了很多根本毫无意义的思考与提防,但是一直没狠下心来把所有的代码中的序号修改一遍,于是就只能在已有的基础上继续啰里啰嗦地往后写,然后越写越难写,越写越难写(就好比在一个三层的危房上再建个同样是危房的第四层),最后下定决心把所有的编号都改成了 0,1,2,3……才算彻底摆脱这一苦恼
心得体会:长痛不如短痛……有些基础性的问题就应该尽早全力解决,如果我在 3.0 版本之前就把这个问题改掉了,我的 SR 版本也不会卡这么多天都上不去
4. 对于实验系统提出问题或建议
① 如果实验系统使用最新版的 Java 也能跑起来。感觉会更方便一点(不过现在这个实验系统 Java8 是可以运行的,好评!)
② 实验系统中命令行的日志过多(最早的部分日志会爆出范围),建议将命令行的日志直接写到文件中;或者对外提供一个写自定义日志的接口?感觉会更方便一点