想学做宝宝食谱上什么网站做软装找产品上哪个网站
news/
2025/10/5 11:20:47/
文章来源:
想学做宝宝食谱上什么网站,做软装找产品上哪个网站,网站建设谈业务要知道什么,苏州一建建筑集团有限公司概念ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一.RTO:Retransmission TimeOutFEC:Forward Error Correctionkcp简介kcp是一个基于udp实现快速、可靠、向前纠错的的协议#xff0c;能以比TCP浪费10%-20%的带宽的代价#xff0c;换…概念ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一.RTO:Retransmission TimeOutFEC:Forward Error Correctionkcp简介kcp是一个基于udp实现快速、可靠、向前纠错的的协议能以比TCP浪费10%-20%的带宽的代价换取平均延迟降低30%-40%且最大延迟降低三倍的传输效果。纯算法实现并不负责底层协议如UDP的收发。查看官方文档kcpkcp-go是用go实现了kcp协议的一个库其实kcp类似tcp协议的实现也很多参考tcp协议的实现滑动窗口快速重传选择性重传慢启动等。kcp和tcp一样也分客户端和监听端。 ----- -----| Client | | Server |----- -----|------ kcp data ------| |----- kcp data -------|
kcp协议layer model----------------------
| Session |
----------------------
| KCP(ARQ) |
----------------------
| FEC(OPTIONAL) |
----------------------
| CRYPTO(OPTIONAL)|
----------------------
| UDP(Packet) |
----------------------
KCP headerKCP Header Format 4 1 1 2 (Byte)
------------------------
| conv |cmd|frg| wnd |
------------------------
| ts | sn |
------------------------
| una | len |
------------------------
| |DATA
| |
------------------------
代码结构src/vendor/github.com/xtaci/kcp-go/
├── LICENSE
├── README.md
├── crypt.go 加解密实现
├── crypt_test.go
├── donate.png
├── fec.go 向前纠错实现
├── frame.png
├── kcp-go.png
├── kcp.go kcp协议实现
├── kcp_test.go
├── sess.go 会话管理实现
├── sess_test.go
├── snmp.go 数据统计实现
├── updater.go 任务调度实现
├── xor.go xor封装
└── xor_test.go
着重研究两个文件kcp.go和sess.gokcp浅析kcp是基于udp实现的所有udp的实现这里不做介绍kcp做的事情就是怎么封装udp的数据和怎么解析udp的数据再加各种处理机制为了重传拥塞控制纠错等。下面介绍kcp客户端和服务端整体实现的流程只是大概介绍一下函数流不做详细解析详细解析看后面数据流的解析。kcp client整体函数流和tcp一样kcp要连接服务端需要先拨号但是和tcp有个很大的不同是即使服务端没有启动客户端一样可以拨号成功因为实际上这里的拨号没有发送任何信息而tcp在这里需要三次握手。DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)V
net.DialUDP(udp, nil, udpaddr)V
NewConn()V
newUDPSession() {初始化UDPSession}V
NewKCP() {初始化kcp}V
updater.addSession(sess) {管理session会话任务管理根据用户设置的internal参数间隔来轮流唤醒任务}V
go sess.readLoop()V
go s.receiver(chPacket)V
s.kcpInput(data)V
s.fecDecoder.decodeBytes(data)V
s.kcp.Input(data, true, s.ackNoDelay)V
kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}V
notifyReadEvent()客户端大体的流程如上面所示先Dial建立udp连接将这个连接封装成一个会话然后启动一个go程接收udp的消息。kcp server整体函数流ListenWithOptions() V
net.ListenUDP()V
ServerConn()V
newFECDecoder()V
go l.monitor() {从chPacket接收udp数据写入kcp}V
go l.receiver(chPacket) {从upd接收数据并入队列}V
newUDPSession()V
updater.addSession(sess) {管理session会话任务管理根据用户设置的internal参数间隔来轮流唤醒任务}V
s.kcpInput(data)V
s.fecDecoder.decodeBytes(data)V
s.kcp.Input(data, true, s.ackNoDelay)V
kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}V
notifyReadEvent()
服务端的大体流程如上图所示先Listen启动udp监听接着用一个go程监控udp的数据包负责将不同session的数据写入不同的udp连接然后解析封装将数据交给上层。kcp 数据流详细解析不管是kcp的客户端还是服务端他们都有io行为就是读与写我们只分析一个就好了因为它们读写的实现是一样的这里分析客户端的读与写。kcp client 发送消息s.Write(b []byte) V
s.kcp.WaitSnd() {}V
s.kcp.Send(b) {将数据根据mss分段并存在kcp.snd_queue}V
s.kcp.flush(false) [flush data to output] {if writeDelaytrue {flush}else{每隔interval时间flush一次}
}V
kcp.output(buffer, size) V
s.output(buf)V
s.conn.WriteTo(ext, s.remote)V
s.conn..Conn.WriteTo(buf)
读写都是在sess.go文件中实现的Write方法// Write implements net.Conn
func (s *UDPSession) Write(b []byte) (n int, err error) {for {...// api flow controlif s.kcp.WaitSnd() int(s.kcp.snd_wnd) {n len(b)for {if len(b) int(s.kcp.mss) {s.kcp.Send(b)break} else {s.kcp.Send(b[:s.kcp.mss])b b[s.kcp.mss:]}}if !s.writeDelay {s.kcp.flush(false)}s.mu.Unlock()atomic.AddUint64(DefaultSnmp.BytesSent, uint64(n))return n, nil}...// wait for write event or timeoutselect {case -s.chWriteEvent:case -c:case -s.die:}if timeout ! nil {timeout.Stop()}}
}
假设发送一个hello消息Write方法会先判断发送窗口是否已满满的话该函数阻塞不满则kcp.Send(“hello”),而Send函数实现根据mss的值对数据分段当然这里的发送的hello长度太短只分了一个段并把它们插入发送的队列里。func (kcp *KCP) Send(buffer []byte) int {...for i : 0; i count; i {var size intif len(buffer) int(kcp.mss) {size int(kcp.mss)} else {size len(buffer)}seg : kcp.newSegment(size)copy(seg.data, buffer[:size])if kcp.stream 0 { // message modeseg.frg uint8(count - i - 1)} else { // stream modeseg.frg 0}kcp.snd_queue append(kcp.snd_queue, seg)buffer buffer[size:]}return 0
}
接着判断参数writeDelay如果参数设置为false则立马发送消息否则需要任务调度后才会触发发送发送消息是由flush函数实现的。// flush pending data
func (kcp *KCP) flush(ackOnly bool) {var seg Segmentseg.conv kcp.convseg.cmd IKCP_CMD_ACKseg.wnd kcp.wnd_unused()seg.una kcp.rcv_nxtbuffer : kcp.buffer// flush acknowledgesptr : bufferfor i, ack : range kcp.acklist {size : len(buffer) - len(ptr)if sizeIKCP_OVERHEAD int(kcp.mtu) {kcp.output(buffer, size)ptr buffer}// filter jitters caused by bufferbloatif ack.sn kcp.rcv_nxt || len(kcp.acklist)-1 i {seg.sn, seg.ts ack.sn, ack.tsptr seg.encode(ptr)}}kcp.acklist kcp.acklist[0:0]if ackOnly { // flash remain ack segmentssize : len(buffer) - len(ptr)if size 0 {kcp.output(buffer, size)}return}// probe window size (if remote window size equals zero)if kcp.rmt_wnd 0 {current : currentMs()if kcp.probe_wait 0 {kcp.probe_wait IKCP_PROBE_INITkcp.ts_probe current kcp.probe_wait} else {if _itimediff(current, kcp.ts_probe) 0 {if kcp.probe_wait IKCP_PROBE_INIT {kcp.probe_wait IKCP_PROBE_INIT}kcp.probe_wait kcp.probe_wait / 2if kcp.probe_wait IKCP_PROBE_LIMIT {kcp.probe_wait IKCP_PROBE_LIMIT}kcp.ts_probe current kcp.probe_waitkcp.probe | IKCP_ASK_SEND}}} else {kcp.ts_probe 0kcp.probe_wait 0}// flush window probing commandsif (kcp.probe IKCP_ASK_SEND) ! 0 {seg.cmd IKCP_CMD_WASKsize : len(buffer) - len(ptr)if sizeIKCP_OVERHEAD int(kcp.mtu) {kcp.output(buffer, size)ptr buffer}ptr seg.encode(ptr)}// flush window probing commandsif (kcp.probe IKCP_ASK_TELL) ! 0 {seg.cmd IKCP_CMD_WINSsize : len(buffer) - len(ptr)if sizeIKCP_OVERHEAD int(kcp.mtu) {kcp.output(buffer, size)ptr buffer}ptr seg.encode(ptr)}kcp.probe 0// calculate window sizecwnd : _imin_(kcp.snd_wnd, kcp.rmt_wnd)if kcp.nocwnd 0 {cwnd _imin_(kcp.cwnd, cwnd)}// sliding window, controlled by snd_nxt sna_unacwndnewSegsCount : 0for k : range kcp.snd_queue {if _itimediff(kcp.snd_nxt, kcp.snd_unacwnd) 0 {break}newseg : kcp.snd_queue[k]newseg.conv kcp.convnewseg.cmd IKCP_CMD_PUSHnewseg.sn kcp.snd_nxtkcp.snd_buf append(kcp.snd_buf, newseg)kcp.snd_nxtnewSegsCountkcp.snd_queue[k].data nil}if newSegsCount 0 {kcp.snd_queue kcp.remove_front(kcp.snd_queue, newSegsCount)}// calculate resentresent : uint32(kcp.fastresend)if kcp.fastresend 0 {resent 0xffffffff}// check for retransmissionscurrent : currentMs()var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64for k : range kcp.snd_buf {segment : kcp.snd_buf[k]needsend : falseif segment.xmit 0 { // initial transmitneedsend truesegment.rto kcp.rx_rtosegment.resendts current segment.rto} else if _itimediff(current, segment.resendts) 0 { // RTOneedsend trueif kcp.nodelay 0 {segment.rto kcp.rx_rto} else {segment.rto kcp.rx_rto / 2}segment.resendts current segment.rtolostlostSegs} else if segment.fastack resent { // fast retransmitneedsend truesegment.fastack 0segment.rto kcp.rx_rtosegment.resendts current segment.rtochangefastRetransSegs} else if segment.fastack 0 newSegsCount 0 { // early retransmitneedsend truesegment.fastack 0segment.rto kcp.rx_rtosegment.resendts current segment.rtochangeearlyRetransSegs}if needsend {segment.xmitsegment.ts currentsegment.wnd seg.wndsegment.una seg.unasize : len(buffer) - len(ptr)need : IKCP_OVERHEAD len(segment.data)if sizeneed int(kcp.mtu) {kcp.output(buffer, size)current currentMs() // time update for a blocking callptr buffer}ptr segment.encode(ptr)copy(ptr, segment.data)ptr ptr[len(segment.data):]if segment.xmit kcp.dead_link {kcp.state 0xFFFFFFFF}}}// flash remain segmentssize : len(buffer) - len(ptr)if size 0 {kcp.output(buffer, size)}// counter updatessum : lostSegsif lostSegs 0 {atomic.AddUint64(DefaultSnmp.LostSegs, lostSegs)}if fastRetransSegs 0 {atomic.AddUint64(DefaultSnmp.FastRetransSegs, fastRetransSegs)sum fastRetransSegs}if earlyRetransSegs 0 {atomic.AddUint64(DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)sum earlyRetransSegs}if sum 0 {atomic.AddUint64(DefaultSnmp.RetransSegs, sum)}// update ssthresh// rate halving, https://tools.ietf.org/html/rfc6937if change 0 {inflight : kcp.snd_nxt - kcp.snd_unakcp.ssthresh inflight / 2if kcp.ssthresh IKCP_THRESH_MIN {kcp.ssthresh IKCP_THRESH_MIN}kcp.cwnd kcp.ssthresh resentkcp.incr kcp.cwnd * kcp.mss}// congestion control, https://tools.ietf.org/html/rfc5681if lost 0 {kcp.ssthresh cwnd / 2if kcp.ssthresh IKCP_THRESH_MIN {kcp.ssthresh IKCP_THRESH_MIN}kcp.cwnd 1kcp.incr kcp.mss}if kcp.cwnd 1 {kcp.cwnd 1kcp.incr kcp.mss}
}
flush函数非常的重要kcp的重要参数都是在调节这个函数的行为这个函数只有一个参数ackOnly意思就是只发送ack如果ackOnly为true的话该函数只遍历ack列表然后发送就完事了。 如果不是也会发送真实数据。 在发送数据前先进行windSize探测如果开启了拥塞控制nc0则每次发送前检测服务端的winsize如果服务端的winsize变小了自身的winsize也要更着变小来避免拥塞。如果没有开启拥塞控制就按设置的winsize进行数据发送。接着循环每个段数据并判断每个段数据的是否该重发还有什么时候重发1. 如果这个段数据首次发送则直接发送数据。 2. 如果这个段数据的当前时间大于它自身重发的时间也就是RTO则重传消息。 3. 如果这个段数据的ack丢失累计超过resent次数则重传也就是快速重传机制。这个resent参数由resend参数决定。 4. 如果这个段数据的ack有丢失且没有新的数据段则触发ERER相关信息ER最后通过kcp.output发送消息hellooutput是个回调函数函数的实体是sess.go的func (s *UDPSession) output(buf []byte) {var ecc [][]byte// extend bufs header spaceext : bufif s.headerSize 0 {ext s.ext[:s.headerSizelen(buf)]copy(ext[s.headerSize:], buf)}// FEC stageif s.fecEncoder ! nil {ecc s.fecEncoder.Encode(ext)}// encryption stageif s.block ! nil {io.ReadFull(rand.Reader, ext[:nonceSize])checksum : crc32.ChecksumIEEE(ext[cryptHeaderSize:])binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)s.block.Encrypt(ext, ext)if ecc ! nil {for k : range ecc {io.ReadFull(rand.Reader, ecc[k][:nonceSize])checksum : crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)s.block.Encrypt(ecc[k], ecc[k])}}}// WriteTo kernelnbytes : 0npkts : 0// if mrand.Intn(100) 50 {for i : 0; i s.dup1; i {if n, err : s.conn.WriteTo(ext, s.remote); err nil {nbytes nnpkts}}// }if ecc ! nil {for k : range ecc {if n, err : s.conn.WriteTo(ecc[k], s.remote); err nil {nbytes nnpkts}}}atomic.AddUint64(DefaultSnmp.OutPkts, uint64(npkts))atomic.AddUint64(DefaultSnmp.OutBytes, uint64(nbytes))
}
output函数才是真正的将数据写入内核中在写入之前先进行了fec编码fec编码器的实现是用了一个开源库github.com/klauspost/reedsolomon编码以后的hello就不是和原来的hello一样了至少多了几个字节。 fec编码器有两个重要的参数reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1))dataShards和parityShards这两个参数决定了fec的冗余度冗余度越大抗丢包性就越强。kcp的任务调度器其实这里任务调度器是一个很简单的实现用一个全局变量updater来管理session代码文件为updater.go。其中最主要的函数func (h *updateHeap) updateTask() {var timer -chan time.Timefor {select {case -timer:case -h.chWakeUp:}h.mu.Lock()hlen : h.Len()now : time.Now()if hlen 0 now.After(h.entries[0].ts) {for i : 0; i hlen; i {entry : heap.Pop(h).(entry)if now.After(entry.ts) {entry.ts now.Add(entry.s.update())heap.Push(h, entry)} else {heap.Push(h, entry)break}}}if hlen 0 {timer time.After(h.entries[0].ts.Sub(now))}h.mu.Unlock()}
}
任务调度器实现了一个堆结构每当有新的连接session都会插入到这个堆里接着for循环每隔interval时间遍历这个堆得到entry然后执行entry.s.update()。而entry.s.update()会执行s.kcp.flush(false)来发送数据。总结这里简单介绍了kcp的整体流程详细介绍了发送数据的流程但未介绍kcp接收数据的流程其实在客户端发送数据后服务端是需要返回ack的而客户端也需要根据返回的ack来判断数据段是否需要重传还是在队列里清除该数据段。处理返回来的ack是在函数kcp.Input()函数实现的。具体详细流程下次再介绍。转载于:https://www.cnblogs.com/zhangboyu/p/34c07c3577c85e9ae5c3477d7cab5f52.html
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/928182.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!