WebRTC程式碼學習之執行緒管理

語言: CN / TW / HK

目的

  • 通過原始碼熟悉整個WebRTC流程
  • 通過原始碼學習WebRTC中涉及到的網路、音視訊技術
  • 閱讀和學習優秀的C++程式碼(WebRTC),從中學習,提煉出實用的技巧、思想、程式碼

WebRTC的執行緒管理

為什麼是從執行緒開始切入整個WebRTC原始碼?相信只要對WebRTC有一定的瞭解的都清楚WebRTC內部有著自己的一套執行緒管理機制,WebRTC通過這套執行緒管理機制,非常簡單就達到了多執行緒安全編碼的目的,並且給每個執行緒劃分屬於自己的職責,方便後續維護、閱讀程式碼 (當然,WebRTC的執行緒管理和Chromium、Flutter都非常相似),如果你不瞭解WebRTC的這套執行緒管理機制,閱讀WebRTC程式碼會很懵逼,又因為執行緒管理並不會涉及到一些專業性知識,非常適合作為切入WebRTC原始碼的起點。

WebRTC程式碼邏輯主要通過三個執行緒管理(這裡不介紹一些編解碼執行緒):

  • network_thread: 網路執行緒,一切涉及到耗時的網路操作都在這個執行緒處理
  • worker_thread: 工作者執行緒,主要負責邏輯處理,比如一些初始化程式碼,還有比如在網路執行緒接收到資料然後會傳遞給工作者執行緒進行一些資料處理然後傳給解碼器執行緒
  • signal_thread: 信令執行緒,信令執行緒通常都是工作在PeerConnect層的,也就是我們絕大部分者呼叫的API都必須在信令執行緒,比如AddCandidate、CreateOffer等,WebRTC為了讓絕大部分API都執行在信令執行緒,還專門做了一層Proxy層,強制將API的呼叫分配到信令執行緒(後面如果有機會,可以分析以下WebRTC的Proxy層實現原理)

image.png

WebRTC執行緒之間的任務投遞

WebRTC執行緒之間的任務(這裡的任務主要指的是函式)投遞主要有兩種方式

  • 同步Invoke機制,通過這個機制可以將任務指定到某個執行緒執行,呼叫Invoke API的執行緒將會同步等待任務執行完成
  • 非同步Post機制,通過這個機制也可以將任務指定到某個執行緒執行,但是呼叫PostTask API的執行緒不會同步等待

Invoke機制,程式碼如下: c++ // 比如NeedsIceRestart函式是在工作者執行緒被呼叫,那麼network_thread()->Invoke將會將 // lambda匿名函式從工作者執行緒派遣到網路執行緒,並等待執行完成 bool PeerConnection::NeedsIceRestart(const std::string& content_name) const { return network_thread()->Invoke<bool>(RTC_FROM_HERE, [this, &content_name] { RTC_DCHECK_RUN_ON(network_thread()); return transport_controller_->NeedsIceRestart(content_name); }); }

PostTask機制,程式碼如下: c++ // 同Invoke機制不同的是,呼叫完PostTask之後不用等待任務執行完成 void EmulatedNetworkManager::EnableEndpoint(EmulatedEndpointImpl* endpoint) { network_thread_->PostTask(RTC_FROM_HERE, [this, endpoint]() { endpoint->Enable(); UpdateNetworksOnce(); }); }

WebRTC執行緒實現細節分析 - Thread

注意:原始碼版本 M92

執行緒啟動流程

先從WebRTC的信令、工作者、網路執行緒的建立開始

file://src/pc_connection_context.cc:81 ```c++ ConnectionContext::ConnectionContext( PeerConnectionFactoryDependencies* dependencies) : network_thread_(MaybeStartThread(dependencies->network_thread, "pc_network_thread", true, owned_network_thread_)), worker_thread_(MaybeStartThread(dependencies->worker_thread, "pc_worker_thread", false, owned_worker_thread_)), signaling_thread_(MaybeWrapThread(dependencies->signaling_thread, wraps_current_thread_)) {

} ``` 通過MabeStartThread函式初始化了工作者、網路執行緒,信令執行緒比較特殊一點,是由於信令執行緒可以直接託管程序中的主執行緒(準確來說應該是當前呼叫執行緒),所以呼叫的函式是MaybeWrapThread

MaybeStartThread

file://src/pc_connection_context.cc:27 c++ rtc::Thread* MaybeStartThread(rtc::Thread* old_thread, const std::string& thread_name, bool with_socket_server, std::unique_ptr<rtc::Thread>& thread_holder) { if (old_thread) { return old_thread; } if (with_socket_server) { thread_holder = rtc::Thread::CreateWithSocketServer(); } else { thread_holder = rtc::Thread::Create(); } thread_holder->SetName(thread_name, nullptr); thread_holder->Start(); return thread_holder.get(); } 暫時忽略with_socket_server,後面會說明CreateWithSocketServer,MaybeStartThread整體流程

  1. old_thread如果不為空直接返回,由於WebRTC的這三個執行緒都是可以由外部自定義的,所以如果外部有傳入自定義執行緒,後續執行緒建立操作將不會進行
  2. 呼叫rtc::Thread::Create
  3. 呼叫rtc::Thread::SetName
  4. 呼叫rtc::Thread::Start
  5. 執行緒啟動完成

MaybeWrapThread

file://src/pc_connection_context.cc:44 c++ rtc::Thread* MaybeWrapThread(rtc::Thread* signaling_thread, bool& wraps_current_thread) { wraps_current_thread = false; if (signaling_thread) { return signaling_thread; } auto this_thread = rtc::Thread::Current(); if (!this_thread) { // If this thread isn't already wrapped by an rtc::Thread, create a // wrapper and own it in this class. this_thread = rtc::ThreadManager::Instance()->WrapCurrentThread(); wraps_current_thread = true; } return this_thread; } 如果外部沒有傳入signaling_thread,內部將會獲取當前執行緒作為signaling_thread

rtc::Thread::Start流程

  1. 呼叫ThreadManager::Instance() 初始化ThreadManager物件
  2. windows上呼叫CreateThread,linux呼叫pthread_create建立執行緒
  3. 進入執行緒處理函式Thread::PreRun
  4. 呼叫Thread::Run函式
  5. Thread::Run函式呼叫ProcessMessage函式

ProcessMessage

file://src/rtc_base/thread.cc:1132

```c++ bool Thread::ProcessMessages(int cmsLoop) { //... int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); int cmsNext = cmsLoop;

while (true) {

if defined(WEBRTC_MAC)

ScopedAutoReleasePool pool;

endif

Message msg;
if (!Get(&msg, cmsNext))
  return !IsQuitting();
Dispatch(&msg);

if (cmsLoop != kForever) {
  cmsNext = static_cast<int>(TimeUntil(msEnd));
  if (cmsNext < 0)
    return true;
}

} } ``` 主要邏輯如下:函式通過一個while迴圈處理訊息,每次迴圈都會通過Get獲取一個可用的Message,然後呼叫Dispatch派遣獲取到的Message,兩個主要函式Dispatch、Get。到這裡整個WebRTC執行緒的初始化和啟動流程就介紹完了

訊息獲取、派遣、投遞分析

上面的ProcessMessages,可以把它當成一個訊息迴圈,迴圈中每次都會通過Get函式去獲取訊息

Get (訊息獲取)

file://src/rtc_base/thread.cc:472

```c++ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { // ......

// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch

int64_t cmsTotal = cmsWait; int64_t cmsElapsed = 0; int64_t msStart = TimeMillis(); int64_t msCurrent = msStart; while (true) { // Check for posted events int64_t cmsDelayNext = kForever; bool first_pass = true; while (true) { // All queue operations need to be locked, but nothing else in this loop // (specifically handling disposed message) can happen inside the crit. // Otherwise, disposed MessageHandlers will cause deadlocks. { CritScope cs(&crit_); // On the first pass, check for delayed messages that have been // triggered and calculate the next trigger time. if (first_pass) { first_pass = false; while (!delayed_messages_.empty()) { if (msCurrent < delayed_messages_.top().run_time_ms_) { cmsDelayNext = TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent); break; } messages_.push_back(delayed_messages_.top().msg_); delayed_messages_.pop(); } } // Pull a message off the message queue, if available. if (messages_.empty()) { break; } else { *pmsg = messages_.front(); messages_.pop_front(); } } // crit_ is released here.

  // If this was a dispose message, delete it and skip it.
  if (MQID_DISPOSE == pmsg->message_id) {
    RTC_DCHECK(nullptr == pmsg->phandler);
    delete pmsg->pdata;
    *pmsg = Message();
    continue;
  }
  return true;
}

if (IsQuitting())
  break;

// Which is shorter, the delay wait or the asked wait?

int64_t cmsNext;
if (cmsWait == kForever) {
  cmsNext = cmsDelayNext;
} else {
  cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
  if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
    cmsNext = cmsDelayNext;
}

{
  // Wait and multiplex in the meantime
  if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
    return false;
}

// If the specified timeout expired, return

msCurrent = TimeMillis();
cmsElapsed = TimeDiff(msCurrent, msStart);
if (cmsWait != kForever) {
  if (cmsElapsed >= cmsWait)
    return false;
}

} return false; } ``` 核心是通過一個迴圈來獲取一個有效的訊息,迴圈會在Get成功、失敗或者外部呼叫了Stop停止了執行緒時結束。

訊息的獲取機制

  • 嘗試獲取延遲訊息,延遲訊息列表使用優先順序列隊儲存,如果延遲訊息到達執行時間,延遲訊息將會從訊息訊息優先順序隊列出列,並將延遲訊息加入可執行訊息佇列
  • 判斷可執行訊息佇列是否存在訊息,如果存在從佇列頭部取出一個訊息返回給外部
  • 如果可執行訊息佇列為空,進行Wait操作,等待訊息到來觸發WakeUp,這裡的Wait和WakeUp使用的是SocketServer物件,後面專門分析SocketServer的Wait和wakeUp原理

可能在一開始看程式碼會對獲取可用延遲訊息產生疑問,為什麼只判斷延遲訊息佇列的第一個元素的執行時間有沒有到達,難道佇列後面的訊息不會有比這個頂部訊息的執行時間更小的嗎? c++ while (!delayed_messages_.empty()) { if (msCurrent < delayed_messages_.top().run_time_ms_) { cmsDelayNext = TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent); break; } messages_.push_back(delayed_messages_.top().msg_); delayed_messages_.pop(); } 進一步檢視delayed_messages_的定義PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); ```c++ // DelayedMessage goes into a priority queue, sorted by trigger time. Messages // with the same trigger time are processed in num_ (FIFO) order. class DelayedMessage { public: DelayedMessage(int64_t delay, int64_t run_time_ms, uint32_t num, const Message& msg) : delay_ms_(delay), run_time_ms_(run_time_ms), message_number_(num), msg_(msg) {}

bool operator<(const DelayedMessage& dmsg) const {
  return (dmsg.run_time_ms_ < run_time_ms_) ||
         ((dmsg.run_time_ms_ == run_time_ms_) &&
          (dmsg.message_number_ < message_number_));
}

int64_t delay_ms_;  // for debugging
int64_t run_time_ms_;
// Monotonicaly incrementing number used for ordering of messages
// targeted to execute at the same time.
uint32_t message_number_;
Message msg_;

};

class PriorityQueue : public std::priority_queue { public: container_type& container() { return c; } void reheap() { make_heap(c.begin(), c.end(), comp); } }; ``` 延遲訊息佇列其實就是一個大項堆的優先順序訊息佇列,也就是使用降序排序,DelayedMessage的大小比較是通過run_time_ms_引數,如果run_time_ms_越小其實DelayedMessage越大,如果run_time_ms_ 相等就使用message_number來比較,通俗說就是延遲時間越小在佇列中越靠前。

Message介紹

在介紹訊息派遣處理之前需要先弄清楚Message

file://src/rtc_base/thread_message.h

c++ struct Message { Message() : phandler(nullptr), message_id(0), pdata(nullptr) {} inline bool Match(MessageHandler* handler, uint32_t id) const { return (handler == nullptr || handler == phandler) && (id == MQID_ANY || id == message_id); } Location posted_from; MessageHandler* phandler; uint32_t message_id; MessageData* pdata; }; 主要看兩個資料phander和pdata,對應類如下 ```c++ class RTC_EXPORT MessageHandler { public: virtual ~MessageHandler() {} virtual void OnMessage(Message* msg) = 0; };

class MessageData { public: MessageData() {} virtual ~MessageData() {} }; ```

兩個虛基類,MesageData用來儲存訊息的內容,MesageHandler用來處理訊息,使用者可以自定義屬於自己的MessageHanlder和MessageData,比如我們自定義一個自己的MessageData如下: ```c++ // 定義了一個自己的MyMessageTask,其中儲存了一個function,並且對外提供了一個Run方法 template class MyMessageTask final : public MessageData { public: explicit MessageWithFunctor(FunctorT&& functor) : functor_(std::forward(functor)) {} void Run() { functor_(); }

private: ~MessageWithFunctor() override {}

typename std::remove_reference::type functor_; }; 在自己定義一個MessageHandler用來處理訊息c++ // OnMessage函式會在派遣訊息的時候被呼叫,裡面的msg存放著一個MessageData物件,這個MessageData物件就是我們自定義的MyMessageTask,獲取到這個物件直接呼叫我們剛剛寫好的Run函式執行。 class MyMessageHandlerWithTask : public MessageHandler { public: void OnMessage(Message msg) overrider { static_cast(msg->pdata)->Run(); delete msg->pdata; } } 上面我們定義了一個handler和data,主要用來在收到派遣過來的訊息時通過handler處理訊息,來看看如何使用我們自定義的handler和data吧 // Thread::Post原型 virtual void Post(const Location& posted_from, MessageHandler phandler, uint32_t id = 0, MessageData pdata = nullptr, bool time_sensitive = false); // 注意看Post函式裡面有需要我們傳入MessageHandler和MessageData,我們只需要將自定義 // 的MessageHandler和MessageData傳入即可 static MyMessageHandlerWithTask myhandler = new MyMessageHandlerWithTask; MyMessageTask* mytask = new MyMessageTask( {int c = a+b;}); Post(FROME_HERE, myhandler, 0, mytask); ``` 執行完上面的Post,MyMessageTask裡面的匿名函式將被執行

Dispatch (訊息派遣)

介紹完Message,就可以看看Dispatch是如何將訊息派遣到MessageHandler去處理的

file://src/rtc_base/thread.cc

c++ void Thread::Dispatch(Message* pmsg) { TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file", pmsg->posted_from.file_name(), "src_func", pmsg->posted_from.function_name()); RTC_DCHECK_RUN_ON(this); int64_t start_time = TimeMillis(); pmsg->phandler->OnMessage(pmsg); int64_t end_time = TimeMillis(); int64_t diff = TimeDiff(end_time, start_time); if (diff >= dispatch_warning_ms_) { RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff << "ms to dispatch. Posted from: " << pmsg->posted_from.ToString(); // To avoid log spew, move the warning limit to only give warning // for delays that are larger than the one observed. dispatch_warning_ms_ = diff + 1; } }

Dispatch函式非常簡單,抓住重點就是呼叫了傳入的Message的OnMessage,將訊息傳遞給MessageHandler去處理

訊息的投遞

前面有看了訊息獲取的實現原理,如果沒有訊息將會呼叫Wait進行等待,既然有Wait,那麼肯定就有地方觸發WaitUp,沒錯,就是在外部投遞訊息的時候會觸發WaitUp, 在 WebRTC執行緒之間的任務投遞中有介紹了兩種方式,一種同步Invoke,一種非同步Post

file://src/rtc_base/thread.h:449

c++ template <class FunctorT> void PostTask(const Location& posted_from, FunctorT&& functor) { Post(posted_from, GetPostTaskMessageHandler(), /*id=*/0, new rtc_thread_internal::MessageWithFunctor<FunctorT>( std::forward<FunctorT>(functor))); } PostTask核心還是呼叫了Post函式,並且傳入了屬於自己的MessageData和MessageHandler

file://src/rtc_base/thread.cc:563 ```c++ void Thread::Post(const Location& posted_from, MessageHandler phandler, uint32_t id, MessageData pdata, bool time_sensitive) { RTC_DCHECK(!time_sensitive); if (IsQuitting()) { delete pdata; return; }

// Keep thread safe // Add the message to the end of the queue // Signal for the multiplexer to return

{ CritScope cs(&crit_); Message msg; msg.posted_from = posted_from; msg.phandler = phandler; msg.message_id = id; msg.pdata = pdata; messages_.push_back(msg); } WakeUpSocketServer(); }

void Thread::WakeUpSocketServer() { ss_->WakeUp(); } ```

Post函式實現非常簡單清晰,構造一個Message新增到佇列,然後呼叫ss_->WakeUp()喚醒Wait,ss_是一個SocketServer物件,後面在分析, 先看同步Invoke

file://src/rtc_base/thread.h:388

```c++ template < class ReturnT, typename = typename std::enable_if<!std::is_void::value>::type> ReturnT Invoke(const Location& posted_from, FunctionView functor) { ReturnT result; InvokeInternal(posted_from, [functor, &result] { result = functor(); }); return result; }

template < class ReturnT, typename = typename std::enable_if::value>::type> void Invoke(const Location& posted_from, FunctionView functor) { InvokeInternal(posted_from, functor); } ``` 兩個過載函式一個有返回結果,一個沒有,內部都呼叫InvokeInternal完成,InvokeInternal緊接著呼叫了Send函式

file://src/rtc_base/thread.cc:914

```c++ void Thread::Send(const Location& posted_from, MessageHandler phandler, uint32_t id, MessageData pdata) { RTC_DCHECK(!IsQuitting()); if (IsQuitting()) return;

// Sent messages are sent to the MessageHandler directly, in the context // of "thread", like Win32 SendMessage. If in the right context, // call the handler directly. Message msg; msg.posted_from = posted_from; msg.phandler = phandler; msg.message_id = id; msg.pdata = pdata; if (IsCurrent()) {

if RTC_DCHECK_IS_ON

RTC_DCHECK_RUN_ON(this);
could_be_blocking_call_count_++;

endif

msg.phandler->OnMessage(&msg);
return;

}

AssertBlockingIsAllowedOnCurrentThread();

Thread* current_thread = Thread::Current();

if RTC_DCHECK_IS_ON

if (current_thread) { RTC_DCHECK_RUN_ON(current_thread); current_thread->blocking_call_count_++; RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, this); }

endif

// Perhaps down the line we can get rid of this workaround and always require // current_thread to be valid when Send() is called. std::unique_ptr done_event; if (!current_thread) done_event.reset(new rtc::Event());

bool ready = false; PostTask(webrtc::ToQueuedTask( &msg mutable { msg.phandler->OnMessage(&msg); }, [this, &ready, current_thread, done = done_event.get()] { if (current_thread) { CritScope cs(&crit_); ready = true; current_thread->socketserver()->WakeUp(); } else { done->Set(); } }));

if (current_thread) { bool waited = false; crit_.Enter(); while (!ready) { crit_.Leave(); current_thread->socketserver()->Wait(kForever, false); waited = true; crit_.Enter(); } crit_.Leave();

// Our Wait loop above may have consumed some WakeUp events for this
// Thread, that weren't relevant to this Send.  Losing these WakeUps can
// cause problems for some SocketServers.
//
// Concrete example:
// Win32SocketServer on thread A calls Send on thread B.  While processing
// the message, thread B Posts a message to A.  We consume the wakeup for
// that Post while waiting for the Send to complete, which means that when
// we exit this loop, we need to issue another WakeUp, or else the Posted
// message won't be processed in a timely manner.

if (waited) {
  current_thread->socketserver()->WakeUp();
}

} else { done_event->Wait(rtc::Event::kForever); } } ```

Send函式的程式碼比較多,不過整體思路還是很清晰

  • 如果呼叫Send的執行緒就是Send所擁有的當前執行緒,直接執行Message中的OnMessage,不需要任務派遣
  • 不在同一個執行緒,呼叫PostTask將訊息傳遞對應執行緒,這裡讀者可能會有一個疑問這個PostTask中的任務被派遣到什麼執行緒了,如果你有一個Thread物件workerThread,你現在再main執行緒中呼叫workerThread.PostTask,這個任務將會被投遞到你建立的Thread物件管理的的執行緒中,也就是workerThread中。
  • 任務被PostTask到對應執行緒中之後,存在兩種情況,再函式執行之前或者之後,執行緒已經釋放
  • 如果執行緒已經釋放,僅僅等待一個函式執行完成的Event訊號
  • 執行緒還存在,等待訊息執行完成,執行完成之後再呼叫一次WakeUp,註釋中也非常詳細的解釋了為什麼需要再執行完成之後再呼叫一次WakeUp,原因就是再while(!ready) {... current_thread->socketserver()->Wait()}中可能會消費掉一些外部觸發的WakeUp事件,如果在執行完成之後不呼叫一次WakeUp可能導致外部新Post的訊息無法被即時消費

訊息投遞、派遣、獲取狀態轉移圖

Screen Shot 2022-06-30 at 21.35.19.png

為了更加清楚的瞭解WebRTC的訊息投遞、派遣、獲取機制,我自己定義了4種狀態,方便理解

  • Idel狀態:通過呼叫Start,並且還沒有呼叫Get函式前
  • Wait狀態:通過呼叫Get函式,將Idel狀態轉換成Wait狀態
  • Ready狀態:通過呼叫Post狀態從而觸發Waitup,將Wait狀態轉換成Ready狀態
  • Running狀態:通過呼叫Dispatch進行訊息的處理,轉換成Running狀態

Current實現機制

提出疑問點:如果我想要在程式碼任意位置獲取當前執行緒的Thread物件,要怎麼做?單例?

看看WebRTC Thread的Current函式原型:

c++ class Thread { public: //...... static Thread* Current(); } 當我們線上程A呼叫Thread::Current將會獲得一個執行緒A的Thread物件,線上程B呼叫Thread::Current將會獲取一個執行緒B的Thread物件, 來看看內部實現

```c++ // static Thread Thread::Current() { ThreadManager manager = ThreadManager::Instance(); Thread* thread = manager->CurrentThread();

ifndef NO_MAIN_THREAD_WRAPPING

// Only autowrap the thread which instantiated the ThreadManager. if (!thread && manager->IsMainThread()) { thread = new Thread(CreateDefaultSocketServer()); thread->WrapCurrentWithThreadManager(manager, true); }

endif

return thread; } ```

核心實現都在ThreadManager中,ThreadManager是針對WebRTC Thread提供的一個管理類,裡面會存放所有外部建立的Thread

c++ Thread* ThreadManager::CurrentThread() { return static_cast<Thread*>(TlsGetValue(key_)); }

ThreadManager::CurrentThread實現很簡單,通過TlsGetValue獲取了私有變數key_,那這個key_肯定有Set操作,沒錯,這個key_的Set操作,是在Thread的建構函式中進行的 Thraed() -> DoInit() -> ThreadManager::SetCurrentThread -> ThreadManager::SetCurrentThreadInternal

c++ void ThreadManager::SetCurrentThreadInternal(Thread* thread) { TlsSetValue(key_, thread); }

TlsSetValue和TlsGetValue是什麼意思? 這裡涉及到了一個知識點,也就是TLS

TLS介紹

TLS全稱是Thread Local Storage 執行緒區域性變數或者執行緒私有變數,私有的意思是每個執行緒都將獨自擁有這個變數

  • 在Windows中採用TlsAlloc獲取程序中一個未使用的TLS slot index,使用TlsSetValue進行值的設定,TlsGetValue進行值的獲取
  • 在linux中採用pthread_key_create、pthread_getspecific、pthread_setspecific對TLS進行操作
  • C++11中採用thread_local

詳細連結:

https://www.notion.so/TLS-78870a02d5c94fce8159c252f64b0125#2f95630986d444d8ae2685652c67eb30

https://www.notion.so/TLS-78870a02d5c94fce8159c252f64b0125#48c012ecbd064a2ab4b227a14b5a8ca3

迴歸Current函式實現,它就是藉助了TLS技術得以實現在不同執行緒儲存屬於自己的私有變數(這個私有變數就是Thread),然後再對應執行緒呼叫Current獲取到的Thread也就是當前執行緒的了

WebRTC執行緒Proxy機制

前面有提到,WebRTC對外暴露的API比如PeerConnectionInterface在內部都一層代理機制,來確保每一個API呼叫在正確的執行緒,先看PeerConnectiontProxy

file://src/api/peer_connection_proxy.h

```c++ BEGIN_PROXY_MAP(PeerConnection) PROXY_PRIMARY_THREAD_DESTRUCTOR() PROXY_METHOD0(rtc::scoped_refptr, local_streams) PROXY_METHOD0(rtc::scoped_refptr, remote_streams) PROXY_METHOD1(bool, AddStream, MediaStreamInterface) PROXY_METHOD1(void, RemoveStream, MediaStreamInterface) PROXY_METHOD2(RTCErrorOr>, AddTrack, rtc::scoped_refptr, const std::vector&) // ......

// This method will be invoked on the network thread. See // PeerConnectionFactory::CreatePeerConnectionOrError for more details. PROXY_SECONDARY_METHOD1(rtc::scoped_refptr, LookupDtlsTransportByMid, const std::string&) // This method will be invoked on the network thread. See // PeerConnectionFactory::CreatePeerConnectionOrError for more details. PROXY_SECONDARY_CONSTMETHOD0(rtc::scoped_refptr, GetSctpTransport)
```

上面的一堆巨集,會生成一個PeerConnectionProxyWithInternal類,我們主要看三個巨集 BEGIN_PROXY_MAP、PROXY_METHOD0、PROXY_SECONDARY_METHOD1

BEGIN_PROXY_MAP

```c++ #define BEGIN_PROXY_MAP(c) \ PROXY_MAP_BOILERPLATE(c) \ SECONDARY_PROXY_MAP_BOILERPLATE(c) \ REFCOUNTED_PROXY_MAP_BOILERPLATE(c) \ public: \ static rtc::scoped_refptr Create( \ rtc::Thread primary_thread, rtc::Thread secondary_thread, \ INTERNAL_CLASS* c) { \ return rtc::make_ref_counted(primary_thread, \ secondary_thread, c); \ }

// Helper macros to reduce code duplication. #define PROXY_MAP_BOILERPLATE(c) \ template \ class c##ProxyWithInternal; \ typedef c##ProxyWithInternal c##Proxy; \ template \ class c##ProxyWithInternal : public c##Interface { \ protected: \ typedef c##Interface C; \ \ public: \ const INTERNAL_CLASS internal() const { return c_; } \ INTERNAL_CLASS internal() { return c_; } ```

看重點, 第一typedef c##ProxyWithInternal c##Proxy;, 也就是外部使用的類名採用PeerConnectionProxy, c##ProxyWithInternal: public c##Interface,也就是繼承自PeerConnectionInterface類,也就是我們在外部拿到的PeerConnect指標物件,其實是PeerConnectionProxyWithInternal物件, 重點2 , Create函式,這個Create函式會在什麼時候呼叫,並且primary_thread和secondary_thread分別對應著什麼執行緒,看下面程式碼

```c++ RTCErrorOr> PeerConnectionFactory::CreatePeerConnectionOrError( const PeerConnectionInterface::RTCConfiguration& configuration, PeerConnectionDependencies dependencies) {

rtc::scoped_refptr result_proxy = PeerConnectionProxy::Create(signaling_thread(), network_thread(), result.MoveValue()); return result_proxy; } ``` 通過上面的程式碼可以確定,在PeerConnectionProxy類中primary_thread對應的就是signaling_thread,secondary_thread執行緒就是network_thread執行緒

PROXY_METHOD0

c++ #define PROXY_METHOD0(r, method) \ r method() override { \ MethodCall<C, r> call(c_, &C::method); \ return call.Marshal(RTC_FROM_HERE, primary_thread_); \ } 建立MethodCall類,並呼叫Marshal,注意呼叫Marshal傳入的引數primary_thread_ ,在PeerConnectionProxy中也就是,signaling_thread

PROXY_SECONDARY_METHOD1

c++ #define PROXY_SECONDARY_METHOD1(r, method, t1) \ r method(t1 a1) override { \ MethodCall<C, r, t1> call(c_, &C::method, std::move(a1)); \ return call.Marshal(RTC_FROM_HERE, secondary_thread_); \ } 與PROXY_METHOD不同的是在呼叫Marshal時傳入的是secondary_thread_,在PeerConnectionProxy也就是network_thread

MethodCall

```c++ template class MethodCall : public QueuedTask { public: typedef R (C::Method)(Args...); MethodCall(C c, Method m, Args&&... args) : c_(c), m_(m), args_(std::forward_as_tuple(std::forward(args)...)) {}

R Marshal(const rtc::Location& posted_from, rtc::Thread* t) { if (t->IsCurrent()) { Invoke(std::index_sequence_for()); } else { t->PostTask(std::unique_ptr(this)); event_.Wait(rtc::Event::kForever); } return r_.moved_result(); }

private: bool Run() override { Invoke(std::index_sequence_for()); event_.Set(); return false; }

template void Invoke(std::index_sequence) { r_.Invoke(c_, m_, std::move(std::get(args_))...); }

C* c_; Method m_; ReturnType r_; std::tuple args_; rtc::Event event_; }; ```

主要看Marshal函式,如果是在當前執行緒直接呼叫Invoke,否則呼叫PostTask將任務投遞到指定執行緒,並等待執行完成. 關於std::tuple 的使用可以檢視官方文件,上面的程式碼用到了兩個C++14的新特性 std::index_sequence_for和 std::get 來輔助tuple的使用