日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Node.js子線程調(diào)試和診斷指南

調(diào)試、診斷子線程最直接的方式就是像調(diào)試、診斷主線程一樣,但是無論是動(dòng)態(tài)開啟還是靜態(tài)開啟,子線程都不可避免地需要內(nèi)置一些相關(guān)的非業(yè)務(wù)代碼,本文介紹另外一種對(duì)子線程代碼無侵入的調(diào)試方式,另外也介紹一下通過子線程調(diào)試主線程的方式。

創(chuàng)新互聯(lián)建站專注于九江企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站建設(shè),商城網(wǎng)站建設(shè)。九江網(wǎng)站建設(shè)公司,為九江等地區(qū)提供建站服務(wù)。全流程按需網(wǎng)站設(shè)計(jì),專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)建站專業(yè)和態(tài)度為您提供的服務(wù)

1.初始化子線程的Inspector

在Node.js啟動(dòng)子線程的時(shí)候,會(huì)初始化Inspector。

 
 
 
 
  1. env_->InitializeInspector(std::move(inspector_parent_handle_));

在分析InitializeInspector之前,我們先看一下inspector_parent_handle_。

 
 
 
 
  1. std::unique_ptr inspector_parent_handle_;

inspector_parent_handle_是一個(gè)ParentInspectorHandle對(duì)象,這個(gè)對(duì)象是子線程和主線程通信的橋梁。我們看一下他的初始化邏輯(在主線程里執(zhí)行)。

 
 
 
 
  1. inspector_parent_handle_ = env->inspector_agent()->GetParentHandle(thread_id_, url);

調(diào)用agent的GetParentHandle獲取一個(gè)ParentInspectorHandle對(duì)象。

 
 
 
 
  1. std::unique_ptr Agent::GetParentHandle(int thread_id, const std::string& url) {
  2.  return client_->getWorkerManager()->NewParentHandle(thread_id, url);
  3. }

內(nèi)部其實(shí)是通過client_->getWorkerManager()對(duì)象的NewParentHandle方法獲取ParentInspectorHandle對(duì)象,接下來我們看一下WorkerManager的NewParentHandle。

 
 
 
 
  1. std::unique_ptr WorkerManager::NewParentHandle(int thread_id, const std::string& url) {
  2.   bool wait = !delegates_waiting_on_start_.empty();
  3.   return std::make_unique(thread_id, url, thread_, wait);
  4. }
  5. ParentInspectorHandle::ParentInspectorHandle(
  6.     int id, const std::string& url,
  7.     std::shared_ptr parent_thread, 
  8.     bool wait_for_connect
  9. )
  10.     : id_(id), 
  11.       url_(url), 
  12.       parent_thread_(parent_thread),
  13.       wait_(wait_for_connect) {}

最終的架構(gòu)圖如下入所示。

分析完P(guān)arentInspectorHandle后繼續(xù)看一下env_->InitializeInspector(std::move(inspector_parent_handle_))的邏輯(在子線程里執(zhí)行)。

 
 
 
 
  1. int Environment::InitializeInspector(
  2.     std::unique_ptr parent_handle) {
  3.   std::string inspector_path;
  4.   inspector_path = parent_handle->url();
  5.   inspector_agent_->SetParentHandle(std::move(parent_handle));
  6.   inspector_agent_->Start(inspector_path,
  7.                           options_->debug_options(),
  8.                           inspector_host_port(),
  9.                           is_main_thread());
  10. }

首先把ParentInspectorHandle對(duì)象保存到agent中,然后調(diào)用agent的Start方法。

 
 
 
 
  1. bool Agent::Start(...) {
  2.     // 新建client對(duì)象
  3.    client_ = std::make_shared(parent_env_, is_main);
  4.    // 調(diào)用agent中保存的ParentInspectorHandle對(duì)象的WorkerStarted
  5.    parent_handle_->WorkerStarted(client_->getThreadHandle(), ...);
  6. }

Agent::Start創(chuàng)建了一個(gè)client對(duì)象,然后調(diào)用ParentInspectorHandle對(duì)象的WorkerStarted方法(剛才SetParentHandle的時(shí)候保存的),我們看一下這時(shí)候的架構(gòu)圖。

接著看parent_handle_->WorkerStarted。

 
 
 
 
  1. void ParentInspectorHandle::WorkerStarted(
  2.     std::shared_ptr worker_thread, bool waiting) {
  3.   std::unique_ptr request(
  4.       new WorkerStartedRequest(id_, url_, worker_thread, waiting));
  5.   parent_thread_->Post(std::move(request));
  6. }

WorkerStarted創(chuàng)建了一個(gè)WorkerStartedRequest請(qǐng)求,然后通過parent_thread_->Post提交,parent_thread_是MainThreadInterface對(duì)象。

 
 
 
 
  1. void MainThreadInterface::Post(std::unique_ptr request) {
  2.   Mutex::ScopedLock scoped_lock(requests_lock_);
  3.   // 之前是空則需要喚醒消費(fèi)者
  4.   bool needs_notify = requests_.empty();
  5.   // 消息入隊(duì)
  6.   requests_.push_back(std::move(request));
  7.   if (needs_notify) {
  8.        // 獲取當(dāng)前對(duì)象的一個(gè)弱引用
  9.        std::weak_ptr* interface_ptr = new std::weak_ptr(shared_from_this());
  10.       // 請(qǐng)求V8執(zhí)行RequestInterrupt入?yún)?duì)應(yīng)的回調(diào)
  11.       isolate_->RequestInterrupt([](v8::Isolate* isolate, void* opaque) {
  12.         // 把執(zhí)行時(shí)傳入的參數(shù)轉(zhuǎn)成MainThreadInterface
  13.         std::unique_ptr> interface_ptr {
  14.           static_cast*>(opaque) 
  15.         };
  16.         // 判斷對(duì)象是否還有效,是則調(diào)用DispatchMessages
  17.         if (auto iface = interface_ptr->lock()) iface->DispatchMessages();
  18.       }, static_cast(interface_ptr));
  19.   }
  20.   // 喚醒消費(fèi)者
  21.   incoming_message_cond_.Broadcast(scoped_lock);
  22. }

我們看看這時(shí)候的架構(gòu)圖。

接著看回調(diào)里執(zhí)行MainThreadInterface對(duì)象DispatchMessages方法的邏輯。

 
 
 
 
  1. void MainThreadInterface::DispatchMessages() {
  2.   // 遍歷請(qǐng)求隊(duì)列
  3.   requests_.swap(dispatching_message_queue_);
  4.   while (!dispatching_message_queue_.empty()) {
  5.     MessageQueue::value_type task;
  6.     std::swap(dispatching_message_queue_.front(), task);
  7.     dispatching_message_queue_.pop_front();
  8.     // 執(zhí)行任務(wù)函數(shù)
  9.     task->Call(this);
  10.   }
  11. }

task是WorkerStartedRequest對(duì)象,看一下Call方法的代碼。

 
 
 
 
  1. void Call(MainThreadInterface* thread) override {
  2.   auto manager = thread->inspector_agent()->GetWorkerManager();
  3.   manager->WorkerStarted(id_, info_, waiting_);
  4. }

接著調(diào)用agent的WorkerManager的WorkerStarted。

 
 
 
 
  1. void WorkerManager::WorkerStarted(int session_id,
  2.                                   const WorkerInfo& info,
  3.                                   bool waiting) {
  4.   children_.emplace(session_id, info);
  5.   for (const auto& delegate : delegates_) {
  6.     Report(delegate.second, info, waiting);
  7.   }
  8. }

WorkerStarted記錄了一個(gè)id和上下文,因?yàn)閐elegates_初始化的時(shí)候是空的,所以不會(huì)執(zhí)行。至此,子線程Inspector初始化的邏輯就分析完了,結(jié)構(gòu)圖如下。

我們發(fā)現(xiàn),和主線程不一樣,主線程會(huì)啟動(dòng)一個(gè)WebSocket服務(wù)器接收客戶端的連接請(qǐng)求,而子線程只是初始化了一些數(shù)據(jù)結(jié)構(gòu)。下面我們看一下基于這些數(shù)據(jù)結(jié)構(gòu),主線程是如何動(dòng)態(tài)開啟調(diào)試子線程的。

2.主線程開啟調(diào)試子線程的能力

我們可以以以下方式開啟對(duì)子線程的調(diào)試。

 
 
 
 
  1. const { Worker, workerData } = require('worker_threads');
  2. const { Session } = require('inspector');
  3. // 新建一個(gè)新的通信通道
  4. const session = new Session();
  5. session.connect();
  6. // 創(chuàng)建子線程
  7. const worker = new Worker('./httpServer.js', {workerData: {port: 80}}); 
  8. // 子線程啟動(dòng)成功后開啟調(diào)試子線程的能力
  9. worker.on('online', () => {
  10.     session.post("NodeWorker.enable",
  11.                  {waitForDebuggerOnStart: false},  
  12.                  (err) => {  
  13.                     err && console.log("NodeWorker.enable", err);
  14.                  });
  15.     });
  16. // 防止主線程退出
  17. setInterval(() => {}, 100000);

我們先來分析一下connect函數(shù)的邏輯。

 
 
 
 
  1. connect() {
  2.     this[connectionSymbol] = new Connection((message) => this[onMessageSymbol](message));
  3. }

新建了一個(gè)Connection對(duì)象并傳入一個(gè)回調(diào)函數(shù),該回調(diào)函數(shù)在收到消息時(shí)被回調(diào)。Connection是C++層導(dǎo)出的對(duì)象,由模版類JSBindingsConnection實(shí)現(xiàn)。

 
 
 
 
  1. template 
  2. class JSBindingsConnection {}

我們看看導(dǎo)出的路邏輯。

 
 
 
 
  1. JSBindingsConnection::Bind(env, target);

接著看Bind。

 
 
 
 
  1. static void Bind(Environment* env, Local target) {
  2.     // class_name是Connection
  3.     Local class_name = ConnectionType::GetClassName(env);
  4.     Local tmpl = env->NewFunctionTemplate(JSBindingsConnection::New);
  5.     tmpl->InstanceTemplate()->SetInternalFieldCount(1);
  6.     tmpl->SetClassName(class_name);
  7.     tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env));
  8.     env->SetProtoMethod(tmpl, "dispatch", JSBindingsConnection::Dispatch);
  9.     env->SetProtoMethod(tmpl, "disconnect", JSBindingsConnection::Disconnect);
  10.     target->Set(env->context(),
  11.                 class_name,
  12.                 tmpl->GetFunction(env->context()).ToLocalChecked())
  13.         .ToChecked();
  14.   }
  15. 當(dāng)我們?cè)贘S層執(zhí)行new Connection的時(shí)候,就會(huì)執(zhí)行JSBindingsConnection::New。

     
     
     
     
    1. static void New(const FunctionCallbackInfo& info) {
    2.    Environment* env = Environment::GetCurrent(info);
    3.    Local callback = info[0].As();
    4.    new JSBindingsConnection(env, info.This(), callback);
    5. }

    我們看看新建一個(gè)JSBindingsConnection對(duì)象時(shí)的邏輯。

     
     
     
     
    1. JSBindingsConnection(Environment* env,
    2.                        Local wrap,
    3.                        Local callback)
    4.                        : AsyncWrap(env, wrap, PROVIDER_INSPECTORJSBINDING),
    5.                          callback_(env->isolate(), callback) {
    6.     Agent* inspector = env->inspector_agent();
    7.     session_ = LocalConnection::Connect(
    8.         inspector, std::make_unique(env, this)
    9.     );}static std::unique_ptr Connect(
    10.       Agent* inspector, 
    11.       std::unique_ptr delegate
    12. ) {
    13.     return inspector->Connect(std::move(delegate), false);
    14. }
    15. 最終是傳入了一個(gè)JSBindingsSessionDelegate對(duì)象調(diào)用Agent的Connect方法。

       
       
       
       
      1. std::unique_ptr Agent::Connect(
      2.     std::unique_ptr delegate,
      3.     bool prevent_shutdown) {
      4.   int session_id = client_->connectFrontend(std::move(delegate),
      5.                                             prevent_shutdown);
      6.   // JSBindingsConnection對(duì)象的session_字段指向的對(duì)象                                         
      7.   return std::unique_ptr(
      8.       new SameThreadInspectorSession(session_id, client_)
      9.   );
      10. }

      Agent的Connect方法繼續(xù)調(diào)用client_->connectFrontend。

       
       
       
       
      1. int connectFrontend(std::unique_ptr delegate,
      2.                       bool prevent_shutdown) {
      3.     int session_id = next_session_id_++;
      4.     channels_[session_id] = std::make_unique(env_,
      5.                                                           client_,
      6.                                                           getWorkerManager(),
      7.                                                           std::move(delegate),
      8.                                                           getThreadHandle(),
      9.                                                           prevent_shutdown);
      10.     return session_id;
      11. }

      connectFrontend新建了一個(gè)ChannelImpl對(duì)象,在新建ChannelImpl時(shí),會(huì)初始化子線程處理的邏輯。

       
       
       
       
      1. explicit ChannelImpl(Environment* env,
      2.                        const std::unique_ptr& inspector,
      3.                        std::shared_ptr worker_manager,
      4.                        std::unique_ptr delegate,
      5.                        std::shared_ptr main_thread_,
      6.                        bool prevent_shutdown)
      7.       : delegate_(std::move(delegate)), prevent_shutdown_(prevent_shutdown),
      8.         retaining_context_(false) {
      9.     session_ = inspector->connect(CONTEXT_GROUP_ID, this, StringView());
      10.     // Node.js拓展命令的處理分發(fā)器
      11.     node_dispatcher_ = std::make_unique(this);
      12.     // trace相關(guān)
      13.     tracing_agent_ = std::make_unique(env, main_thread_);
      14.     tracing_agent_->Wire(node_dispatcher_.get());
      15.     // 處理子線程相關(guān)
      16.     if (worker_manager) {
      17.       worker_agent_ = std::make_unique(worker_manager);
      18.       worker_agent_->Wire(node_dispatcher_.get());
      19.     }
      20.     // 處理runtime
      21.     runtime_agent_ = std::make_unique();
      22.     runtime_agent_->Wire(node_dispatcher_.get());
      23. }

      我們這里只關(guān)注處理子線程相關(guān)的邏輯。看一下 worker_agent_->Wire。

       
       
       
       
      1. void WorkerAgent::Wire(UberDispatcher* dispatcher) {
      2.   frontend_.reset(new NodeWorker::Frontend(dispatcher->channel()));
      3.   NodeWorker::Dispatcher::wire(dispatcher, this);
      4.   auto manager = manager_.lock();
      5.   workers_ = std::make_shared(frontend_, manager->MainThread());
      6. }

      這時(shí)候的架構(gòu)圖如下

      接著看一下NodeWorker::Dispatcher::wire(dispatcher, this)的邏輯。

       
       
       
       
      1. void Dispatcher::wire(UberDispatcher* uber, Backend* backend){
      2.     std::unique_ptr dispatcher(new DispatcherImpl(uber->channel(), backend));
      3.     uber->setupRedirects(dispatcher->redirects());
      4.     uber->registerBackend("NodeWorker", std::move(dispatcher));
      5. }

      首先新建了一個(gè)DispatcherImpl對(duì)象。

       
       
       
       
      1. DispatcherImpl(FrontendChannel* frontendChannel, Backend* backend)
      2.         : DispatcherBase(frontendChannel)
      3.         , m_backend(backend) {
      4.         m_dispatchMap["NodeWorker.sendMessageToWorker"] = &DispatcherImpl::sendMessageToWorker;
      5.         m_dispatchMap["NodeWorker.enable"] = &DispatcherImpl::enable;
      6.         m_dispatchMap["NodeWorker.disable"] = &DispatcherImpl::disable;
      7.         m_dispatchMap["NodeWorker.detach"] = &DispatcherImpl::detach;
      8. }

      除了初始化一些字段,另外了一個(gè)kv數(shù)據(jù)結(jié)構(gòu),這個(gè)是一個(gè)路由配置,后面我們會(huì)看到它的作用。新建完DispatcherImpl后又調(diào)用了uber->registerBackend("NodeWorker", std::move(dispatcher))注冊(cè)該對(duì)象。

       
       
       
       
      1. void UberDispatcher::registerBackend(const String& name, std::unique_ptr dispatcher){
      2.     m_dispatchers[name] = std::move(dispatcher);
      3. }

      這時(shí)候的架構(gòu)圖如下。

      我們看到這里其實(shí)是建立了一個(gè)路由體系,后面收到命令時(shí)就會(huì)根據(jù)這些路由配置進(jìn)行轉(zhuǎn)發(fā),類似Node.js Express框架路由機(jī)制。這時(shí)候可以通過session的post給主線程發(fā)送NodeWorker.enable命令來開啟子線程的調(diào)試。我們分析這個(gè)過程。

       
       
       
       
      1. post(method, params, callback) {
      2.     // 忽略參數(shù)處理
      3.     // 保存請(qǐng)求對(duì)應(yīng)的回調(diào)
      4.     if (callback) {
      5.       this[messageCallbacksSymbol].set(id, callback);
      6.     }
      7.     // 調(diào)用C++的dispatch
      8.     this[connectionSymbol].dispatch(JSONStringify(message));
      9. }

      this[connectionSymbol]對(duì)應(yīng)的是JSBindingsConnection對(duì)象。

       
       
       
       
      1. static void Dispatch(const FunctionCallbackInfo& info) {
      2.     Environment* env = Environment::GetCurrent(info);
      3.     JSBindingsConnection* session;
      4.     ASSIGN_OR_RETURN_UNWRAP(&session, info.Holder());
      5.     if (session->session_) {
      6.       session->session_->Dispatch(
      7.           ToProtocolString(env->isolate(), info[0])->string());
      8.     }
      9. }

      session_是一個(gè)SameThreadInspectorSession對(duì)象。

       
       
       
       
      1. void SameThreadInspectorSession::Dispatch(
      2.     const v8_inspector::StringView& message) {
      3.   auto client = client_.lock();
      4.   client->dispatchMessageFromFrontend(session_id_, message);}void dispatchMessageFromFrontend(int session_id, const StringView& message) {
      5.     channels_[session_id]->dispatchProtocolMessage(message);
      6. }

      最終調(diào)用了ChannelImpl的dispatchProtocolMessage。

       
       
       
       
      1. void dispatchProtocolMessage(const StringView& message) {
      2.     std::string raw_message = protocol::StringUtil::StringViewToUtf8(message);
      3.     std::unique_ptr value =
      4.         protocol::DictionaryValue::cast(protocol::StringUtil::parseMessage(
      5.             raw_message, false));
      6.     int call_id;
      7.     std::string method;
      8.     // 解析命令
      9.     node_dispatcher_->parseCommand(value.get(), &call_id, &method);
      10.     // 判斷命令是V8內(nèi)置命令還是Node.js拓展的命令
      11.     if (v8_inspector::V8InspectorSession::canDispatchMethod(
      12.             Utf8ToStringView(method)->string())) {
      13.       session_->dispatchProtocolMessage(message);
      14.     } else {
      15.       node_dispatcher_->dispatch(call_id, method, std::move(value),
      16.                                  raw_message);
      17.     }
      18. }

      因?yàn)镹odeWorker.enable是Node.js拓展的命令,所以會(huì)走到else里面的邏輯。根據(jù)路由配置找到該命令對(duì)應(yīng)的處理邏輯(NodeWorker.enable以.切分,對(duì)應(yīng)兩級(jí)路由)。

       
       
       
       
      1. void UberDispatcher::dispatch(int callId, const String& in_method, std::unique_ptr parsedMessage, const ProtocolMessage& rawMessage){
      2.     // 找到一級(jí)路由配置
      3.     protocol::DispatcherBase* dispatcher = findDispatcher(method);
      4.     std::unique_ptr messageObject = DictionaryValue::cast(std::move(parsedMessage));
      5.     // 交給一級(jí)路由處理器處理
      6.     dispatcher->dispatch(callId, method, rawMessage, std::move(messageObject));
      7. }

      NodeWorker.enable對(duì)應(yīng)的路由處理器代碼如下

       
       
       
       
      1. void DispatcherImpl::dispatch(int callId, const String& method, const ProtocolMessage& message, std::unique_ptr messageObject){
      2.     // 查找二級(jí)路由
      3.     std::unordered_map::iterator it = m_dispatchMap.find(method);
      4.     protocol::ErrorSupport errors;
      5.     // 找到處理函數(shù)
      6.     (this->*(it->second))(callId, method, message, std::move(messageObject), &errors);
      7. }

      dispatch繼續(xù)尋找命令對(duì)應(yīng)的處理函數(shù),最終找到NodeWorker.enable命令的處理函數(shù)為DispatcherImpl::enable。

       
       
       
       
      1. void DispatcherImpl::enable(...){
      2.     std::unique_ptr weak = weakPtr();
      3.     DispatchResponse response = m_backend->enable(...);
      4.     // 返回響應(yīng)給命令(類似請(qǐng)求/響應(yīng)模式)
      5.     weak->get()->sendResponse(callId, response);
      6. }

      根據(jù)架構(gòu)圖可以知道m(xù)_backend是WorkerAgent對(duì)象。

       
       
       
       
      1. DispatchResponse WorkerAgent::enable(bool waitForDebuggerOnStart) {
      2.   auto manager = manager_.lock();
      3.   std::unique_ptr delegate(new AgentWorkerInspectorDelegate(workers_));
      4.   event_handle_ = manager->SetAutoAttach(std::move(delegate));
      5.   return DispatchResponse::OK();
      6. }

      繼續(xù)調(diào)用WorkerManager的SetAutoAttach方法。

       
       
       
       
      1. std::unique_ptr WorkerManager::SetAutoAttach(
      2.     std::unique_ptr attach_delegate) {
      3.   int id = ++next_delegate_id_;
      4.   // 保存delegate
      5.   delegates_[id] = std::move(attach_delegate);
      6.   const auto& delegate = delegates_[id];
      7.   // 通知子線程
      8.   for (const auto& worker : children_) {
      9.     Report(delegate, worker.second, false);
      10.   }
      11.   ...
      12. }

      SetAutoAttach遍歷子線程。

       
       
       
       
      1. void Report(const std::unique_ptr& delegate,
      2.             const WorkerInfo& info, bool waiting) {
      3.   if (info.worker_thread)
      4.     delegate->WorkerCreated(info.title, info.url, waiting, info.worker_thread);
      5. }

      info是一個(gè)WorkerInfo對(duì)象,該對(duì)象是子線程初始化和主線程建立關(guān)系的數(shù)據(jù)結(jié)構(gòu)。delegate是AgentWorkerInspectorDelegate對(duì)象。

       
       
       
       
      1. void WorkerCreated(const std::string& title,
      2.                      const std::string& url,
      3.                      bool waiting,
      4.                      std::shared_ptr target) override {
      5.     workers_->WorkerCreated(title, url, waiting, target);
      6. }

      workers_是一個(gè)NodeWorkers對(duì)象。

       
       
       
       
      1. void NodeWorkers::WorkerCreated(const std::string& title,
      2.                                 const std::string& url,
      3.                                 bool waiting,
      4.                                 std::shared_ptr target) {
      5.   auto frontend = frontend_.lock();
      6.   std::string id = std::to_string(++next_target_id_);
      7.   // 處理數(shù)據(jù)通信的delegate
      8.   auto delegate = thread_->MakeDelegateThreadSafe(
      9.       std::unique_ptr(
      10.           new ParentInspectorSessionDelegate(id, shared_from_this())
      11.       )
      12.   );
      13.   // 建立和子線程V8 Inspector的通信通道
      14.   sessions_[id] = target->Connect(std::move(delegate), true);
      15.   frontend->attachedToWorker(id, WorkerInfo(id, title, url), waiting);
      16. }

      WorkerCreated建立了一條和子線程通信的通道,然后通知命令的發(fā)送方通道建立成功。這時(shí)候架構(gòu)圖如下。

      接著看attachedToWorker。

       
       
       
       
      1. void Frontend::attachedToWorker(const String& sessionId, std::unique_ptr workerInfo, bool waitingForDebugger){
      2.     std::unique_ptr messageData = AttachedToWorkerNotification::create()
      3.         .setSessionId(sessionId)
      4.         .setWorkerInfo(std::move(workerInfo))
      5.         .setWaitingForDebugger(waitingForDebugger)
      6.         .build();
      7.     // 觸發(fā)NodeWorker.attachedToWorker
      8.     m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.attachedToWorker", std::move(messageData)));
      9. }

      繼續(xù)看sendProtocolNotification

       
       
       
       
      1. void sendProtocolNotification(
      2.       std::unique_ptr message) override {
      3.     sendMessageToFrontend(message->serializeToJSON());
      4.  }
      5.  void sendMessageToFrontend(const StringView& message) {
      6.     delegate_->SendMessageToFrontend(message);
      7.  }

      這里的delegate_是一個(gè)JSBindingsSessionDelegate對(duì)象。

       
       
       
       
      1. void SendMessageToFrontend(const v8_inspector::StringView& message)
      2.         override {
      3.       Isolate* isolate = env_->isolate();
      4.       HandleScope handle_scope(isolate);
      5.       Context::Scope context_scope(env_->context());
      6.       MaybeLocal v8string = String::NewFromTwoByte(isolate,
      7.                                                            message.characters16(),
      8.                                                            NewStringType::kNormal, message.length()
      9.       );
      10.       Local argument = v8string.ToLocalChecked().As();
      11.       // 收到消息執(zhí)行回調(diào)
      12.       connection_->OnMessage(argument);
      13. }
      14. // 執(zhí)行JS層回調(diào)
      15. void OnMessage(Local value) {
      16.    MakeCallback(callback_.Get(env()->isolate()), 1, &value);
      17. }

      JS層回調(diào)邏輯如下。

       
       
       
       
      1. [onMessageSymbol](message) {
      2.     const parsed = JSONParse(message);
      3.     // 收到的消息如果是某個(gè)請(qǐng)求的響應(yīng),則有個(gè)id字段記錄了請(qǐng)求對(duì)應(yīng)的id,否則則觸發(fā)事件
      4.     if (parsed.id) {
      5.        const callback = this[messageCallbacksSymbol].get(parsed.id);
      6.        this[messageCallbacksSymbol].delete(parsed.id);
      7.        if (callback) {
      8.          callback(null, parsed.result);
      9.        }
      10.      } else {
      11.        this.emit(parsed.method, parsed);
      12.        this.emit('inspectorNotification', parsed);
      13.      }
      14.   }

      主線程拿到Worker Session對(duì)一個(gè)的id,后續(xù)就可以通過命令NodeWorker.sendMessageToWorker加上該id和子線程通信。大致原理如下,主線程通過自己的channel和子線程的channel進(jìn)行通信,從而達(dá)到控制子線程的目的。

      我們分析一下NodeWorker.sendMessageToWorker命令的邏輯,對(duì)應(yīng)處理函數(shù)為DispatcherImpl::sendMessageToWorker。

       
       
       
       
      1. void DispatcherImpl::sendMessageToWorker(...){
      2.     std::unique_ptr weak = weakPtr();
      3.     DispatchResponse response = m_backend->sendMessageToWorker(in_message, in_sessionId);
      4.     // 響應(yīng)
      5.     weak->get()->sendResponse(callId, response);
      6.     return;
      7. }

      繼續(xù)分析m_backend->sendMessageToWorker。

       
       
       
       
      1. DispatchResponse WorkerAgent::sendMessageToWorker(const String& message,
      2.                                                   const String& sessionId) {
      3.   workers_->Receive(sessionId, message);
      4.   return DispatchResponse::OK();
      5. }
      6. void NodeWorkers::Receive(const std::string& id, const std::string& message) {
      7.   auto it = sessions_.find(id);
      8.   it->second->Dispatch(Utf8ToStringView(message)->string());
      9. }

      sessions_對(duì)應(yīng)的是和子線程的通信的數(shù)據(jù)結(jié)構(gòu)CrossThreadInspectorSession。看一下該對(duì)象的Dispatch方法。

       
       
       
       
      1. void Dispatch(const StringView& message) override {
      2.     state_.Call(&MainThreadSessionState::Dispatch,
      3.                 StringBuffer::create(message));
      4. }

      再次調(diào)了MainThreadSessionState::Dispatch

       
       
       
       
      1. void Dispatch(std::unique_ptr message) {
      2.     session_->Dispatch(message->string());
      3. }

      session_是SameThreadInspectorSession對(duì)象。繼續(xù)看它的Dispatch方法。

       
       
       
       
      1. void SameThreadInspectorSession::Dispatch(
      2.     const v8_inspector::StringView& message) {
      3.   auto client = client_.lock();
      4.   client->dispatchMessageFromFrontend(session_id_, message);}void dispatchMessageFromFrontend(int session_id, const StringView& message) {
      5.     channels_[session_id]->dispatchProtocolMessage(message);
      6. }

      通過層層調(diào)用,最終拿到了一個(gè)合子線程通信的channel,dispatchProtocolMessage方法剛才已經(jīng)分析過,該方法會(huì)根據(jù)命令做不同的處理,因?yàn)槲覀冞@里發(fā)送的是V8內(nèi)置的命令,所以會(huì)交給V8 Inspector處理。當(dāng)V8 Inspector處理完后,會(huì)通過ChannelImpl的sendResponse返回結(jié)果。

       
       
       
       
      1. void sendResponse(
      2.       int callId,
      3.       std::unique_ptr message) override {
      4.     sendMessageToFrontend(message->string());
      5. }
      6.  void sendMessageToFrontend(const StringView& message) {
      7.     delegate_->SendMessageToFrontend(message);
      8.  }

      這里的delegate_是ParentInspectorSessionDelegate對(duì)象。

       
       
       
       
      1. void SendMessageToFrontend(const v8_inspector::StringView& msg) override {
      2.   std::string message = protocol::StringUtil::StringViewToUtf8(msg);
      3.   workers_->Send(id_, message);
      4. }
      5. void NodeWorkers::Send(const std::string& id, const std::string& message) {
      6.   auto frontend = frontend_.lock();
      7.   if (frontend)
      8.     frontend->receivedMessageFromWorker(id, message);
      9. }
      10. void Frontend::receivedMessageFromWorker(const String& sessionId, const String& message){
      11.     std::unique_ptr messageData = ReceivedMessageFromWorkerNotification::create()
      12.         .setSessionId(sessionId)
      13.         .setMessage(message)
      14.         .build();
      15.     // 觸發(fā)NodeWorker.receivedMessageFromWorker       
      16.     m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.receivedMessageFromWorker", std::move(messageData)));
      17. }

      m_frontendChannel是主線程的ChannelImpl對(duì)象。

       
       
       
       
      1. void sendProtocolNotification(
      2.     std::unique_ptr message) override {
      3.     sendMessageToFrontend(message->serializeToJSON());
      4. }
      5. void sendMessageToFrontend(const StringView& message) {
      6.     delegate_->SendMessageToFrontend(message);
      7. }

      delegate_是C++層傳入的JSBindingsSessionDelegate對(duì)象。最終通過JSBindingsSessionDelegate對(duì)象回調(diào)JS層,之前已經(jīng)分析過就不再贅述。至此,主線程就具備了控制子線程的能力,但是控制方式有很多種。

      2.1 使用通用的V8命令

      通過下面代碼收集子線程的CPU Profile信息。

       
       
       
       
      1. const { Worker, workerData } = require('worker_threads');
      2. const { Session } = require('inspector');
      3. const session = new Session();
      4. session.connect();
      5. let id = 1;  新聞名稱:Node.js子線程調(diào)試和診斷指南
        當(dāng)前地址:http://www.dlmjj.cn/article/coesihi.html
      6. <tt id="sclzg"></tt>