文章目录

1. 目的2. 实现?验证!makefileQueue 类的 public 成员单元测试

3. 实现 Queue 类的方案

1. 目的

串行的程序只用到单个 CPU 核心, 希望加速整个程序, 考虑使用多线程加速。典型情况下可以找到生产者、消费者,两个角色之间通过队列进行数据交互:

生产者负责把数据放入队列Q消费者负责从队列取出数据Q

要求队列是线程安全的,即:不能有读写冲突等。

2. 实现?验证!

这里并不给出具体实现, 主要原因是网络上有太多的“实现”,也许很强大,但是否正确则有待验证,反倒是怎样验证正确性,总是被忽略:

新手小白,或者“算法工程师”们,往往没怎么写过合格的单元测试验证也许只是粗略跑一下,Thread Sanitizer 这样的有力武器没有被用上

makefile

我是在 Linux 下验证的, 用的 makefile 如下, 重点是 tsan 的设定, 以及 gtest 的配置:

SANITIZER_OPTION=-fsanitize=thread -fno-omit-frame-pointer

#SANITIZER_OPTION=

all:

clang++ test_queue.cpp -I. -g `pkg-config --cflags --libs gtest gtest_main` ${SANITIZER_OPTION}

Queue 类的 public 成员

template

class Queue

{

public:

Queue(unsigned int max_size = 0);

~Queue();

void push(const T& elem);

T pop();

bool empty();

size_t size();

其中:

Queue是模板类,这样可以支持任意数据类型作为队列元素(但队列中所有元素类型需要相同)所有成员函数都不能是 const 的, 尤其是 empty 和 size 函数, 原因是当前线程调用它们时,其他线程可能立即改变队列成员,需要 mutex 锁住, 对于 mutex 的操作导致函数不再是 const 的支持设定队列最大元素数量,如果没指定, 看似用0,实际表示“无限”

单元测试

如下是基于 GoogleTest 和 Queue 的 ADT 给出的单元测试代码。 如果你基于上述 Queue 类的定义, 能通过如下单元测试, 那么程序的正确性应该说比较高了。这部分代码的价值比 Queue 本身的价值要更高, 但往往被人们忽略:

#include

#include

#include

#include

using namespace digimon;

//using namespace Shadow;

TEST(Queue, SingleThread)

{

Queue q;

EXPECT_EQ(q.empty(), true);

q.push(1);

q.push(2);

EXPECT_EQ(q.empty(), false);

int x = q.pop();

EXPECT_EQ(x, 1);

x = q.pop();

EXPECT_EQ(x, 2);

}

class ThreadData

{

public:

ThreadData() {}

ThreadData(Queue* _q, int _start, int _end) :

q(_q), start(_start), end(_end)

{}

public:

Queue* q;

int start;

int end;

};

class ConsumerThreadData

{

public:

ConsumerThreadData(Queue* _q, int _start, int _end) :

q(_q), start(_start), end(_end), sum(0)

{

pthread_mutex_init(&mutex, NULL);

}

~ConsumerThreadData()

{

pthread_mutex_destroy(&mutex);

}

public:

Queue* q;

int start;

int end;

int sum;

pthread_mutex_t mutex;

};

static void* producer(void* _thread_data)

{

ThreadData* thread_data = (ThreadData*)_thread_data;

for (int i = thread_data->start; i < thread_data->end; i++)

{

thread_data->q->push(i);

}

return NULL;

}

TEST(Queue, MultiThread_MultiProducer)

{

Queue q;

pthread_t t1;

ThreadData thread_data1(&q, 0, 10);

pthread_create(&t1, NULL, producer, (void*)&thread_data1);

pthread_t t2;

ThreadData thread_data2(&q, 0, 10);

pthread_create(&t2, NULL, producer, (void*)&thread_data2);

pthread_join(t1, NULL);

pthread_join(t2, NULL);

EXPECT_EQ(q.empty(), false);

EXPECT_EQ(q.size(), 20);

int sum = 0;

while (!q.empty())

{

int x = q.pop();

sum += x;

}

int expected_sum = 90;

EXPECT_EQ(expected_sum, sum);

}

static void* consumer(void* _thread_data)

{

ConsumerThreadData* thread_data = (ConsumerThreadData*)_thread_data;

for (int i = thread_data->start; i < thread_data->end; i++)

{

int x = thread_data->q->pop();

thread_data->sum += x;

std::cout << x << std::endl;

}

return NULL;

}

TEST(Queue, MultiThread_SingleProducer_SingleConsumer)

{

Queue q;

pthread_t t1;

ThreadData thread_data1(&q, 0, 10);

pthread_create(&t1, NULL, producer, (void*)&thread_data1);

pthread_t t2;

ConsumerThreadData thread_data2(&q, 0, 10);

pthread_create(&t2, NULL, consumer, (void*)&thread_data2);

pthread_join(t1, NULL);

pthread_join(t2, NULL);

EXPECT_EQ(q.empty(), true);

EXPECT_EQ(q.size(), 0);

}

static void* producer_slow(void* _thread_data)

{

ThreadData* thread_data = (ThreadData*)_thread_data;

for (int i = thread_data->start; i < thread_data->end; i++)

{

sleep(1);

thread_data->q->push(i);

}

return NULL;

}

TEST(Queue, MultiThread_Consumer_Meaningless_Grab_Mutex)

{

Queue q;

pthread_t t1;

ThreadData thread_data1(&q, 0, 3);

pthread_create(&t1, NULL, producer_slow, (void*)&thread_data1);

pthread_t t2;

ConsumerThreadData thread_data2(&q, 0, 3);

pthread_create(&t2, NULL, consumer, (void*)&thread_data2);

pthread_join(t1, NULL);

pthread_join(t2, NULL);

EXPECT_EQ(q.empty(), true);

EXPECT_EQ(q.size(), 0);

EXPECT_EQ(thread_data2.sum, 3);

}

static void* consumer_slow(void* _thread_data)

{

ConsumerThreadData* thread_data = (ConsumerThreadData*)_thread_data;

for (int i = thread_data->start; i < thread_data->end; i++)

{

EXPECT_EQ(thread_data->q->size(), 5);

int x = thread_data->q->pop();

thread_data->sum += x;

sleep(1);

std::cout << x << std::endl;

}

return NULL;

}

TEST(Queue, LimitedQueueSize)

{

Queue q(5);

pthread_t t1;

ThreadData thread_data1(&q, 0, 10);

pthread_create(&t1, NULL, producer, (void*)&thread_data1);

pthread_t t2;

ConsumerThreadData thread_data2(&q, 0, 5);

pthread_create(&t2, NULL, consumer_slow, (void*)&thread_data2);

pthread_join(t1, NULL);

pthread_join(t2, NULL);

EXPECT_EQ(q.empty(), false);

EXPECT_EQ(q.size(), 5);

}

3. 实现 Queue 类的方案

可以基于 C++ 11 实现, 不过据说 C++11 的 thread 在华为手机上有问题,传闻中 pthread 能消除问题; 于是乎还有另一个选择: C++03 + pthread 实现 Queue 类。

Windows 平台上可以使用 windows-pthreads 库, 它是基于 Windows threads 模拟实现了 PThread 和 Semaphore 接口。(完)

相关阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。