参考《c++ Concurrency In Action 》第二章做的笔记
目录
- 传递参数
- 量产线程
- 线程标识
传递参数
thread构造函数的附加参数会拷贝至新线程的内存空间中,即使函数中的采纳数是引用类型,拷贝操作也会执行。如果我们期待传入一个引用,必须使用std::ref
将参数转换成引用形式:
如下:
void update_weight(weight_id w,weight_data& data); //1
void oops(weight_id w)
{weight_data data;//错误方式std::thread t(update_weight,w,data);//正确方式std::thread t(update_weight,w,std::ref(data));display_status();t.join();
}
这样就能收到data的引用,而非data的拷贝副本。
与std:bind
的传参机制相同,使用std::thread
创建线程时,传递参数的过程如下:
- 向
std::thread
构造函数传参:一般实参会被拷贝至新线程的内存空间。具体拷贝的过程是由调用线程(主线程)在堆上创建并交由子线程管理,在子线程结束时同时被释放。 - 向线程函数传参:由于
std::thread
对象里一般保存的是参数的副本,为了效率同时兼顾一些只移动类型的对象,所有的副本均被std::move
到线程函数,即以右值的形式传入。
示例:std::move转移动态对象的所有权到线程中去:
void process_big_object(std::unique_ptr<big_object>);std::unique_ptr<big_object> p(new big_object);
p->prepare_data(47);
std::thread t(process_big_object,std::move(p));
在thread构造函数中执行move,big_object对象的所有权首先转移到新创建线程的内部存储中,之后再传递给process_big_object函数。
量产线程
void do_work(unsigned id);void f()
{std::vector<std::thread> threads;for(unsigned i = 0; i < 20; i++)threads.emplace_back(do_work,i); //产生线程for(auto& entry : threads) //对每个线程调用join()entry.join();
}
使用线程去分割一个算法的工作总量,所以在算法结束之前,所有线程必须结束。线程所做的工作都是独立的,并且结果仅会受到共享数据的影响。如果f有返回值,在写入返回值之前,程序会检查使用共享数据的线程是否终止。
下面函数,会返回并发线程的数量
std::thread::hardware_concurrency()
下面展示并行版本的std::accumulate
。代码将整体工作拆分成小任务,交给每个线程去做,并设置最小任务数,避免产生太多的线程,程序会在操作数量为0时抛出异常。
template<typename Iterator,typename T>
struct accumulate_block
{void operator()(Iterator first,Iterator last,T& result){result = std::accumulate(first,last,result);}
};template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{unsigned long const length = std::distance(first,last);if(!length) //1return init;unsigned long const min_per_thread = 25;unsigned long const max_threads = (length+min_per_thread-1)/min_per_thread; //2unsigned long const hardware_threads = std::thread::hardware_concurrency();unsigned long const num_threads = //3std::min(hardware_threads != 0 ? hardware_threads : 2,max_threads);unsigned long const block_size = length / num_threads; //4std::vector<T> results(num_threads);std::vector<std::thread> threads(num_threads - 1); //5Iterator block_start = first;for(unsigned long i = 0; i < num_threads-1; ++i){Iterator block_end = block_start;std::advance(block_end,block_size); //6threads[i] = std::thread( //7accumulate_block<Iterator,T>(),block_start,block_end,std::ref(result[i]));block_start = block_end; //8}accumulate_block<Iterator,T>(),block_start,last,results[num_threads-1]); //9for(auto& entry : threads)entry.join(); //10return std::accumulate(results.begin(),results.end(),init); //11
}
函数解释:
如果输入范围为空1,就会得到init值。如果范围内的元素多于1个,需要用范围内元素的总数量除以线程块中最小任务数,从而确定启动线程的最大数量。由于上下文切换可能会降低线程性能,所以计算最大线程数以及硬件支持线程数,选择较小值作为启动线程数量3。如果为0的话,选择2替代。
每个线程中处理的元素数量,是范围中元素的总量除以线程的个数得出的4。
既然确定了线程个数,创建一个std::vector<T>
容器存放中间结果,并为线程创建一个std::vector<std::thread>
容器。因为在启动之前已经有了一个主线程,所以启动线程数是num_threads-1。
使用循环启动线程,block_end指向当前块的末尾6,并启动一个新线程为当前块累加结果7.当迭代器指向当前块的末尾时,启动下一个块8.
启动所有县城后,9中线程会处理最终块的结果,累加最终块结果后,等待std::for_each创建线程10.之后使用std::accumulate将所有结果进行累加11
线程标识
线程标识为std::thread::id
类型,可以通过成员函数get_id()
来获取。如果thread对象没有和任何执行线程相关联,将返回一个默认构造值,表示“无线程”。
如果两个线程id相等,说明是同一个线程,或者都是“无线程”。
std::thread::id
实例常用作检测线程是否需要进行一些操作,比如,当用线程来分割一项工作,主线程可能要做一些与其他线程不同的工作,启动其他线程前,可以通过get_id得到自己线程id,每个线程都检查一下,其拥有线程id是否与初始线程id相同。
std::thread::id master_thread;
void some_core_part_of_algorithm()
{if(std::this_thread::get_id() == master_thread)do_master_thread_work();elsedo_common_work();
}