Linux :线程 【生产者消费者模型与信号量】
- (一)生产消费模型
- 1、生产消费模式概念
- 2、生产者消费者之间的关系
- 3、生产者消费者模型优点
- (二)基于BlockingQueue的生产者消费者模型
- 1、基于阻塞队列模型
- 2、模拟实现基于阻塞队列的生产消费模型
(一)生产消费模型
1、生产消费模式概念
- 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
- 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
- 这个阻塞队列就是用来给生产者和消费者解耦的。
2、生产者消费者之间的关系
生产者消费者模型是多线程同步与互斥的一个经典场景,最根本特点是 321原则:
- 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
- 两种角色: 生产者和消费者。(通常由进程或线程承担)
- 一个交易场所: 通常指的是内存中的一段缓冲区(阻塞队列、环形队列)。
为什么三种关系中都具有互斥关系??
因为容器(交易场所)会有多个执行流进行访问,而我们要保持临界资源的安全,需要加上互斥关系。
生产者和消费者之间为什么会存在同步关系?
如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。
反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。
互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。
3、生产者消费者模型优点
- 解耦
生产者、消费者、交易场所 各司其职,可以根据具体需求自由设计,很好地做到了 解耦,便于维护和扩展 - 支持并发。
- 支持忙闲不均。
生产者在生产时,无需关注消费者的状态,只需关注交易场所中是否有空闲位置
消费者在消费时,无需关注生产者的状态,只需关注交易场所中是否有就绪数据
(二)基于BlockingQueue的生产者消费者模型
1、基于阻塞队列模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别在于:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素
- 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出
(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
2、模拟实现基于阻塞队列的生产消费模型
阻塞队列实现代码:
#include <iostream>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#define NUM 5using namespace std;template <class T>
class BlockQueue
{
private:queue<T> _q; // 阻塞队列pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _consumers; // 消费者条件变量pthread_cond_t _producer; // 生产者条件变量int _capacity; // 阻塞队列容量
public://初始化锁和条件变量BlockQueue(int capacity = NUM): _capacity(capacity){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_consumers, nullptr);pthread_cond_init(&_producer, nullptr);}//销毁锁和条件变量~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumers);pthread_cond_destroy(&_producer);}//阻塞队列里插入数据void push(const T &in){pthread_mutex_lock(&_mutex);//当阻塞队列为空时不能再插入变量,需要做到同步,让线程进入到生产者的条件变量中去while (_q.size() == _capacity) //这里一定要是while ,因为唤醒可能造成伪唤醒(如果同时唤醒条件变量中的所有线程,那么可能造成伪唤醒){ pthread_cond_wait(&_producer, &_mutex);}_q.push(in);pthread_cond_signal(&_consumers); //push数据后,阻塞队列一定有数据存在,所以唤醒消费者的条件变量。pthread_mutex_unlock(&_mutex);}//从阻塞队列里拿出数据T pop(){pthread_mutex_lock(&_mutex);//当阻塞队列为空时,执行该代码的线程放入到消费者条件变量中while (_q.size() == 0){pthread_cond_wait(&_consumers, &_mutex);}T data = _q.front();_q.pop();pthread_cond_signal(&_producer); //pop数据后,阻塞队列一定是满的,唤醒生产者的条件变量pthread_mutex_unlock(&_mutex);return data;}
};
使用 互斥锁 + 条件变量 实现互斥与同步
生产者和消费者模型中传递的数据一般都是要经过处理的,例如:
- 生产者:
生成数据 + 传递数据 - 消费者:
获取数据 + 处理数据
为了方便理解我们简单实现一个任务类,Task.hpp代码如下:
#pragma once
#include <iostream>
#include <string>std::string opers = "+-*/%";enum // 规定错误码
{DivZero = 1, // 当 除数为0时ModZero, // 当 %的时候 ,xx % 0时Unknown // 出现错误的符号时
};class Task
{
private:int data1_; //int data2_;char oper_;int exitcode_; // 错误码,判断结果是否合理,默认为0int result_; // 结果public:Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0){}~Task(){}void run(){switch (oper_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':{if (data2_ == 0)exitcode_ = DivZero;elseresult_ = data1_ / data2_;}break;case '%':{if (data2_ == 0)exitcode_ = ModZero;elseresult_ = data1_ % data2_;}break;default:exitcode_ = Unknown;break;}}//两数运算 所指向的任务std::string GetTask(){std::string r = std::to_string(data1_);r += oper_;r += std::to_string(data2_);r += "= ";return r;}//两数运算的结果std::string GetResult(){std::string r = std::to_string(result_);r += "[code: ";r += std::to_string(exitcode_);r += "]";return r;}
};
该类主要是实现两数之间的运算,使用该类时需要我们提供具体的 两个数字 和 运算符。
接下来准备工作已经完毕,我们看看如何运用阻塞队列,为了方便观察数据我这里是单生产者单消费者的模型,main.cc代码如下:
#pragma Once
#include "BlockQueue.hpp"
#include <vector>
#include <string>
#include <ctime>
#include "Task.hpp"// std::string opers = "+-*/%";
void *Consumer(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true){// 接收任务Task data = bq->pop();usleep(10000);// 处理任务data.run();cout << endl;cout << "处理了任务 " << data.GetTask() << data.GetResult();}return nullptr;
}void *Producer(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true){// 产生任务char op = opers[rand() % opers.size()];int data1 = rand() % 20 + 1;int data2 = rand() % 10;usleep(10);Task t(data1, data2, op);// 发送任务bq->push(t);cout << endl;cout << "产生了一个任务=> " << t.GetTask() << "??";sleep(1);}return nullptr;
}int main()
{srand(time(nullptr));BlockQueue<Task> *bq = new BlockQueue<Task>(5); //阻塞队列//创造单生产者 单消费者pthread_t c, p;pthread_create(&c, nullptr, Producer, bq);pthread_create(&p, nullptr, Consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete bq;return 0;
}
运行效果如下:
生产消费模型在代码层的实际 就是线程的同步与互斥 。