一个项目,要接收 UDP 数据包,解析并获取其中的数据,主要根据解析出来的行号和序号将数据拼接起来,然后将拼接起来的数据(最重要的数据是 R、G、B 三个通道的像素值)显示在窗口中。考虑到每秒钟要接收的数据包的数量较大,Python 的处理速度可能没有那么快,而且之前对 Qt 也比较熟悉了,所以用Qt 作为客户端接收处理数据包,用近期学习的 Python 模拟发送数据包。
数据格式
在 TCP/IP 协议中,UDP 数据包的大小是由限制的,因此用 UDP 传输数据时,还要在 UDP 层上再封装一层自定义的协议。这个自定义的协议比较简单,每个 UDP 包的大小为 1432 个字节,分为几个部分:
上述表格描述的就是一个完整的 UDP 包。这里的一个 UDP 数据包包含的是 RGB 某个通道的某一部分的数据。换种说法:
- 一行数据
- R 通道数据(若干个分包组成)
- G 通道数据(若干个分包组成)
- B 通道数据(若干个分包组成)
所以要生成/解析 UDP 包,最重要的是 PartialCnt、PartialIdx、RGB、LineIdx、Data 这几个部分。清楚了自定义协议就可以开始编写模拟包的生成和相应的接收逻辑了。
使用 Python 模拟 UDP 发包
由于本地开发的时候缺少必要的硬件环境,为了方便开发,用 Python 编写一个简单的 UDPServer,发送模拟生成的数据包。根据上述协议,可以写出如下的 CameraData 类来表示 UDP 数据包:
# -*- coding: utf-8 -*-
DATA_START_MAGIC = bytearray(4)
DATA_START_MAGIC[0] = 0x53 # S
DATA_START_MAGIC[1] = 0x74 # t
DATA_START_MAGIC[2] = 0x61 # a
DATA_START_MAGIC[3] = 0x72 # r
DATA_END_MAGIC = bytearray(4)
DATA_END_MAGIC[0] = 0x54 # T
DATA_END_MAGIC[1] = 0x45 # E
DATA_END_MAGIC[2] = 0x6e # n
DATA_END_MAGIC[3] = 0x64 # d
slice_start_magic = slice(0, 4)
slice_partial_cnt = 4
slice_partial_idx = 5
slice_sample_line = 6
slice_rgb_extern = 7
slice_line_idx = slice(8, 12)
slice_valid_data_len = slice(12, 16)
slice_line_bytes = slice(16, 20)
slice_resv = slice(20, 148)
slice_data = slice(148, 1428)
slice_end_magic = slice(1428, 1432)
import numpy as np
class CameraData(object):
def __init__(self):
# self.new()
# self.rawdata = rawdata
self.dataLow = 10
self.dataHigh = 20
self.new()
def genRandomByte(self, by=4):
r = bytearray(by)
for i in range(by):
r[i] = np.random.randint(0, 255)
def setPackageIdx(self, i = 0):
self.rawdata[slice_partial_idx] = i
def setRGB(self, c = 1):
self.rawdata[slice_rgb_extern] = c
def setLineIdx(self, line):
start = slice_line_idx.start
self.rawdata[start+3] = 0x000000ff & line
self.rawdata[start+2] = (0x0000ff00 & line) >> 8
self.rawdata[start+1] = (0x00ff0000 & line) >> 16
self.rawdata[start+0] = (0xff000000 & line) >> 24
def setValidDataLen(self, len):
start = slice_valid_data_len.start
self.rawdata[start+3] = 0x000000ff & len
self.rawdata[start+2] = (0x0000ff00 & len) >> 8
self.rawdata[start+1] = (0x00ff0000 & len) >> 16
self.rawdata[start+0] = (0xff000000 & len) >> 24
def setLineBytes(self, len):
start = slice_line_bytes.start
self.rawdata[start+3] = 0x000000ff & len
self.rawdata[start+2] = (0x0000ff00 & len) >> 8
self.rawdata[start+1] = (0x00ff0000 & len) >> 16
self.rawdata[start+0] = (0xff000000 & len) >> 24
def randomData(self):
size = slice_data.stop - slice_data.start
arr = np.random.randint(self.dataLow, self.dataHigh, size, dtype=np.uint8)
self.rawdata[slice_data] = bytearray(arr)
def new(self):
"""构造新的数据对象
"""
self.rawdata = bytearray(1432)
self.rawdata[slice_start_magic] = DATA_START_MAGIC
self.rawdata[slice_partial_cnt] = 0x02
self.rawdata[slice_partial_idx] = 0x00
self.rawdata[slice_sample_line] = 0x03
self.rawdata[slice_rgb_extern] = 0x01
self.setLineIdx(0x00)
self.setValidDataLen(1280)
self.setLineBytes(1432)
self.randomData()
self.rawdata[slice_end_magic] = DATA_END_MAGIC
def hex(self):
return self.rawdata.hex()
def __repr__(self):
return '<CameraData@{} hex len: {}>'.format(hex(id(self)), len(self.rawdata))
CameraData 中的 rawdata 是一个 bytearray 对象,它将会被 UdpServer 通过网络接口发送出去。设置 4 个字节大小的整数时(如写 LineIdx 行号),不能直接将数值赋到 rawdata 中,要将其中的 4 个字节分别赋值到对应的地址上才行。
CameraData 中的 randomData 方法是模拟随机数据,更好的做法不是完全随机给每个像素点赋值,而是有规律的变化,这样在接收数据出现问题、分析问题的时候可以直观地看到哪里有问题。
然后我们需要定义一个 UdpServer,用它来将数据对象中包含的信息发送出去。
import socket
class UdpServer( object ):
"""该类功能是处理底层的 UDP 数据包发送和接收,利用队列缓存所有数据
"""
def __init__(self, *args, **kwargs):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self._sock.bind( ('', DATA_PORT+11 ) )
self._sock.settimeout( None ) # never timeout
# self._sock.setblocking( 0 ) # none block
def send_msg( self, msg ):
"""发送消息,
@param msg 字典对象,发送 msg 的 rawdata 字段
"""
self._sock.sendto( msg.rawdata, ('192.168.8.1', DATA_PORT))
这个 UdpServer 非常简单,因为后续会通过这个 UdpServer 不停的发包,但是每次发包必须等待发送端成功将 UDP 包发送出去,这里不要将 socket 对象设置成非阻塞的,否则程序运行时会出现错误提示(尽管可以忽略掉这个错误提示,但是没必要设置成非阻塞的,阻塞模式完全足够了)。
在 github 中可以找到完整的 Python 文件,里面定义了其他类,如 DataSender
、RGBSender
。DataSender
是在一个线程里面发送 RGB 三个通道的值,RGBSender
的一个对象只会发送 RGB 三个通道中的某一个的值。
小结和注意事项
在本地测试的时候,为了方便在任务管理器中看到网络占用率,最初是在 VirtualBox 的 ubuntu 虚拟机上运行这个 Python 程序的,但是受到虚拟机的资源分配和电脑性能影响,调用 singleMain
函数时每秒钟最多只能产生 50MB 的数据量。但是在本地非虚拟机环境运行的时候最多可以达到 80MB 的数据量。所以尽可能地使用本地环境运行该 Python 程序可以最大限度的生成数据包。
如果让 RGB 三个通道分别在三个不同的进程中执行发送过程(注释掉 singleMain
的调用,换用 multiSend
方法),那么每秒钟的数据量可到 200MB,不过 80MB 的数据量已经足够多了(接近千兆网卡的上限了,网络利用率过高的话通过网线传输时会出现严重丢包的情况),不需要使用 multiSend
方法加大数据量。
在 singleMain 方法中,不直接执行 dataSender.serve()
,而是在新进程中执行,可以更好的利用多核优势,发送数据更快:
# singleMain()
dataSender = DataSender()
# dataSender.serve()
p = Process(target=dataSender.serve)
p.start()
实际开发过程并不是这么顺利,因为一开始并不知道在大量数据发送的时候,发送端能否有效地将数据发送出去,实际上是边编写 Python 的模拟发送数据程序,边编写 Qt 获取数据的程序,根据出现的问题逐步解决发送端和接收端的问题的。
编写 Qt 获取数据包的代码及简单的 GUI
Qt 这边作为客户端,只需要将接收到的数据包保存下来,获取其中的有效数据,再将 RGB 数据赋到 QImage 对应的像素上显示出来即可。GUI 部分比较简单,使用 QWidget 中的 label 控件,将 QImage 转换成 QPixmap,显示到 label 上就好了。初始化后的窗口如图:
比较麻烦的是接收数据和拼接。同样地,为了方便表示和解析每个 UDP 包,我们构造一些类来存储这些信息(现在想想似乎直接用结构体表示会更简单)。
定义数据实体
我们在 Qt 中定义 CameraData
类来表示数据包实体:
/**
* @brief The CameraData class
* 对应从下位机接收到的字节数组的类,原始数据包,需要经过处理后变成一行数据
*/
class CameraData : public DataObj
{
Q_OBJECT
public:
enum RGBType {
R = 1,
G = 2,
B = 3,
UNKOWN = 0
};
static const QByteArray DATA_START_MAGIC;
static const QByteArray DATA_END_MAGIC;
static const int PacketSize;
explicit CameraData(QObject *parent = 0);
~CameraData();
bool isPackageValid();
// 获取保留区域的数据
QByteArray getReserved();
// 设置原始数据
void setRawData(const QByteArray &value);
void setRawData(const char *data);
// 获取数据区域内的所有数据,默认获取有效数据
QByteArray getData(bool valid = true);
int getPackageCntInLine();
int getPackageIdxInLine();
int getSampleDiffLine();
int getRGBExtern();
RGBType getRGBType();
int getLineIdx();
int getValidDataLen();
int getLineBytes();
int sliceToInt(int start, int len = 4);
// DataObj interface
void reset();
signals:
public slots:
private:
inline QByteArray slice(int start, int len = -1);
inline QByteArray getStartMagic();
inline QByteArray getEndMagic();
QByteArray data;
int packageCntInLine = -1;
int packegeIdxInLine = -1;
int lineIdx = -1;
int lineBytes = -1;
int rgbType = -1;
};
CameraData
类继承自 DataObj
类,而 DataObj
类又继承自 QObject
,这样方便进行内存管理和对象上的操作。DataObj
是为了方便复用对象而定义的基类,详细代码可参考 github 上的完整代码。
C++ 部分的 CameraData
类与 Python 中定义的 CameraData
类是对应的,不过 C++ 部分的 CameraData
类只需要调用 CameraData::setRawData
传入一个 QByteArray 对象后就可以自动将其中包含的数据解析出来,并且它只提供获取数据的接口而不提供修改数据的接口。
另外我们还需要定义一个类 PreProcessData,来表示一行数据:
/**
* @brief The PreProcessData class
* 预处理数据
*/
class PreProcessData: public DataObj
{
Q_OBJECT
public:
static const int PacketSize;
static const int PacketPerLine;
explicit PreProcessData(QObject *parent = 0, int line = -1);
void put(CameraData *cd);
bool isReady();
void reset();
int line() const;
void setLine(int line);
const QByteArrayList &getDataList() const;
QByteArray repr();
private:
/**
* @brief cameraData
* 每 2 个 CameraData 构成一行的单通道数据,有序存放 RGB 通道数据
* 0-1 存放 R,2-3 存放 G, 4-5 存放 B
*/
QByteArrayList dataList;
int m_line;
int m_readyCount = 0;
int m_duplicateCount = 0;
bool *dataPlaced = 0;
};
目前的协议中,每 2 个数据包(对应 2 个 CameraData
对象)构成某一行的单通道数据,所以 PreProcessData
中至少会包含 6 个 CameraData
对象,处理完 CameraData
对象后,只需要存储 Data 部分即可,所以这里没有用 QList 列表,而是直接使用 QByteArrayList
来存储数据。当三个通道的数据都准备好后,PreProcessData::isReady
就会返回 true,表示该行数据已经准备好,可以显示在窗口中。
在子线程中执行接收 UDP 包和处理过程
我们定义一个 Controller
类用来操作数据接收对象和子线程。用 Qt 的事件槽机制和 QObject::moveToThread
实现多线程非常方便,不重写 QThread 的 run 方法就可以让对象的方法在子线程中执行。
class Controller : public QObject
{
Q_OBJECT
public:
explicit Controller(QObject *parent = 0);
~Controller();
static const int DataPort;
static const int CONTROL_PORT;
static const QStringList BOARD_IP;
void start();
void stop();
DataProcessor *getDataProcessor() const;
signals:
public slots:
private:
CameraDataReceiver *cdr;
QThread recvThread;
QThread recvProcessThread;
QByteArrayList rawdataList;
DataProcessor *dp = 0;
QTimer *statsTimer;
int statsInterval;
};
其中 CameraDataReceiver
对象会被实例化,在子线程中接收 UDP 数据包(因为发送和接收数据的端口是不同的,操作和数据是分离的)。这里将 DataProcessor 通过 getDataProcessor
暴露给上层应用,以便上层应用连接信号槽接收图像。仅到接收数据,就用到了三个线程:分别是 GUI 线程,用于接收 UDP 包的 recvThread 线程和处理 UDP 的 recvProcessThread。
为什么接收 UDP 包和处理 UDP 包不是放在一个线程中执行呢?因为这里的数据量实在太多,最开始实现的时候这两个逻辑代码确实是在同一个线程中执行,然而由于处理数据的代码执行起来也要消耗时间,将会导致无法接收其他的 UDP 包,这样的话就会导致比较严重的丢包。为了保证接收端不会丢包,只好将处理逻辑放在其他的线程中执行。
Qt 接收 UDP 包
将接收数据和处理数据放在不同的线程中执行,确实可以解决丢包问题了,但是会出现新的问题:接收到的包如果不能够及时处理完,并且释放掉相应的资源,那么可能会出现程序将数据缓存下来但无法处理,程序占用的内存越来越大,导致程序运行起来越来越慢。
在编写程序时误以为是 Qt 的事件循环机制过慢导致程序处理不了那么多数据(实际上它的速度足够处理这些数据),因此将程序中使用的 QUdpSocket 对象换成了 [Windows 平台的 Socket 通信代码][winsock demo],并将其改写成类方便调用。实际上是在 QThread 子线程中无限循环地运行 recvfrom(clientSocket, recvedData.data(), recvbuflen, 0, &fromaddr, &addrLen);
这样的接收数据包函数,跳过了 Qt 事件循环机制,然后当接收到包之后再通过回调函数通知数据处理线程进行处理。
但当我写这篇博客,重新用正常的代码进行测试时,发现即便使用 QUdpSocket::readyRead
信号来接收 UDP 数据,只要数据处理进程不堆积数据,就不会出现占用内存越来越多的情况。换句话说,不是 Qt 无法处理实时性的数据,而是自己编写的代码里面有问题。
回想最开始写的程序,在处理 QByteArray 表示的原始数据时,会为每一个接收到的数据包分配地址,而且分配的地址位于堆中。而实际上在堆 heap 中分配回收内存地址相较于在栈 stack 中是慢得多的。为每个到来的数据用 new 构造一个新的 CameraData 对象,然后在处理完后将这个 CameraData delete 掉其实是很慢的,如果你这样做了,并且你在 CameraData 的析构函数中加上 qDebug 语句打印 "CameraData is deleting...",你会发现,当发送方(我们的 Python 模拟发送程序)停止发送数据包后很长一段时间内,Qt 程序在一直打印着 "CameraData is deleting"。
而我最开始就是这么做的,所以发生了 Qt 程序随着数据接收的变多,占用的内存越来越大的情况。当然,这不排除 qDebug 语句输出到控制台上也会占用很多时间。如果每秒钟要调用上万次 qDebug() << "CameraData is deleting"
,那么建议你使用一个计数变量控制 qDebug 的调用次数,因为这条语句的调用也会让数据处理变得缓慢。
处理接收到的 UDP 包
为了让接收端不丢包,需要快速的处理接收到的 UDP 包,并且在处理的代码中不要调用耗时的函数或者 new 操作。为了避免重复调用 new 和 delete 操作符,我们需要构建一个对象池,以便复用池中的对象,减少 new 操作。池的定义比较简单,封装一个 QList
容器类就好了,为了简化和复用池的代码,我用到了 c++ 的 template 特性,但是这个 DataObjPool
中的容器只能是 DataObj 的子类:
template<class T>
class DataObjPool
{
public:
virtual ~DataObjPool() {
qDeleteAll(pool);
numAvailable = 0;
}
T *getAvailable() {
if( numAvailable == 0 ) {
return 0;
}
for(int i = 0; i < pool.size(); i++) {
T *item = pool[i];
if(item->isValid()) {
item->setValid(false);
numAvailable -= 1;
return item;
}
}
return 0;
}
T *get(int id) {
return pool[id];
}
inline bool release(T *dobj) {
dobj->reset();
numAvailable += 1;
return true;
}
int releaseTimeout(int now, int timeout = 100) {
int releaseCount = 0;
for(int i = 0; i < pool.size(); i++) {
T *item = pool[i];
if(now > item->getGenerateMs() + timeout) {
item->reset();
numAvailable += 1;
releaseCount += 1;
}
}
return releaseCount;
}
void releaseAll() {
for(int i = 0; i < pool.size(); i++) {
T *item = pool[i];
if(item->isValid()) {
continue;
}
item->reset();
numAvailable += 1;
}
}
int getNumAvailable() const {
return numAvailable;
}
template<class T2> operator DataObjPool<T2>();
protected:
DataObjPool(int size = 100);
private:
QList<T *> pool;
int numAvailable = 0;
};
class RawDataObjPool: public DataObjPool<CameraData>
{
public:
RawDataObjPool(int size = 100);
};
class LineDataPool : public DataObjPool<PreProcessData>
{
public:
LineDataPool(int size = 100);
};
当然你也可以直接编写两个类 RawDataObjPool
和 LineDataPool
,把池的操作分别复制到两个类中,使用模板特化的好处是改动的时候不需要改动两个类了。前面说过,DataObj
类继承自 QObject
,就是为了简化在对象池中进行的操作。DataObjPool
会在构造时在内存中预分配一定数量的对象,以 RawDataObjPool
为例,构造时传入 size 参数,便会预先在内存中创建 size 个 CameraData,在程序运行过程中,这些对象都会被我们这个 Qt 程序循环利用,直到关闭程序才会释放掉这些 CameraData(如果操作系统的内存不足,过多的对象占用的内存还是会被释放)。
对象池的主要接口有两个:getAvailable
和 release
分别用于获取可用的对象或释放掉池中的对象,注意这里的释放是让对象池对该对象进行标记,以便重复使用,而不是释放掉该对象占用的内存空间或 delete 掉。当对象池中无可用对象时,可以根据需要释放掉超时的对象或者释放掉全部对象。
使用对象池减少 new 操作符的使用后,处理数据的子线程的速度明显加快。正常情况下就可以看到如下的图片:
这里数据显示的部分还有待完善,因为发送端的发送数据大小不够凑成一行,所以图片的右侧部分是空白的。
数据的复制
这里说一下数据的复制,从 Socket 接口中传上来的数据,我们用 QByteArray
对象保存了底层的数据,即便在 UDP 数据包中含有很多个 \x00
这样的数据,QByteArray 也会正确识别出字符串的结束位置。
在设置 CameraData::setRawData(const QByteArray &value)
函数中,尽量避免手动调用 memcpy(data.data(), value, value.size());
这个底层 API,因为你不知道它会将 QByteArray 对象 CameraData.data
中的 char * data()
指针指向哪个位置。
我在 CameraData.cpp
文件中将它注释掉了,因为在程序运行和调试时它给我带来了巨大的困惑:经常出现 invalid address specified to rtlvalidateheap
这种类型的错误。经过很长时间的排查后发现注释掉这行代码,程序就能一直稳定运行。
总结
- 在 c++ 程序中要使用大量可重用的对象时,尽量避免频繁地使用 new 操作符新建对象,使用对象池来获取对象,这样可以加快程序的运行速度。
- Qt 的事件循环机制实际上运行地足够快,是可以处理实时性的数据的,在程序出现问题时,还是应该多找找自己编写的代码中的问题。
- 对于 memcpy 这类的底层 API,不熟悉的话尽量少用,否则出现问题很难 debug。
完整的项目代码可以在 github 中找到。