實現可靠傳輸
1. 結合代碼和 LOG 文件分析針對每個項目舉例說明解決效果。
RDT1.0
對應 Log 日志:Log 1.0.txt,接收文件 recvData 1.0.txt
RDT1.0 版本是在可靠信道上進行可靠的數據傳輸,因此沒有過多的內容需要說明,發送方 Log 日志如下:
接收方 Log 日志如下:
從發送方和接收方的發送數據報的數量我們就可以看出信道是沒有出任何錯的,雙方也正常完成了全部內容傳輸
RDT2.0
對應 Log 日志:Log 2.0.txt,接收文件 recvData 2.0.txt
RDT2.0 版本是在可能出現位錯的信道上進行傳輸,只需要在 1.0 的基礎上做出如下幾點更改即可:
① 添加校驗和 Checksum 的計算,代碼如下:
package com.ouc.tcp.test;import com.ouc.tcp.message.TCP_HEADER;
import com.ouc.tcp.message.TCP_PACKET;public class CheckSum {/*計算TCP報文段校驗和:只需校驗TCP首部中的seq、ack和sum,以及TCP數據字段*/public static short computeChkSum(TCP_PACKET tcpPack) {//計算校驗和int checkSum = 0;TCP_HEADER header = tcpPack.getTcpH();int[] data = tcpPack.getTcpS().getData();int length = data.length;int[] header_info = new int[3];header_info[0] = header.getTh_ack(); //seqheader_info[1] = header.getTh_seq(); //ack//header_info[2] = header.getTh_sum(); //sum//這里不代入sum進行計算是為了少更改Receiver的已有代碼int maxValue = 0xffff;int modulus = 65536;for(int i = 0; i < 2; i++) {if(checkSum > maxValue) {checkSum = checkSum % modulus + checkSum / modulus;}checkSum = checkSum + header_info[i];}for(int i = 0; i < length; i++) {if(checkSum > maxValue) {checkSum = checkSum % modulus + checkSum / modulus;}checkSum = checkSum + data[i];}if(checkSum > maxValue) {checkSum = checkSum % modulus + checkSum / modulus;}checkSum = ~checkSum;//System.out.println("checksum=" + checkSum);return (short) checkSum;}
}
這里我的校驗和的計算方式是仿照了 UDP 的校驗和計算方式,但是老師提供的代碼中的 Receiver 類中的校驗和的判斷是這么寫的:
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum())
如果嚴格按照 UDP 計算校驗和的方法,上述 if 語句的左邊就會計算出來 0,而 if 語句的右邊給出的值應該不是 0,那么這個 if 語句就不成立了,而應該改為:
if(CheckSum.computeChkSum(recvPack) == 0)
為了少更改 Receiver 類中的已有代碼,這里我在計算校驗和的時候沒有把 sum 代入進來,只計算了 seq,ack,以及 TCP 數據字段
② 在 Sender 的 recv 函數中加入對于 ACK 包的 ack 字段的檢測:如果檢測到 NACK,重發,代碼如下:
if(recvPack.getTcpH().getTh_ack() == -1) { //2.0版本檢測NACKudt_send(tcpPack);return;
}
③ 調整 Receiver 的代碼,在檢測到 corrupt 之后返回 NACK,代碼如下:
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum()) {//校驗通過,這里代碼省略了
} else {//校驗未通過System.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);tcpH.setTh_ack(-1);ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));
}
運行程序,得到發送方 Log 日志如下:
由于 2.0 版本的假設,我們可以知道只有發送方會出現錯誤,接收方不會出現錯誤,因此發送方的 eFlag 設置成 1,接受方的 eFlag 設置成 0
在上圖中,我們可以看到,發送方共犯了 13 個錯誤,因此有 13 個包需要重發,共計 1013 個包,數字是對的
在發送方的日志中我們也可以實際地看到這種犯錯誤并重發來修正的過程,下面以 Log 日志中的兩處作為例子:
同時,我們可以去接收方查看一下接收方對應處的日志,來檢查接收方的 ACK/NACK 機制是否正常運行了:
可以看到,我們的接收方在 6001 的正常 ack 之前,以及 24601 的正常 ack 之前,都先給發送方回了一個 NACK 包,因此我們可以得出發送方與接收方都在正常工作的結論。
RDT2.1
對應 Log 日志:Log 2.1.txt,接收文件:recvData 2.1.txt,控制臺日志:consoleLog 2.1.txt
①RDT2.1 是在 RDT2.0 的基礎上解決 ack/nack 包會出錯的問題,我們在發送方的 recv()函數的代碼中做如下更改:
if(CheckSum.computeChkSum(recvPack) != recvPack.getTcpH().getTh_sum()) { //2.1版本檢測corruptSystem.out.println("corrupt");udt_send(tcpPack); return;
}
② 將 Receiver 中的 rdt_recv()函數修改如下:
int seqInPack = recvPack.getTcpH().getTh_seq();
System.out.println("seqInPack = " + seqInPack);
//2.0版本:檢查校驗碼,生成ACK
//2.1版本,加入對seqInPack的判斷(使用序號判斷來代替書中0和1兩個狀態)
if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum() && seqInPack == sequence) {//校驗通過,并且是我期待的包//代碼省略
} else if(seqInPack == sequence){//2.0版本 NAKSystem.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);tcpH.setTh_ack(-1);ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//回復ACK報文段System.out.println("ack包序號為" + ackPack.getTcpH().getTh_seq());reply(ackPack);
} else {//2.0版本 重復System.out.println("重復");//seqInPack != sequence,說明該數據報我已經接收過了tcpH.setTh_ack(recvPack.getTcpH().getTh_seq());ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//回復ACK報文段System.out.println("ack包序號為" + ackPack.getTcpH().getTh_seq());reply(ackPack);
}
由于 2.1 版本的假設,發送方和接收方都有可能出現錯誤,因此雙方的 eFlag 都應該改成 1,運行程序,得到發送方日志如下:
接收方的日志如下:
我們可以從這個 Log 的數據中看出來:發送方犯了 13 個錯誤,因此這 13 個錯誤都需要重傳;接收方犯了 16 個錯誤,對于這 16 個錯誤的 ack 包,發送方不知道接收方是否 ack 了,因此也需要重傳,所以發送方共計發送了 1000+13+16=1029 個數據包
發送方錯誤舉例(上圖為發送方日志,下圖為接收方日志):
可以看到,發送方犯了錯,于是接收方回了 NACK,發送方進行重傳,這個重傳的包被正常 ack
接收方錯誤舉例(上圖為發送方日志,下圖為接收方日志):
可以看到,發送方沒有犯錯,但是包也沒有正常 ack,原因是接收方的 ack 出現了錯誤,因此發送方重傳了該包,并正常地收到 ack 了
由此,我們可以得出發送方與接收方都在正常工作的結論
RDT2.2
對應 Log 日志:Log 2.2.txt,對應接收文件:recvData 2.2.txt,對應控制臺日志:consoleLog 2.2.txt
RDT2.2 版本與 RDT2.1 版本的功能是相同的,唯一區別只是不再使用 ack/nack 的確認方式,而是統一使用 ack,如果接收方檢測到包的 corrupt,那么返回一個過期的 ack 即可,這里我還是使用序號的方式來進行檢測,即:
如果接收方接收到了一個正常的包,就正常返回這個包的序號作為 ack
如果接收方接收到了一個 corrupt 的包,或者一個過期的包,就返回上一個包的序號作為 ack
(該算法的合理性論證如下:正常的發送與接收就不說明了;說明一下接收方收到一個過期的包的情況:由于現在是停止等待協議,因此如果接收方接收到了一個過期的包,它只可能是上一個包,因此我們應該返回上一個包的序號作為 ack 來告訴發送方我們正常接收了這個包,雖然接收方實際上是不需要這個包的)
對代碼更改如下:
① 更改 Sender 的 recv()函數的最開始對包的檢測部分如下:
//注:前面的代碼中要將2.0版本的檢測NACK隱去if(CheckSum.computeChkSum(recvPack) != recvPack.getTcpH().getTh_sum()) { //2.1版本檢測corruptSystem.out.println("corrupt");udt_send(tcpPack); return;
}
if(recvPack.getTcpH().getTh_seq() < sequence) { //2.2版本,無NAKSystem.out.println("ack報文編號" + recvPack.getTcpH().getTh_seq() + "已重復收到");System.out.println("想要的報文編號是" + sequence);//該ack報文我已經收到過了udt_send(tcpPack);return;
}//注:后面的代碼中有 接收到一個正常包之后更新sequence的值的功能
② 更改 Receiver 的 rdt_recv()函數中的校驗和不正確但包編號是對的的情況的代碼:
else if(seqInPack == sequence){//2.0版本 NAK
// System.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));
// System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());
// System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);
// tcpH.setTh_ack(-1);
// ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());
// tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//2.2版本 無NAK,改用序號不足的ack來充當NAKSystem.out.println("Recieve Computed: "+CheckSum.computeChkSum(recvPack));System.out.println("Recieved Packet"+recvPack.getTcpH().getTh_sum());System.out.println("Problem: Packet Number: "+recvPack.getTcpH().getTh_seq()+" + InnerSeq: "+sequence);//回復ACK報文段System.out.println("ack包序號為" + ackPack.getTcpH().getTh_seq());reply(ackPack);}
運行代碼結果可見 Log 2.2.txt,由于功能上與 RDT2.1 完全一致,這里不再贅述
RDT3.0
對應 Log 日志:Log 3.0 -1.txt、Log 3.0 -2.txt,接收文件:recvData 3.0 -1.txt、recvData 3.0 -2.txt
注:后綴帶 1 的是發送方會錯會丟包,接收方只會錯;后綴帶 2 的是發送方與接收方都是會錯會丟包
RDT3.0 的最大進步是可以處理包的 Loss 了,從 2.2 上到 3.0 版本只需要更改發送方代碼即可,發送方的狀態機如下:
我們對照上圖來修改代碼(其實是 2.2 上到 3.0 非常簡單,所以我現在已經記不全 3.0 都改了什么了)
① 首先在 Sender 類中加入一個私有變量 UDT_Timer:
private UDT_Timer timer; //3.0版本,計時器
② 在發送方的 rdt_Send()函數中加入如下代碼:
//用于3.0版本:設置計時器和超時重傳任務timer = new UDT_Timer();UDT_RetransTask reTrans = new UDT_RetransTask(client, tcpPack);//每隔3秒執行重傳,直到收到ACKtimer.schedule(reTrans, 3000, 3000);
③ 在發送方的 waitACK()函數中加入如下代碼:
④ 這里我是嚴格按照狀態機來寫的,因此我去除了發送方收到 corrupt 的 ack 包以及序號不對的 ack 包之后的重發,相當于是不管發生什么,都等到超時事件被觸發的時候才重發
將發送方和接收方的 eFlag 都調整成 4,運行代碼(每運行一次 3.0 版本都要經歷一次漫長的等待,太太太太太慢了)
以下日志分析我采用發送方和接收方都會錯會丟包的日志 2 來進行分析:
發送方日志如下:
接收方日志如下:
從整體來看,可以得到 1015=1000+12+3,1006=1000+6 的正確結論,接下來我們再從細節上看一下我們的系統是否在正常工作:
- 發送方 Wrong:
- 發送方 Loss:
- 接收方 Wrong(上為發送方日志,下為接收方日志):
- 接收方 Loss(上為發送方日志,下為接收方日志):
綜上,我們可以看出我們的 RDT3.0 正常運行了(但是太太太太太慢了),不過令人高興的是,這是我們最后一次使用停止等待協議了,接下來我們就全面邁進流水線協議時代了
選擇響應協議
對應 Log 日志:Log SR.txt,接收文件:recvData SR.txt
選擇響應協議是一個變化比較大的版本,工作量也非常多,在我的 GitHub 記錄中,這也是第一次推了兩個子版本的協議(第一個版本我的發送方采用的是選擇響應協議,接收方采用的是 Go-Back-N 協議,其結果就是……跑一次需要大概 10 分鐘 QAQ;第二個版本是雙方采用選擇響應協議,效率一下子就上去了)
主要工作如下:
① 構建所有窗口的父類:Window 類(窗口大小設的 15):
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.message.TCP_PACKET;import java.util.TimerTask;public class Window {public Client client;public int size = 15;public TCP_PACKET[] packets = new TCP_PACKET[size];public volatile int base = 0;public volatile int nextseqnum = 0;public volatile int end = size - 1;public volatile int sequence = 1;public boolean[] isAck = new boolean[size];public Window(Client client) {this.client = client;}public boolean isFull() {return nextseqnum == end;}
}
注:為什么要加個 volatile 呢?這是痛苦地 debug 并且各種百度了一天之后的成果(心痛),不加 volatile 會出現各種各樣的奇奇怪怪的問題
② 構建接收窗口:
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.message.TCP_PACKET;import java.util.Vector;public class SR_ReceiveWindow extends Window {public SR_ReceiveWindow(Client client) {super(client);}public Vector<TCP_PACKET> recvPacket(TCP_PACKET packet) {Vector<TCP_PACKET> vector = new Vector<>();int seq = packet.getTcpH().getTh_seq();int index = seq % size;System.out.println("ReceiveWindow信息如下:");System.out.print("seq = " + seq);System.out.print("index = " + index);System.out.print(" base = " + base);System.out.print(" nextseqnum = " + nextseqnum);System.out.println(" end = " + end);if(index >= 0) {isAck[index] = true;packets[index] = packet;
// client.send(packet);if(seq == base) { //收到的包是窗口的第一個包int i;for(i = base; i <= end && isAck[i % size]; i++) {vector.addElement(packets[i % size]);isAck[i % size] = false;packets[i % size] = null;}base = i; //移動窗口位置end = base + size - 1;}}return vector;}
}
③ 構建發送窗口:
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.client.UDT_RetransTask;
import com.ouc.tcp.client.UDT_Timer;
import com.ouc.tcp.message.TCP_PACKET;import java.util.TimerTask;public class SR_SendWindow extends Window{public UDT_Timer[] timers = new UDT_Timer[size];public SR_SendWindow(Client client) {super(client);}public void sendPacket(TCP_PACKET packet) {System.out.println(packet.getTcpH().getTh_seq());//在窗口中初始化這個包的相關數據int index = nextseqnum % size;packets[index] = packet;isAck[index] = false;timers[index] = new UDT_Timer();UDT_RetransTask task = new UDT_RetransTask(client, packet);timers[index].schedule(task, 3000, 3000);nextseqnum++;packet.getTcpH().setTh_eflag((byte)4);client.send(packet);}public void recvPacket(TCP_PACKET packet) {int ack = packet.getTcpH().getTh_ack(); //System.out.println("接收到了ack包,ack號為" + ack);if(ack >= base && ack <= base + size) {int index = ack % size;if(timers[index] != null)timers[index].cancel();isAck[index] = true;System.out.print("index = " + index);System.out.print(" base = " + base);System.out.print(" nextseqnum = " + nextseqnum);System.out.println(" end = " + end);if(ack == base) {//收到的包是窗口的第一個包,將窗口下沿向前推到一個unAckd seq#int i;for(i = base; i <= nextseqnum && isAck[i % size]; i++) {packets[i % size] = null;isAck[i % size] = false;if(timers[i % size] != null) {timers[i % size].cancel();timers[i % size] = null;}}base = Math.min(i, nextseqnum);System.out.println("base2 = " + base);end = base + size - 1;}}}}
④ 將 Sender 中的工作更改成為交給 SendWindow 來做
public void rdt_send(int dataIndex, int[] appData) {//生成TCP數據報(設置序號和數據字段/校驗和),注意打包的順序tcpH = new TCP_HEADER();tcpS = new TCP_SEGMENT();tcpH.setTh_seq(dataIndex);//包序號設置為字節流號:tcpS.setData(appData);tcpH.setTh_sum((short)0); //需要初始化校驗和以進行計算tcpPack = new TCP_PACKET(tcpH, tcpS, destinAddr);tcpH.setTh_sum(CheckSum.computeChkSum(tcpPack));tcpPack.setTcpH(tcpH);while(window.isFull());TCP_PACKET packet = new TCP_PACKET(tcpH, tcpS, destinAddr);try {window.sendPacket(packet.clone());} catch (CloneNotSupportedException e) {e.printStackTrace();}
}
public void recv(TCP_PACKET recvPack) {if(CheckSum.computeChkSum(recvPack) != recvPack.getTcpH().getTh_sum()) { //2.1版本檢測corrupt并作出處理System.out.println("corrupt");//udt_send(tcpPack); //GBN版本 corrupt不需處理return;}window.recvPacket(recvPack); //使用窗口來處理ackSystem.out.println("Receive ACK Number: "+ recvPack.getTcpH().getTh_ack());ackQueue.add(recvPack.getTcpH().getTh_ack());System.out.println();
}
⑤ 將 Receiver 中的回復 ack 包以外的工作交給 ReceiverWindow 來完成
public void rdt_recv(TCP_PACKET recvPack) {int seqInPack = recvPack.getTcpH().getTh_seq();//2.0版本:檢查校驗碼,生成ACK//2.1版本,加入對seqInPack的判斷(代替書中0和1兩個狀態)//if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum() && seqInPack == sequence) {System.out.println("seqInPack = " + seqInPack);if(CheckSum.computeChkSum(recvPack) == recvPack.getTcpH().getTh_sum() && seqInPack >= window.base && seqInPack < window.base + window.size) {//是我期望的序號 && 校驗通過//生成ACK報文段(設置確認號)tcpH.setTh_ack(recvPack.getTcpH().getTh_seq());ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//回復ACK報文段try {Vector<TCP_PACKET> vector = window.recvPacket(recvPack.clone());if(vector != null && vector.size() > 0) {for (int i = 0; i < vector.size(); i++) {dataQueue.add(vector.get(i).getTcpS().getData());}//交付數據(每20組數據交付一次)//if(dataQueue.size() >= 20) //SR版本修改交付情況deliver_data();}} catch (CloneNotSupportedException e) {e.printStackTrace();}reply(ackPack);System.out.println("ack包序號為" + ackPack.getTcpH().getTh_seq());}else if(seqInPack < window.base && seqInPack > window.base - window.size) {//收到了一個序號小于我的包//SR版本:收到了一個窗口以外的包System.out.println("該包在窗口以外");tcpH.setTh_ack(seqInPack);ackPack = new TCP_PACKET(tcpH, tcpS, recvPack.getSourceAddr());tcpH.setTh_sum(CheckSum.computeChkSum(ackPack));//回復ACK報文段reply(ackPack);}else {//GBN版本//reply(ackPack);//SR版本:do nothing}}
⑥ 注:在實現 GBN-SR 版本升級到 SR 版本的過程中,我把我的系統的包的序號體系修改了一下,由原來的 1,101,201,301,401……改成了 0,1,2,3,4……,修改之后大幅降低了思考難度與編碼難度(不然在維護窗口的時候要時刻想清楚要不要把包的序號整除一個 100)
運行代碼,對日志進行分析:
由上面這一段發送方日志我們可以看出來我們現在確實是流水線協議,而不是停止等待協議(19 號的重發與 19 的第一次發并不挨著)
以下兩張圖片,第一張是發送方日志,第二張是接收方日志
由這一段發送方日志我們可以看出來發送方窗口的大小限制了發送方的窗口繼續往前推進(窗口滿了,所以新包不能再發送,只能等著舊包超時重傳)
同時,我們也可以看出我們的重傳是誰超時重傳誰,而不是像 GBN 版本一樣整個窗口全都重傳
我們還可以從這里的接收方日志中看出雖然 184 號包出現了問題,但是沒有影響接收方對 185 186 187 等后續的包的接收,這也說明了我們的 SR 版本的正確性
擁塞控制 Taho
對應 Log 日志:Log Taho2.txt,接收文件:recvData Taho2.txt,控制臺日志:consoleLog Taho2.txt
注:Log Taho.txt、recvData Taho.txt、consoleLog Taho.txt 所對應的 Taho 版本存在潛在的整型溢出問題,因此不是 Taho 的最終版本
以下內容按照 Taho Fixed 版本進行描述:
Taho 版本的有限狀態機(來自《計算機網絡教程:自頂向下方法》)
Taho 版本要解決的一個最重大的問題就是要改變發送方窗口的大小,接收方不用做什么改變
① 我們對發送方的窗口做出如下改變:
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.client.UDT_RetransTask;
import com.ouc.tcp.client.UDT_Timer;
import com.ouc.tcp.message.TCP_PACKET;import java.util.HashMap;
import java.util.TimerTask;public class Taho_SendWindow extends SR_SendWindow{private int ssthresh;private int wrongAckNum;private int status; //status=0代表慢啟動,status=1代表擁塞避免private HashMap<Integer, Integer> hashMap = new HashMap<>();public Taho_SendWindow(Client client) {super(client);size = 1;ssthresh = Integer.MAX_VALUE;wrongAckNum = 0;}@Overridepublic void sendPacket(TCP_PACKET packet) {System.out.println(packet.getTcpH().getTh_seq());//在窗口中初始化這個包的相關數據int index = packet.getTcpH().getTh_seq();packets[index] = packet;isAck[index] = false;timers[index] = new UDT_Timer();hashMap.put(nextseqnum, index);
// UDT_RetransTask task = new UDT_RetransTask(client, packet);Taho_RetransmitTask task = null;try {task = new Taho_RetransmitTask(client, packet.clone());} catch (CloneNotSupportedException e) {e.printStackTrace();}timers[index].schedule(task, 3000, 3000);nextseqnum++;packet.getTcpH().setTh_eflag((byte)7);client.send(packet);}@Overridepublic void recvPacket(TCP_PACKET packet) {int ack = packet.getTcpH().getTh_ack();System.out.println("\nTaho_SenderWindow\n接收到了ack包,ack號為" + ack);if (ack >= base) {System.out.print("size: " + size);if (size < ssthresh) {if(size * 2 <= 0) {//處理整型溢出現象size = Integer.MAX_VALUE/2;} else {size = Math.min(Integer.MAX_VALUE/2, size * 2);}} else {if(size + 1 <= 0) {//處理整型溢出現象size = Integer.MAX_VALUE/2;} else {size = Math.min(Integer.MAX_VALUE/2, size + 1);}}System.out.println(" --> " + size);}if(ack >= base) {int index = ack;if(timers[index] != null) {timers[index].cancel();timers[index] = null;}isAck[index] = true;if(ack == base) {//收到的包是窗口的第一個包,將窗口下沿向前推到一個unAckd seq#int i;for(i = base; i <= nextseqnum && isAck[i]; i++) {packets[i] = null;isAck[i] = false;if(timers[i] != null) {timers[i].cancel();timers[i] = null;}}base = Math.min(i, nextseqnum);System.out.println("base2 = " + base);end = base + size - 1;}}System.out.print("index = " + ack);System.out.print(" base = " + base);System.out.print(" nextseqnum = " + nextseqnum);System.out.println(" end = " + end);}class Taho_RetransmitTask extends RetransmitTask {int number;TCP_PACKET packet;public Taho_RetransmitTask(Client client, TCP_PACKET packet) {super(client, packet);number = packet.getTcpH().getTh_seq();this.packet = packet;}@Overridepublic void run() {System.out.println("執行重傳,size已置成1");ssthresh = Math.max(size / 2, 1);size = 1;super.run();if(timers[number] != null) {timers[number].cancel();timers[number] = null;}timers[number] = new UDT_Timer();Taho_RetransmitTask task = new Taho_RetransmitTask(client, packet);timers[number].schedule(task, 3000, 3000);}}
}
注:該類繼承自上一個 SR 版本的發送窗口
注 2:這一版本進入擁塞避免的條件只有超時這一條件
② 修改 Window 類的 isFull 方法,使其可以同時應用于舊版本和 Taho 及以上版本
public boolean isFull() {return nextseqnum >= end;
}
運行程序,觀察發送方日志:
可以看到雖然 210 號出了問題,但是一直沒有重傳(窗口沒滿,并且計時器沒到)
210 號重發的時候已經是 432 號發完了,這時會引起一次超時重傳,因此窗口大小會驟降為 1
對應的命令行日志如下:
這里 size 變成了 1,因此窗口會被判定成滿的,于是新的包發不了,只能等待舊包重發,于是就有了以下的現象:
由于窗口太小,因此只能等到把前面的未 ack 的包全都重發了并且 ack 了,才有可能發新的包
類似的例子還有這里:
504 的重發導致窗口縮減成尺寸為 1,因此只能等到 520 的重發完成才能繼續往前推進
同時,這兩部分的日志聯合起來,我們也可以得知在這 200 個包的發送過程中,我們的窗口又再次慢慢變大了
擁塞控制 Reno
對應 Log 日志:Log Reno Fixed.txt,接收文件:recvData Reno Fixed.txt,窗口大小變化日志:windowSize Reno Fixed.txt
Reno 版本的有限狀態機(來自《計算機網絡教程:自頂向下方法》)
從 Taho 版本上到 Reno 版本嗎,我做了這么幾件事情:
① 在發送方加入了冗余 ack 的判斷,當收到冗余 ack 的次數達到 3 次的時候,執行快速重傳
② 加入了快速恢復階段
③ 將 3 次冗余 ack 也變成了切換狀態的條件之一
④ 將窗口尺寸變化改成了 Reno 版本的形式(/2 + 3)
1 月 6 日更新:
⑤ 在 Fixed 版本中修正了慢啟動的 bug
這一個版本更改過多(這一個版本也是讓我在 Git 上面上傳了多個子版本的一個版本,工作量著實不小),代碼如下:
package com.ouc.tcp.test;import com.ouc.tcp.client.Client;
import com.ouc.tcp.client.UDT_RetransTask;
import com.ouc.tcp.client.UDT_Timer;
import com.ouc.tcp.message.TCP_PACKET;import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.TimerTask;public class Reno_SendWindow extends SR_SendWindow{private int ssthresh;private int wrongAckNum = 0;private int status; //status=0代表慢啟動,status=1代表擁塞避免, status=2代表快速恢復private int tempAdd = 1;private int count = 0;private HashMap<Integer, Integer> hashMap = new HashMap<>();public Reno_SendWindow(Client client) {super(client);size = 1;ssthresh = Integer.MAX_VALUE;wrongAckNum = 0;status = 0;}@Overridepublic void sendPacket(TCP_PACKET packet) {System.out.println(packet.getTcpH().getTh_seq());//在窗口中初始化這個包的相關數據int index = packet.getTcpH().getTh_seq();packets[index] = packet;isAck[index] = false;timers[index] = new UDT_Timer();hashMap.put(nextseqnum, index);
// UDT_RetransTask task = new UDT_RetransTask(client, packet);Taho_RetransmitTask task = null;try {task = new Taho_RetransmitTask(client, packet.clone());} catch (CloneNotSupportedException e) {e.printStackTrace();}timers[index].schedule(task, 3000, 3000);nextseqnum++;packet.getTcpH().setTh_eflag((byte)7);client.send(packet);}@Overridepublic void recvPacket(TCP_PACKET packet) {System.out.println("size : " + size);int ack = packet.getTcpH().getTh_ack();System.out.println("\nReno_SenderWindow\n接收到了ack包,ack號為" + ack);if(status == 0) {size++;if(size >= ssthresh) {status = 1;}} else if(status == 1) {count++;if(count >= size) {count = 0;size++;}}if(ack > base) {if(status == 2) {size++;System.out.println("快速恢復狀態,一個重復的ACK到達");} else {wrongAckNum++;if(wrongAckNum >= 3) {if(status == 0 || status == 1) {ssthresh = size / 2;size = ssthresh + 3;status = 2;System.out.println("慢啟動/擁塞避免狀態執行快速重傳,窗口大小已置為" + size + ",已進入快速恢復狀態");}wrongAckNum = 0;if(timers[base] != null) {timers[base].cancel();timers[base] = new UDT_Timer();try {Taho_RetransmitTask task = new Taho_RetransmitTask(client, packets[base].clone());timers[base].schedule(task, 3000, 3000);} catch (CloneNotSupportedException e) {e.printStackTrace();}}try {client.send(packets[base].clone());} catch (CloneNotSupportedException e) {e.printStackTrace();}}}}else if (ack >= base) {if(status == 2 && !isAck[ack]) {//快速恢復狀態,一個新的ACK到達size = ssthresh;status = 1;count = 0;System.out.println("快速恢復狀態,一個新的ACK到達,進入擁塞避免狀態");}}if(ack >= base) {int index = ack;if(timers[index] != null) {timers[index].cancel();timers[index] = null;}isAck[index] = true;if(ack == base) {//收到的包是窗口的第一個包,將窗口下沿向前推到一個unAckd seq#int i;for(i = base; i <= nextseqnum && isAck[i]; i++) {packets[i] = null;isAck[i] = false;if(timers[i] != null) {timers[i].cancel();timers[i] = null;}}base = Math.min(i, nextseqnum);System.out.println("base2 = " + base);end = base + size - 1;}}System.out.println("size : " + size);System.out.print("index = " + ack);System.out.print(" base = " + base);System.out.print(" nextseqnum = " + nextseqnum);System.out.println(" end = " + end);File fw = new File("windowSize.txt");BufferedWriter writer;try {writer = new BufferedWriter(new FileWriter(fw, true));writer.write("ack = " + ack + " size = " + size + " ssthresh = " + ssthresh + "\n");writer.flush();writer.close();} catch (IOException e) {}}class Taho_RetransmitTask extends RetransmitTask {int number;TCP_PACKET packet;public Taho_RetransmitTask(Client client, TCP_PACKET packet) {super(client, packet);number = packet.getTcpH().getTh_seq();this.packet = packet;}@Overridepublic void run() {
// if(number > base + size) {
// System.out.println("number = " + number);
// System.out.println("base + size = " + (base+size));
// //超出部分不做處理
// if(timers[number] != null) {
// timers[number].cancel();
// timers[number] = null;
// }
// timers[number] = new UDT_Timer();
// Taho_RetransmitTask task = new Taho_RetransmitTask(client, packet);
// timers[number].schedule(task, 3000, 3000);
// return;
// }if(status == 0) {ssthresh = Math.max(size / 2, 1);size = 1;System.out.println("慢啟動狀態超時, size已置成1, ssthresh = " + ssthresh);} else if(status == 2) {ssthresh = Math.max(size / 2, 1);size = 1;status = 0;System.out.println("快速恢復狀態超時, size已置成1, ssthresh = " + ssthresh);} else if(status == 1) {ssthresh = Math.max(size / 2, 1);size = 1;status = 0;System.out.println("擁塞避免狀態超時,size已置成1, ssthresh = " + ssthresh);}super.run();if(timers[number] != null) {timers[number].cancel();timers[number] = null;}timers[number] = new UDT_Timer();Taho_RetransmitTask task = new Taho_RetransmitTask(client, packet);timers[number].schedule(task, 3000, 3000);}}
}
將發送方和接收方的 eFlag 改成 7,運行代碼,分析日志文件中的錯誤、延遲、丟失三種情況:
案例 1:
發送方的包延遲了,于是在 3 次冗余 ack 之后,發送方進行了快速重傳
可以看到,這里從擁塞避免狀態進入了快速恢復狀態,然后立刻就接收到了一個新的 ack,因此進入擁塞避免狀態,size 變成和 ssthresh 相同,因此 size=ssthresh=3(我的窗口大小日志是在收了這個包以后才輸出的,因此日志中顯示不出進入快速恢復狀態那一瞬間的窗口大小)
由于此時處于擁塞避免狀態,因此一個發送輪次結束后,窗口尺寸才會 +1,我們可以看到在 3 個 3 之后變成 4,4 個 4 之后變成 5,5 個 5 之后變成 6,可以看到我們的窗口變化是正確的,這個案例也可以作為加法增大的正確性的證明。
案例 2:
發送方的包丟失了,于是在 3 次冗余 ack 之后,發送方進行了快速重傳
案例 3:
發送方的包出錯了,于是在 3 次冗余 ack 之后,發送方進行了快速重傳
案例 4:
初始窗口尺寸為 1,在收到第一個 ack 包之后窗口尺寸就會變為 2,之后是變為 3,4,5……上圖是對于慢啟動的正確性的證明(ssthresh 的初始值我設置的 Integer.Max)
注:快速重傳機制基本上保證了根本不會超時(笑),只要不是接收方的所有包都 delay 了,基本上就不會出現發送方重傳的問題(畢竟我這是在選擇響應協議的基礎上做的),縱覽整個日志,也確實沒看到有超時重發的例子……
2. 說明在實驗過程中采用迭代開發的優點或問題。
這次實驗讓我對迭代式開發有了非常深刻的體會,我覺得迭代開發優缺點都相對比較明顯
我認為迭代開發主要有以下優點:
① 每一個迭代版本的目標非常明確,這與連續開發是不同的,我清楚我做到什么地步,要實現什么樣的效果就算是完成了這樣的一個迭代版本,也相當于是對于自己的項目進度有一個比較明確的進度條(有一個進度條能讓我對自己的項目有一個更好的把控)。軟件工程中也學到過,直接估計一個項目的總工作量是很難的,但是如果我們采用迭代開發的話,目標就相對明確,工作量也就隨之相對明確了
② 完成每一個迭代版本我可以向 GitHub 上推一個版本,這樣我在做下一個迭代版本的開發時,一旦出現一個非常嚴重的問題,我可以直接回退回上一個大的迭代版本,重新來過;如果不采用迭代開發,就只能憑借推 Git 的時候提交的簡短的 summary 和 description 來勉強記憶這個 Git commit 已經完成到什么程度了,這樣一旦需要回滾,需要把代碼整個過一遍來確定我做了哪些內容沒做哪些內容(這些內容很難在提交 commit 的時候精確描述)
③ 迭代式開發的焦點與重點非常明確,不至于出現開發大型項目的時候容易出現的項目太大下不去手的問題
④ 針對于這個項目而言,這樣的迭代式開發能夠讓我真切地體會到每個版本的優缺點(3.0 版本和 GBN 版本讓我印象非常深刻),并且在實驗結束后的現在,可以說我對于每一個版本都非常非常熟悉了,如果直接開發最后的版本,那么這些中間過程我是不能了解到的,自然也不會對整個 tcp 版本的發展歷史有所了解有所掌握
我認為迭代開發主要有以下缺點:
① 這個項目中從要求上來說共分為 1.0 2.0 2.1 2.2 3.0 GBN/SR 擁塞控制這么幾個大的迭代版本,但是實際上我在做的時候大的迭代版本數遠遠不止于此:
我實際在做的時候迭代版本是以下的:
Initial Commit-> RDT 1.0 -> RDT 2.0 -> RDT 2.1 -> RDT 2.2 -> RDT SR-GBN(發送方 SR,接收方 GBN) -> RDT SR -> RDT Taho -> RDT Taho Fixed -> RDT FR(快速重傳) -> RDT Reno
由于我在開發的時候沒有把之前的代碼刪掉,而是把他們注釋掉了,并且我在編程的時候會寫明這個代碼是哪一個版本進行添加/修改的,因此我可以比較明確地看到我哪個版本做了什么(除非有多個版本連續修改同一塊代碼),但是現在全部寫完了再回頭看,其實最開始的代碼(或者可以說 3.0 版本之前的代碼),沒剩多少了……我覺得在某種意義上來說,這也算是增添了比較多的工作量
② 這個項目相當于是老師為我們規定了迭代的版本,如果是其他的項目(如軟件工程項目),由開發者自行規定迭代版本,很可能出現迭代版本安排設置不合理的情況,從而極大地影響開發效率,我猜測:如果迭代版本安排過小,就失去了它的意義;如果迭代版本安排過大,就與不采用迭代開發沒有本質區別了。因此,迭代開發會受到制定迭代計劃的好壞的影響
3. 總結完成大作業過程中已經解決的主要問題和自己采取的相應解決方法
① recvData 輸出不完整(SR-GBN 版本升到 SR 版本過程中出現)
這個 recvData 文件我特意存儲了下來,可以看到其他的文件都是 694KB,只有這個文件是 680KB,我百思不得其解==(多線程的程序很難直接調試)
最后把接收方的代碼分成了非常多的小的功能模塊,然后每一個功能模塊都用 System.out.println 來進行輸出日志,來進行詳細地查看,仍然沒有找到問題所在
輸出日志仍然沒有解決我的問題,于是我在紙上手動執行了一次代碼,大概執行了兩趟,我就發現了問題所在,問題出在一段我從來沒有修改過,甚至可以說從來沒有注意過的代碼上:
//交付數據(每20組數據交付一次)
if(dataQueue.size() >= 20)deliver_data();
這一個簡單的 if 語句與我的 SR 版本寫的緩沖區的交付數據不搭配,因此就會造成整個數據的最后一小段還在 dataQueue 中放著,不夠 20,因此沒有交付,所以接收文件中少一段
心得體會:多線程真的非常難以 debug,并且用輸出法來檢查問題也非常麻煩(命令行的日志過多,找我自己的日志也很麻煩),有的時候手動執行以下代碼是不錯的選擇;再有就是,細致地了解自己的代碼,對他們要有完全的掌控,不然不定什么時候就會出現錯誤
② 對于后續迭代版本出現的窗口有點難以下手(3.0 版本升到 SR 版本過程中出現)
我先仔細整理了一下后續版本要實現的功能或方法,以及需要的成員變量,然后打了一個 UML 圖的草稿,類似下圖:
把繼承關系理順了之后,把函數名以及相關的注釋都標在了上面,接下來將每一個函數需要做什么明確地記錄在紙上,然后每一個迭代版本按照自己的草稿逐步填入進去就可以了
③ 最開始的序號系統是 0,101,201……給我的思考和編碼都帶來了極大的麻煩(3.0 版本升到 GBN 版本過程中出現)
在構建窗口的時候,必須時刻小心,這里是不是需要把序號除 100,那里是不是不應該把序號除 100,帶來了很多根本毫無意義的思考與提防,但是一直沒狠下心來把所有的代碼中的序號修改一遍,于是就只能在已有的基礎上繼續啰里啰嗦地往后寫,然后越寫越難寫,越寫越難寫(就好比在一個三層的危房上再建個同樣是危房的第四層),最后下定決心把所有的編號都改成了 0,1,2,3……才算徹底擺脫這一苦惱
心得體會:長痛不如短痛……有些基礎性的問題就應該盡早全力解決,如果我在 3.0 版本之前就把這個問題改掉了,我的 SR 版本也不會卡這么多天都上不去
4. 對于實驗系統提出問題或建議
① 如果實驗系統使用最新版的 Java 也能跑起來。感覺會更方便一點(不過現在這個實驗系統 Java8 是可以運行的,好評!)
② 實驗系統中命令行的日志過多(最早的部分日志會爆出范圍),建議將命令行的日志直接寫到文件中;或者對外提供一個寫自定義日志的接口?感覺會更方便一點