Pthread条件变量同步

前面说到用互斥量做同步结果不怎么理想。因为OS系统对线程调度随机的。自己实现的resume/yield 原语不是原子的。操作系统可以扣住原语中的一部分语句搁置起来。这可以导致实际运行中,两个原语对sched互斥量的上锁、解锁,以逆序的方式出现。这时,代码中设计的调度线程挂起就会失效。如果操作系统继续扣住读写线程yield的最后一个mutex_lock()语句。它的作用是让读写线程挂起。

那么是是什么情况。没有线程挂在互斥量上!互斥量的上锁、解锁操作脱离了咒语的魔法。调度线程对一堆离线互斥量自以为是的做了上锁、解锁操作,什么效果都没有。又因为调度线程对sched互斥量的上锁、解锁的积累效果为0。如果调度线程一个人跑了一圈没有挂起,那么再跑第二圈也不会挂起…。如果经过奇数次空转,操作系统终于把扣住mutex_lock()语句放行了,把读写线程放了进来。咒语恢复了魔力。读写线程终于被调度线程解锁。这时读写线程会看到,bx缓冲区仍然是上次它休眠前的样子,无法做操作,只能再次yield让出处理机。这就是前文那个if需要改为while的原因。

究其原因,调度线程无法测试读写线程是否执行了mutex_lock(),把自己挂在互斥量,因而暂停了运行,还是仅仅因为操作系统扣住了它。所以想到了要用条件变量。这是唯一办法。条件变量用pthread_cond_wait()挂起线程,用pthread_cond_signal()唤醒线程。用pthread_cond_wait()挂起线程的时候,同时也解锁了互斥量。这样测试互斥量就可以知道线程有没有挂起。正好解决了前面遇到的难题。注意!除了开始的初始化语句。读写线程是全文上锁的,除非用pthread_cond_wait()进入休眠了。所以可以做这个测试。别的线程测到互斥量可以打开,就是这个线程已经休眠了。

现在调度的数据结构改成这样了:

typedef struct _Context_ {int finish;struct {pthread_mutex_t mutex;pthread_cond_t cond;volatile int wait_sched_lock_thread;int state;} thread[2];
} Context;

读写线程用条件变量睡眠,调度线程用对应的signal语句唤醒它。调度线程(resume)下面的mutex_lock()会挂起,最后被读写线程的pthread_cond_wait()唤醒。

void resume(int i)
{pthread_cond_signal(&gCtx.thread[i].cond);while (gCtx.thread[i].wait_sched_lock_thread == 0) {sched_yield();}gCtx.thread[i].wait_sched_lock_thread = 0;mutex_lock(&gCtx.thread[i].mutex);mutex_unlock(&gCtx.thread[i].mutex);
}void yield(int i)
{
//printf("thread %d yield\n", th);pthread_cond_wait(&gCtx.thread[i].cond, &gCtx.thread[i].mutex);gCtx.thread[i].wait_sched_lock_thread = 999;
}

唤醒之后还是需要自己同步一下的。resume()中的下一条语句 mutex_lock();会和pthread_cond_wait()醒来时的互斥量上锁产生竞争。如果resume()先得到锁,resume()不会暂停,而读写线程也不能恢复执行。做这个同步就比较容易了,只要用个变量做个标记检查一下就行了。

具体的代码贴上:

#include <stdio.h>
#include <string.h>
#include <setjmp.h>
#include <stdlib.h>
#include <pthread.h>int mutex_lock(pthread_mutex_t * mutex)
{int i;i= pthread_mutex_lock(mutex);if(i!=0) {printf("LOCK FAILED\n");}return i;
}int mutex_unlock(pthread_mutex_t * mutex)
{return pthread_mutex_unlock(mutex);
}typedef int BOOL;
#define TRUE    1
#define FALSE   0extern pthread_mutex_t bx_mutex;typedef struct _Context_ {int finish;struct {pthread_mutex_t mutex;pthread_cond_t cond;volatile int wait_sched_lock_thread;int state;} thread[2];
} Context;#define  WRITE_THREAD 0
#define  READ_THREAD 1Context gCtx;
static int next_thread;
int run;void resume(int i)
{pthread_cond_signal(&gCtx.thread[i].cond);while (gCtx.thread[i].wait_sched_lock_thread == 0) {sched_yield();}gCtx.thread[i].wait_sched_lock_thread = 0;mutex_lock(&gCtx.thread[i].mutex);mutex_unlock(&gCtx.thread[i].mutex);
}void yield(int i)
{
//printf("thread %d yield\n", th);pthread_cond_wait(&gCtx.thread[i].cond, &gCtx.thread[i].mutex);gCtx.thread[i].wait_sched_lock_thread = 999;
}#define MAXBUFS 10
struct buffer {char buf[MAXBUFS][128];int pos;int bc;int eobf;
};
struct buffer bx;void bx_read(char *s, int n)
{restart:if (bx.bc) {strncpy(s, bx.buf[bx.pos], n);s[n - 1] = '\0';bx.bc--;bx.pos++;if (bx.pos >= MAXBUFS)bx.pos = 0;return;} else if (bx.eobf) {gCtx.finish = 1;gCtx.thread[run].state = 1;yield(READ_THREAD);} else {yield(READ_THREAD);goto restart;}
}void bx_write(char *s, int n)
{int wpos;while (bx.bc >= MAXBUFS) {yield(WRITE_THREAD);}wpos = bx.pos + bx.bc;if (wpos >= MAXBUFS)wpos -= MAXBUFS;if (n < 128) {strcpy(bx.buf[wpos], s);}else {strncpy(bx.buf[wpos], s, 128);bx.buf[wpos][127] = '\0';}bx.bc++;
}typedef void *(*pf) (void *);
BOOL start_coroutine(pf func, void *arg);
void coroutine_read(void *arg);void coroutine_write(void *arg)
{FILE *fp;char s[128];int len;int i = (int) arg;i = WRITE_THREAD;mutex_lock(&gCtx.thread[i].mutex);gCtx.thread[i].wait_sched_lock_thread = 999;pthread_cond_wait(&gCtx.thread[i].cond, &gCtx.thread[i].mutex);gCtx.thread[i].wait_sched_lock_thread = 999;fp = fopen("b.c", "r");if (!fp) {printf("can not open `b.c'\n");exit(0);}while (fgets(s, 128, fp) != NULL) {len = strlen(s);if (len) {--len;if (s[len] == '\n') s[len] = '\0'; else ++len;}if (len) {--len;if (s[len] == '\r') s[len] = '\0'; else ++len;}bx_write(s, len + 1);}bx.eobf = 1;gCtx.thread[WRITE_THREAD].state = 1;fclose(fp);
printf("[WRITE_THREAD FINISHED]\n");yield(WRITE_THREAD);
}void coroutine_read(void *arg)
{char buf[128];int i = (int) arg;i = READ_THREAD;mutex_lock(&gCtx.thread[i].mutex);gCtx.thread[i].wait_sched_lock_thread = 999;pthread_cond_wait(&gCtx.thread[i].cond, &gCtx.thread[i].mutex);gCtx.thread[i].wait_sched_lock_thread = 999;while (!bx.eobf || bx.bc) {bx_read(buf, 128);printf("%s\n", buf);}gCtx.thread[READ_THREAD].state = 1;
printf("[READ_THREAD FINISHED]\n");yield(READ_THREAD);
}BOOL start_coroutine(pf func, void *arg)
{int thread_id;pthread_t pt_id;thread_id = next_thread++;pthread_create(&pt_id, NULL, func, (void *) thread_id);pthread_detach(pt_id);return TRUE;
}int sched()
{int s;if (run == 0) s = 1; else s = 0;if (gCtx.thread[s].state == 0) run = s;return run;
}int main()
{int i;pthread_mutexattr_t mat;pthread_mutexattr_init(&mat);pthread_mutexattr_settype(&mat, PTHREAD_MUTEX_NORMAL);pthread_mutex_init(&gCtx.thread[0].mutex, &mat);pthread_mutex_init(&gCtx.thread[1].mutex, &mat);pthread_cond_init(&gCtx.thread[0].cond, NULL);pthread_cond_init(&gCtx.thread[1].cond, NULL);start_coroutine((pf) coroutine_write, NULL);start_coroutine((pf) coroutine_read, NULL);sched_yield();while (gCtx.thread[0].wait_sched_lock_thread == 0) {sched_yield();}gCtx.thread[0].wait_sched_lock_thread = 0;mutex_lock(&gCtx.thread[0].mutex);mutex_unlock(&gCtx.thread[0].mutex);while (gCtx.thread[1].wait_sched_lock_thread == 0) {sched_yield();}gCtx.thread[1].wait_sched_lock_thread = 0;mutex_lock(&gCtx.thread[0].mutex);mutex_unlock(&gCtx.thread[0].mutex);run = 1;while (TRUE) {resched:i = sched();
printf("sched thread %d to run\n", i);if (gCtx.thread[i].state == 0) {resume(i);} else break;}printf("done\n");return 0;
}

调度线程用条件变量唤醒读写线程。读写线程用解锁互斥量唤醒调度线程。同步问题得到了解决。

然而在最后结束之前,仍有一个问题需要考察一下。既然调度线程用条件变量唤醒读写线程,读写线程用解锁互斥量唤醒调度线程,这样问题就得到了解决。那么反过来,读写线程用条件变量唤醒调度线程,调度线程用解锁互斥量唤醒读写线程是否也可以呢?可以的。

#include <stdio.h>
#include <string.h>
#include <setjmp.h>
#include <stdlib.h>
#include <pthread.h>int mutex_lock(pthread_mutex_t * mutex)
{int i;i= pthread_mutex_lock(mutex);if(i!=0) {printf("LOCK FAILED\n");}return i;
}int mutex_unlock(pthread_mutex_t * mutex)
{return pthread_mutex_unlock(mutex);
}typedef int BOOL;
#define TRUE    1
#define FALSE   0extern pthread_mutex_t bx_mutex;typedef struct _Context_ {pthread_cond_t cond;int finish;struct {pthread_mutex_t mutex;volatile int wait_sched_lock_thread;int state;} thread[2];
} Context;#define  WRITE_THREAD 0
#define  READ_THREAD 1Context gCtx;
static int next_thread;
int run;void resume(int i)
{pthread_cond_wait(&gCtx.cond, &gCtx.thread[i].mutex);
}void yield(int i)
{
//printf("thread %d yield\n", i);mutex_unlock(&gCtx.thread[i].mutex);pthread_cond_signal(&gCtx.cond);mutex_lock(&gCtx.thread[i].mutex);mutex_unlock(&gCtx.thread[i].mutex);mutex_lock(&gCtx.thread[i].mutex);
}#define MAXBUFS 10
struct buffer {char buf[MAXBUFS][128];int pos;int bc;int eobf;
};
struct buffer bx;void bx_read(char *s, int n)
{restart:if (bx.bc) {strncpy(s, bx.buf[bx.pos], n);s[n - 1] = '\0';bx.bc--;bx.pos++;if (bx.pos >= MAXBUFS)bx.pos = 0;return;} else if (bx.eobf) {gCtx.finish = 1;gCtx.thread[run].state = 1;yield(READ_THREAD);} else {yield(READ_THREAD);goto restart;}
}void bx_write(char *s, int n)
{int wpos;while (bx.bc >= MAXBUFS) {yield(WRITE_THREAD);}wpos = bx.pos + bx.bc;if (wpos >= MAXBUFS)wpos -= MAXBUFS;if (n < 128) {strcpy(bx.buf[wpos], s);}else {strncpy(bx.buf[wpos], s, 128);bx.buf[wpos][127] = '\0';}bx.bc++;
}typedef void *(*pf) (void *);
BOOL start_coroutine(pf func, void *arg);
void coroutine_read(void *arg);void coroutine_write(void *arg)
{FILE *fp;char s[128];int len;int i = (int) arg;i = WRITE_THREAD;mutex_lock(&gCtx.thread[i].mutex);fp = fopen("b.c", "r");if (!fp) {printf("can not open `b.c'\n");exit(0);}while (fgets(s, 128, fp) != NULL) {len = strlen(s);if (len) {--len;if (s[len] == '\n') s[len] = '\0'; else ++len;}if (len) {--len;if (s[len] == '\r') s[len] = '\0'; else ++len;}bx_write(s, len + 1);}bx.eobf = 1;gCtx.thread[WRITE_THREAD].state = 1;fclose(fp);
printf("[WRITE_THREAD FINISHED]\n");yield(WRITE_THREAD);mutex_unlock(&gCtx.thread[WRITE_THREAD].mutex);
}void coroutine_read(void *arg)
{char buf[128];int i = (int) arg;i = READ_THREAD;mutex_lock(&gCtx.thread[i].mutex);while (!bx.eobf || bx.bc) {bx_read(buf, 128);printf("%s\n", buf);}gCtx.thread[READ_THREAD].state = 1;
printf("[READ_THREAD FINISHED]\n");yield(READ_THREAD);mutex_unlock(&gCtx.thread[READ_THREAD].mutex);
}BOOL start_coroutine(pf func, void *arg)
{int thread_id;pthread_t pt_id;thread_id = next_thread++;pthread_create(&pt_id, NULL, func, (void *) thread_id);pthread_detach(pt_id);return TRUE;
}int sched()
{int s;if (run == 0) s = 1; else s = 0;if (gCtx.thread[s].state == 0) run = s;return run;
}int main()
{int i;pthread_mutexattr_t mat;pthread_mutexattr_init(&mat);pthread_mutexattr_settype(&mat, PTHREAD_MUTEX_NORMAL);pthread_mutex_init(&gCtx.thread[0].mutex, &mat);pthread_mutex_init(&gCtx.thread[1].mutex, &mat);pthread_cond_init(&gCtx.cond, NULL);mutex_lock(&gCtx.thread[0].mutex);mutex_lock(&gCtx.thread[1].mutex);start_coroutine((pf) coroutine_write, NULL);start_coroutine((pf) coroutine_read, NULL);run = 1;while (TRUE) {i = sched();
printf("sched thread %d to run\n", i);if (gCtx.thread[i].state == 0) {resume(i);} else break;}printf("done\n");return 0;
}

稍微需要注意下,这里读写线程在yield()里面用了额外的一对mutex_unlock()和mutex_lock()来避免和resume()中的 pthread_cond_wait(),“退出休眠并上锁互斥量”发生竞争。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/50781.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

选择排序思路和算法实现

选择排序 在未排序的数组中&#xff0c;用第一个数去和后面的数比较&#xff0c;找出最小的数&#xff0c;和第一个数交换。第一个数已为已排序的数。 相当于0~7 从0~7中找到最小的数放在0 从1~7中找到最小的数放在1 从2~7中找到最小的数放在2 ...以此类推 从6~7中找到最…

JAVA学习-练习试用Java实现“最长有效括号”

问题&#xff1a; 最长有效括号 给定一个只包含 ( 和 ) 的字符串&#xff0c;找出最长有效&#xff08;格式正确且连续&#xff09;括号子串的长度。 示例 1&#xff1a; 输入&#xff1a;s "(()" 输出&#xff1a;2 解释&#xff1a;最长有效括号子串是 "(…

从简单到复杂:9款画底纹软件让设计更生动

底纹不知道怎么画&#xff1f;快来试一试这9款软件吧&#xff0c;可以让你的设计作品更加出彩&#xff0c;相信我&#xff0c;用了就是赚到&#xff0c;分别是即时设计、Adobe Illustrator、CorelDRAW、Photoshop、AutoCAD、Inkscape、GIMP、ZebraDesigner、LabelJoy。 1、即时…

软件定义AI算力:解锁AI算力的新时代

从1964年发明CPU——即IBM System 360发布的那一年算起&#xff0c;已经过去60年&#xff0c;我们已经从传统的CPU算力时代跨越到了以GPU算力为代表的加速计算时代。尤其是随着大模型的广泛应用和AIGC技术的崛起&#xff0c;行业对GPU等AI算力的需求呈现出爆炸式的增长。据华为…

7.29爬虫修复

1、加密算法sha、与时间加密。f12前端页面&#xff0c;所有文件中查询到指定字段。 2、加锁维护token或jwt之类的凭证&#xff0c;双if嵌套判断 3、熟悉公司整体项目框架&#xff0c;前端、nginx服务器&#xff0c;后端服务器 4、scrapy的爬虫过滤&#xff0c;注意重复url在src…

微短剧出海CPS分销推广影视平台系统搭建思维逻辑介绍

随着国内短剧市场的蓬勃发展&#xff0c;其独特的魅力与影响力已跨越国界&#xff0c;成为海外观众的新宠。这一趋势不仅推动了短剧内容的全球化传播&#xff0c;也为海外市场的CPS&#xff08;按销售分润&#xff09;分销模式提供了广阔舞台。连接内容创作者、平台运营者、系统…

HIVE调优方式及原因

3.HIVE 调优&#xff1a; 需要调优的几个方面&#xff1a; 1.HIVE语句执行不了 2.HIVE查询语句&#xff0c;在集群中执行时&#xff0c;数据无法落地 HIVE执行时&#xff0c;一开始语句检查没有问题&#xff0c;生成了多个JOB&#xff0c; …

VMware 的网络模式详解

VMware 的网络模式详解 使用 VMware 创建虚拟机&#xff0c;配置虚拟机网络时&#xff0c;主要有三个选项&#xff0c;分别是桥接模式、NAT 模式、仅主机模式 这三个模式到底有什么含义&#xff1f; VMware 是通过虚拟网络&#xff0c;即虚拟交换机&#xff0c;来连接物理机&…

泥球代码是什么,如何预防?typescript例

“泥球代码”&#xff08;Spaghetti Code&#xff09;通常是指结构混乱、复杂度高且难以理解或维护的代码。这样的代码往往缺乏清晰的设计和规划&#xff0c;看起来就像一团乱糟糟的意大利面&#xff0c;因此得名。 在软件开发中&#xff0c;避免产生泥球代码是非常重要的&…

angular入门基础教程(五)父子组件的数据通信

组件之间的通信是我们业务开发中少不了的,先了解下父子组件的通信 父组件传数据给子组件 前面&#xff0c;我们学会会动态属性的绑定&#xff0c;所以在父组件中给子组件绑定属性&#xff0c;在子组件中就可以使用这个属性了。 父组件中声明然后赋值 export class AppCompon…

设计模式-备忘录

备忘录&#xff08;Memento&#xff09;设计模式是为了保存对象当前状态&#xff0c;并在需要的时候恢复到之前保存的状态。以下是一个简单的C#备忘录模式的实现&#xff1a; // Originator 类&#xff0c;负责创建和恢复备忘录 class Originator {private string state;publi…

智能开关助力酒店管理提升

随着科技的迅猛跃进&#xff0c;智能化浪潮席卷全球&#xff0c;酒店业亦不例外地迎来了智能化转型的新纪元。智能开关&#xff0c;作为这股浪潮中的先锋&#xff0c;凭借其尖端的通信技术和智能控制逻辑&#xff0c;正深刻改变着酒店的运营模式与顾客体验。 它不仅赋予了酒店远…

如何理解tcp的三次握手?

TCP&#xff08;传输控制协议&#xff09;是一种网络通信协议&#xff0c;用于可靠地传输数据。它是互联网协议套件&#xff08;TCP/IP&#xff09;中的一部分&#xff0c;负责将数据分割成小块&#xff08;称为数据包&#xff09;&#xff0c;通过网络传输&#xff0c;然后在接…

IP地址证书签发之后可以绑定到指定端口访问吗?

IP地址证书概述 IP地址证书&#xff0c;也称为IP SSL证书&#xff0c;是为互联网协议(IP)地址提供安全认证的一种证书。它包含公钥、所有者信息以及由可信的证书颁发机构(CA)签发的数字签名。通过使用公钥基础设施(PKI)&#xff0c;IP地址证书确保了网络实体之间的信任和验证。…

语音合成大模型汇总

https://www.speechhome.com/blogs/news/1810969234071752704 阿里CosyVoice&#xff1a; https://github.com/FunAudioLLM/CosyVoice NeuCo v2声音克隆首发&#xff0c;一键AI翻唱&#xff0c;无需训练&#xff01;在线版SoVITS&#xff0c;在线使用&#xff0c;只需10秒音频…

React基础知识 精简全面 推荐

这篇博文主要对一些刚入门react框架的同学&#xff0c;以及对react基本知识进行巩固的&#xff0c;最后就是精简一下基本知识&#xff0c;以方便自己查看&#xff0c;感谢参考&#xff0c;有问题评论区交流&#xff0c;谢谢。 目录 1.JSX 2.Props 和 State 3.组件生命周期…

基于JAVA的美妆购物商城系统/美妆销售系统的设计与实现/美妆网站的设计与开发/在线美妆购物平台

摘 要 本毕业设计的内容是设计并且实现一个基于SSM框架的美妆购物商城系统。它是在Windows下&#xff0c;JSP技术&#xff0c;以MYSQL为数据库开发平台&#xff0c;Tomcat网络信息服务作为应用服务器。美妆购物商城系统的功能已基本实现&#xff0c;主要包括用户、商品信息、…

2024电赛H题参考方案(+视频演示+核心控制代码)——自动行使小车

目录 一、题目要求 二、参考资源获取 三、参考方案 1、环境搭建及工程移植 2、相关模块的移植 4、整体控制方案视频演示 5、视频演示部分核心代码 总结 一、题目要求 小编自认为&#xff1a;此次H题属于控制类题目&#xff0c;相较于往年较为简单&#xff0c;功能也算单一&…

科普文:『 码到三十五 』Java微服务中Token鉴权设计的4种方案

吐槽~~~~~~~ Java微服务中Token鉴权设计的几种方案: 1. JWT鉴权 「概述」&#xff1a;JWT是一种用于双方之间安全传输信息的简洁的、URL安全的令牌标准。它基于JSON格式&#xff0c;包含三个部分&#xff1a;头部&#xff08;Header&#xff09;、负载&#xff08;Payload&a…

ubuntu Ubuntu 20.04.6 LTS 安装vtk 报错缺少opengl

ubuntu编译9.3.0 vtk报错 -- Could NOT find OpenGL (missing: OPENGL_opengl_LIBRARY OPENGL_glx_LIBRARY OpenGL) CMake Error at CMake/vtkModule.cmake:4793 (message): Could not find the OpenGL external dependency. Call Stack (most recent call first): CMake/vtkMo…