一、前言

本文章的主要目的是笔者在使用Qt进行UDP编程时遇到的一些问题,及其解决方案。

笔者的项目需求是:瞬时大数据传输,传输数据量在100M左右,传输时间在几毫秒之内。在代码编写的过程中,笔者采用了自写的FIFO结构作为数据的缓存位置。实现方式上,笔者依次采用了Qt自带的和Windows操作系统自带的来实现通讯,也依次采用Qt自带的和Windows操作系统自带的来作为收发的线程。具体的实现方式如下,几者之间存在的问题也在代码后会进行描述。

二、数据存储FIFO实现

FIFO的目的是进行数据的缓存,因此笔者在FIFO中并没有添加如判空、判满等函数。其目的在于尽量的精简函数结构。本FIFO的互锁实现,使用的是C++基本库的实现方式。

#include

class cQueueData

{

public:

cQueueData(int capacity, int dataSize);

~cQueueData();

void pushData(uint8_t* data, uint64_t len);

void popData(uint8_t* data, uint64_t& len);

private:

struct DataStruct

{

uint64_t len;

uint8_t* data;

};

int nCapacity, nFront, nRear, nSize;

DataStruct *buffer;

std::mutex _mutex;

};

cQueueData::cQueueData(int capacity, int dataSize)

: nCapacity(capacity),

nFront(0),

nRear(0),

nSize(0)

{

buffer = new DataStruct[nCapacity];

for (int i = 0; i < nCapacity; i++)

{

buffer[i].len = 0;

buffer[i].data = new uint8_t[dataSize];

memset(buffer[i].data, 0, dataSize * sizeof (uint8_t));

}

}

cQueueData::~cQueueData()

{

nFront = 0;

nRear = 0;

nSize = 0;

for (int i = 0; i < nCapacity; i++)

{

buffer[i].len = 0;

delete [] buffer[i].data;

buffer[i].data = nullptr;

}

delete [] buffer;

buffer = nullptr;

}

void cQueueData::pushData(uint8_t *data, uint64_t len)

{

std::lock_guard lock(_mutex);

if (nSize == nCapacity)

return;

buffer[nRear].len = len;

memcpy(buffer[nRear].data, data, len);

nRear = (nRear + 1) % nCapacity;

++nSize;

}

void cQueueData::popData(uint8_t *data, uint64_t& len)

{

std::lock_guard lock(_mutex);

if (nSize == 0)

return;

len = buffer[nFront].len;

memcpy(data, buffer[nFront].data, len);

nFront = (nFront + 1) % nCapacity;

--nSize;

}

三、基于的UDP通讯

笔者构建了一个发送的类,类的主要作用是每隔500ms,发送一次总量为128M的数据。需要注意的是,尽管UDP建议一包数据的数量不超过1572字节,但是依然可以发送较大的数据包,UDP的底层会对包体进行拆解和拼接。

接下来是发送部分的代码

cUdpSend::cUdpSend(int nPort)

: m_bRunning(false),

m_pSender(nullptr),

m_pUdpSend(INVALID_SOCKET)

{

memset(&m_stUdpAddr, 0, sizeof(m_stUdpAddr));

m_nLen = 1024 * 1024 * 128;

m_pData = new uint8_t[m_nLen];

memset(m_pData, 0, m_nLen);

initUdp(true, nPort);

}

cUdpSend::~cUdpSend()

{

initUdp(false);

if (nullptr != m_pData)

{

delete [] m_pData;

m_pData = nullptr;

}

}

void cUdpSend::initUdp(bool bInit, int nPort)

{

if (bInit)

{

WSADATA wsaData;

WORD sockVersion;

sockVersion = MAKEWORD(2, 2);

if (WSAStartup(sockVersion, &wsaData) != 0)

{

WSACleanup();

return;

}

if (INVALID_SOCKET == (m_pUdpSend = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)))

{

WSACleanup();

return;

}

unsigned long iMODE = 0;

if (0 != ioctlsocket(m_pUdpSend, FIONBIO, &iMODE))

{

closesocket(m_pUdpSend);

m_pUdpSend = INVALID_SOCKET;

WSACleanup();

return;

}

m_stUdpAddr.sin_family = AF_INET;

m_stUdpAddr.sin_addr.s_addr = INADDR_BROADCAST;

m_stUdpAddr.sin_port = htons(nPort);

int optVal = 5 * 1024 * 1024;

if (setsockopt(m_pUdpSend, SOL_SOCKET, SO_SNDBUF, (const char*)&optVal, sizeof (int)) < 0)

{

closesocket(m_pUdpSend);

m_pUdpSend = INVALID_SOCKET;

WSACleanup();

return;

}

int sockopt = 1;

if(SOCKET_ERROR == setsockopt(m_pUdpSend, SOL_SOCKET, SO_BROADCAST, (char*)&sockopt, sizeof (int)))

{

closesocket(m_pUdpSend);

m_pUdpSend = INVALID_SOCKET;

WSACleanup();

return;

}

m_bRunning = true;

m_pSender = new std::thread(runSender, this);

}

else

{

m_bRunning = false;

if (nullptr != m_pSender)

{

m_pSender->join();

delete m_pSender;

m_pSender = nullptr;

}

if (INVALID_SOCKET != m_pUdpSend)

{

int optVal = 0;

setsockopt(m_pUdpSend, SOL_SOCKET, SO_SNDBUF, (const char*)&optVal, sizeof (int));

closesocket(m_pUdpSend);

m_pUdpSend = INVALID_SOCKET;

WSACleanup();

}

}

}

void cUdpSend::runSender(void *pParam)

{

cUdpSend *udpSender = (cUdpSend *)pParam;

while (udpSender->m_bRunning && INVALID_SOCKET != udpSender->m_pUdpSend)

{

for (int i = 0; i < udpSender->m_nLen; i = i + 32768)

sendto(udpSender->m_pUdpSend, (char*)(udpSender->m_pData + i),

fmin(32768, udpSender->m_nLen - i), 0, (struct sockaddr*)&udpSender->m_stUdpAddr,

sizeof (udpSender->m_stUdpAddr));

std::this_thread::sleep_for(std::chrono::milliseconds(500));

}

}

接下来是关于接收部分的代码

cUdpRecv::cUdpRecv(int nPort)

: m_bRunning(false),

m_pData(new uint8_t[32768]),

m_pRecver(nullptr),

m_pUncode(nullptr),

m_pUdpRecv(INVALID_SOCKET),

m_pQueueData(new cQueueData(32768, 32768))

{

memset(&m_stUdpAddr, 0, sizeof(m_stUdpAddr));

memset(m_pData, 0, 32768);

initUdp(true, nPort);

}

cUdpRecv::~cUdpRecv()

{

initUdp(false);

if (nullptr != m_pData)

{

delete [] m_pData;

m_pData = nullptr;

}

if (nullptr != m_pQueueData)

{

delete m_pQueueData;

m_pQueueData = nullptr;

}

}

void cUdpRecv::initUdp(bool bInit, int nPort)

{

if (bInit)

{

WSADATA wsaData;

WORD sockVersion;

sockVersion = MAKEWORD(2, 2);

if (WSAStartup(sockVersion, &wsaData) != 0)

{

WSACleanup();

return;

}

if (INVALID_SOCKET == (m_pUdpRecv = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)))

{

WSACleanup();

return;

}

unsigned long iMODE = 1;

if (0 != ioctlsocket(m_pUdpRecv, FIONBIO, &iMODE))

{

closesocket(m_pUdpRecv);

m_pUdpRecv = INVALID_SOCKET;

WSACleanup();

return;

}

m_stUdpAddr.sin_family = AF_INET;

m_stUdpAddr.sin_addr.s_addr = INADDR_ANY;

m_stUdpAddr.sin_port = htons(nPort);

if(SOCKET_ERROR == ::bind(m_pUdpRecv, (const sockaddr *)&m_stUdpAddr, sizeof(sockaddr_in)))

{

closesocket(m_pUdpRecv);

m_pUdpRecv = INVALID_SOCKET;

WSACleanup();

return;

}

int optVal = 5 * 1024 * 1024;

if (setsockopt(m_pUdpRecv, SOL_SOCKET, SO_RCVBUF, (const char*)&optVal, sizeof (int)) < 0)

{

closesocket(m_pUdpRecv);

m_pUdpRecv = INVALID_SOCKET;

WSACleanup();

return;

}

int reuseFlag = 1;

if (setsockopt(m_pUdpRecv, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseFlag, sizeof (int)) < 0)

{

closesocket(m_pUdpRecv);

m_pUdpRecv = INVALID_SOCKET;

WSACleanup();

return;

}

m_bRunning = true;

m_pUncode = new std::thread(runUncode, this);

m_pRecver = new std::thread(runRecver, this);

}

else

{

m_bRunning = false;

if (nullptr != m_pRecver)

{

m_pRecver->join();

delete m_pRecver;

m_pRecver = nullptr;

}

if (nullptr != m_pUncode)

{

m_pUncode->join();

delete m_pUncode;

m_pUncode = nullptr;

}

if (INVALID_SOCKET != m_pUdpRecv)

{

closesocket(m_pUdpRecv);

m_pUdpRecv = INVALID_SOCKET;

WSACleanup();

}

}

}

void cUdpRecv::runRecver(void *pParam)

{

cUdpRecv *udpRecv = (cUdpRecv *)pParam;

int sizeData = -1;

while (udpRecv->m_bRunning && INVALID_SOCKET != udpRecv->m_pUdpRecv)

{

sizeData = ::recv(udpRecv->m_pUdpRecv, (char*)udpRecv->m_pData, 32768, 0);

if (sizeData > 0)

udpRecv->m_pQueueData->pushData(udpRecv->m_pData, sizeData);

else

std::this_thread::sleep_for(std::chrono::nanoseconds(1));

}

}

void cUdpRecv::runUncode(void *pParam)

{

cUdpRecv *udpRecv = (cUdpRecv *)pParam;

uint8_t* pUncodeData = new uint8_t[32768];

memset(pUncodeData, 0, 32768);

uint64_t nDataLen;

while (udpRecv->m_bRunning)

{

nDataLen = 0;

udpRecv->m_pQueueData->popData(pUncodeData, nDataLen);

if (nDataLen)

{

// 做一些解码相关的事情

//

//

//

}

else

std::this_thread::sleep_for(std::chrono::microseconds(5));

}

}

在UDP的接收代码中,笔者构建了一个_hUncode的句柄用于专门的解码,读者如果有需要,可以自行对其回调函数进行定义。该套代码,笔者经过测试,可以稳定的进行发送与接收且不丢包。

四、基于的UDP通讯

接下来是使用Qt自带的类构建的发送端代码,如下:

qUdpSend::qUdpSend(int nPort)

: m_bRunning(false),

m_nPort(-1),

m_pUdpSend(nullptr)

{

m_nLen = 1024 * 1024 * 128;

m_pData = new uint8_t[m_nLen];

memset(m_pData, 0, m_nLen);

initUdp(true, nPort);

}

qUdpSend::~qUdpSend()

{

initUdp(false);

if (nullptr != m_pData)

{

delete [] m_pData;

m_pData = nullptr;

}

}

void qUdpSend::initUdp(bool bInit, int nPort)

{

if (bInit)

{

m_nPort = nPort;

m_pUdpSend = new QUdpSocket(this);

m_pThreadSend = new QThread;

this->moveToThread(m_pThreadSend);

connect(m_pThreadSend, &QThread::started, this, &qUdpSend::runSender);

connect(m_pThreadSend, &QThread::finished, m_pUdpSend, &QUdpSocket::deleteLater);

connect(m_pThreadSend, &QThread::finished, m_pThreadSend, &QThread::deleteLater);

m_bRunning = true;

m_pThreadSend->start();

}

else

{

m_bRunning = false;

if (nullptr != m_pThreadSend)

{

m_pThreadSend->quit();

m_pThreadSend->wait();

}

}

}

void qUdpSend::runSender()

{

while (m_bRunning && nullptr != m_pUdpSend)

{

for (int i = 0; i < m_nLen; i = i + 32768)

m_pUdpSend->writeDatagram((char*)m_pData + i, 32768, QHostAddress::Broadcast, m_nPort);

QThread::usleep(500);

}

}

接下来是接收端的代码。

qUdpRecv::qUdpRecv(int nPort)

: m_pData(new uint8_t[32768]),

m_pUdpRecv(nullptr),

m_pThreadRecv(nullptr),

m_pQueueData(new cQueueData(32768, 32768))

{

memset(m_pData, 0, 32768);

initUdp(true, nPort);

}

qUdpRecv::~qUdpRecv()

{

initUdp(false);

if (nullptr != m_pData)

{

delete [] m_pData;

m_pData = nullptr;

}

if (nullptr != m_pQueueData)

{

delete m_pQueueData;

m_pQueueData = nullptr;

}

}

void qUdpRecv::initUdp(bool bInit, int nPort)

{

if (bInit)

{

m_pUdpRecv = new QUdpSocket(this);

m_pUdpRecv->bind(QHostAddress::Any, nPort);

m_pThreadRecv = new QThread;

this->moveToThread(m_pThreadRecv);

connect(m_pThreadRecv, &QThread::started, this, &qUdpRecv::runRecver);

connect(m_pThreadRecv, &QThread::finished, m_pUdpRecv, &QUdpSocket::deleteLater);

connect(m_pThreadRecv, &QThread::finished, m_pThreadRecv, &QThread::deleteLater);

m_bRunning = true;

this->start();

m_pThreadRecv->start();

}

else

{

m_bRunning = false;

if (nullptr != m_pThreadRecv)

{

m_pThreadRecv->quit();

m_pThreadRecv->wait();

}

if (this->isRunning())

wait();

}

}

void qUdpRecv::runRecver()

{

int sizeData = -1;

while(m_bRunning && nullptr != m_pUdpRecv)

{

if (m_pUdpRecv->hasPendingDatagrams())

{

sizeData = m_pUdpRecv->pendingDatagramSize();

m_pUdpRecv->readDatagram((char*)m_pData, sizeData);

m_pQueueData->pushData(m_pData, sizeData);

}

}

}

void qUdpRecv::run()

{

uint8_t* pUncodeData = new uint8_t[32768];

memset(pUncodeData, 0, 32768);

uint64_t nDataLen;

while (m_bRunning)

{

nDataLen = 0;

m_pQueueData->popData(pUncodeData, nDataLen);

if (nDataLen)

{

// 做一些解码相关的事情

//

//

//

}

else

msleep(5);

}

}

五、测试总结

笔者基于C++与Qt构建了两个UDP的发送端和接收端,将其排列组合依次作为收/发端后,发现一下几个情况:

基于C++构建的cUdpSend和cUdpRecv,在短时间内大量发包和收包,无丢包情况发生;基于Qt构建的qUdpSend,短时间大量发包可以稳定快速发送,且无丢包情况,但当间隔一定时间后,再次发送,会稳定丢失前两包。基于Qt构建的qUdpRecv,存在丢包情况,丢包率在2%。

关于丢包问题主要原因在于:Qt的QUDPSocket是不使用Buffer的,而受限于处理速度和发送速度的限制,必然会产生丢包。其解决方案只能是令发送端降低频率或减小包文长度。

六、结论

综上所述,在Windows端进行UDP编程的过程中,建议使用基于C++构建的cUdpSend和cUdpRecv以满足项目需求。

推荐阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。