【中间件】bthread_基础_TaskControl

TaskControl

  • 1 Definition
  • 2 Introduce
      • **核心职责**
  • 3 成员解析
    • **3.1 数据结构与线程管理**
    • **3.2 任务调度与负载均衡**
    • **3.3 线程停放与唤醒(ParkingLot)**
    • **3.4 统计与监控**
  • 4 **工作流程**
  • 5 **设计亮点**
  • 6 **使用场景示例**
  • 7 **总结**
  • 8 学习过程中的疑问
    • 8.1 init函数为什么不在构造函数中调用

1 Definition

class TaskControl {friend class TaskGroup; // 友元类friend void wait_for_butex(void*); // 友元函数
#ifdef BRPC_BTHREAD_TRACERfriend bthread_t init_for_pthread_stack_trace(); // 友元函数
#endif // BRPC_BTHREAD_TRACERpublic:TaskControl();~TaskControl();// Must be called before using. `nconcurrency' is # of worker pthreads.int init(int nconcurrency);// Create a TaskGroup in this control.TaskGroup* create_group(bthread_tag_t tag);// Steal a task from a "random" group.bool steal_task(bthread_t* tid, size_t* seed, size_t offset);// Tell other groups that `n' tasks was just added to caller's runqueuevoid signal_task(int num_task, bthread_tag_t tag);// Stop and join worker threads in TaskControl.void stop_and_join();// Get # of worker threads.int concurrency() const { return _concurrency.load(butil::memory_order_acquire); }int concurrency(bthread_tag_t tag) const { return _tagged_ngroup[tag].load(butil::memory_order_acquire); }void print_rq_sizes(std::ostream& os);double get_cumulated_worker_time();double get_cumulated_worker_time_with_tag(bthread_tag_t tag);int64_t get_cumulated_switch_count();int64_t get_cumulated_signal_count();// [Not thread safe] Add more worker threads.// Return the number of workers actually added, which may be less than |num|int add_workers(int num, bthread_tag_t tag);// Choose one TaskGroup (randomly right now).// If this method is called after init(), it never returns NULL.TaskGroup* choose_one_group(bthread_tag_t tag);#ifdef BRPC_BTHREAD_TRACER// A stacktrace of bthread can be helpful in debugging.void stack_trace(std::ostream& os, bthread_t tid);std::string stack_trace(bthread_t tid);
#endif // BRPC_BTHREAD_TRACERprivate:typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;static const int PARKING_LOT_NUM = 4;typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;// Add/Remove a TaskGroup.// Returns 0 on success, -1 otherwise.int _add_group(TaskGroup*, bthread_tag_t tag);int _destroy_group(TaskGroup*);// Tag groupTaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; }// Tag ngroupbutil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }// Tag parking slotTaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; }static void delete_task_group(void* arg);static void* worker_thread(void* task_control);template <typename F>void for_each_task_group(F const& f);bvar::LatencyRecorder& exposed_pending_time();bvar::LatencyRecorder* create_exposed_pending_time();bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag);bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag);std::vector<butil::atomic<size_t>> _tagged_ngroup;std::vector<TaggedGroups> _tagged_groups;butil::Mutex _modify_group_mutex;butil::atomic<bool> _init;  // if not init, bvar will case coredumpbool _stop;butil::atomic<int> _concurrency;std::vector<pthread_t> _workers;butil::atomic<int> _next_worker_id;bvar::Adder<int64_t> _nworkers;butil::Mutex _pending_time_mutex;butil::atomic<bvar::LatencyRecorder*> _pending_time;bvar::PassiveStatus<double> _cumulated_worker_time;bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second;bvar::PassiveStatus<int64_t> _cumulated_switch_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second;bvar::PassiveStatus<int64_t> _cumulated_signal_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second;bvar::PassiveStatus<std::string> _status;bvar::Adder<int64_t> _nbthreads;std::vector<bvar::Adder<int64_t>*> _tagged_nworkers;std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second;std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;std::vector<TaggedParkingLot> _pl;#ifdef BRPC_BTHREAD_TRACERTaskTracer _task_tracer;
#endif // BRPC_BTHREAD_TRACER};

2 Introduce

TaskControl作为任务调度控制中心,管理多个任务组(TaskGroup)并协调工作线程的高效运作,适用于BRPC的bthread协程库:

核心职责

  1. 任务组管理:创建、销毁任务组,支持按标签(bthread_tag_t)分类管理。
  2. 线程池调度:动态调整工作线程数量,实现任务窃取(Work Stealing)以平衡负载。
  3. 同步与唤醒:通过停放区(ParkingLot)管理线程的休眠与唤醒。
  4. 性能监控:集成统计模块(bvar)跟踪任务处理时间、切换次数等指标。

3 成员解析

3.1 数据结构与线程管理

  • _tagged_groups
    类型:std::vector<TaggedGroups>
    作用:按标签存储任务组指针数组,每个标签对应一个TaggedGroups(固定大小为BTHREAD_MAX_CONCURRENCY的数组)。
    示例:标签可用于区分不同业务优先级或租户的任务。

  • _tagged_ngroup
    类型:std::vector<butil::atomic<size_t>>
    作用:记录每个标签下的任务组数量,原子操作保证线程安全。

  • _workers
    类型:std::vector<pthread_t>
    作用:保存所有工作线程的ID,用于线程生命周期管理(启动、停止、回收)。

  • _concurrency
    类型:butil::atomic<int>
    作用:总工作线程数,支持原子读写,动态调整并发度。

3.2 任务调度与负载均衡

  • steal_task(bthread_t* tid, size_t* seed, size_t offset)
    作用:从其他任务组窃取任务,避免工作线程空闲。
    实现:

    • 使用seed随机选择目标组,结合offset避免多个线程竞争同一队列。
    • 窃取成功返回true,任务ID存入tid
  • signal_task(int num_task, bthread_tag_t tag)
    作用:通知任务组有新任务加入,触发唤醒机制。
    场景:当任务被添加到队列时,调用此方法唤醒可能休眠的线程。

  • choose_one_group(bthread_tag_t tag)
    作用:根据标签选择一个任务组,用于任务分发或负载均衡。
    策略:可能采用轮询或随机算法选择组,确保任务均匀分配。

3.3 线程停放与唤醒(ParkingLot)

  • _pl
    类型:std::vector<TaggedParkingLot>
    作用:按标签管理的停放区数组,每个标签对应PARKING_LOT_NUM个停放区。
    机制:
    • 工作线程无任务时进入停放区等待,减少CPU空转。
    • 新任务到达时,通过停放区唤醒线程,降低延迟。

3.4 统计与监控

  • bvar集成
    关键指标:

    • _cumulated_worker_time:累计任务处理时间。
    • _cumulated_switch_count:上下文切换次数。
    • _signal_per_second:每秒任务唤醒次数。
      作用:通过BRPC的bvar库暴露性能指标,方便实时监控与调优。
  • 标签化统计
    成员如_tagged_nworkers_tagged_cumulated_worker_time等,按标签细分统计,支持多维分析。

4 工作流程

  1. 初始化

    • 调用init(nconcurrency)创建指定数量工作线程,每个线程执行worker_thread函数。
    • 工作线程通过choose_one_group选择任务组,执行任务循环。
  2. 任务执行

    • 线程从本地任务组获取任务,若队列为空,尝试从其他组窃取(steal_task)。
    • 无任务可执行时,进入停放区(ParkingLot)休眠。
  3. 任务通知

    • 添加新任务时,调用signal_task递增信号计数器,唤醒停放区线程。
  4. 动态扩缩容

    • add_workers动态增加指定标签的工作线程,适应负载变化。
  5. 停止与清理

    • stop_and_join设置_stop标志,通知所有线程退出,并回收资源。

5 设计亮点

  • 标签化分组
    支持多维度任务分类,适用于混合部署场景(如不同服务优先级)。
  • 任务窃取
    避免工作线程闲置,提升CPU利用率,降低任务处理延迟。
  • 高效同步
    结合原子操作与停放区,减少锁竞争,保证高吞吐量。
  • 精细化监控
    通过bvar提供详尽的运行时指标,助力性能分析与优化。

6 使用场景示例

高并发服务

  • Web服务器接收请求后,封装为bthread任务,按业务类型打标签。
  • TaskControl根据标签分发任务到不同组,保证关键业务优先调度。
  • 工作线程动态扩展应对流量高峰,空闲时自动缩减节省资源。
  • 监控指标实时反馈系统负载,辅助容量规划。

7 总结

TaskControl是BRPC bthread调度系统的核心,通过高效的任务组管理、工作线程调度及细粒度监控,实现了高并发、低延迟的协程任务处理。其设计充分考虑了扩展性、性能与可观测性,是构建高性能C++服务的基石组件。

8 学习过程中的疑问

8.1 init函数为什么不在构造函数中调用

疑问:init函数在注释中声明需要在使用前调用,为什么不能将其放在构造函数中直接调用呢?
回答:

  • 可能原因:

      1. 初始化可能失败,需要错误处理
      • 构造函数没有返回值,若在构造函数中执行可能的失败操作(eg. 创建线程、分配资源),只能通过异常或设置内部状态标记错误。brpc代码风格倾向于避免异常,习惯于返回错误码。参考代码8-1
      1. 需要依赖外部参数
      • 构造时参数可能不完整,TaskControl的初始化需要并发线程数等参数,可能在运行时动态确定,无法在编译器硬编码。
      • 更复杂的重载,如果后续需要扩展初始化参数(eg. 增加timeout/policy等配置),显式init()更容易扩展,而构造函数重载会膨胀。
    • 3 支持对象的复用
      • 销毁后重新初始化,某些场景下,用户可能希望销毁TaskControl后重新初始化(eg. 动态调整线程池的大小)。若初始化逻辑在构造函数中,则需要先析构对象再重新构造,而显式init/stop_and_join允许复用同一对象。参考代码8-2
    • 4 明确的二段式生命周期
      • 分离资源分配与初始化,二段式设计(构造+init())将对喜爱嗯的内存分配和资源初始化解耦:
        • 构造阶段:仅进行内存布局、简单成员初始化;
        • 初始化阶段:执行重量级操作(eg. 创建线程、连接资源);
      • 更符合RAII的变体模式,尤其是在需要延迟初始化时。
    • 5 避免隐藏的副作用
      • 隐式初始化可能引入意外行为,若构造函数自动初始化,用户可能在不知情的情况下触发资源分配(eg. 线程创建)。显式init()强制用户主动控制初始化时机,避免副作用。
    • 6 与brpc其他组件的设计一致性
      • 统一风格,brpc很多组件(eg. Channel / Server)均采用类似的二段式模式(先构造,再调用init() / start()),保持代码风格统一,降低用户学习成本。
  • 什么情况下应在构造函数中初始化?

    • 轻量级且无失败可能的操作,eg. 设置默认参数、初始化原子计数器等。
    • 强制一次性初始化,若对象必须在构造时完全初始化,且不允许重新初始化。
  • 两种方式对比,见代码8-3

  • 二段式的作用

    • 清晰的错误处理:通过返回int明确传递错误
    • 参数灵活性:允许运行时动态决定初始化参数
    • 对象复用:支持重新初始化而不重新构造
    • 代码一致性:符合BRPC设计惯例

代码8-1:

// 当前使用方法
TaskControl ctl;
if (ctl.init(32) != 0) {// 处理初始化失败
}// 如果放在构造函数中
TaskControl ctl(32);
if (!ctl.is_initialized()) {// 处理错误
}

代码8-2

TaskControl ctl;
ctl.init(16);
ctl.stop_and_join();
ctl.init(32);

代码8-3

// 二段式
class TaskControl {
public:TaskControl();  // 轻量构造~TaskControl();int init(int nconcurrency);  // 显式初始化// ...
};// 使用方式
TaskControl ctl;
if (ctl.init(32) != 0) {LOG(ERROR) << "Failed to initialize TaskControl";return -1;
}// 合并到构造方式
class TaskControl {
public:explicit TaskControl(int nconcurrency);  // 可能抛出异常~TaskControl();bool is_initialized() const;  // 需额外状态检查// ...
};// 使用方式
try {TaskControl ctl(32);
} catch (const std::exception& e) {LOG(ERROR) << "Construction failed: " << e.what();
}
if (!ctl.is_initialized()) {  // 需要额外检查// 处理错误
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/79232.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

win11 终端 安装ffmpeg 使用终端Scoop

1、安装scoop (Windows 包管理器) Set-ExecutionPolicy RemoteSigned -Scope CurrentUser iwr -useb get.scoop.sh | iex 2、使用scoop来安装ffmpeg scoop install ffmpeg 3、测试一下ffmpeg&#xff0c;将Mp3文件转为Wav文件 ffmpeg -i A.mp3 A.wav 然后我们就看到A.wav生成…

力扣838.推多米诺随笔

“生活就像海洋&#xff0c;只有意志坚强的人&#xff0c;才能到达彼岸。”—— 马克思 题目 n 张多米诺骨牌排成一行&#xff0c;将每张多米诺骨牌垂直竖立。在开始时&#xff0c;同时把一些多米诺骨牌向左或向右推。 每过一秒&#xff0c;倒向左边的多米诺骨牌会推动其左侧…

超级好用的​​参数化3D CAD 建模​​图形库 (CadQuery库介绍)

CadQuery 库详细介绍​​ ​​CadQuery​​ 是一个基于 ​​Python​​ 的 ​​参数化 3D CAD 建模​​ 库&#xff0c;允许用户通过编写代码&#xff08;而不是传统 GUI&#xff09;来创建精确的 ​​3D 模型​​。它特别适用于 ​​自动化设计、机械工程、3D 打印​​ 等场景…

HBM的哪些事

命令操作 这也许是DDR往HBM演进的一些奇淫技巧。 本篇内容属于杂谈&#xff0c;关于HBM的奇淫技巧&#xff0c;随后出专题介绍。

Python基于深度学习的网络舆情分析系统(附源码,部署)

大家好&#xff0c;我是Python徐师兄&#xff0c;一个有着7年大厂经验的程序员&#xff0c;也是一名热衷于分享干货的技术爱好者。平时我在 CSDN、掘金、华为云、阿里云和 InfoQ 等平台分享我的心得体会。 &#x1f345;文末获取源码联系&#x1f345; 2025年最全的计算机软件毕…

滑动窗口leetcode 209和76

一、leetcode 209. 长度最小的子数组 代码&#xff1a; class Solution { public:int minSubArrayLen(int target, vector<int>& nums) {int n nums.size();int left 0;int sum 0;int res 100001;for(int right 0;right <n;right){sum nums[right];while(s…

node.js 实战——mongoDB 续一

mongoDB的基本指令 进入mongodb mongo显示当前的所有数据库 show dbs # 或者 show databases切换数据库/进入指定数据库 使用这个命令的时候&#xff0c;是不要求这个数据库是否创建 use 数据库名显示当前数据库 db显示数据库中所有集合 show collections数据库的CRUD的…

SVMSPro平台获取Websocket视频流规则

SVMSPro平台获取Websocket视频流规则 Websocket 的服务端口为&#xff1a;53372&#xff0c;如需要公网访问需要开启这个端口 这里讲的是如何获取长效URL&#xff0c;短效&#xff08;时效性&#xff09;URL也支持&#xff0c;下回讲 一、如何获取Websocket实时流视频 ws:/…

Arduino按键开关编程详解

一、按键开关的基本原理与硬件连接 1.1 按键开关的工作原理 按键开关是一种常见的输入设备&#xff0c;其核心原理基于机械触点的闭合与断开。当用户按下按键时&#xff0c;内部的金属片会连接电路两端&#xff0c;形成通路&#xff1b;松开按键后&#xff0c;金属片在弹簧作…

我的日记杂文

Sequoia sempervirens 北美红杉树 Troll 洞穴巨人 喜欢在网上搞事的人 piss off 滚开 让人恼火的 欧洲美甲 60euor - 30euro 拖车 mobie house Motel 汽车旅馆 Minoxidil 米诺地尔 Health insurance 医疗保险 casetify 香港手机品牌 coolant 汽车防冻液 Auto tint film 汽车贴…

数字智慧方案5867丨智慧建造(BIM技术智慧工地)在施工阶段的实践与应用方案(90页PPT)(文末有下载方式)

资料解读&#xff1a;智慧建造(BIM技术智慧工地)在施工阶段的实践与应用方案 详细资料请看本解读文章的最后内容。 在当今的建筑行业中&#xff0c;智慧建造已成为提升施工效率和质量的关键手段。随着科技的进步&#xff0c;智慧建造结合了物联网、大数据、人工智能等技术&am…

机器学习中的标签策略:直接标签、代理标签与人工数据生成

机器学习中的标签策略&#xff1a;直接标签、代理标签与人工数据生成 摘要 本文深入探讨了机器学习领域中标签的关键概念&#xff0c;包括直接标签与代理标签的定义、优缺点比较&#xff0c;以及人工生成数据的相关内容。通过详细实例和练习&#xff0c;帮助读者理解如何选择…

从0搭建Transformer

1. 位置编码模块&#xff1a; import torch import torch.nn as nn import mathclass PositonalEncoding(nn.Module):def __init__ (self, d_model, dropout, max_len5000):super(PositionalEncoding, self).__init__()self.dropout nn.Dropout(pdropout)# [[1, 2, 3],# [4, 5…

【Bootstrap V4系列】学习入门教程之 表格(Tables)和画像(Figure)

Bootstrap V4系列 学习入门教程之 表格&#xff08;Tables&#xff09;和画像&#xff08;Figure&#xff09; 表格&#xff08;Tables&#xff09;一、Examples二、Table head options 表格头选项三、Striped rows 条纹行四、Bordered table 带边框的表格五、Borderless table…

在C# WebApi 中使用 Nacos02: 配置管理、服务管理实战

一、配置管理 1.添加一个新的命名空间 这里我都填写为publicdemo 2.C#代码配置启动 appsetting.json加上&#xff1a; (nacos默认是8848端口) "NacosConfig": {"ServerAddresses": [ "http://localhost:8848" ], // Nacos 服务器地址"Na…

如何搭建spark yarn 模式的集群集群。

下载 App 如何搭建spark yarn 模式的集群集群。 搭建Spark on YARN集群的详细步骤 Spark on YARN模式允许Spark作业在Hadoop YARN资源管理器上运行&#xff0c;利用YARN进行资源调度。以下是搭建步骤&#xff1a; 一、前提条件 已安装并配置好的Hadoop集群&#xff08;包括HDF…

C++--入门基础

C入门基础 1. C的第一个程序 C继承C语言许多大多数的语法&#xff0c;所以以C语言实现的hello world也可以运行&#xff0c;C中需要把文件定义为.cpp&#xff0c;vs编译器看是.cpp就会调用C编译器编译&#xff0c;linux下要用g编译&#xff0c;不再是gcc。 // test.cpp #inc…

从实列中学习linux shell9 如何确认 服务器反应迟钝是因为cpu还是 硬盘io 到底是那个程序引起的。cpu负载多高算高

在 Linux 系统中,Load Average(平均负载) 是衡量系统整体压力的关键指标,但它本身没有绝对的“高/低”阈值,需要结合 CPU 核心数 和 其他性能指标 综合分析。以下是具体判断方法: 一、Load Average 的基本含义 定义:Load Average 表示 单位时间内处于可运行状态(R)和不…

聊一聊接口测试更侧重于哪方面的验证

目录 一、功能性验证 输入与输出正确性 参数校验 业务逻辑覆盖 二、数据一致性验证 数据格式规范 数据完整性 数据类型与范围 三、异常场景验证 容错能力测试 边界条件覆盖 错误码与信息清晰度 四、安全与权限验证 身份认证 数据安全 防攻击能力 五、性能与可…

Fiddler抓取APP端,HTTPS报错全解析及解决方案(一篇解决常见问题)

环境&#xff1a;雷电模拟器Android9系统 ​ 你所遇到的fiddler中抓取HTTPS的问题可以分为三类&#xff1a;一类是你自己证书安装上逻辑错误&#xff0c;另一种是APP中使用了“证书固定”的手段。三类fiddler中生成证书时的参数过程。 1.Fiddler证书安装上的逻辑错误 更新Opt…