因为旅游、配环境问题等原因这个 Lab4 拖得有点久,现在终于完成了,虽然性能还不是最优。
Lab4 介绍
TCPConnection
需要将 TCPSender
和 TCPReceiver
结合,实现成一个 TCP
终端,同时收发数据。
据说这应该是整个 Project 中最难的 Lab 了,完成这个 Lab 后将完整实现 TCP 协议。文档表示如果你之前的 Lab 写得足够 Robust,通过拼凑调用之前的代码,你可以不超过 100 行代码完成它。(实测我大概也是 100 行多一点吧,只算实际写的话,当然借鉴了网上先辈的智慧)
因为 TCP 协议是全双工的,TCPConnection
与 TCPSender
,TCPReceiver
的关系就如下图,
两个对等方 A 和 B 被称作 “endpoints” of the connection 或 “peers”,TCPConnection
需要同时负责发送和接收 segments,下面是 TCPConnection
需要完成的工作:
接收数据段:
- 如果
RST
标志位被设置,那么立刻将 inbound 和 outbound 流置于 error state,并永久关闭当前连接 -
将 segment 交给
TCPReceiver
,以便它可以检查传入段上关心的字段:seqno、SYN、payload
和FIN
-
如果
ACK
标志位被设置,告诉TCPSender
关心的ackno
和window_size
字段 -
如果 segment 占任何序列号,
TCPConnection
确保至少发送一个段作为响应,以反映ackno
和window_size
的更新(只有收到的 TCP 段占序列空间,才 ack) -
Keep-alive 机制:在
TCPConnection
的segment_received()
方法中,还有一个特殊情况需要处理:响应“keep-alive”段。对等方可能会选择发送一个具有无效序列号的段,以查看你的 TCP 实现是否仍然活动(如果是的话,当前的窗口是多少)。尽管这些“keep-alive”段不占用任何序列号,你的TCPConnection
应该对其进行回复实现代码可以是如下:
if (_receiver.ackno().has_value() and (seg.length_in_sequence_space() == 0) and seg.header().seqno == _receiver.ackno().value() - 1) { _sender.send_empty_segment(); }
发送数据段:TCPConnection
将在互联网上发送 TCPSegments:
- 每当
TCPSender
将一个段推送到其出站队列上时,它会在传出段上设置它负责的字段:seqno、SYN、payload 和 FIN
- 在发送段之前,
TCPConnection
将向TCPReceiver
请求它负责的出站段上的字段:ackno
和window_size
。如果存在ackno
,它将设置ACK
标志和 TCPSegment 中的字段
时间流逝时:TCPConnection
具有一个 tick
方法,操作系统会定期调用该方法。当发生这种情况时,TCPConnection
需要执行以下操作:
- 通知
TCPSender
时间的流逝
回想 Lab3,
TCPSender
会超时重传没有 ack 的数据报,并记录连续重传次数 - 如果连续重传的次数超过上限
TCPConfig::MAX_RETX_ATTEMPTS
,中止连接并向对等方发送复位段(一个带有RST
标志的空段)。 -
如有必要,clean shutdown
关闭 TCP 连接
文档指出 Lab 最难的部分是决定何时完全终止 TCPConnection
并声明其不再 active。
关闭 TCP 连接分为 unclean shutdown 和 clean shutdown 两种情况。
unclean shutdown:当发送或收到带 RST
的数据报时,立刻将 inbound 和 outbound 流置于 error state,并永久关闭当前连接。或者如果在 TCPConnection
析构时依然处于 active 状态,则立刻向对等方发送 RST
。
clean shutdown:这是一个比较棘手的问题了,因为 Two Generals Problem, 证明了以不可靠的通信渠道交换消息并达成共识是难以实现的,但 TCP 协议几乎接近实现了这一目标。要关闭 TCP 连接,需要满足以下 4 个q前提条件:
- 前提条件#1:输入流的报文已经结束并全部组装完毕
- 前提条件#2:输出流的报文已经结束并且已经全部发送给对方(包括
FIN
) -
前提条件#3:收到了远程方的确认 ack
-
前提条件#4:自己的确认 ack 被对方收到
这里的“确认 ack“均指对对方
FIN
的 ack
前提条件#1-3 是容易满足的,但是条件#4 却不太好办,因为 TCP 协议不会对 ack 进行 ack。
clean shutdown 又分两种情况:
- 主动关闭:在输入流和输出流都结束后会徘徊等待一段时间后结束连接。首先需要满足前提条件#1-3,然后需要等待一段时间(lingering)再关闭。通常等待时间为 2 倍 MSL(Maximum Segment Lifetime),可能为 60 或 120 秒。在本实验中,等待时间要求为 10 倍 initial retransmission timeout。
- 被动关闭:对方是第一个结束流的,则满足前提条件#1-3 后可以直接关闭连接,无需等待。因为自己能 100% 确定对等方是满足条件3的,也即自己满足条件4。
个人理解,因为两军问题,想从理论上满足条件#4 是不可能的,因此用等待一段时间(lingering)满足条件#4 是一种近似的实现。首先发完数据报的一方进入主动关闭状态,等待一段时间,如果没有新的数据报到来可以近似说明对方已经收到了自己的确认 ack,否则对方肯定会重发。由此来近似满足条件#4,关闭连接。而后发完的一方,只要收到对方的 ack(满足条件#3),就可以直接被动关闭了,因为对方是先结束流的,说明对方满足条件#3,则自己满足条件#4。
这个问题比较烧脑,可能我也没理解明白或者说明白。
经典好图
上面是三次握手和四次挥手的状态图,下面是 TCP 的 FSM 图。上面的状态名称(如 SYN_SENT
等)与下面的图是对应一致的。
图中的箭头上类似 CONNECT/SYN
这种的,’/’ 左边是发生的事件,’/’ 右边是对应的动作。有颜色的线路是主要情况,而 unusaul event 其实基本就多了同时关闭和同时打开连接的情况。
下面的图是实验文档中定义的状态图,与上面的 FSM 不完全对应。
代码实现
其实实现的时候并不需要把自己的思维套进自动机里面,根据什么状态作出什么动作。正常自然的写,其实就能完成自动机中那些动作。
实现的时候还是会有个疑问,上层会保证在 active()
的状态下才调用 TCPConnection::segment_received
等成员函数吗?
我不知道上层是如何调用的,不过我猜测上层应该会保证吧。
第一版实现测试时前面的 36,37 Tests 出现了段错误,原因是 C++ 技术太拉了,还是没有完全理解右值、右值引用和 std::move
auto &&seg = std::move(_sender.segments_out().front());
应用 auto seg = std::move(_sender.segments_out().front());
之后又修了点逻辑 BUG,可以通过 1-48 的测试样例了。其实代码已经没有问题了,不能通过后面的测试样例是环境问题,我下面会说明,这里先放出代码。
tcp_connection.hh
#ifndef SPONGE_LIBSPONGE_TCP_FACTORED_HH
#define SPONGE_LIBSPONGE_TCP_FACTORED_HH
#include "tcp_config.hh"
#include "tcp_receiver.hh"
#include "tcp_sender.hh"
#include "tcp_state.hh"
//! \brief A complete endpoint of a TCP connection
class TCPConnection {
private:
TCPConfig _cfg;
TCPReceiver _receiver{_cfg.recv_capacity};
TCPSender _sender{_cfg.send_capacity, _cfg.rt_timeout, _cfg.fixed_isn};
//! outbound queue of segments that the TCPConnection wants sent
std::queue<TCPSegment> _segments_out{};
//! Should the TCPConnection stay active (and keep ACKing)
//! for 10 * _cfg.rt_timeout milliseconds after both streams have ended,
//! in case the remote TCPConnection doesn't know we've received its whole stream?
bool _linger_after_streams_finish{true};
//! Number of milliseconds since the last segment was received
size_t _time_since_last_segment_received = 0;
//! Is the connection still alive in any way?
bool _is_active = true;
//! 置为 RST 状态,如果 send_rst 为 true,则发送 RST 包
void _set_rst_state(const bool send_rst);
//! 将待发送的包添加上 ackno 和 window_size 发送出去
void _add_ackno_and_window_to_send();
public:
//! \name "Input" interface for the writer
//!@{
//! \brief Initiate a connection by sending a SYN segment
void connect();
//! \brief Write data to the outbound byte stream, and send it over TCP if possible
//! \returns the number of bytes from `data` that were actually written.
size_t write(const std::string &data);
//! \returns the number of `bytes` that can be written right now.
size_t remaining_outbound_capacity() const;
//! \brief Shut down the outbound byte stream (still allows reading incoming data)
void end_input_stream();
//!@}
//! \name "Output" interface for the reader
//!@{
//! \brief The inbound byte stream received from the peer
ByteStream &inbound_stream() { return _receiver.stream_out(); }
//!@}
//! \name Accessors used for testing
//!@{
//! \brief number of bytes sent and not yet acknowledged, counting SYN/FIN each as one byte
size_t bytes_in_flight() const;
//! \brief number of bytes not yet reassembled
size_t unassembled_bytes() const;
//! \brief Number of milliseconds since the last segment was received
size_t time_since_last_segment_received() const;
//!< \brief summarize the state of the sender, receiver, and the connection
TCPState state() const { return {_sender, _receiver, active(), _linger_after_streams_finish}; };
//!@}
//! \name Methods for the owner or operating system to call
//!@{
//! Called when a new segment has been received from the network
void segment_received(const TCPSegment &seg);
//! Called periodically when time elapses
void tick(const size_t ms_since_last_tick);
//! \brief TCPSegments that the TCPConnection has enqueued for transmission.
//! \note The owner or operating system will dequeue these and
//! put each one into the payload of a lower-layer datagram (usually Internet datagrams (IP),
//! but could also be user datagrams (UDP) or any other kind).
std::queue<TCPSegment> &segments_out() { return _segments_out; }
//! \brief Is the connection still alive in any way?
//! \returns `true` if either stream is still running or if the TCPConnection is lingering
//! after both streams have finished (e.g. to ACK retransmissions from the peer)
bool active() const;
//!@}
//! Construct a new connection from a configuration
explicit TCPConnection(const TCPConfig &cfg) : _cfg{cfg} {}
//! \name construction and destruction
//! moving is allowed; copying is disallowed; default construction not possible
//!@{
~TCPConnection(); //!< destructor sends a RST if the connection is still open
TCPConnection() = delete;
TCPConnection(TCPConnection &&other) = default;
TCPConnection &operator=(TCPConnection &&other) = default;
TCPConnection(const TCPConnection &other) = delete;
TCPConnection &operator=(const TCPConnection &other) = delete;
//!@}
};
#endif // SPONGE_LIBSPONGE_TCP_FACTORED_HH
tcp_connection.cc
#include "tcp_connection.hh"
#include <iostream>
// Dummy implementation of a TCP connection
// For Lab 4, please replace with a real implementation that passes the
// automated checks run by `make check`.
template <typename... Targs>
void DUMMY_CODE(Targs &&... /* unused */) {}
using namespace std;
size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }
size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }
size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }
size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; }
void TCPConnection::segment_received(const TCPSegment &seg) {
// 计时清零
_time_since_last_segment_received = 0;
const auto& header = seg.header();
// 接收到 RST 包
if (header.rst) {
_set_rst_state(false);
return;
}
// 将包交给 TCPReceiver,由于代码足够鲁棒,可以不经过任何过滤
_receiver.segment_received(seg);
// 是否需要发送一个不占序列空间的空 ack 包,因为收到任何占序列空间的 TCP 段都需要 ack,或者 keep-alive 也需要空 ack 包
bool need_empty_ack = seg.length_in_sequence_space() > 0;
// 如果设置了 ack,交给 TCPSender 处理 ack
if (header.ack) {
// 实际上在 ack_received 的时候就已经 fill_window() 了
_sender.ack_received(header.ackno, header.win);
// 发送了新的数据包,可以顺带 ack,那么可以不必再发空 ack 包了
if (need_empty_ack && !_segments_out.empty())
need_empty_ack = false;
}
// LISTEN 时收到 SYN,进入 FSM 的 SYN RECEIVED 状态
if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::SYN_RECV &&
TCPState::state_summary(_sender) == TCPSenderStateSummary::CLOSED) {
connect();
return;
}
// 判断是否为 Passive CLOSE,并进入 FSM 的 CLOSE WAIT 状态
if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV &&
TCPState::state_summary(_sender) == TCPSenderStateSummary::SYN_ACKED) {
_linger_after_streams_finish = false;
}
// Passive CLOSE,判断是否进入 FSM 的 CLOSED 状态
if (!_linger_after_streams_finish &&
TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV &&
TCPState::state_summary(_sender) == TCPSenderStateSummary::FIN_ACKED) {
_is_active = false;
return;
}
// Keep-alive 判断
if (_receiver.ackno().has_value() && (seg.length_in_sequence_space() == 0)
&& seg.header().seqno == _receiver.ackno().value() - 1) {
need_empty_ack = true;
}
// 发送 empty ack
if (need_empty_ack) {
_sender.send_empty_segment();
}
// 将待发送的包添加上 ackno 和 window_size 发送出去
_add_ackno_and_window_to_send();
}
bool TCPConnection::active() const { return _is_active; }
size_t TCPConnection::write(const string &data) {
auto ret = _sender.stream_in().write(data);
_sender.fill_window();
_add_ackno_and_window_to_send();
return ret;
}
//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
_time_since_last_segment_received += ms_since_last_tick;
// 调用 _sender 的 tick
_sender.tick(ms_since_last_tick);
// 连续重传次数超过阈值,发送 RST 包
if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
// 清除本应该重发的包
while (!_sender.segments_out().empty()) _sender.segments_out().pop();
// 发送 RST 包
_set_rst_state(true);
return;
}
// 调用 _sender.tick 可能导致有新数据包需要发送
_add_ackno_and_window_to_send();
// Active CLOSE,判断是否等待时间完成进入 CLOSED 状态
if (_linger_after_streams_finish &&
TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV &&
TCPState::state_summary(_sender) == TCPSenderStateSummary::FIN_ACKED &&
_time_since_last_segment_received >= 10 * _cfg.rt_timeout) {
_is_active = false;
_linger_after_streams_finish = false;
}
}
void TCPConnection::end_input_stream() {
_sender.stream_in().end_input();
// 流结束后可能需要发送 FIN
_sender.fill_window();
_add_ackno_and_window_to_send();
}
void TCPConnection::connect() {
// 第一次调用 fill_window() 会发送一个 SYN 数据包
_sender.fill_window();
_add_ackno_and_window_to_send();
}
TCPConnection::~TCPConnection() {
try {
if (active()) {
cerr << "Warning: Unclean shutdown of TCPConnection\n";
// Your code here: need to send a RST segment to the peer
_set_rst_state(true);
}
} catch (const exception &e) {
std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}
void TCPConnection::_set_rst_state(const bool send_rst) {
if (send_rst) {
TCPSegment seg;
seg.header().seqno = _sender.next_seqno();
seg.header().rst = true;
// 直接把包送到 _segments_out,不需要再调用 _add_ackno_and_window_to_send 发送出去了
_segments_out.emplace(std::move(seg));
}
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
_linger_after_streams_finish = false;
_is_active = false;
}
void TCPConnection::_add_ackno_and_window_to_send() {
while (!_sender.segments_out().empty()) {
auto seg = std::move(_sender.segments_out().front());
_sender.segments_out().pop();
if (_receiver.ackno().has_value()) {
seg.header().ack = true;
seg.header().ackno = _receiver.ackno().value();
}
seg.header().win = min(static_cast<size_t>(numeric_limits<uint16_t>::max()), _receiver.window_size());
_segments_out.emplace(std::move(seg));
}
}
测试遇到的问题与解决办法
测试时我遇到了如下问题,因为环境问题被卡了两天。我在网上没有找到一点关于该问题的原因和解决办法。
我判断该问题可能与我实现代码没有关系,getaddrinfo
显然与 socket 系统调用函数相关。而且看着错误很诡异,.144.1, XXXXX)
和 getaddrinfo(169.254
,很奇怪。
于是我又在我 RackNerd 和 Olink 的 VPS 上搭建了实验环境来测试,下面是 VScode SFTP 的配置,插件还不错
{
"name": "RackNerd",
"host": "blog.lrl52.top",
"protocol": "sftp",
"port": 22,
"username": "lrl52",
"remotePath": "/home/lrl52/sponge",
"uploadOnSave": true,
"useTempFile": false,
"openSsh": false,
"sshConfigPath": "/home/lrl52/.ssh",
"privateKeyPath": "/home/lrl52/.ssh/id_rsa",
"ignore": [
".git",
"build",
".cache"
]
}
我在 VPS 上克隆别人的项目进行测试,测试了很多次,得出的结论是有的能跑过 1-98,有的还是只能跑过 1-48,99 及以后报错的问题如下
当时我很困惑,网上也没有找到有遇到这种情况的先例,甚至怀疑是不是与今年 sponge 网站关闭了有关系。
没办法,只能尝试装下 VirtualBox 使用它给的镜像进行实验。
很好,使用它给的镜像环境,我成功跑通了别人的作业,但是我的作业依然只能跑通 1-48。于是我开始怀疑是可能是我的实验框架问题。我把我的代码(也就是 libsponge
里面的部分)复制到别人的项目中,嘿嘿,果真测试通过了😅。
接下来我通过“移花接木”的办法,成功找出了问题的原因:
通过与能跑过的项目框架文件的仔细比对,我发现仅有下面 4 个文件不同(sftp.json
不算)。两个 cc 文件只是改动了下对齐和缩进,更为好看,并没有本质不同。cflags.cmake
把原来 Debug
模式的 -Og
选项改为了 -O0
。
罪魁祸首竟是一个看似相同的纯文本文件 tunconfig
,通过 diff 可以发现两者是不同的,
真相就是 CRLF 的锅,CRLF 导致了 49-98 的测试样例无法通过!而 LF 就没有问题了。
其实之前我就发现了一个临时解决办法,在 txrx.sh
中,有如下几行:
. "$(dirname "$0")"/etc/tunconfig
REF_HOST=${TUN_IP_PREFIX}.144.1
TEST_HOST=${TUN_IP_PREFIX}.144.1
SERVER_PORT=$(($((RANDOM % 50000)) + 1025))
在样例 49 跑不过的情况下你会发现,你 echo ${TUN_IP_PREFIX}
会发现,得到的是空值,进而导致 REF_HOST
和 TEST_HOST
的值变为 ".144.1"
,所以当然就出现了上面的问题。临时解决办法就是直接插入一行
TUN_IP_PREFIX=169.254
这样就可以跑过 99 之前的测试了。但 99 及其之后的测试跑不过,目前我还没找到原因,而且不止是 WSL2 Ubuntu 22.04
上跑不了,两台 ubuntu 22.04
和 ubuntu 20.04
的 VPS 也跑不了。
省流总结:如果 Test 49 跑不过,把 tunconfig 由 CRLF 改为 LF 就可以了。如果 Test 99 跑不过,那只能换环境,使用官方提供的镜像 + VirtualBox 是可以的。
其它问题与方法收获
切换 g++ 版本
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 10
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-8 20
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-9 10
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-8 20
sudo update-alternatives --install /usr/bin/cc cc /usr/bin/gcc 30
sudo update-alternatives --set cc /usr/bin/gcc
sudo update-alternatives --install /usr/bin/c++ c++ /usr/bin/g++ 30
sudo update-alternatives --set c++ /usr/bin/g++
给的镜像是很新的 Ubuntu 22.10,默认使用的 g++ 是 g++12,g++12 如遇编译失败,需要在 address.cc
中添加 include <array>
;
如果使用 Ubuntu 22.04,则默认使用 g++11,g++11 如遇编译失败,需要在 buffer.hh
中添加 #include <stdexcept>
;
而如果使用更老的 Ubuntu 20.04,则默认使用的可能是 g++9 或者 g++8,会发现反而不会编译出错。你发现了什么?g++ 版本越高反而对头文件的要求越严格😄。
编译卡住
在 1H1G 或者甚至 4H4G 的机子上直接 make -j
会导致编译卡住,甚至直接报错退出。这不是因为代码的问题,而是编译爆内存或 CPU 了!
实践得出,在 1H 的机子上只能 make -j2
,在 4H 的机子上只能 make -j4
。
❗❗ 另外,不要妄想在 1H1G 的小鸡上尝试搭建 VsCode 远程开发环境,真的会炸内存!😭🤡
CRLF to LF
可以通过 git 工具巧妙将所有 CRLF 变为 LF,
git config core.autocrlf false
git rm --cached -r .
git reset --hard
其实删了项目后再在 Linux 下重新从 GitHub 上 clone 一遍,也可以解决问题。CRLF 应该是 Windows 平台上出的问题,因为我 WSL2 和 Windows 文件是互通共享的,当时没注意就在 Windows 上 clone 了。
WSL2 与宿主机的通信
如下图,WSL2 可以通过 172.27.144.1
访问宿主机上的端口,而宿主机可以通过 172.27.156.187
访问到 WSL2 上的端口。
测试
WSL2 下发的包对方根本收不到,原因尚不清楚
虚拟机上是正常的,
通过所有测试用例,
性能测试,我是慢的那个,不算优秀,快的那个是 Vixbob 博主的代码。等未来有时间了再来优化吧,现在不想做了。
请问一下,lab4使用的镜像是cs144-2023年的镜像吗?还是cs144-2021年的镜像
应该是 CS144 2023 年的镜像,就在 Setting up your CS144 VM using VirtualBox 上下载的,是 Ubuntu 22.10