dcf写入机制

dcf写入机制

写入

dcf提供如下两个写入接口:

  • dcf_write

    int dcf_write(unsigned int stream_id, const char* buffer, unsigned int length, unsigned long long key, unsigned long long *index);
    

    仅在leader节点调用。

  • dcf_universal_write

    int dcf_universal_write(unsigned int stream_id, const char* buffer, unsigned int length, unsigned long long key, unsigned long long *index);
    

    可以在任意节点调用。

确认

  • dcf_register_after_writer

    用于注册leader节点写入成功回调函数。

    int dcf_register_after_writer(usr_cb_after_writer_t cb_func)
    {return rep_register_after_writer(ENTRY_TYPE_LOG, cb_func);
    }
    

    最终将回调函数注册到全局变量

    int rep_register_after_writer(entry_type_t type, usr_cb_after_writer_t cb_func)
    {g_cb_after_writer[type] = cb_func;return CM_SUCCESS;
    }
    

    调用流程如下:

启动个线程来执行apply
rep_common_init
cm_create_thread
rep_apply_thread_entry
rep_apply_proc
g_cb_after_writer

线程在等待条件变量释放,并执行apply。

// 先查出有多少条流
if (md_get_stream_list(streams, &stream_count) != CM_SUCCESS) {LOG_DEBUG_ERR("[REP]md_get_stream_list failed");return;}
while (!thread->closed) { // 若线程没有关闭则循环等待if (!exists_log) { // 首先判断是否存在日志,不存在就休眠等待唤醒(void)cm_event_timedwait(&g_apply_cond, CM_SLEEP_500_FIXED);}LOG_TRACE(g_rep_tracekey, "apply_thread work");exists_log = CM_FALSE;for (uint32 i = 0; i < stream_count; i++) { // 遍历每一条流uint32 stream_id = streams[i];bool8 stream_exists_log = CM_FALSE;LOG_TIME_BEGIN(rep_apply_proc);// 执行applyif (rep_apply_proc(stream_id, &stream_exists_log) != CM_SUCCESS) {LOG_DEBUG_ERR_EX("[REP]rep_apply_proc failed.");}LOG_TIME_END(rep_apply_proc);exists_log = (exists_log || stream_exists_log);}}

其中等待的条件变量为,在如下的地方唤醒。

void rep_apply_trigger()
{LOG_DEBUG_INF("[REP]rep_apply_trigger");LOG_TRACE(g_rep_tracekey, "common:rep_apply_trigger.");cm_event_notify(&g_apply_cond);
}

rep_apply_trigger的调用栈如下;

leader
follower
rep_acceptlog_proc
rep_leader_acceptlog_proc
rep_try_commit_log
rep_apply_trigger
rep_accept_thread_entry
rep_follower_acceptlog_proc

经过上面调用流程可以看到,apply线程是由accept线程唤醒的。accept线程与apply线程类似,同样是等待条件变量将线程唤醒。

if (md_get_stream_list(streams, &stream_count) != CM_SUCCESS) {LOG_DEBUG_ERR("[REP]md_get_stream_list failed");return;}while (!thread->closed) {if (!exists_log) {LOG_TRACE(g_rep_tracekey, "accept_thread wait.");(void)cm_event_timedwait(&g_accept_cond, CM_SLEEP_500_FIXED);}LOG_TRACE(g_rep_tracekey, "accept_thread work.");exists_log = CM_FALSE;for (uint32 i = 0; i < stream_count; i++) {uint32 stream_id = streams[i];date_t now = g_timer()->now;exists_log = (exists_log || g_common_state[stream_id].accept_log);if (g_common_state[stream_id].accept_log ||now - g_common_state[stream_id].last_accept_time > CM_DEFAULT_HB_INTERVAL*MICROSECS_PER_MILLISEC) {LOG_TRACE(g_rep_tracekey, "accept_thread do work.");g_common_state[stream_id].accept_log = CM_FALSE;g_common_state[stream_id].last_accept_time = now;if (rep_acceptlog_proc(stream_id) != CM_SUCCESS) {LOG_DEBUG_ERR("[REP]rep_acceptlog_proc failed.");}} else {LOG_TRACE(g_rep_tracekey, "accept_thread no work.");}}}

accept线程等待的条件变量为g_accept_cond,其唤醒流程如下:

void rep_set_accept_flag(uint32 stream_id)
{LOG_DEBUG_INF("rep_set_accept_flag.");g_common_state[stream_id].accept_log = CM_TRUE;cm_event_notify(&g_accept_cond);
}

rep_set_accept_flag调用堆栈如下:

stg_register_cb
rep_accepted_trigger

可以看已看到accept线程的唤醒是通过注册回调函数来触发的。

    if (stg_register_cb(ENTRY_TYPE_LOG, rep_accepted_trigger) != CM_SUCCESS) {LOG_DEBUG_ERR("[REP]rep register stg callback failed");return CM_ERROR;}
status_t stg_register_cb(entry_type_t type, void *func)
{switch (type) {case ENTRY_TYPE_CONF:g_write_conf_func = (write_conf_func_t)func;break;case ENTRY_TYPE_LOG:g_notify_rep_func = (notify_rep_func_t)func;break;default:LOG_RUN_ERR("[STG]Register callback failed");return CM_ERROR;}return CM_SUCCESS;
}

可以看到回调函数最终被注册到了g_notify_rep_func。

append线程
disk_thread_entry
process_append_action
stream_append_entry_impl
callback_rep_func
g_notify_rep_func
stream_batcher_flush

可以看到其中一个回调函数是由append线程触发的,append线程会等待stream->disk_event条件变量被唤醒。stream->disk_event在stream_append_entry中被唤醒。

rep_appendlog_req_proc
rep_follower_process
rep_follower_appendlog
stg_append_entry
stream_append_entry
rep_write
register_msg_process(MEC_CMD_APPEND_LOG_RPC_REQ, rep_appendlog_req_proc, PRIV_LOW);

MEC_CMD_APPEND_LOG_RPC_REQ在rep_appendlog_node中发送。

rep_appendlog_thread_entry
rep_appendlog_stream
rep_appendlog_node

rep_appendlog_thread_entry有条件变量g_appendlog_cond唤醒。g_appendlog_cond又通过rep_appendlog_trigger唤醒。

rep_write
rep_appendlog_trigger
rep_appendlog_ack_proc
rep_rematch_proc

rep_appendlog_trigger由MEC_CMD_APPEND_LOG_RPC_ACK消息触发。

register_msg_process(MEC_CMD_APPEND_LOG_RPC_ACK, rep_appendlog_ack_proc, PRIV_LOW);

dd

  • dcf_register_consensus_notify

    用于注册follower节点写入数据成功的回调函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/224223.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码审计的未来趋势

代码审计的未来发展趋势&#xff0c;如人工智能、区块链、云计算等代码审计是一项非常重要的工作&#xff0c;可以帮助团队发现潜在的安全漏洞和缺陷。随着技术的不断发展&#xff0c;代码审计也在不断地发展和改进。本文将介绍代码审计的未来发展趋势&#xff0c;包括人工智能…

C++ 标准库-chrono 基本用法

文章目录 再C开发中&#xff0c;经常会遇到时间相关的问题&#xff0c;这里可以借助 <chrono>来帮我们解决问题。 <chrono> 是 C 标准库中的一个头文件&#xff0c;它提供了处理时间和日期的功能。这个库引入了一组类型和函数&#xff0c;用于在程序中进行时间点、…

Linux(22):X Window 设定介绍

X Window System X Window System 是个非常大的架构&#xff0c;他还用到网络功能。也就是说&#xff0c;其实 X 窗口系统是能够跨网络与跨操作系统平台的。 X Window系统最早是由 MIT (Massachusetts Institute of Technology&#xff0c;麻省理工学院) 在1984年发展出来的&…

day13 栈与队列(三)

day13 2023.12.11 代码随想录 今天刚出差回来&#xff0c;拉下了很多天的博客&#xff0c;慢慢补吧&#xff0c;每天做当天的任务&#xff0c;再补一篇博客。 1. 239滑动窗口最大值 本题就是每次窗口内容放在一个单调队列中&#xff0c;那么每次直接返回队头元素&#xff08;最…

register_chrdev函数使用

static struct class *led_class;static int major 0; /*记得static 不然可能会出现外部.c调用这个变量的可能*/static struct file_operations led_drv {.owner THIS_MODULE,.open led_drv_open,.read led_drv_read,.write led_drv_write,.release led_drv…

Python中的继承:概念、用法与示例

目录 一、引言 二、继承的概念 三、继承的用法 1、继承父类的属性和方法 2、添加新的属性和方法 3、覆盖父类的方法 四、示例代码展示 五、继承中的多态性 六、继承中的封装和抽象 七、继承中的多重继承 总结 一、引言 面向对象编程&#xff08;OOP&#xff09;是一…

「完美世界」石昊被诓入至尊道场,修炼无敌道,打跑天仙书院弟子

Hello,小伙伴们&#xff0c;我是拾荒君。 《完美世界》这部国漫&#xff0c;在粉丝的翘首期盼中&#xff0c;终于迎来了第141集的更新。这一集的内容&#xff0c;对于喜欢石昊和至尊道场劫难的观众来说&#xff0c;可谓是扣人心弦&#xff0c;让人目不转睛。 在这一集中&#…

【信息学奥赛】拼在起跑线上,想入道就别落下自己!

编程无难事&#xff0c;只怕有心人&#xff0c;学就是了&#xff01; 文章目录 1 信息学奥赛简介2 信息学竞赛的经验回顾3 优秀参考图书推荐《信息学奥赛一本通关》4 高质量技术圈开放 1 信息学奥赛简介 信息学奥赛&#xff0c;作为全国中学生学科奥林匹克“五大学科竞赛”之一…

ChatGPT使用:一个发包机器人的提示词

发包机器人&#xff1a; 设想&#xff1a;目前项目组有n条打包线会输出多个包&#xff0c;用户想获取最新的包是比较困难的&#xff0c;难点在于 1. 分支多&#xff1a;trunk&#xff0c;release&#xff0c;outer等&#xff0c;至少有3个分支&#xff1b; 2. 多平台&#x…

C++ 面向对象模型 小知识点

空类 大小 空类的 sizeof 结果是 1。原因&#xff1a;每个对象都应该在内存上有独一无二的地址&#xff0c;因此给空对象分配 1 个字节空间。 当定义了一个变量后&#xff0c;则类的大小为这个变量的大小。 类中 变量和函数是分开存储的。 静态成员变量&#xff0c;静态成员函数…

浅谈“前端已死”论

自我介绍一下&#xff0c;我目前是一个C#后端开发工程师&#xff0c;题目中所述的java和前端貌似跟我没有太大的关系&#xff0c;我想说&#xff0c;在这IT行业内&#xff0c;所有的东西都是互通的&#xff0c;最近公司也在搞B/S&#xff0c;也是使用了javavue的模式&#xff0…

Spring+SpringMVC+SpringBoot

Spring bean bean基础配置 bean别名配置 注意事项&#xff1a; 获取bean无论是通过id还是name获取。如果无法获取到&#xff0c;将抛出异常NoSuchBeanDefinitionException bean的作用范围配置 适合交给容器进行管理的bean 表现层对象、业务层对象、数据层对象、工具对象 不…

Vue3+ts实现页面跳转及参数传递

## 列表页 <script lang"ts" setup> import { reactive, toRefs } from vue // 1 引入useRouter路由信息方法 import { useRouter } from vue-router // 2 获取实例 const router useRouter()const gotoDetail (index: string) > {router.push({path: …

《opencv实用探索·十九》光流法检测运动目标

前言 光流法&#xff08;Optical Flow&#xff09;是计算机视觉中的一种技术&#xff0c;用于估计图像中相邻帧之间的像素位移或运动。它是一种用于追踪图像中物体运动的技术&#xff0c;可以在视频中检测并测量物体的运动轨迹。 光流的直观理解&#xff1a; 光流是一个视频中两…

智能物联网(IoT)VS AI物联网(AIoT)

#IoT# #AIoT# 智能物联网&#xff08;IoT&#xff09;和AI物联网&#xff08;AIoT&#xff09;区别 概念&#xff1a; 物联网&#xff08;IoT&#xff09;&#xff1a;即“万物相连的互联网”&#xff0c;是在互联网基础上延伸和扩展的网络&#xff0c;将各种信息传感设备与网…

离散型制造企业MES系统行业应用

离散型制造企业具有产品种类多、生产周期长、生产过程复杂等特点&#xff0c;因此&#xff0c;采用先进的生产管理系统对于提高企业的生产效率和管理水平至关重要。其中&#xff0c;制造执行系统&#xff08;MES&#xff09;在离散型制造企业中得到了广泛应用&#xff0c; 一、…

TensorBoard使用和问题解决

一、什么是TensorBoard? TensorBoard 是一组用于数据可视化的工具&#xff0c;它包含在流行的开源机器学习库 Tensorflow 中。TensorBoard 的主要功能包括&#xff1a; 可视化模型的网络架构跟踪模型指标&#xff0c;如损失和准确性等检查机器学习工作流程中权重、偏差和其他…

PDI/Kettle-9.2.0.0-R(对应jdk1.8)源码编译问题记录及源码结构简介

目录 &#x1f4da;第一章 前言&#x1f4d7;背景&#x1f4d7;目的&#x1f4d7;总体方向 &#x1f4da;第二章 代码结构初识基本结构&#x1f4d7;代码模块详情 ⁉️问题记录❓问题一&#xff1a;代码分支哪些是发布版本❗答&#xff1a;后缀-R的版本 ❓问题二&#xff1a;50…

Milesight VPN server.js 任意文件读取漏洞(CVE-2023-23907)

0x01 产品简介 MilesightVPN 是一款软件&#xff0c;一个 Milesight 产品的 VPN 通道设置过程更加完善&#xff0c;并可通过网络服务器界面连接状态。 0x02 漏洞概述 MilesightVPN server.js接口处存在文件读取漏洞&#xff0c;攻击者可通过该漏洞读取系统重要文件&#xff…

dockerfite创建镜像---INMP+wordpress

目录 搭建dockerfile---lnmp 创建nginx镜像 运行 创建数据库镜像 运行 ​编辑 创建php镜像 运行 搭建dockerfile---lnmp 在192.168.10.201 服务IP地址nginx 172.111.0.10 dockernginxmysql172.111.0.20dockermysqlphp172.111.0.30dockerphp 创建nginx镜像 路径 vim /…