一、前言
本文章的主要目的是笔者在使用Qt进行UDP编程时遇到的一些问题,及其解决方案。
笔者的项目需求是:瞬时大数据传输,传输数据量在100M左右,传输时间在几毫秒之内。在代码编写的过程中,笔者采用了自写的FIFO结构作为数据的缓存位置。实现方式上,笔者依次采用了Qt自带的
二、数据存储FIFO实现
FIFO的目的是进行数据的缓存,因此笔者在FIFO中并没有添加如判空、判满等函数。其目的在于尽量的精简函数结构。本FIFO的互锁实现,使用的是C++基本库的实现方式。
#include
class cQueueData
{
public:
cQueueData(int capacity, int dataSize);
~cQueueData();
void pushData(uint8_t* data, uint64_t len);
void popData(uint8_t* data, uint64_t& len);
private:
struct DataStruct
{
uint64_t len;
uint8_t* data;
};
int nCapacity, nFront, nRear, nSize;
DataStruct *buffer;
std::mutex _mutex;
};
cQueueData::cQueueData(int capacity, int dataSize)
: nCapacity(capacity),
nFront(0),
nRear(0),
nSize(0)
{
buffer = new DataStruct[nCapacity];
for (int i = 0; i < nCapacity; i++)
{
buffer[i].len = 0;
buffer[i].data = new uint8_t[dataSize];
memset(buffer[i].data, 0, dataSize * sizeof (uint8_t));
}
}
cQueueData::~cQueueData()
{
nFront = 0;
nRear = 0;
nSize = 0;
for (int i = 0; i < nCapacity; i++)
{
buffer[i].len = 0;
delete [] buffer[i].data;
buffer[i].data = nullptr;
}
delete [] buffer;
buffer = nullptr;
}
void cQueueData::pushData(uint8_t *data, uint64_t len)
{
std::lock_guard
if (nSize == nCapacity)
return;
buffer[nRear].len = len;
memcpy(buffer[nRear].data, data, len);
nRear = (nRear + 1) % nCapacity;
++nSize;
}
void cQueueData::popData(uint8_t *data, uint64_t& len)
{
std::lock_guard
if (nSize == 0)
return;
len = buffer[nFront].len;
memcpy(data, buffer[nFront].data, len);
nFront = (nFront + 1) % nCapacity;
--nSize;
}
三、基于
笔者构建了一个发送的类,类的主要作用是每隔500ms,发送一次总量为128M的数据。需要注意的是,尽管UDP建议一包数据的数量不超过1572字节,但是依然可以发送较大的数据包,UDP的底层会对包体进行拆解和拼接。
接下来是发送部分的代码
cUdpSend::cUdpSend(int nPort)
: m_bRunning(false),
m_pSender(nullptr),
m_pUdpSend(INVALID_SOCKET)
{
memset(&m_stUdpAddr, 0, sizeof(m_stUdpAddr));
m_nLen = 1024 * 1024 * 128;
m_pData = new uint8_t[m_nLen];
memset(m_pData, 0, m_nLen);
initUdp(true, nPort);
}
cUdpSend::~cUdpSend()
{
initUdp(false);
if (nullptr != m_pData)
{
delete [] m_pData;
m_pData = nullptr;
}
}
void cUdpSend::initUdp(bool bInit, int nPort)
{
if (bInit)
{
WSADATA wsaData;
WORD sockVersion;
sockVersion = MAKEWORD(2, 2);
if (WSAStartup(sockVersion, &wsaData) != 0)
{
WSACleanup();
return;
}
if (INVALID_SOCKET == (m_pUdpSend = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)))
{
WSACleanup();
return;
}
unsigned long iMODE = 0;
if (0 != ioctlsocket(m_pUdpSend, FIONBIO, &iMODE))
{
closesocket(m_pUdpSend);
m_pUdpSend = INVALID_SOCKET;
WSACleanup();
return;
}
m_stUdpAddr.sin_family = AF_INET;
m_stUdpAddr.sin_addr.s_addr = INADDR_BROADCAST;
m_stUdpAddr.sin_port = htons(nPort);
int optVal = 5 * 1024 * 1024;
if (setsockopt(m_pUdpSend, SOL_SOCKET, SO_SNDBUF, (const char*)&optVal, sizeof (int)) < 0)
{
closesocket(m_pUdpSend);
m_pUdpSend = INVALID_SOCKET;
WSACleanup();
return;
}
int sockopt = 1;
if(SOCKET_ERROR == setsockopt(m_pUdpSend, SOL_SOCKET, SO_BROADCAST, (char*)&sockopt, sizeof (int)))
{
closesocket(m_pUdpSend);
m_pUdpSend = INVALID_SOCKET;
WSACleanup();
return;
}
m_bRunning = true;
m_pSender = new std::thread(runSender, this);
}
else
{
m_bRunning = false;
if (nullptr != m_pSender)
{
m_pSender->join();
delete m_pSender;
m_pSender = nullptr;
}
if (INVALID_SOCKET != m_pUdpSend)
{
int optVal = 0;
setsockopt(m_pUdpSend, SOL_SOCKET, SO_SNDBUF, (const char*)&optVal, sizeof (int));
closesocket(m_pUdpSend);
m_pUdpSend = INVALID_SOCKET;
WSACleanup();
}
}
}
void cUdpSend::runSender(void *pParam)
{
cUdpSend *udpSender = (cUdpSend *)pParam;
while (udpSender->m_bRunning && INVALID_SOCKET != udpSender->m_pUdpSend)
{
for (int i = 0; i < udpSender->m_nLen; i = i + 32768)
sendto(udpSender->m_pUdpSend, (char*)(udpSender->m_pData + i),
fmin(32768, udpSender->m_nLen - i), 0, (struct sockaddr*)&udpSender->m_stUdpAddr,
sizeof (udpSender->m_stUdpAddr));
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
接下来是关于接收部分的代码
cUdpRecv::cUdpRecv(int nPort)
: m_bRunning(false),
m_pData(new uint8_t[32768]),
m_pRecver(nullptr),
m_pUncode(nullptr),
m_pUdpRecv(INVALID_SOCKET),
m_pQueueData(new cQueueData(32768, 32768))
{
memset(&m_stUdpAddr, 0, sizeof(m_stUdpAddr));
memset(m_pData, 0, 32768);
initUdp(true, nPort);
}
cUdpRecv::~cUdpRecv()
{
initUdp(false);
if (nullptr != m_pData)
{
delete [] m_pData;
m_pData = nullptr;
}
if (nullptr != m_pQueueData)
{
delete m_pQueueData;
m_pQueueData = nullptr;
}
}
void cUdpRecv::initUdp(bool bInit, int nPort)
{
if (bInit)
{
WSADATA wsaData;
WORD sockVersion;
sockVersion = MAKEWORD(2, 2);
if (WSAStartup(sockVersion, &wsaData) != 0)
{
WSACleanup();
return;
}
if (INVALID_SOCKET == (m_pUdpRecv = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)))
{
WSACleanup();
return;
}
unsigned long iMODE = 1;
if (0 != ioctlsocket(m_pUdpRecv, FIONBIO, &iMODE))
{
closesocket(m_pUdpRecv);
m_pUdpRecv = INVALID_SOCKET;
WSACleanup();
return;
}
m_stUdpAddr.sin_family = AF_INET;
m_stUdpAddr.sin_addr.s_addr = INADDR_ANY;
m_stUdpAddr.sin_port = htons(nPort);
if(SOCKET_ERROR == ::bind(m_pUdpRecv, (const sockaddr *)&m_stUdpAddr, sizeof(sockaddr_in)))
{
closesocket(m_pUdpRecv);
m_pUdpRecv = INVALID_SOCKET;
WSACleanup();
return;
}
int optVal = 5 * 1024 * 1024;
if (setsockopt(m_pUdpRecv, SOL_SOCKET, SO_RCVBUF, (const char*)&optVal, sizeof (int)) < 0)
{
closesocket(m_pUdpRecv);
m_pUdpRecv = INVALID_SOCKET;
WSACleanup();
return;
}
int reuseFlag = 1;
if (setsockopt(m_pUdpRecv, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseFlag, sizeof (int)) < 0)
{
closesocket(m_pUdpRecv);
m_pUdpRecv = INVALID_SOCKET;
WSACleanup();
return;
}
m_bRunning = true;
m_pUncode = new std::thread(runUncode, this);
m_pRecver = new std::thread(runRecver, this);
}
else
{
m_bRunning = false;
if (nullptr != m_pRecver)
{
m_pRecver->join();
delete m_pRecver;
m_pRecver = nullptr;
}
if (nullptr != m_pUncode)
{
m_pUncode->join();
delete m_pUncode;
m_pUncode = nullptr;
}
if (INVALID_SOCKET != m_pUdpRecv)
{
closesocket(m_pUdpRecv);
m_pUdpRecv = INVALID_SOCKET;
WSACleanup();
}
}
}
void cUdpRecv::runRecver(void *pParam)
{
cUdpRecv *udpRecv = (cUdpRecv *)pParam;
int sizeData = -1;
while (udpRecv->m_bRunning && INVALID_SOCKET != udpRecv->m_pUdpRecv)
{
sizeData = ::recv(udpRecv->m_pUdpRecv, (char*)udpRecv->m_pData, 32768, 0);
if (sizeData > 0)
udpRecv->m_pQueueData->pushData(udpRecv->m_pData, sizeData);
else
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
}
}
void cUdpRecv::runUncode(void *pParam)
{
cUdpRecv *udpRecv = (cUdpRecv *)pParam;
uint8_t* pUncodeData = new uint8_t[32768];
memset(pUncodeData, 0, 32768);
uint64_t nDataLen;
while (udpRecv->m_bRunning)
{
nDataLen = 0;
udpRecv->m_pQueueData->popData(pUncodeData, nDataLen);
if (nDataLen)
{
// 做一些解码相关的事情
//
//
//
}
else
std::this_thread::sleep_for(std::chrono::microseconds(5));
}
}
在UDP的接收代码中,笔者构建了一个_hUncode的句柄用于专门的解码,读者如果有需要,可以自行对其回调函数进行定义。该套代码,笔者经过测试,可以稳定的进行发送与接收且不丢包。
四、基于
接下来是使用Qt自带的类构建的发送端代码,如下:
qUdpSend::qUdpSend(int nPort)
: m_bRunning(false),
m_nPort(-1),
m_pUdpSend(nullptr)
{
m_nLen = 1024 * 1024 * 128;
m_pData = new uint8_t[m_nLen];
memset(m_pData, 0, m_nLen);
initUdp(true, nPort);
}
qUdpSend::~qUdpSend()
{
initUdp(false);
if (nullptr != m_pData)
{
delete [] m_pData;
m_pData = nullptr;
}
}
void qUdpSend::initUdp(bool bInit, int nPort)
{
if (bInit)
{
m_nPort = nPort;
m_pUdpSend = new QUdpSocket(this);
m_pThreadSend = new QThread;
this->moveToThread(m_pThreadSend);
connect(m_pThreadSend, &QThread::started, this, &qUdpSend::runSender);
connect(m_pThreadSend, &QThread::finished, m_pUdpSend, &QUdpSocket::deleteLater);
connect(m_pThreadSend, &QThread::finished, m_pThreadSend, &QThread::deleteLater);
m_bRunning = true;
m_pThreadSend->start();
}
else
{
m_bRunning = false;
if (nullptr != m_pThreadSend)
{
m_pThreadSend->quit();
m_pThreadSend->wait();
}
}
}
void qUdpSend::runSender()
{
while (m_bRunning && nullptr != m_pUdpSend)
{
for (int i = 0; i < m_nLen; i = i + 32768)
m_pUdpSend->writeDatagram((char*)m_pData + i, 32768, QHostAddress::Broadcast, m_nPort);
QThread::usleep(500);
}
}
接下来是接收端的代码。
qUdpRecv::qUdpRecv(int nPort)
: m_pData(new uint8_t[32768]),
m_pUdpRecv(nullptr),
m_pThreadRecv(nullptr),
m_pQueueData(new cQueueData(32768, 32768))
{
memset(m_pData, 0, 32768);
initUdp(true, nPort);
}
qUdpRecv::~qUdpRecv()
{
initUdp(false);
if (nullptr != m_pData)
{
delete [] m_pData;
m_pData = nullptr;
}
if (nullptr != m_pQueueData)
{
delete m_pQueueData;
m_pQueueData = nullptr;
}
}
void qUdpRecv::initUdp(bool bInit, int nPort)
{
if (bInit)
{
m_pUdpRecv = new QUdpSocket(this);
m_pUdpRecv->bind(QHostAddress::Any, nPort);
m_pThreadRecv = new QThread;
this->moveToThread(m_pThreadRecv);
connect(m_pThreadRecv, &QThread::started, this, &qUdpRecv::runRecver);
connect(m_pThreadRecv, &QThread::finished, m_pUdpRecv, &QUdpSocket::deleteLater);
connect(m_pThreadRecv, &QThread::finished, m_pThreadRecv, &QThread::deleteLater);
m_bRunning = true;
this->start();
m_pThreadRecv->start();
}
else
{
m_bRunning = false;
if (nullptr != m_pThreadRecv)
{
m_pThreadRecv->quit();
m_pThreadRecv->wait();
}
if (this->isRunning())
wait();
}
}
void qUdpRecv::runRecver()
{
int sizeData = -1;
while(m_bRunning && nullptr != m_pUdpRecv)
{
if (m_pUdpRecv->hasPendingDatagrams())
{
sizeData = m_pUdpRecv->pendingDatagramSize();
m_pUdpRecv->readDatagram((char*)m_pData, sizeData);
m_pQueueData->pushData(m_pData, sizeData);
}
}
}
void qUdpRecv::run()
{
uint8_t* pUncodeData = new uint8_t[32768];
memset(pUncodeData, 0, 32768);
uint64_t nDataLen;
while (m_bRunning)
{
nDataLen = 0;
m_pQueueData->popData(pUncodeData, nDataLen);
if (nDataLen)
{
// 做一些解码相关的事情
//
//
//
}
else
msleep(5);
}
}
五、测试总结
笔者基于C++与Qt构建了两个UDP的发送端和接收端,将其排列组合依次作为收/发端后,发现一下几个情况:
基于C++构建的cUdpSend和cUdpRecv,在短时间内大量发包和收包,无丢包情况发生;基于Qt构建的qUdpSend,短时间大量发包可以稳定快速发送,且无丢包情况,但当间隔一定时间后,再次发送,会稳定丢失前两包。基于Qt构建的qUdpRecv,存在丢包情况,丢包率在2%。
关于丢包问题主要原因在于:Qt的QUDPSocket是不使用Buffer的,而受限于处理速度和发送速度的限制,必然会产生丢包。其解决方案只能是令发送端降低频率或减小包文长度。
六、结论
综上所述,在Windows端进行UDP编程的过程中,建议使用基于C++构建的cUdpSend和cUdpRecv以满足项目需求。
推荐阅读
发表评论