Lab2 介绍
在 Lab2,我们将实现一个 TCPReceiver,用以接收传入的 TCP Segment 并将其转换成用户可读的数据流。
TCPReceiver
除了将读入的数据写入至 ByteStream
中以外,它还需要告诉发送者两个属性:
- 第一个未组装的字节索引,称为确认号 ackno,它是接收者需要的第一个字节的索引。
- 第一个未组装的字节索引和第一个不可接受的字节索引之间的距离,称为 窗口大小 window size。
ackno
和 window size
共同描述了接收者当前的接收窗口。接收窗口是 发送者允许发送数据的一个范围,通常 TCP 接收方使用接收窗口来进行流量控制,限制发送方发送数据。
还是 Lab1 那张示意图
三种序号及转换
这个 Lab 会涉及到 3 种序号,以及它们之间的转换。下面是原文档关于 3 种序号的摘录:
$$
\begin{array}{l|l|l}
\text { Sequence Numbers } & \text { Absolute Sequence Numbers } & \text { Stream Indices } \\
\text { • Start at the ISN } & \text { • Start at 0 } & \text { • Start at 0 } \\
\text { • Include SYN/FIN } & \text { • Include SYN/FIN } & \text { • Omit SYN/FIN } \\
\text { • } 32 \text { bits, wrapping } & \text { • } 64 \text { bits, non-wrapping } & \text { • } 64 \text { bits, non-wrapping } \\
\text { • “seqno” } & \text { • “absolute seqno” } & \text { • “stream index” }
\end{array}
$$
示例:
$$
\begin{array}{r|c|c|c|c|c}
\text { element } & \text { SYN } & \text { c } & \text { a } & \text { t } & \text { FIN } \\
\hline \textbf { seqno } & 2^{32}-2 & 2^{32}-1 & 0 & 1 & 2 \\
\hline \textbf { absolute seqno } & 0 & 1 & 2 & 3 & 4 \\
\hline \textbf { stream index } & & 0 & 1 & 2 &
\end{array}
$$
补充一点:
seqno
:32 bit,初始为随机值,是 TCP 报文段中使用的,对应WrappingInt32
absolute seqno
:64 bit,初始为 0,可以理解为相对于seqno
的绝对偏移量stream index
:64 bit,是StreamReassembler
中使用的,不计SYN
和FIN
wrap
这部分很容易,因为
$$
\text{seqno} \equiv \text{ISN} + \text{absolute\_seqno} \pmod{2^{32}}
$$
而 ISN
和 absolute_seqno
已知,直接计算即可
unwrap
这部分需要推导一下,假设待求的 seqno
为 $x$,因为
$$
\begin{align}
n &\equiv \text{ISN} + x \pmod{2^{32}} \\
x &\equiv n -\ \text{ISN} \pmod{2^{32}}
\end{align}
$$
令 $t = (n -\ \text{ISN}) \bmod {2^{32}}$,则
$$
x = t + k \cdot 2^{32}, k \in \mathbb{Z}
$$
提供了 checkpoint
,表示 the index ofthe last reassembled byte,而 $x$ 取与 checkpoint
最接近的值。满足
$$
\text{checkpoint} – 2^{31} \le x \le \text{checkpoint} + 2^{31}
$$
这里需要一点观察,其实 $x$ 的低 32 bit 已经确定,而 $\text{checkpoint} \pm 2^{31}$ 至多使高 32 bit “+1” 或 “-1”,并且不会同时发生。因此只需要分两种情况,拼接 $x$ 的低 32 bit 和 $\text{checkpoint} \pm 2^{31}$ 的高 32 bit,取最接近 checkpoint
的值即是答案。
代码实现
WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) {
return WrappingInt32{static_cast<uint32_t>(n) + isn.raw_value()};
}
uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) {
auto low32bit = static_cast<uint32_t>(n - isn);
auto high32bit1 = (checkpoint + (1 << 31)) & 0xFFFFFFFF00000000;
auto high32bit2 = (checkpoint - (1 << 31)) & 0xFFFFFFFF00000000;
auto res1 = low32bit | high32bit1, res2 = low32bit | high32bit2;
if (max(res1, checkpoint) - min(res1, checkpoint) < max(res2, checkpoint) - min(res2, checkpoint))
return res1;
return res2;
}
TCPReceiver 实现
本实现主要参考了这篇博客,实现简洁而清晰,很多博客实现得很复杂
对于 TCPReceiver
来说,除了错误状态以外,它一共有 3 种状态,分别是:
LISTEN
:等待 SYN 包的到来。若在 SYN 包到来前就有其他数据到来,则必须丢弃。SYN_RECV
:获取到了 SYN 包,此时可以正常的接收数据包FIN_RECV
:获取到了 FIN 包,此时务必终止ByteStream
数据流的输入。
在每次 TCPReceiver
接收到数据包时,我们该如何知道当前接收者处于什么状态呢?可以通过以下方式快速判断:
- 当
isn
还没设置时,肯定是LISTEN
状态 - 当
ByteStream.input_ended()
,则肯定是FIN_RECV
状态 - 其他情况下,是
SYN_RECV
状态
实际实现过程中不需要区分 FIN_RECV
和 SYN_RECV
,只管往 reassember
里塞字符片段即可,因为 reassember
足够鲁棒,如果是 FIN_RECV
状态了,塞进去了自然会被丢弃。
Window Size
是当前的 capacity
减去 ByteStream
中尚未被读取的数据大小,即 reassembler
可以存储的尚未装配的子串索引范围。
ackno
的计算必须考虑到 SYN 和 FIN 标志,因为这两个标志各占一个 seqno
。故在返回 ackno
时,务必判断当前 接收者处于什么状态,然后依据当前状态来判断是否需要对当前的计算结果加1或加2。而这条准则对 push_substring
时同样适用。
这里先把那位博主的代码贴过来,以防丢失,他的代码和分注释写得很清晰
/**
* \brief 当前 TCPReceiver 大体上有三种状态, 分别是
* 1. LISTEN,此时 SYN 包尚未抵达。可以通过 _set_syn_flag 标志位来判断是否在当前状态
* 2. SYN_RECV, 此时 SYN 抵达。只能判断当前不在 1、3状态时才能确定在当前状态
* 3. FIN_RECV, 此时 FIN 抵达。可以通过 ByteStream end_input 来判断是否在当前状态
*/
void TCPReceiver::segment_received(const TCPSegment &seg) {
// 判断是否是 SYN 包
const TCPHeader &header = seg.header();
if (!_set_syn_flag) {
// 注意 SYN 包之前的数据包必须全部丢弃
if (!header.syn)
return;
_isn = header.seqno;
_set_syn_flag = true;
}
uint64_t abs_ackno = _reassembler.stream_out().bytes_written() + 1;
uint64_t curr_abs_seqno = unwrap(header.seqno, _isn, abs_ackno);
//! NOTE: SYN 包中的 payload 不能被丢弃
//! NOTE: reassember 足够鲁棒以至于无需进行任何 seqno 过滤操作
uint64_t stream_index = curr_abs_seqno - 1 + (header.syn);
_reassembler.push_substring(seg.payload().copy(), stream_index, header.fin);
}
optional<WrappingInt32> TCPReceiver::ackno() const {
// 判断是否是在 LISTEN 状态
if (!_set_syn_flag)
return nullopt;
// 如果不在 LISTEN 状态,则 ackno 还需要加上一个 SYN 标志的长度
uint64_t abs_ack_no = _reassembler.stream_out().bytes_written() + 1;
// 如果当前处于 FIN_RECV 状态,则还需要加上 FIN 标志长度
if (_reassembler.stream_out().input_ended())
++abs_ack_no;
return WrappingInt32(_isn) + abs_ack_no;
}
size_t TCPReceiver::window_size() const { return _capacity - _reassembler.stream_out().buffer_size(); }
我在实现的时候通过现学到的 std::optional
,使用 std::optional<WrappingInt32> _isn
,正好去掉 _set_syn_flag
变量。
TCPReceiver
一开始测试的时候有一个测试点没有通过,
测试点数据如下:
// credit for test: Jared Wasserman
{
// A byte with invalid stream index should be ignored
size_t cap = 4;
uint32_t isn = 23452;
TCPReceiverTestHarness test{cap};
test.execute(SegmentArrives{}.with_syn().with_seqno(isn).with_result(SegmentArrives::Result::OK));
test.execute(SegmentArrives{}.with_seqno(isn).with_data("a").with_result(SegmentArrives::Result::OK));
test.execute(ExpectTotalAssembledBytes{0});
}
这种情况在文档里没说呀,序列号都是错的😓,我也不知道如何判断。还是参考那位博主,稍微修改了下 stream_index
计算的实现,这样在上面的数据下 stream_index
会被置为 uint64_t
的 -1
,也即是 0xFFFFFFFFFFFFFFFF
,这样就超出了 reassambler
的接收边界,就不会接收错误数据了。
对了,今天在博主 Kiprey 的仓库里找到了 Fall 2021
的文档,我也是用的它仓库的初始代码。
代码
声明部分:
class TCPReceiver {
//! Our data structure for re-assembling bytes.
StreamReassembler _reassembler;
//! The maximum number of bytes we'll store.
size_t _capacity;
//! The initial sequence number of the sender.
std::optional<WrappingInt32> _isn;
// bool _set_syn_flag;
...
实现部分:
void TCPReceiver::segment_received(const TCPSegment &seg) {
const auto &header = seg.header();
if (!_isn.has_value()) {
if (!header.syn) return; // SYN 之前的包都丢弃
_isn = header.seqno; // 建立连接的 seqno 即是 ISN,也是 SYN 对应的 seqno
}
uint64_t checkpoint = _reassembler.stream_out().bytes_written();
uint64_t abs_seq = unwrap(header.seqno, _isn.value(), checkpoint);
uint64_t stream_index = abs_seq - 1 + (header.syn ? 1 : 0);
_reassembler.push_substring(seg.payload().copy(), stream_index, header.fin);
}
optional<WrappingInt32> TCPReceiver::ackno() const {
if (!_isn.has_value()) return nullopt;
uint64_t abs_seq = _reassembler.stream_out().bytes_written() + 1 + \
(_reassembler.stream_out().input_ended() ? 1 : 0);
return wrap(abs_seq, _isn.value());
}
size_t TCPReceiver::window_size() const {
return _capacity - _reassembler.stream_out().buffer_size();
}
测试
Lab2 一开始没有编译通过,错误提示如下:
需要安装 libpcap-dev
库,
sudo apt update
sudo apt install libpcap-dev
Easy,轻松通过 wrap
测试
完整 Lab2 测试,
1 << 32 吧
忽略,我搞错了