丹灶网站建设seo搜狗
web/
2025/9/29 9:26:35/
文章来源:
丹灶网站建设,seo搜狗,淘宝网站建设策划书,学校做网站需要什么目录
线程的声明
线程创建过程
向线程中投递消息
从消息队列中取消息的具体实现
处理线程消息 webrtc线程模块的实现逻辑在 rtc_base\thread.h 文件中
比如想创建一个线程#xff1a;
//声明要创建的线程指针#xff0c;通过智能指针管理
std::unique_ptrrtc::Thr…目录
线程的声明
线程创建过程
向线程中投递消息
从消息队列中取消息的具体实现
处理线程消息 webrtc线程模块的实现逻辑在 rtc_base\thread.h 文件中
比如想创建一个线程
//声明要创建的线程指针通过智能指针管理
std::unique_ptrrtc::Thread video_thread_;
// 创建线程
video_thread_ rtc::Thread::Create();
//设置新创建的线程名
video_thread_-SetName(video_thread_, video_thread_.get());
//开启线程
video_thread_-Start();
//向线程投递要处理的消息video_thread_-Post(RTC_FROM_HERE, this, MESSAGE_ID);// MESSAGE_ID 自定义的消息id//向线程投入带有消息体的消息video_thread_-Post(RTC_FROM_HERE, this, VIDEO_INFO,new rtc::TypedMessageDataVIDEO_INFO_MEESAGE(r));//其中RTC_FROM_HERE 是个宏定义标记线程调用的原位置
// Define a macro to record the current source location.
#define RTC_FROM_HERE RTC_FROM_HERE_WITH_FUNCTION(__FUNCTION__)
下面看下线程的具体实现
线程的声明
//线程继承自一个任务队列并且有两个存储消息的消息队列
//普通消息 messages_延时消息 delayed_messages_
class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {explicit Thread(SocketServer* ss);explicit Thread(std::unique_ptrSocketServer ss);privateMessage msgPeek_;//声明对应的消息//MessageList 具体的定义//typedef std::listMessage MessageList;MessageList messages_ RTC_GUARDED_BY(crit_); //延时队列继承自 std::priority_queueDelayedMessage PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
}
创建线程的实现
//具体的创建函数
//构造中传入一个 NullSocketServer() 作为参数
std::unique_ptrThread Thread::Create() {return std::unique_ptrThread(new Thread(std::unique_ptrSocketServer(new NullSocketServer())));
}//最终调用到这里线程构造函数
Thread::Thread(SocketServer* ss, bool do_init): fPeekKeep_(false),delayed_next_num_(0),fInitialized_(false),fDestroyed_(false),stop_(0),ss_(ss) {RTC_DCHECK(ss);//把当前线程的this指针传给 NullSocketServerss_-SetMessageQueue(this);
//设置线程的初始名字SetName(Thread, this); // default nameif (do_init) {DoInit();}
}void Thread::DoInit() {if (fInitialized_) {return;}fInitialized_ true;//把当前线程的this指针对象传给ThreadManagerThreadManager::Add(this);
}
//ThreadManager会把当前线程放到一个 message_queues_ 中统一管理
void ThreadManager::AddInternal(Thread* message_queue) {CritScope cs(crit_);// Prevent changes while the list of message queues is processed.RTC_DCHECK_EQ(processing_, 0);message_queues_.push_back(message_queue);
}
引入了一个新的对象 ThreadManager
//ThreadManager是线程管理类是一个单例
//保存创建的所有线程对象
class RTC_EXPORT ThreadManager {// Singleton, constructor and destructor are private.static ThreadManager* Instance();//保存线程的消息队列其实是个vector不是queue。//很多服务都喜欢用vector代替queuesrs也是把vector当queue用// This list contains all live Threads.std::vectorThread* message_queues_ RTC_GUARDED_BY(crit_);}
//创建单例 ThreadManager饿汉模式
ThreadManager* ThreadManager::Instance() {static ThreadManager* const thread_manager new ThreadManager();return thread_manager;
}
//把线程指针加入到消息队列中
void ThreadManager::Add(Thread* message_queue) {return Instance()-AddInternal(message_queue);
}
void ThreadManager::AddInternal(Thread* message_queue) {CritScope cs(crit_);// Prevent changes while the list of message queues is processed.RTC_DCHECK_EQ(processing_, 0);message_queues_.push_back(message_queue);
}
线程创建过程
线程的Start()函数才是真正创建线程的地方只看android即linux端。
具体的实现是用的pthread而没有用标准的std::thread
bool Thread::Start() {pthread_attr_t attr;pthread_attr_init(attr);//创建线程调用的是pthread_create//并传入线程函数 PreRunint error_code pthread_create(thread_, attr, PreRun, this);if (0 ! error_code) {RTC_LOG(LS_ERROR) Unable to create pthread, error error_code;thread_ 0;return false;}
}void* Thread::PreRun(void* pv) {Thread* thread static_castThread*(pv);ThreadManager::Instance()-SetCurrentThread(thread);rtc::SetCurrentThreadName(thread-name_.c_str());//调用一个Run()函数thread-Run();}void Thread::Run()
{
// Forever 模式一直轮训处理ProcessMessages(kForever);
}
//真正处理消息的函数下文会详细介绍
bool Thread::ProcessMessages(int cmsLoop)
{while (true) {Message msg;// Get()函数从消息队列中取消息 if (!Get(msg, cmsNext))return !IsQuitting();//取出消息后调用Dispatch()进行处理Dispatch(msg);if (cmsLoop ! kForever){cmsNext static_castint(TimeUntil(msEnd));if (cmsNext 0)return true;}} }向线程中投递消息 // |time_sensitive| is deprecated and should always be false.virtual void Post(const Location posted_from,//是从哪个函数向线程中投递消息MessageHandler* phandler,//消息处理的类一般是向线程抛消息的类的this指针当线程轮训到该消息时通过该this指针再回调对应的处理函数uint32_t id 0,//消息idMessageData* pdata nullptr, //消息体bool time_sensitive false);//废弃的参数virtual void PostDelayed(const Location posted_from, //支持向线程抛入延迟消息int delay_ms,MessageHandler* phandler,uint32_t id 0,MessageData* pdata nullptr);virtual void PostAt(const Location posted_from, int64_t run_at_ms,MessageHandler* phandler,uint32_t id 0,MessageData* pdata nullptr);// 看下Post的具体实现void Thread::Post(const Location posted_from,MessageHandler* phandler,uint32_t id,MessageData* pdata,bool time_sensitive) {RTC_DCHECK(!time_sensitive);if (IsQuitting()) {delete pdata;return;}// Keep thread safe// Add the message to the end of the queue// Signal for the multiplexer to return{//注意这个大括号哈//数据进队列加锁内部用的 pthread_mutex_lock(mutex_)//CritScope对 mutex_进行了封装构造函数加锁、析构函数解锁CritScope cs(crit_);Message msg;//构造消息体msg.posted_from posted_from;msg.phandler phandler;msg.message_id id;msg.pdata pdata;messages_.push_back(msg);}//CritScope退出作用区域后调用对应的析构函数解锁//即pthread_mutex_unlock(mutex_);函数//这种实现方式一方面缩小了锁的范围锁的范围仅仅局限于大括号内部而不是整个Post()函数//同时退出临界区后自动调用析构函数释放锁也避免了死锁的可能性//这个WakeUp* 函数是重点它会唤醒当前等待的线程WakeUpSocketServer();
}
//看一下 WakeUpSocketServer()的实现
//最终是通过 pthread_cond_broadcast()
//唤醒当前所有处于pthread_cond_wait()的线程void Thread::WakeUpSocketServer() {ss_-WakeUp();
}
void NullSocketServer::WakeUp() {event_.Set();
} void Event::Set() {pthread_mutex_lock(event_mutex_);event_status_ true;//广播唤醒所有处于 pthread_cond_wait()的线程pthread_cond_broadcast(event_cond_);pthread_mutex_unlock(event_mutex_);
}
从消息队列中取消息的具体实现
//消息处理是从 Thread 的Run()函数开始
void Thread::Run() {// KForever字段一直轮训取数据//没有数据时会 wait() 阻塞等待ProcessMessages(kForever);
}
bool Thread::ProcessMessages(int cmsLoop) {int cmsNext cmsLoop;while (true) {Message msg;//从消息队列中取消//取出来后交给 Dispatch()进行处理if (!Get(msg, cmsNext))return !IsQuitting();Dispatch(msg);if (cmsLoop ! kForever) {cmsNext static_castint(TimeUntil(msEnd));if (cmsNext 0)return true;}}
}
//取消息的过程
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {// Return and clear peek if present// Always return the peek if it exists so there is Peek/Get symmetryif (fPeekKeep_) {*pmsg msgPeek_;fPeekKeep_ false;return true;}// Get w/wait timer scan / dispatch socket / event multiplexer dispatchint64_t cmsTotal cmsWait;int64_t cmsElapsed 0;int64_t msStart TimeMillis();int64_t msCurrent msStart;while (true) {// Check for posted eventsint64_t cmsDelayNext kForever; //一直训练bool first_pass true;//具体实现是两层while(true)。内部的while负责取消息//取不到时外部while负责wait()阻塞等待while (true) {// All queue operations need to be locked, but nothing else in this loop// (specifically handling disposed message) can happen inside the crit.// Otherwise, disposed MessageHandlers will cause deadlocks.{//和像线程中投递消息类似取消息时也先加锁CritScope cs(crit_);// On the first pass, check for delayed messages that have been// triggered and calculate the next trigger time.if (first_pass) {//线程被唤醒后只从延时队列中取一次//并且这一次会把所有到时需要处理的延时消息取完//取出的延时消息放到messages_队列和普通消息一样进行处理first_pass false;while (!delayed_messages_.empty()) {//当前时间小于延时队列中第一条消息时间//说明还没有到需要处理延时消息的时间,if (msCurrent delayed_messages_.top().run_time_ms_) {//cmsDelayNext计算出需要等待的时间,//也是后面线程wait()时需要等待的最大时间//因为到了这个时间即便没有普通消息到来//延时队列中的消息也到时间需要处理了cmsDelayNext TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);break;}//把到时需要处理的延时消息放到普通队列中一起处理messages_.push_back(delayed_messages_.top().msg_);//延时消息出队列delayed_messages_.pop();}}// Pull a message off the message queue, if available.if (messages_.empty()) {break;} else {//真正获得需要处理消息的地方*pmsg messages_.front();messages_.pop_front();}} // crit_ is released here.// If this was a dispose message, delete it and skip it.//如果是dispose废除的消息就会删除//然后 continue()继续去取if (MQID_DISPOSE pmsg-message_id) {RTC_DCHECK(nullptr pmsg-phandler);delete pmsg-pdata;*pmsg Message();continue;}//如果是需要处理的消息就return退出当前 Get()函数//进行后面的Disptch()处理return true;}if (IsQuitting())break;// Which is shorter, the delay wait or the asked wait?int64_t cmsNext;if (cmsWait kForever) {cmsNext cmsDelayNext;} else {cmsNext std::maxint64_t(0, cmsTotal - cmsElapsed);if ((cmsDelayNext ! kForever) (cmsDelayNext cmsNext))cmsNext cmsDelayNext;}// 如果延时消息队列和普通的消息队列中都没有消息//内部while(true)会调用 break退出//然后就调用到这里,因为我们是 KForever一直轮训模式//所以当队列中没有消息时防止一直遍历查询,//会通过wait()挂起当前线程让出时间片{// Wait and multiplex in the meantime//内部调用的是 pthread_cond_wait//并且在wait()时也加了锁if (!ss_-Wait(static_castint(cmsNext), process_io))return false;}// If the specified timeout expired, returnmsCurrent TimeMillis();cmsElapsed TimeDiff(msCurrent, msStart);if (cmsWait ! kForever) {if (cmsElapsed cmsWait)return false;}}return false;
}
处理线程消息
从消息队列中Get(获取消息后会调用 Dispatch()处理消息。具体实现就是回调向线程中抛消息的类的OnMessage(pmg)函数然后进行具体消息的处理 void Thread::Dispatch(Message* pmsg) {TRACE_EVENT2(webrtc, Thread::Dispatch, src_file,pmsg-posted_from.file_name(), src_func,pmsg-posted_from.function_name());int64_t start_time TimeMillis();//回调对应OnMessage(pmsg)函数进行消息处理pmsg-phandler-OnMessage(pmsg);int64_t end_time TimeMillis();int64_t diff TimeDiff(end_time, start_time);if (diff kSlowDispatchLoggingThreshold) {RTC_LOG(LS_INFO) Message took diff ms to dispatch. Posted from: pmsg-posted_from.ToString();}
}
而我们的处理类继承 rtc::MessageHandler并实现了 OnMessage()函数就可以基于对应的MessageID类型处理不同的消息了 CVideoThread::OnMessage(rtc::Message* msg) {switch case:Message_id:handlerMessage();case VIDEO_INFO: //假如向线程中传入了 MessageData//在线程回调时会把这个消息体带出来方便我们处理if(msg-pdata){rtc::TypedMessageDataVideoMessageData* data static_castrtc::TypedMessageDataVideoMessageData*(msg-pdata);string message data-data();delete data;data nullptr;}default:break;
}
以上就是webrtc的线程模块了下一篇会介绍webrtc的 TaskQueue 任务队列
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/83819.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!