1 前言
在“突破编程_C++_网络编程(Windows 套接字(处理 TCP 粘包问题))”一文中,已经讲解了 TCP 粘包问题,并且给出了样例代码。但是该样例代码的核心是使用队列(std::queue)做报文的处理。
std::queue 是 C++ 标准模板库(STL)中的一个容器适配器,它提供了一种先进先出(FIFO)的数据结构。在 STL 中,std::queue 并不直接存储元素,而是依赖于一个底层容器来管理元素的存储,这个底层容器通常是 std::deque(双端队列)或者 std::list(双向链表),具体取决于实现。
默认情况下,std::queue 会使用 std::deque 作为底层容器:这是最常见的实现方式。std::deque 提供了从两端快速插入和删除元素的能力,这与 std::queue 的操作特性非常吻合。在 std::deque 上,std::queue 的 push() 操作对应于 std::deque 的 push_back(),而 pop() 对应于 std::deque 的 pop_front()。
如果使用 std::queue 作为处理 TCP 粘包问题,所有接收到的 TCP 数据会先存入队列中,然后再一个一个拿出来处理。一般情况下,这种处理模式是可行的,但是当报文的字节数很大时,这种模式的效率会比较低,所以本文会实现一种高性能的处理方式。
2 回顾一下 TCP 粘包问题
TCP(传输控制协议)是一种面向流的协议,它不保留数据包边界。在TCP连接中,数据被看作是一连串无结构的字节流。TCP 粘包问题指的是接收方在接收数据时,由于发送方发送的数据包较小,或者接收方处理较慢等原因,导致多个数据包粘在一起,形成一个大的数据块,从而使得接收方无法辨认出各个数据包的边界。
2.1 粘包产生的原因
(1)发送方原因:
- 发送数据包较小,尤其是小于TCP协议的MSS(最大报文段长度)。
- 发送数据包的时间间隔太短,导致接收方来不及处理。
(2)接收方原因:
- 接收处理速度慢,导致多个数据包到达后才开始处理。
- 接收缓冲区大小设置不当,可能过大或过小。
(3)网络原因:
- 网络延迟或拥塞,导致数据包传输时间不一致。
2.2 粘包问题的影响
TCP 粘包问题对网络通信的影响是多方面的,它可能会导致数据的不完整、错误解析、性能下降甚至安全问题。以下是 TCP 粘包问题可能带来的一些具体影响:
-
数据完整性受损:由于接收方无法准确识别数据包边界,可能会导致数据包被错误地合并或拆分,从而造成数据丢失或重复。
-
错误解析:粘包可能导致接收方错误地解析数据,比如将两个数据包的内容错误地解释为一个数据包,或者将一个数据包的内容错误地解释为两个数据包。
-
应用逻辑错误:在某些应用中,数据包的内容和顺序是非常重要的。粘包问题可能会导致应用逻辑处理错误,比如在聊天应用中,消息的顺序可能会被打乱。
-
性能下降:为了处理粘包问题,接收方可能需要额外的缓冲区来暂存数据,并进行边界检测,这会增加CPU和内存的使用,从而降低系统的整体性能。
-
延迟增加:在某些情况下,为了等待更多的数据以确定数据包的边界,接收方可能会延迟处理已经接收到的数据,这会增加处理延迟。
-
资源浪费:由于粘包问题,接收方可能需要分配更大的缓冲区来暂存数据,这可能会导致内存资源的浪费。
-
安全问题:粘包问题可能会被恶意利用,比如通过发送特制的数据包来破坏接收方的缓冲区,从而导致缓冲区溢出等安全问题。
-
协议复杂性增加:为了解决粘包问题,可能需要在应用层定义额外的协议来标识数据包的边界,这会增加协议的复杂性。
-
兼容性问题:不同的系统和应用可能采用不同的方法来处理粘包问题,这可能会导致兼容性问题。
-
调试困难:粘包问题可能会使得网络通信的调试变得更加困难,因为错误可能不容易被发现和定位。
2.3 解决粘包问题的方法
(1)固定长度消息:
每个消息都发送固定长度的数据,接收方按照固定长度进行读取和处理。
**(2)消息头和消息体:
消息分为消息头和消息体,消息头中包含消息体的长度信息,接收方根据消息头中的长度来确定消息体的边界。
**(3)使用特殊字符或字节序列:
在消息体中使用特殊的字符或字节序列作为消息边界的标识。
**(4)应用层协议:
定义应用层协议,明确消息的开始和结束,例如使用 HTTP 协议中的 Content-Length 字段。
**(5)使用缓冲区处理:
接收方使用缓冲区暂存接收到的数据,当满足一个完整消息的条件时再进行处理。
**(6)使用 TCP 的紧急数据或带外数据:
通过发送紧急指针或带外数据来标记消息的结束。
3 一种高性能的 TCP 粘包问题的处理方式
本方式采用使用消息头(Header)+消息体长度(Length)+消息体(Body)的方法来做处理:
-
消息头设计:设计一个固定长度的消息头结构,通常包含消息体的长度信息。消息头的长度是固定的,以便于接收方能够快速地识别出消息头并从中读取消息体的长度。
-
消息体长度:在消息头中,最关键的信息是消息体的长度,这个长度值告诉接收方接下来需要读取多少字节的数据来构成一个完整的消息体。
-
消息体发送:发送方在发送消息体之前,先发送包含消息体长度的消息头。这样,接收方在接收到消息头后,就能够知道接下来需要读取多少字节的数据。
-
接收处理:
- 接收方首先读取消息头,解析出消息体的长度。
- 根据消息头中的长度信息,接收方分配一个足够大的缓冲区来存储整个消息体。
- 接收方接着从TCP流中读取指定长度的消息体数据,直到读取完整个消息体。
-
连续读取:在读取完一个完整消息后,接收方再次读取消息头,以确定下一个消息的边界,继续处理后续的消息。
-
缓冲区管理:为了处理可能的粘包情况,接收方可能需要使用一个缓冲区来暂存接收到的数据。当缓冲区中的数据足以构成一个完整的消息时,就从缓冲区中提取出消息并处理,然后将剩余的数据留在缓冲区中,等待构成下一个消息。
-
错误处理:在设计协议时,还应考虑错误处理机制,比如如果接收到的消息头中的长度信息不合理(如长度为负数或过大),则需要采取相应的错误处理措施。
通过这种方式,即使在TCP流中数据包被粘在一起,接收方也能够准确地识别出每个消息的边界,从而有效地解决了粘包问题。这种方法简单、高效,且易于实现,因此在处理TCP粘包问题时被广泛采用。
(1)定义数据缓冲区
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include <deque>
#include <tuple>
#include <memory>
using namespace std;
std::deque<tuple<unique_ptr<char[]>, uint64_t>> remainingDatas;
(2)定义工具函数,用来显示报文
string binaryStringToHex(string& binaryStr, string strPre, string strSplit)
{
string ret;
static const char *hex = "0123456789ABCDEF";
uint64_t offset = 0;
for (auto c : binaryStr)
{
ret.append(strPre);
ret.push_back(hex[(c >> 4) & 0xf]); //取二进制高四位
ret.push_back(hex[c & 0xf]); //取二进制低四位
if (offset < binaryStr.length() - 1)
{
ret.append(strSplit);
}
offset++;
}
return ret;
}
string binaryCharToHex(const char* data, uint64_t len, string strPre, string strSplit)
{
string strBinaryVal = string(data, len);
return binaryStringToHex(strBinaryVal, strPre, strSplit);
}
(3)TCP 粘包问题处理函数
void revMsg(const char *data, uint64_t len)
{
if (0 == len) {
return;
}
unique_ptr<char[]> dataTmp = make_unique<char[]>(len);
memcpy(dataTmp.get(), data, len);
remainingDatas.push_back(tuple<unique_ptr<char[]>, uint64_t>(move(dataTmp), len));
while (remainingDatas.size() > 0) {
auto remainingDataPtr = get<0>(remainingDatas[0]).get();
if (0x20 != remainingDataPtr[0]) {
remainingDatas.clear();
return;
}
if (1 == get<1>(remainingDatas[0]) && remainingDatas.size() <= 1) {
return;
}
uint8_t msgLen = 0;
uint64_t totalLen = 0;
for (auto& remainingData : remainingDatas) {
totalLen += get<1>(remainingData);
}
if (1 == get<1>(remainingDatas[0])) {
remainingDataPtr = get<0>(remainingDatas[1]).get();
msgLen = (uint8_t)remainingDataPtr[0];
}
else {
msgLen = (uint8_t)remainingDataPtr[1];
}
if (msgLen + 2 > totalLen) {
return;
}
unique_ptr<char[]> totalData = make_unique<char[]>(totalLen);
uint64_t totalDataOffset = 0;
for (auto& remainingData : remainingDatas) {
uint64_t lenTmp = get<1>(remainingData);
memcpy(&(totalData.get())[totalDataOffset], get<0>(remainingData).get(), lenTmp);
totalDataOffset += lenTmp;
}
remainingDatas.clear();
uint64_t offset = 2;
uint64_t lastMsgOffset = 0; // 最后一帧完整报文的最后一个字节偏移
while (offset + msgLen <= totalLen) {
uint64_t revMsgLen = msgLen + 2;
unique_ptr<char[]> revMsg = make_unique<char[]>(revMsgLen);
memcpy(revMsg.get(), &(totalData.get())[offset - 2], revMsgLen);
// 打印处理粘包后的一帧完整报文
auto strMsg = binaryCharToHex(revMsg.get(), revMsgLen, "", " ");
printf("[recive] %s\n", strMsg.c_str());
offset += msgLen;
lastMsgOffset = offset;
if (offset + 2 < totalLen) {
if (0x20 != totalData[offset]) {
return;
}
offset++;
msgLen = (uint8_t)totalData[offset];
offset++;
}
}
if (lastMsgOffset < totalLen) {
uint64_t lastRemainLen = totalLen - lastMsgOffset;
unique_ptr<char[]> lastRemainData = make_unique<char[]>(lastRemainLen);
memcpy(lastRemainData.get(), &(totalData.get())[lastMsgOffset], lastRemainLen);
remainingDatas.push_back(tuple<unique_ptr<char[]>, uint64_t>(move(lastRemainData), lastRemainLen));
}
}
}
(4)使用模拟的字节流数据做测试
std::vector<std::string> split(const std::string& str, const char delimiter)
{
std::vector<std::string> tokens;
std::istringstream tokenStream(str);
std::string token;
while (std::getline(tokenStream, token, delimiter))
{
tokens.push_back(token);
}
return tokens;
}
tuple<unique_ptr<char[]>, size_t> getBytesFromStr(string str)
{
vector<string> strs = split(str, ' ');
if (strs.size() > 0)
{
unique_ptr<char[]> bytes(new char[strs.size()]);
for (size_t i = 0; i < strs.size(); i++)
{
int val = strtol(strs[i].c_str(), nullptr, 16);
bytes[i] = (char)val;
}
return tuple<unique_ptr<char[]>, size_t>(move(bytes), strs.size());
}
else
{
return tuple<unique_ptr<char[]>, size_t>(unique_ptr<char[]>(nullptr), 0);
}
}
int main() {
// 模拟TCP接收数据
string str = "20";
auto res = getBytesFromStr(str);
revMsg((get<0>(res)).get(), get<1>(res));
str = "04 02 00 00 01 20 04 02 00 00 02 20 04 02 00";
res = getBytesFromStr(str);
revMsg((get<0>(res)).get(), get<1>(res));
str = "00 03 20 04 02 00 00 04 20 04 02 00";
res = getBytesFromStr(str);
revMsg((get<0>(res)).get(), get<1>(res));
return 0;
}
上面代码的输出为:
[recive] 20 04 02 00 00 01
[recive] 20 04 02 00 00 02
[recive] 20 04 02 00 00 03
[recive] 20 04 02 00 00 04