<翻译> C++多线程编程(第6-10部分)

第6-7部分 事件处理与条件变量

现在我们需要设计一个网络应用,这个应用需要完成如下任务:

  1. 和服务器交互
  2. 从XML文件中加载一些数据
  3. 处理第2个任务中得到的数据

显然任务1不依赖于其他两个任务,而任务3依赖于任务2。如果这个应用只有一个线程,那么任务2中读取XML文件的操作会拖慢应用的速度。为了提高性能,我们可以将任务2设计成单独的子线程,而任务1和3设计成主线程。具体来说,线程1负责:

  1. 和服务器交互
  2. 等待XML数据被线程2加载
  3. 收到数据就作数据处理

而线程2负责:

  1. 加载XML数据
  2. 告知线程1数据是否完成

可以从下图直观的看出程序的执行流程:

图中线程1需要等待线程2发送完成数据读取的信号。一旦收到信号,便会执行数据处理操作。这里,多线程的好处在于,当线程1忙于与服务器交互的时候,线程2可以同步加载数据,这样就减小了总的等待时间。那么如何实现这个程序呢?

第一种方法

最简单的想法是自定义一个全局变量,作为线程2是否完成数据读取的信号。初始化这个变量为false,当线程2完成数据读取就设置为true。线程1轮询这个变量,当读到这个变量为true时,就处理数据。但是由于这个变量被两个线程共用,因此需要使用互斥锁来进行线程同步。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include<iostream>
#include<thread>
#include<mutex>
class Application
{
std::mutex m_mutex;
bool m_bDataLoaded;
public:
Application()
{
m_bDataLoaded = false;
}
void loadData()
{
// Make This Thread sleep for 1 Second
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout<<"Loading Data from XML"<<std::endl;
// Lock The Data structure
std::lock_guard<std::mutex> guard(m_mutex);
// Set the flag to true, means data is loaded
m_bDataLoaded = true;
}
void mainTask()
{
std::cout<<"Do Some Handshaking"<<std::endl;
// Acquire the Lock
m_mutex.lock();
// Check if flag is set to true or not
while(m_bDataLoaded != true)
{
// Release the lock
m_mutex.unlock();
//sleep for 100 milli seconds
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Acquire the lock
m_mutex.lock();
}
// Release the lock
m_mutex.unlock();
//Doc processing on loaded Data
std::cout<<"Do Processing On loaded Data"<<std::endl;
}
};

int main()
{
Application app;
std::thread thread_1(&Application::mainTask, &app);
std::thread thread_2(&Application::loadData, &app);
thread_2.join();
thread_1.join();
return 0;
}

输出如下:

然而这种做法存在缺陷

反复的上锁和解锁会消耗大量不必要的时间。有没有更好的解决方法呢?

第二种方法

为了杜绝反复的轮询,我们希望线程1能够在事件未发生时阻塞,只有当事件发生后才继续。这种想法可以通过条件变量实现。

条件变量是一种用于两个线程交互的变量。一个线程等待它被激活,另一个线程则负责激活它。

使用条件变量时需要添加头文件:

1
#include <condition_variable>

让我们看一下条件变量如何工作:

  • 首先定义一个互斥锁,用于条件变量的工作。
  • 线程1调用条件变量的wait()函数,这个函数在其内部查询互斥锁,检查是否满足了指定的条件。
  • 如果没有满足指定条件,条件变量释放这个互斥锁并阻塞线程1。
  • 线程2负责在指定条件满足是,将条件变量激活。
  • 一旦条件变量激活,线程1恢复运转。它首先检查指定条件是否真正满足(因为一个更高等级的调用也可能使线程1恢复运转)。
  • 如果是一个更高级的调用,那么线程1再次调用wait()函数。

具体的代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
using namespace std::placeholders;
class Application
{
std::mutex m_mutex;
std::condition_variable m_condVar;
bool m_bDataLoaded;
public:
Application()
{
m_bDataLoaded = false;
}
void loadData()
{
// Make This Thread sleep for 1 Second
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout<<"Loading Data from XML"<<std::endl;
// Lock The Data structure
std::lock_guard<std::mutex> guard(m_mutex);
// Set the flag to true, means data is loaded
m_bDataLoaded = true;
// Notify the condition variable
m_condVar.notify_one();
}
bool isDataLoaded()
{
return m_bDataLoaded;
}
void mainTask()
{
std::cout<<"Do Some Handshaking"<<std::endl;
// Acquire the lock
std::unique_lock<std::mutex> mlock(m_mutex);
// Start waiting for the Condition Variable to get signaled
// Wait() will internally release the lock and make the thread to block
// As soon as condition variable get signaled, resume the thread and
// again acquire the lock. Then check if condition is met or not
// If condition is met then continue else again go in wait.
m_condVar.wait(mlock, std::bind(&Application::isDataLoaded, this));
std::cout<<"Do Processing On loaded Data"<<std::endl;
}
};

int main()
{
Application app;
std::thread thread_1(&Application::mainTask, &app);
std::thread thread_2(&Application::loadData, &app);
thread_2.join();
thread_1.join();
return 0;
}