磨了 3 天才搞完,第 4 天才来得及写文档 😭
Lab3 介绍
本次实验是完成 TCPSender 部分,我也把文档来回读了好几遍才弄懂各个细节
TCPSender 负责将数据以 TCP 报文的形式发送,其需要完成的功能有:
- 将 ByteStream 中的数据以 TCP 报文形式持续发送给接收者。
- 处理 TCPReceiver 传入的 ackno 和 window size,以追踪接收者当前的接收状态,以及检测丢包情况。
- 若经过一个超时时间后仍然没有接收到 TCPReceiver 发送的针对某个数据包的 ack 包,则重传对应的原始数据包。
ARQ(Automatic Repeat Request)原则:发送接收方允许我们发送的任何内容(填充窗口),并不断重传,直到接收方确认每段内容。发送方的工作是确保接收方至少获得每个字节一次
TCPSender 的状态图(可以读 /lib_sponge/tcp_helper/tcp_state.cc
帮助理解状态变化):
TCPSender 如何检测丢包
TCP 使用超时重传机制。TCPSender 除了将原始数据流分解成众多 TCP 报文并发送以外,它还会追踪每个已发送报文(已被发送但还未被接收)的发送时间。如果某些已发送报文太久没有被接收方确认(即接收方接收到对应的 ackno),则该数据包必须重传。TCPSender 并不会为每一个发送的数据报都启动一个定时器,相反,TCPSender 只有一个定时器,它只为最早未确认的数据报启动定时器
接收方确认某个报文,指的是该报文的所有字节索引都已被确认。这意味着如果该报文只有部分被确认,则不能说明该报文已被完全确认。
另外,我们把已经发送出去但还未成功确认的数据报称作 outstanding segments
TCP 的超时机制比较麻烦,这是因为超时机制直接影响到应用程序从远程服务器上读取数据的响应时间,以及影响到网络拥堵的程度。以下是实现 TCPSender 时需要注意的一些点:
- 每隔几毫秒,TCPSender 的 tick 函数将会被调用,其参数声明了过去的时间。这是 TCPSender 唯一能调用的超时时间相关函数。因为直接调用 clock 或者 time 将会导致测试套件不可用。
-
TCPSender 在构造时会被给予一个重传超时时间 RTO 的初始值。RTO 是在重新发送未完成 TCP 段之前需要等待的毫秒数。RTO 值将会随着时间的流逝(或者更应该说是网络环境的变化)而变化,但初始的RTO将始终不变。
-
在 TCPSender 中,我们需要实现一个重传计时器。该计时器将会在 RTO 结束时进行一些操作。
-
当每次发送包含数据的数据包时,都需要启动重传计时器,并让它在 RTO 毫秒后超时。若所有发送中报文均被确认,则终止重传计时器。
-
如果重传计时器超时,则需要进行以下几步(稍微有点麻烦)
- 重传尚未被 TCP 接收方完全确认的最早报文(序列号最小的报文)。这一步需要我们将发送中的报文数据保存至一个新的数据结构中,这样才可以追踪正处于发送状态的数据。
-
如果接收者的 window size 不为 0,即可以正常接收数据,则
-
增加连续重传次数。过多的重传次数可能意味着网络的中断,需要立即停止重传。
- 将 RTO 的值设置为先前的两倍,以降低较差网络环境的重传速度,以避免加深网络环境的拥堵。
⚠ 接收者 window size 为 0 只是表示接收者缓存空间不够了,并不意味着网络拥塞,所以不需要增加连续重传计数器和加倍 RIO
- 重置并重启重传计时器
- 当接收者给发送者一个确认成功接收新数据的 ack 包时(absolute ack seqno 比之前接收到的 ackno 更大):
- 将 RTO 设置回初始值
- 如果发送方存在尚未确认的数据,则重新启动重传定时器,否则关闭定时器
- 将连续重传计数清零。
TCPSender 的具体实现
Timer
我自己封装实现了一个简单的 Timer,可以满足开关重启、超时检测等基本需求
//! \brief TCPSender 的计时器,最长定时时间(ms)不能超过 uint32
class Timer {
private:
uint32_t _time_count = 0;
uint32_t _time_out = 0;
bool _is_running = false;
public:
Timer() = default;
Timer(const uint32_t time_out) : _time_out(time_out) {}
// void start() { _is_running = true; }
void stop() { _is_running = false; }
void set_time_out(const uint32_t time_out) { _time_out = time_out; }
uint32_t get_time_out() const { return _time_out; }
void restart() { _is_running = true, _time_count = 0; }
void tick(const size_t ms_since_last_tick) {
if (_is_running)
_time_count += ms_since_last_tick;
}
bool check_time_out() const { return _is_running && _time_count >= _time_out; }
bool is_running() const { return _is_running; }
};
TCPSender 的私有成员
//! \brief The "sender" part of a TCP implementation.
//! Accepts a ByteStream, divides it up into segments and sends the
//! segments, keeps track of which segments are still in-flight,
//! maintains the Retransmission Timer, and retransmits in-flight
//! segments if the retransmission timer expires.
class TCPSender {
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};
//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;
//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;
//! the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{0};
//! 重传定时器
Timer _timer;
//! 已经发出但还未收到 ACK 确认的 TCPSegment 队列
std::queue<std::pair<uint64_t, TCPSegment> > _outstanding_seg{};
//! 连续重传次数
uint32_t _consecutive_retransmissions_count = 0;
//! 已经发送出去但还未收到 ACK 确认的字节数
size_t _bytes_in_flight = 0;
//! 窗口大小,根据文档初始值应为 1
uint16_t _window_size = 1;
//! 是否发送带 SYN/FIN 的包
bool _set_syn_flag = false, _set_fin_flag = false;
/*...*/
};
fill_window()
fill_window()
的作用是尽可能填满 TCP 的发送窗口。
首先获取到当前窗口大小,如果窗口大小为 0 则视为 1。接下来通过 bytes_in_flight
和 window_size
的大小关系来判断能发多少字节,循环发包。
第一次会发 SYN 包,不含 payload(因为初始时 window_size 为 1)。
填入 payload 后发送出去。如果定时器关闭,则启动定时器。同时把数据报放入 outstanding_seg
的队列中。
ack_received()
首先将收到的 ackno
转化为 absolute ackno
,便于后续处理已经收到的包。
如果传入的 ackno
是不可靠的,直接丢弃。
接着把所有序列号空间小于 absolute ackno
的包从 outstanding_seg
队列中删除,表示这些数据报已经成功发送。
只要有成功确认的包,则重置定时器,清零连续重传次数。如果所有包已经成功确认,则关闭定时器。
最后更新 window_size
,调用 fill_window()
填满发送窗口。
tick()
首先调用 _timer.tick()
表示一段时间流逝。
如果定时器超时(已经确保定时器已经打开),如果定时器关闭不会超时检查不会返回 true
。重传最早的报文。如果 window_size
非 0,增加连续重传计数器和加倍 RIO。最后重启定时器。
需要注意的几个细节
bytes_in_flight()
:已经发送出去但还未收到 ACK 确认的字节数-
next_seqno_absolute()
: 从另一个角度可以理解为已经发送的载荷字节数(包括SYN
和FIN
各占一个字节) -
MAX_PAYLOAD_SIZE
只限制字符串长度并不包括SYN
和FIN
,但是window_size
包括SYN
和FIN
-
发出的
segment
要在满足ackno
和window_size
的要求 -
FIN 包的发送必须满足三个条件:
- 从来没发送过 FIN。这是为了防止发送方在发送 FIN 包并接收到 FIN ack 包之后,循环用 FIN 包填充发送窗口的情况。
- 输入字节流处于 EOF
- window 减去 payload 大小后,仍然可以存放下 FIN
- TCP 建立连接的时候能携带
payload
吗?
在 TCP 握手过程中,SYN 报文通常不会携带数据。三次握手的主要目的是同步双方的序列号并建立连接,而不是传输数据。虽然理论上 SYN 报文可以包含数据,但是由于连接在握手期间尚未建立,因此接收方的 TCP 实现可能会丢弃该数据或者不知道如何处理它。大多数现代 TCP 实现都不会在 SYN 报文中发送数据。
然而,在某些特定的实现和配置下,这种行为可能会有所不同。有些非标准的 TCP 扩展或修改可能允许在 SYN 报文中携带数据,但这种行为在通用的 TCP 实现中并不常见。如果在 SYN 报文中携带数据,可能会导致兼容性问题或者意料之外的行为。所以在标准的 TCP 握手过程中,通常不会在包含 SYN 标志的报文中携带有效载荷。
- TCP 关闭连接的 FIN 的数据报可以携带数据吗?
是的,包含 FIN 标志的 TCP 数据报可以携带数据。当一个 TCP 连接关闭时,发送方可以在同一个报文中发送 FIN 标志和有效载荷(数据)。接收方必须首先处理任何附带的数据,然后再处理 FIN 标志。
- 定时器什么时候关?
当所有已经发送的包均已确认时,定时器应该关闭。关于 Timer 的更多疑问可以参考这个 RFC6298 第 5 小节的内容。
-
注意读
/lib_sponge/tcp_helper/tcp_state.cc
帮助理解状态变化
代码
//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, _timer(retx_timeout) {}
size_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; }
void TCPSender::fill_window() {
uint16_t window_size = max(_window_size, static_cast<uint16_t>(1));
while (_bytes_in_flight < window_size) {
TCPSegment seg;
// 首先发 SYN 包,不含 payload(因为初始时 window_size 为 1)
if (!_set_syn_flag) {
seg.header().syn = true;
_set_syn_flag = true;
}
// MAX_PAYLOAD_SIZE 只限制字符串长度并不包括 SYN 和 FIN,但是 window_size 包括 SYN 和 FIN
auto payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE, \
min(window_size - _bytes_in_flight - seg.header().syn, _stream.buffer_size()));
auto payload = _stream.read(payload_size);
seg.payload() = Buffer(move(payload));
// 如果读到 EOF 了且 window_size 还有空位
if (!_set_fin_flag && _stream.eof() && _bytes_in_flight + seg.length_in_sequence_space() < window_size) {
seg.header().fin = true;
_set_fin_flag = true;
}
// 空数据报就不发送了
uint64_t length;
if ((length = seg.length_in_sequence_space()) == 0) break;
// 发送
seg.header().seqno = next_seqno(); // next_seqno() 是 TCP seqno
_segments_out.push(seg);
// 如果定时器关闭,则启动定时器
if (!_timer.is_running()) _timer.restart();
// 保存备份,重发时可能会用
_outstanding_seg.emplace(_next_seqno, move(seg));
// 更新序列号和发出但未 ACK 的字节数
_next_seqno += length; // _next_seqno 是 absolute seqno
_bytes_in_flight += length;
}
}
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
auto abs_ackno = unwrap(ackno, _isn, next_seqno_absolute());
if (abs_ackno > next_seqno_absolute()) return; // 传入的 ACK 是不可靠的,直接丢弃
int is_successful = 0;
// 处理已经收到的包(序列号空间要小于 ACK)
while (!_outstanding_seg.empty()) {
auto &[abs_seq, seg] = _outstanding_seg.front();
if (abs_seq + seg.length_in_sequence_space() - 1 < abs_ackno) {
is_successful = 1;
_bytes_in_flight -= seg.length_in_sequence_space();
_outstanding_seg.pop();
} else {
break;
}
}
// 有成功 ACK 的包,则重置定时器,清零连续重传次数
if (is_successful) {
_consecutive_retransmissions_count = 0;
_timer.set_time_out(_initial_retransmission_timeout);
_timer.restart();
}
// 没有等待 ACK 的包了,则关闭定时器
if (_bytes_in_flight == 0) {
_timer.stop();
}
// 更新 window_size,并尝试填满窗口
_window_size = window_size;
fill_window();
}
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
_timer.tick(ms_since_last_tick);
// 定时器超时(已经确保定时器已经打开),如果定时器关闭不会超时检查不会返回 true
// 理论上不用检测 _outstanding_seg 非空,但为了鲁棒性就检测下吧
if (_timer.check_time_out() && !_outstanding_seg.empty()) {
// 重传最早的报文
_segments_out.push(_outstanding_seg.front().second);
// window_size 非 0 对应的操作
if (_window_size > 0) {
++_consecutive_retransmissions_count;
_timer.set_time_out(_timer.get_time_out() * 2);
}
// 重启定时器
_timer.restart();
}
}
unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmissions_count; }
void TCPSender::send_empty_segment() {
// 发送空数据报,可以用于仅仅 ACK
TCPSegment seg;
seg.header().seqno = next_seqno();
_segments_out.emplace(move(seg));
}