电销如何介绍网站建设女孩学电子商务专业好就业吗
news/
2025/9/26 20:43:41/
文章来源:
电销如何介绍网站建设,女孩学电子商务专业好就业吗,登录建筑培训网,咪呜瀑布流WordPress模板目录
一、线程同步
1.生产消费模型#xff08;或生产者消费者模型#xff09;
2.认识同步
#xff08;1#xff09;生产消费模型中的同步
#xff08;2#xff09;生产者消费者模型的特点
二、条件变量
1.认识条件变量
2.条件变量的使用
3.代码改造
三、基于阻…
目录
一、线程同步
1.生产消费模型或生产者消费者模型
2.认识同步
1生产消费模型中的同步
2生产者消费者模型的特点
二、条件变量
1.认识条件变量
2.条件变量的使用
3.代码改造
三、基于阻塞队列的生产消费模型
1.阻塞队列类
1阻塞队列
2实现生产者的生产函数
3实现消费者的消费函数
2.pthread_cond_wait为什么要传入锁
3.生产者和消费者线程的执行函数
1执行函数
2试运行
4.部分细节处理
1伪唤醒问题
2解锁与唤醒的顺序
5.处理任务的生产消费模型
1代码改造
2运行
6.生产消费模型为何高效
四、双阻塞队列的生产消费模型
1.编写类代码
2.增加处理保存任务的函数
3.更改线程函数
4.更改main函数 一、线程同步
1.生产消费模型或生产者消费者模型
我们肯定有在超市买东西的经历比如买水。超市的瓶装水是供应商提供的所以供应商是生产者超市从供应商进货超市就是一个交易场所我们从超市买水我们就是消费者。
这些概念也可以转换到线程中我们将读取数据的线程叫做消费者线程消费者将产生数据的线程叫做生产者线程供应商将共享的特定数据结构叫做缓冲区交易场所。
超市中售卖的瓶装水品牌很多所以供应商肯定不止一个这些不同牌子瓶装水的生产者之间的关系就是竞争关系。所以多线程中各个生产者线程之间是互斥关系同一时刻只有一个生产者线程能访问缓冲区。
对于消费者线程也是一样的一个消费者买到了水其他人可能就买不到了。所以消费者线程和消费者线程之间也是互斥关系同一时刻也只有一个消费者线程能访问缓冲区。
超市也总要有补货的时间如果当前仓库里有货但是工作人员还没有将货物摆在货架上那到来的顾客就以为这里没有水。
为了避免出问题应当在超市补货时阻止消费者进入。由于缓冲区数据又被错误覆盖的风险所以最好在消费者线程访问缓冲区时不允许生产者线程访问缓冲区反之亦然。
消费者线程和生产者线程之间也是互斥关系在同一时间内只有一个线程可以访问缓冲区。
2.认识同步
1生产消费模型中的同步
在保证数据安全的前提下让线程能够按照某种特定的顺序访问临界资源从而有效避免饥饿问题叫做同步。
首先什么叫饥饿状态
我以多线程抢票代码为例如果每个线程抢完票后都没有进行其他处理动作时第一个申请到锁的线程更容易申请到锁。最终大部分票都被一个线程抢走。而其他线程竞争能力弱缺乏调度这些线程就处于饥饿状态。
那什么是同步
而同步就是让所有线程按照一定顺序来抢票尽可能做到人人有份避免线程饥饿问题产生。
再次回到超市买水供应商不能没完没了地向超市供货一方面超市一直关着消费者无法消费另一方面超市又不是四次元口袋总是要装满的。
同样消费者也不能没完没了地买水一方面超市不关门供货商不能进货另一方面瓶装水又肯定会卖完。
所以最好生产者先供货货架摆满了就不进货了。消费者来买当水卖完了再让供应商进货让消费者和生产者协同起来。
所以消费者线程和生产者线程之间也是同步关系。生产者线程和消费者线程需要按照一定顺序去访问缓冲区。
所以我们可以将生产消费模型总结为321原则3种关系、2种角色、1个交易场所。
3种关系生产者和生产者(互斥关系)消费者和消费者(互斥关系)生产者和消费者(互斥和同步关系)2种角色生产者和消费者1个交易场所一段特定结构的缓冲区
生产消费模型的运行本质就是321原则。
2生产者消费者模型的特点
对供货商而言只需要给超市供大量的货即可不用关心消费者什么时候来买。
对消费者而言只需要直接去超市买方便面就行不用等待方便面的生产运输。
对超市而言只需要在水卖完时告诉供货商进货进完货后告诉消费者来买。
生产消费模型中消费者和生产者各自只需要关心自己所做的事情生产与消费线程线程之间完全独立在计算机科学的角度我们称其实现了消费者线程和生产者线程之间的解耦。
我们大部分人在周一到周五都是上班上学所以这些时间去超市买水的人会相对少这个时候超市就可以适时多进货。而周末大家都放假了去超市买水的人变多因为之前进货也很多了就不需要进货了。
就像上面所说的策略生产消费模型解决了生产者线程和消费者线程忙闲不均的问题。
而超市作为交易场所能够储存更多的物品同样缓冲区也能储存更多的数据。如果消费者直接去找供货商供货商一般都不会零售。纵使能够零售直接去找生产者还要等待生成者完成商品生产消耗时间成本高效率低。
生产者消费者模型提高了了生产者线程和消费者线程的执行效率。
二、条件变量
1.认识条件变量
条件变量是用来描述某种临界资源是否就绪的一种数据化描述。
比如说存在一个共享的容器生产者线程负责生产数据到容器内消费者线程负责从容器中中读取数据。消费者线程发现容器为空时就不应当去竞争锁而是阻塞等待直到生产者线程将数据生成到容器中。
要想让消费者线程等待那就必须使用条件变量标识容器的状态那么就需要用到条件变量。
那条件变量到底是什么呢
假设超市的架子进货一次只放一瓶水只有这瓶水被买走后供货商才会进货。
此时又有很多消费者来买水只有竞争能力强的消费者才能买到水甚至他们会不停地买。竞争能力弱的消费者买不到水。放在线程中也是一样的竞争能力弱的消费者线程始终抢不到锁产生了饥饿问题。
为了解决这个问题超市的工作人员设置了一个柜台所有消费者都在这里排队有一瓶水摆上货架工作人员就允许一个消费者进去买没有水所有人就需要在外面等待。而如果消费者想买第二瓶就只能重新排队。而这个柜台和工作人员就相当于条件变量。
多线程互斥访问临界资源时为了让这些线程按一定顺序访问。通常会将这些线程都放在条件变量的等待队列中当其他线程让条件变量符合线程的唤醒条件时队列中的第一个线程就会去访问临界资源。
2.条件变量的使用
条件变量同样是一个类pthread_cond_t由POSIX线程库维护使用的是POSIX标准。它也可以构造对象pthread_cond_t condcond就是条件变量的对象。
以下是条件变量的一些成员函数和使用代码
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
头文件pthread.h
功能初始化条件变量。
参数pthread_cond_t *restrict cond表示需要被初始化的条件变量的地址const pthread_condattr_t *restrict attr表示条件变量的属性一般都为nullptr。
返回值取消成功返回0取消失败返回错误码。
int pthread_cond_destroy(pthread_cond_t *cond);
头文件pthread.h
功能销毁互斥条件变量。
参数pthread_cond_t *cond表示需要被销毁的条件变量的地址。
返回值销毁成功返回0失败返回错误码。
pthread_cond_t cond PTHREAD_COND_INITIALIZER;
如果是全局或static修饰的条件变量使用上面语句初始化。
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
头文件pthread.h
功能将调用该接口的线程放入传入的条件变量等待队列中。
参数pthread_cond_t *restrict cond创建的条件变量地址。pthread_mutex_t *restrict mutex互斥锁的地址为什么传锁以后会解释。
返回值放入等待队列成功返回0失败返回错误码。
int pthread_cond_signal(pthread_cond_t *cond);
头文件pthread.h
功能由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的一个线程。
参数pthread_cond_t *cond表示需要唤醒的线程所在的等待队列的条件变量地址。
返回值唤醒成功返回0失败返回错误码。
int pthread_cond_broadcast(pthread_cond_t *cond);
头文件pthread.h
功能由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的所有线程。
参数pthread_cond_t *cond表示需要唤醒的线程所在的等待队列的条件变量地址。
返回值唤醒成功返回0失败返回错误码。
3.代码改造
我们使用条件变量使所有进程可以以一定顺序抢票。
#includeiostream
#includepthread.h
#includeunistd.h
#includestdio.h
#includevector
using namespace std;#define NUM 5pthread_mutex_t mutx PTHREAD_MUTEX_INITIALIZER;//构建一个全局锁
pthread_cond_t cond PTHREAD_COND_INITIALIZER;//构建一个全局条件变量
int tickets 10;class pthread_data
{
public:pthread_t tid;char buffer[64];
};void* start_routine(void* args)
{pthread_data* p (pthread_data*)args;string s;s p-buffer;s Remaining tickets:;while(1){pthread_mutex_lock(mutx);//加锁pthread_cond_wait(cond, mutx);//线程进入等待队列if(tickets 0){--tickets;pthread_mutex_unlock(mutx);//解锁printf(%s%d\n, s.c_str(), tickets);//不修改临界资源可以不包含在内}else{pthread_mutex_unlock(mutx);//解锁break;}}pthread_exit(nullptr);
}int main()
{vectorpthread_data* vpd;//创建多个线程for(int i 0; iNUM; i){pthread_data* pd new pthread_data;snprintf(pd-buffer, sizeof(pd-buffer), thread%d buy ticket:,i1);pthread_create((pd-tid), nullptr, start_routine, (void*)pd);vpd.push_back(pd);}//主线程唤醒其他线程for(;;){sleep(1);pthread_cond_signal(cond);printf(main thread wake up a thread\n);}//线程回收for(int i 0; iNUM; i){pthread_join(vpd[i]-tid, nullptr);delete vpd[i];}return 0;
}
条件变量、票数和锁都是全局变量每个线程申请锁成功就进入条件变量等待队列。主线程每个一秒钟唤醒一个等待的线程抢票。
运行结果 可以发现线程按12345的顺序循环抢票。
使用pthread_cond_broadcast()接口可以一次唤醒条件变量等待队列中的的所有线程每隔一秒唤醒一次。 运行结果 仍然是按照一定顺序抢票只是进行抢票的线程是5个同时进行。
三、基于阻塞队列的生产消费模型
既然讲了这么半天的生产消费模型那我们不妨实现一个。
1.阻塞队列类
首先需要搭建模型的框架也就是实现包括生产者线程、消费者线程还有一个储存数据的阻塞队列缓冲区的简单执行代码。
1阻塞队列
阻塞队列的实现有以下注意事项
阻塞队列可使用CSTL中的queue实现。
由于阻塞队列是公共资源所以必须保证它是线程安全的。生产者线程和消费者线程需要互斥访问其实也只需要一把互斥锁就能实现生产者和消费者间的互斥。以后的生产者和生产者消费者和消费者之间的互斥也是这样实现的。
只有阻塞队列中有数据消费者才能读取消费者读取时生产者不能生产必须在等待队列中。
阻塞队列中没有数据或者数据未填满时生产者才能生产消费者在生产的时候消费者不能读取必须在等待队列中。
templateclaas T
class Blockqueue
{
public://构造函数Blockqueue(size_t capcity MAX_NUM):_capcity(capcity){pthread_mutex_init(_mutx, nullptr);pthread_cond_init(_pcond, nullptr);pthread_cond_init(_ccond, nullptr);}//析构函数~Blockqueue(){pthread_mutex_destroy(_mutx);pthread_cond_destroy(_pcond);pthread_cond_destroy(_ccond);}//生产数据void push(const T data);//消费数据void pop(T* data);
private://检测队列是否装满size_t Isfull() const{return (_q.size() _capcity);}std::queueT _q;pthread_mutex_t _mutx;pthread_cond_t _pcond;pthread_cond_t _ccond;size_t _capcity;
};
为了保持生产者和消费者的互斥我们对生产者和消费者各使用一个条件变量用一个锁控制对阻塞队列的访问。
需要给阻塞队列储存的数据量设置一个上限生产者线程不能无限制地生产数据。
构造函数中初始化锁和条件变量在析构函数中释放互斥锁和条件变量。阻塞队列的容量设置一个合适的缺省值。
2实现生产者的生产函数
生产数据我们一般使用push作为函数名。
void push(const T data)
{//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是满的那就需要将生产者线程加入等待队列挂起if(Isfull()){pthread_cond_wait(_pcond, _mutx)}_q.push(data);//唤醒消费者线程消费pthread_cond_signal(_ccond, _mutx);//解锁pthread_mutex_unlock(_mutx);
}
生产者线程调用生产数据接口时先申请锁进入临界区。
当阻塞队列满时在生产者条件变量将线程放入等待队列中挂起。
当阻塞队列不满时生产者生产数据到阻塞队列由于消费者线程全部在等待所以需要唤醒消费者线程消费数据否则生产者会一直生产至满。
3实现消费者的消费函数
消费数据我们一般使用pop作为函数名。
void pop(T* data)
{//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是空的那就需要将消费者线程加入等待队列挂起if(_q.empty()){pthread_cond_wait(_ccond, _mutx)}//将数据输出到data中并删除*data _q.front();_q.pop();//唤醒生产者线程生产pthread_cond_signal(_pcond, _mutx);//解锁pthread_mutex_unlock(_mutx);
}
消费者线程同样先申请到锁后进入临界区。
当阻塞队列为空时没有数据可以消费消费者挂起等待。
当阻塞队列为不为空时消费者消费数据由生产者线程全部在等待所以需要唤醒生产者线程消费数据否则消费者会一直消费至空。
2.pthread_cond_wait为什么要传入锁
使用pthread_cond_wait接口时必须传如一个锁这一点我们没有解释。
线程在条件变量的等待队列中排队等待其目的就是要拿到要访问临界资源的那把锁申请到锁线程就可以进入临界区。
如果一个线程拿到了锁而又发现自己不满足条件需要挂起等待。按照之前的知识该线程应该继续拿着锁进入条件变量的等待队列。即使其他线程被唤醒了因为申请不到到锁无法访问共享资源只能被挂起。
为了解决这个问题pthread_cond_wiat的实现大致分三个步骤挂起该线程-释放锁-记录锁。
也就是说只要是持有锁的线程进入等待队列就自动释放自己持有的锁而释放锁也是原子性操作不会引起线程安全问题。
在最后接口还会记录当前进程挂起时释放的锁这也就解释了为什么唤醒线程的时候pthread_cond_signal(pthread_cond_t* cond)只有一个参数被唤醒线程根据记录就知道它该去申请哪把锁。
我们最终得到以下结论
参数传递的这个锁必须是正在使用的锁。调用pthread_cond_wait函数的进程会以原子性的方式将锁释放并将自己挂起。线程在被唤醒的时候会自动重新获取挂起时传入的锁。
3.生产者和消费者线程的执行函数
1执行函数
生产者不断生成随机数再将数据插入阻塞队列。消费者不断将随机数再从阻塞队列中拿出来。
//生产者
void* Produce(void* args)
{Blockqueueint* bq (Blockqueueint*)args;while(1){sleep(1);int data rand()%10;bq-push(data);printf(生产数据完成数据为%d\n, data);}return nullptr;
}//消费者
void* Consume(void* args)
{Blockqueueint* bq (Blockqueueint*)args;while(1){sleep(1);int data 0;bq-pop(data);printf(消费数据完成数据为%d\n, data);}return nullptr;
}
2试运行
我们只创建一个生产者线程和一个消费者线程并且让生产者每一秒生产一个数据所以最终代码如下
#includeiostream
#includequeue
#includestdlib.h
#includetime.h
#includeunistd.h
using namespace std;#define MAX_NUM 10templateclass T
class Blockqueue
{
public://构造函数Blockqueue(size_t capcity MAX_NUM):_capcity(capcity){pthread_mutex_init(_mutx, nullptr);pthread_cond_init(_pcond, nullptr);pthread_cond_init(_ccond, nullptr);}//析构函数~Blockqueue(){pthread_mutex_destroy(_mutx);pthread_cond_destroy(_pcond);pthread_cond_destroy(_ccond);}//生产数据void push(const T data){//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是满的那就需要将生产者线程加入等待队列挂起if(Isfull()){pthread_cond_wait(_pcond, _mutx);}_q.push(data);//唤醒消费者线程消费pthread_cond_signal(_ccond);//解锁pthread_mutex_unlock(_mutx);}//消费数据void pop(T* data){//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是空的那就需要将消费者线程加入等待队列挂起if(_q.empty()){pthread_cond_wait(_ccond, _mutx);}//将数据输出到data中并删除*data _q.front();_q.pop();//唤醒生产者线程生产pthread_cond_signal(_pcond);//解锁pthread_mutex_unlock(_mutx);}private://检测队列是否装满size_t Isfull() const{return (_q.size() _capcity);}std::queueT _q;pthread_mutex_t _mutx;pthread_cond_t _pcond;pthread_cond_t _ccond;size_t _capcity;
};//生产者
void* Produce(void* args)
{Blockqueueint* bq (Blockqueueint*)args;while(1){sleep(1);int data rand()%10;bq-push(data);printf(生产数据完成数据为%d\n, data);}return nullptr;
}//消费者
void* Consume(void* args)
{Blockqueueint* bq (Blockqueueint*)args;while(1){//sleep(1);int data 0;bq-pop(data);printf(消费数据完成数据为%d\n, data);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr));Blockqueueint* bq new Blockqueueint;pthread_t tids[2];pthread_create(tids[0], nullptr, Produce, (void*)bq);pthread_create(tids[1], nullptr, Consume, (void*)bq);pthread_join(tids[0], nullptr);pthread_join(tids[1], nullptr);return 0;
}
运行结果 4.部分细节处理
对于一个消费者和一个生产者的模型而言上面的代码的确足够了。但是生产消费模型的生产者和消费者都应当有多个此时我们就需要对其进行修改。
1伪唤醒问题
如果现在有多个生产者线程在进行数据生产。当阻塞队列满了以后所有生产者线程都会在条件变量的等待队列中等待。
第一种情况某个生产者线程调用挂起接口失败。
pthread_cond_wait即使调用失败它也只会返回错误码并不能阻断执行流继续向下执行。所以即使出错生产者还是会生成数据到阻塞队列中。
第二种情况一个消费者线程一次唤醒所有生产者线程。
比如现在阻塞队列满了消费者线程消费了一个数据并且唤醒了所有的生产者线程那么很多个生产者向阻塞队列的一个空位置生成数据同样会出现上述问题。
上面这种情况被叫做伪唤醒再生产者和消费者线程中都存在这个问题。所以需要让执行流只要不满足队列满或空的条件就循环执行pthread_cond_wait。执行成功了该线程就被挂起了执行流不再运行执行失败了执行流会一直调用pthread_cond_wait执行流也不会向下操作数据。
生产数据 消费数据 2解锁与唤醒的顺序
在生产者生产完数据后它需要做两件事唤醒消费者线程和归还锁。
对于消费者进程也是一样的唤醒生产者线程和归还锁。
由于唤醒线程是不对共享资源进行操作的所以对于唤醒线程和解锁的顺序谁先谁后都可以。只是更建议先唤醒再解锁。 5.处理任务的生产消费模型
1代码改造
我们使用生产消费模型可不是用来保存随机数的而是用它处理任务的。
我们可以写一个保存计算方法的任务类从而实现一个随机数计算器。
任务类
//任务类
class Task
{typedef std::functionint(int,int,char) func_t;
public://默认构造Task(){}//构造函数Task(int a, int b, char op, func_t func):_a(a),_b(b),_op(op),_func(func){}//仿函数string operator()(){int result _func(_a, _b, _op);char buffer[64];snprintf(buffer, sizeof(buffer), %d %c %d %d\n, _a, _op, _b, result);string s(buffer);return s;}//显示任务string show_task(){char buffer[64];snprintf(buffer, sizeof(buffer), %d %c %d ?\n, _a, _op, _b);string s(buffer);return s;}
private:func_t _func;int _a;int _b;char _op;
};
设置处理任务的函数修改生产者和消费者线程执行的函数。
//计算器函数
const string ops -*/%;
int calculate(int a, int b, char op)
{int result 0;switch(op){case :result a b;break;case -:result a - b;break;case *:result a * b;break;case /:{if(b 0)cerr 除数不能为0\n;elseresult a / b;}break;case %:{if(b 0)cerr 取模的数字不能为0\n;elseresult a % b;}break;default:break;}return result;
}//生产者
void* Produce(void* args)
{BlockqueueTask* bq (BlockqueueTask*)args;while(1){sleep(1);int a rand()%10;int b rand()%10;int opnum rand()%ops.size();Task data(a, b, ops[opnum], calculate);string s 数据生产完成需要计算;bq-push(data);s data.show_task().c_str();cout s;}return nullptr;
}
2运行
在主线程中多创建几个线程就可以运行了。
总代码如下
#includeiostream
#includequeue
#includestdlib.h
#includetime.h
#includeunistd.h
#includefunctional
using namespace std;#define MAX_NUM 10//任务类
class Task
{typedef std::functionint(int,int,char) func_t;
public://默认构造Task(){}//构造函数Task(int a, int b, char op, func_t func):_a(a),_b(b),_op(op),_func(func){}//仿函数string operator()(){int result _func(_a, _b, _op);char buffer[64];snprintf(buffer, sizeof(buffer), %d %c %d %d\n, _a, _op, _b, result);string s(buffer);return s;}//显示任务string show_task(){char buffer[64];snprintf(buffer, sizeof(buffer), %d %c %d ?\n, _a, _op, _b);string s(buffer);return s;}
private:func_t _func;int _a;int _b;char _op;
};templateclass T
class Blockqueue
{
public://构造函数Blockqueue(size_t capcity MAX_NUM):_capcity(capcity){pthread_mutex_init(_mutx, nullptr);pthread_cond_init(_pcond, nullptr);pthread_cond_init(_ccond, nullptr);}//析构函数~Blockqueue(){pthread_mutex_destroy(_mutx);pthread_cond_destroy(_pcond);pthread_cond_destroy(_ccond);}//生产数据void push(const T data){//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是满的那就需要将生产者线程加入等待队列挂起while(Isfull()){pthread_cond_wait(_pcond, _mutx);}_q.push(data);//唤醒消费者线程消费pthread_cond_signal(_ccond);//解锁pthread_mutex_unlock(_mutx);}//消费数据void pop(T* data){//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是空的那就需要将消费者线程加入等待队列挂起while(_q.empty()){pthread_cond_wait(_ccond, _mutx);}//将数据输出到data中并删除*data _q.front();_q.pop();//唤醒生产者线程生产pthread_cond_signal(_pcond);//解锁pthread_mutex_unlock(_mutx);}private://检测队列是否装满size_t Isfull() const{return (_q.size() _capcity);}std::queueT _q;pthread_mutex_t _mutx;pthread_cond_t _pcond;pthread_cond_t _ccond;size_t _capcity;
};//计算器函数
const string ops -*/%;
int calculate(int a, int b, char op)
{int result 0;switch(op){case :result a b;break;case -:result a - b;break;case *:result a * b;break;case /:{if(b 0)cerr 除数不能为0\n;elseresult a / b;}break;case %:{if(b 0)cerr 取模的数字不能为0\n;elseresult a % b;}break;default:break;}return result;
}//生产者
void* Produce(void* args)
{BlockqueueTask* bq (BlockqueueTask*)args;while(1){sleep(1);int a rand()%10;int b rand()%10;int opnum rand()%ops.size();Task data(a, b, ops[opnum], calculate);string s 数据生产完成需要计算;bq-push(data);s data.show_task().c_str();cout s;}return nullptr;
}//消费者
void* Consume(void* args)
{BlockqueueTask* bq (BlockqueueTask*)args;while(1){//sleep(1);Task data;string s 数据消费完成计算结果为;bq-pop(data);string result data();s result;cout s;}return nullptr;
}#define NUM_PRODUCE 3
#define NUM_CONSUME 3int main()
{srand((unsigned int)time(nullptr));BlockqueueTask* bq new BlockqueueTask;pthread_t ptids[NUM_PRODUCE];pthread_t ctids[NUM_CONSUME];//创建多个生产者线程for(int i 0; iNUM_PRODUCE; i){pthread_create(ptids[i], nullptr, Produce, (void*)bq);}//创建多个消费者线程for(int i 0; iNUM_CONSUME; i){pthread_create(ctids[i], nullptr, Consume, (void*)bq);}//回收所有线程for(int i 0; iNUM_PRODUCE; i){pthread_join(ptids[i], nullptr);}for(int i 0; iNUM_CONSUME; i){pthread_join(ctids[i], nullptr);}return 0;
}
运行结果 有一个地方我一直没说像cout 所以我们在打印信息时尽量使用单句printf或者cout 6.生产消费模型为何高效
该模型中多个生产者线程向阻塞队列生成数据多个消费者线程也从阻塞队列中消费数据。
各生产消费者线程之间互斥关系各线程对于阻塞队列的访问是串行的。同一时间访问阻塞队列的线程只有一个拿这样又何来高效呢
注意观察程序的运行逻辑我们能发现只有临界区的代码是串行的其他代码所有线程都是并发执行的。这些非临界区的代码通常耗时长而它们是并发的所以该模型的效率就变得很高。
结论生产消费模型的高效不体现在对临界资源的访问上而是体现在对非临界区代码的并发执行上。
四、双阻塞队列的生产消费模型
我么们可以使用上面的生产者消费者模型将消费者处理完的计算任务保存成日志并储存到磁盘上。
所以该模型有两个阻塞队列一个阻塞队列用于保存计算任务另一个阻塞队列用于保存保存任务。原来的生产者线程还是生产者原来的消费者作为中间的线程既是消费者也是生产者保存线程是消费者。
1.编写类代码
我们需要构建四个类计算任务类、保存任务类、阻塞队列类和多队列集合类。
计算任务类就是之前的Task类我们其他代码不用动对它改个CalTask的名字就可以。保存任务类与计算任务类很相似需要传入一个string表示需要打印的信息还有一个函数对象表示保存数据的具体执行函数。阻塞队列类也不用改多队列集合类是为了线程执行函数的void* args传参设计的包含了两个阻塞队列指针。
//计算任务类
class CalTask
{typedef std::functionint(int,int,char) func_t;
public://默认构造CalTask(){}//构造函数CalTask(int a, int b, char op, func_t func):_a(a),_b(b),_op(op),_func(func){}//仿函数string operator()(){int result _func(_a, _b, _op);char buffer[64];snprintf(buffer, sizeof(buffer), %d %c %d %d\n, _a, _op, _b, result);string s(buffer);return s;}//显示任务string show_task(){char buffer[64];snprintf(buffer, sizeof(buffer), %d %c %d ?\n, _a, _op, _b);string s(buffer);return s;}
private:func_t _func;int _a;int _b;char _op;
};//保存任务类
class SaveTask
{typedef functionvoid(const string) func_t;
public://默认构造SaveTask(){}//构造函数SaveTask(string message, func_t func):_message(message),_func(func){}//仿函数void operator()(){_func(_message);}
private:string _message;func_t _func;
};//阻塞队列类
templateclass T
class Blockqueue
{
public://构造函数Blockqueue(size_t capcity MAX_NUM):_capcity(capcity){pthread_mutex_init(_mutx, nullptr);pthread_cond_init(_pcond, nullptr);pthread_cond_init(_ccond, nullptr);}//析构函数~Blockqueue(){pthread_mutex_destroy(_mutx);pthread_cond_destroy(_pcond);pthread_cond_destroy(_ccond);}//生产数据void push(const T data){//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是满的那就需要将生产者线程加入等待队列挂起while(Isfull()){pthread_cond_wait(_pcond, _mutx);}_q.push(data);//唤醒消费者线程消费pthread_cond_signal(_ccond);//解锁pthread_mutex_unlock(_mutx);}//消费数据void pop(T* data){//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是空的那就需要将消费者线程加入等待队列挂起while(_q.empty()){pthread_cond_wait(_ccond, _mutx);}//将数据输出到data中并删除*data _q.front();_q.pop();//唤醒生产者线程生产pthread_cond_signal(_pcond);//解锁pthread_mutex_unlock(_mutx);}private://检测队列是否装满size_t Isfull() const{return (_q.size() _capcity);}std::queueT _q;pthread_mutex_t _mutx;pthread_cond_t _pcond;pthread_cond_t _ccond;size_t _capcity;
};//多队列集合类
templateclass A,class B
class Blockqueues
{
public:BlockqueueA* _q1;BlockqueueB* _q2;
};
2.增加处理保存任务的函数
我们之前对计算任务有一个处理函数calculate那处理保存任务也同样需要一个函数Savedata。
//保存函数
void Savedata(const string message)
{//需要保存信息的文件char buffer[64] log.txt;//按追加方式打开文件FILE* fp fopen(buffer, a);if(fp nullptr){cerr 文件打开失败 endl;return;}fprintf(fp, %s, message.c_str());fclose(fp);
}
3.更改线程函数
生产者线程此时依旧生产只不过生产的位置是第一个阻塞队列。
消费者线程此时也成为了后一个模型的生产者那就需要添加向第二个队列生产的代码。
保存者线程此时是消费者从第二个队列中取任务执行。
//生产者
void* Produce(void* args)
{BlockqueuesCalTask, SaveTask* bqs (BlockqueuesCalTask, SaveTask*)args;//生产数据while(1){sleep(1);int a rand()%10;int b rand()%10;int opnum rand()%ops.size();CalTask data(a, b, ops[opnum], calculate);string s 数据生产完成需要计算;bqs-_q1-push(data);s data.show_task().c_str();cout s;}return nullptr;
}//消费者
void* Consume(void* args)
{BlockqueuesCalTask, SaveTask* bqs (BlockqueuesCalTask, SaveTask*)args;while(1){//消费数据//sleep(1);CalTask data;string s1 数据消费完成计算结果为;bqs-_q1-pop(data);string result data();s1 result;cout s1;//生成待保存的数据string s2 数据保存任务推送完毕\n;SaveTask task SaveTask(result, Savedata);bqs-_q2-push(task);cout s2;}return nullptr;
}void* Save(void* args)
{BlockqueuesCalTask, SaveTask* bqs (BlockqueuesCalTask, SaveTask*)args;while(1){//sleep(1);SaveTask data;string s 数据保存完成\n;bqs-_q2-pop(data);data();cout s;}return nullptr;
}
4.更改main函数
最后我们在main函数中创建两个队列创建三种线程最后加上回收代码就可以了。
所以总代码如下
produce_consume.h
#includeiostream
#includequeue
#includestdlib.h
#includetime.h
#includeunistd.h
#includefunctional
#includestdio.h
#define MAX_NUM 10
using namespace std;
//计算任务类
class CalTask
{typedef std::functionint(int,int,char) func_t;
public://默认构造CalTask(){}//构造函数CalTask(int a, int b, char op, func_t func):_a(a),_b(b),_op(op),_func(func){}//仿函数string operator()(){int result _func(_a, _b, _op);char buffer[64];snprintf(buffer, sizeof(buffer), %d %c %d %d\n, _a, _op, _b, result);string s(buffer);return s;}//显示任务string show_task(){char buffer[64];snprintf(buffer, sizeof(buffer), %d %c %d ?\n, _a, _op, _b);string s(buffer);return s;}
private:func_t _func;int _a;int _b;char _op;
};//保存任务类
class SaveTask
{typedef functionvoid(const string) func_t;
public://默认构造SaveTask(){}//构造函数SaveTask(string message, func_t func):_message(message),_func(func){}//仿函数void operator()(){_func(_message);}
private:string _message;func_t _func;
};//阻塞队列类
templateclass T
class Blockqueue
{
public://构造函数Blockqueue(size_t capcity MAX_NUM):_capcity(capcity){pthread_mutex_init(_mutx, nullptr);pthread_cond_init(_pcond, nullptr);pthread_cond_init(_ccond, nullptr);}//析构函数~Blockqueue(){pthread_mutex_destroy(_mutx);pthread_cond_destroy(_pcond);pthread_cond_destroy(_ccond);}//生产数据void push(const T data){//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是满的那就需要将生产者线程加入等待队列挂起while(Isfull()){pthread_cond_wait(_pcond, _mutx);}_q.push(data);//唤醒消费者线程消费pthread_cond_signal(_ccond);//解锁pthread_mutex_unlock(_mutx);}//消费数据void pop(T* data){//下面的判断就开始使用共享资源需要加锁pthread_mutex_lock(_mutx);//如果当前队列是空的那就需要将消费者线程加入等待队列挂起while(_q.empty()){pthread_cond_wait(_ccond, _mutx);}//将数据输出到data中并删除*data _q.front();_q.pop();//唤醒生产者线程生产pthread_cond_signal(_pcond);//解锁pthread_mutex_unlock(_mutx);}private://检测队列是否装满size_t Isfull() const{return (_q.size() _capcity);}std::queueT _q;pthread_mutex_t _mutx;pthread_cond_t _pcond;pthread_cond_t _ccond;size_t _capcity;
};//多队列集合类
templateclass A,class B
class Blockqueues
{
public:BlockqueueA* _q1;BlockqueueB* _q2;
};
produce_consume.cc
#includeproduce_consume.h
using namespace std;//计算器函数
const string ops -*/%;
int calculate(int a, int b, char op)
{int result 0;switch(op){case :result a b;break;case -:result a - b;break;case *:result a * b;break;case /:{if(b 0)cerr 除数不能为0\n;elseresult a / b;}break;case %:{if(b 0)cerr 取模的数字不能为0\n;elseresult a % b;}break;default:break;}return result;
}//保存函数
void Savedata(const string message)
{//需要保存信息的文件char buffer[64] log.txt;//按追加方式打开文件FILE* fp fopen(buffer, a);if(fp nullptr){cerr 文件打开失败 endl;return;}fprintf(fp, %s, message.c_str());fclose(fp);
}//生产者
void* Produce(void* args)
{BlockqueuesCalTask, SaveTask* bqs (BlockqueuesCalTask, SaveTask*)args;//生产数据while(1){sleep(1);int a rand()%10;int b rand()%10;int opnum rand()%ops.size();CalTask data(a, b, ops[opnum], calculate);string s 数据生产完成需要计算;bqs-_q1-push(data);s data.show_task().c_str();cout s;}return nullptr;
}//消费者
void* Consume(void* args)
{BlockqueuesCalTask, SaveTask* bqs (BlockqueuesCalTask, SaveTask*)args;while(1){//消费数据//sleep(1);CalTask data;string s1 数据消费完成计算结果为;bqs-_q1-pop(data);string result data();s1 result;cout s1;//生成待保存的数据string s2 数据保存任务推送完毕\n;SaveTask task SaveTask(result, Savedata);bqs-_q2-push(task);cout s2;}return nullptr;
}void* Save(void* args)
{BlockqueuesCalTask, SaveTask* bqs (BlockqueuesCalTask, SaveTask*)args;while(1){//sleep(1);SaveTask data;string s 数据保存完成\n;bqs-_q2-pop(data);data();cout s;}return nullptr;
}#define NUM_PRODUCE 3
#define NUM_CONSUME 3
#define NUM_SAVE 3int main()
{srand((unsigned int)time(nullptr));BlockqueuesCalTask, SaveTask* bqs new BlockqueuesCalTask, SaveTask();bqs-_q1 new BlockqueueCalTask();bqs-_q2 new BlockqueueSaveTask();pthread_t ptids[NUM_PRODUCE];pthread_t ctids[NUM_CONSUME];pthread_t stids[NUM_SAVE];//创建多个生产者线程for(int i 0; iNUM_PRODUCE; i){pthread_create(ptids[i], nullptr, Produce, (void*)bqs);}//创建多个消费者线程for(int i 0; iNUM_CONSUME; i){pthread_create(ctids[i], nullptr, Consume, (void*)bqs);}//创建多个保存者线程for(int i 0; iNUM_CONSUME; i){pthread_create(stids[i], nullptr, Save, (void*)bqs);}//回收所有线程for(int i 0; iNUM_PRODUCE; i){pthread_join(ptids[i], nullptr);}for(int i 0; iNUM_CONSUME; i){pthread_join(ctids[i], nullptr);}for(int i 0; iNUM_SAVE; i){pthread_join(stids[i], nullptr);}return 0;
}
运行结果
目录中确实多了一个log.txt也能看出线程的具体执行轨迹。 基于阻塞队列的生产者消费者模型是线程同步与互斥的充分应用现实中很多场景都可以应用是线程中的一大杀器。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/918762.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!