多程序打包:thread-loader 原始碼(6)

語言: CN / TW / HK

theme: fancy highlight: an-old-hope


持續創作,加速成長!這是我參與「掘金日新計劃 · 6 月更文挑戰」的11天,點選檢視活動詳情

一、前情回顧

上一篇小作文接著 workerPool.run(data, cb) 內部通過呼叫建立 poolQueue 傳遞的 worker 方法即 this.distributeJob 方法處理 data 的位置繼續討論了:

  1. this.distributeJob 方法尋找最合適的 worker 或者建立 worker 處理 data,建立 worker 就來到了 PoolWorker 建構函式;

  2. PoolWorker 建構函式主要作用是通過 child_process.spawn 建立子程序執行 thread-loader/dist/worker.js 這個檔案處理 data,將 spawn 返回的子程序物件掛載到 PoolWorker 例項的 worker 屬性上,即 this.worker

  3. 在建立子程序的時候通過傳遞給 spawnoptions.stdio 自定義管道實現程序間通訊,這裡使用的自定義管道是 Stream 物件。這些自定義的管道可以通過 子程序物件.stdio[索引] 的方式訪問到,即 this.worker.stdio[索引],這裡的索引 0,1,2分別是 標準輸入、標準輸出、標準錯誤3及以後的是自定義管道。

本文接上文 PoolWorker 建構函式將自定義管道掛載到 PoolWorker 例項——this.readPipe/wirtePipe 掛載完成後的內容。

二、PoolWorker.prototype.listenStdOutAndErrFromWorker

  1. 方法位置:thread-loader/src/WorkerPool.js -> PoolWorker.prototype.listenStdOutAndErrFromWorker

  2. 方法引數:

    • 2.1 workerStdout:子程序的 stdout
    • 2.2 workerStderr:子程序的 stderr
  3. 方法作用:監聽子程序的 stdout/stderrdata 事件,把收到的輸入輸出到父程序的標準輸出、標準錯誤

```js class PoolWorker { constructor(options, onJobDone) { // 監聽子程序的 stdout 和 stderr 的 data 事件 // 把 stdout 和 stderr 輸出的內容輸出到父程序的 shell this.listenStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr); } }

listenStdOutAndErrFromWorker(workerStdout, workerStderr) { if (workerStdout) { workerStdout.on('data', this.writeToStdout); }

if (workerStderr) { workerStderr.on('data', this.writeToStderr); } }

// 寫入父程序的 stdout writeToStdout(data) { if (!this.disposed) { process.stdout.write(data); } }

// 寫入父程序的 stderr writeToStderr(data) { if (!this.disposed) { process.stderr.write(data); } } ```

三、PoolWorker.prototype.readNextMessage

在執行 PoolWorker 的建構函式中被呼叫;

  1. 方法位置:thread-loader/src/WorkerPool.js -> PoolWorker.prototype.readNextMessage

  2. 方法引數:暫無

  3. 方法作用:

    • 3.1 更新 this.state 值為 'read length' 表示開始讀取本次 message 的長度,這個是 buffer 長度;
    • 3.2 呼叫 this.readBuffer 方法先讀取長度,傳入 4,標識 4 個位元組,因為儲存 message 長度的是一個32位整數,從其回撥的 readIn32BE 方法呼叫也可佐證這一點。

```js class PoolWorker { constructor(options, onJobDone) { // 讀取訊息,訊息哪裡來的呢? // 很顯然是子程序拋過來的,至於丟擲訊息這是後話了 this.readNextMessage(); }

// 讀取訊息 readNextMessage() { this.state = 'read length';

// 讀取長度
this.readBuffer(4, (lengthReadError, lengthBuffer) => {
   // ... 讀取長度後的操作
});

});
} } ```

3.1 PoolWorker.prototype.readBuffer

  1. 方法位置:thread-loader/src/WorkerPool.js -> PoolWorker.prototype.readBuffer

  2. 方法引數:

    • 2.1 length: 要讀取的 buffer 長度
    • 2.2 callback: 取出完成後要執行的回撥函式
  3. 方法作用:科裡化 readBuffer 方法,繫結可讀流物件為 this.readPipe,從前文可知,這個 readPipe 是父子程序通訊的管道。事實上,readPipe 是一個可讀流物件,由 thread-loader 下的 worker.js 檔案在子程序中執行時建立。

js class PoolWorker { readBuffer(length, callback) { // 科裡化 readBuffer 繫結 readPipe readBuffer(this.readPipe, length, callback); } }

3.2 readBuffer 方法

  1. 方法位置:thread-loader/src/readBuffer.js -> function readBuffer

  2. 方法引數:

    • 2.1 pipe:資料來源管道物件,在 thread-loader 中就是用於父子程序通訊的 this.readPipe 這個管道
    • 2.2 length:本次要讀取的長度,是 buffer 長度;你有沒有想過,我怎麼知道要讀取多長?每次放回的結果長度肯定不一致的?記好這個問題,後面會有答案的;
    • 2.3 callback:讀取成功後要執行的回撥含糊
  3. 方法作用:下面的程式碼時經過簡化過得,結構很清晰

    • 3.1 判斷如果 length0,則直接分配一個 0 位元組的 buffer,然後呼叫 callback,因為不需要讀取;
    • 3.2 呼叫 readChunk 私有方法,這個方法內部實現就是給 pipe 繫結 data 事件,給可讀流繫結 data 事件會觸發快取中的資料轉移出來,這樣就能接收到快取區的資料了,把接收到的資料交給 onChunk 回撥處理

```js export default function readBuffer(pipe, length, callback) { if (length === 0) { callback(null, Buffer.alloc(0)); return; }

let remainingLength = length;

const buffers = [];

const readChunk = () => { const onChunk = () => { / 具體讀取 buffer 資料的邏輯 / }; pipe.on('data', onChunk); pipe.resume(); };

// 呼叫 readChunk readChunk(); } ```

3.3 onChunk 回撥

  1. 方法位置:thread-loader/src/readBuffer.js → function readBuffer → readChunk → onChunk

  2. 方法引數:arg,從可讀流物件得到的資料塊

  3. 方法作用:從 pipe 快取的 buffer 讀取指定長度的內容,如果超出長度把超過的內容退回 pipe 的快取區;詳細如下:

    • 3.1 設定 overflow,用以承載超過 readBuffer 指定的 length 長度的資料,以備後面退回緩衝區
    • 3.2 判斷 chunk.length > reamainingLengthremainingLmengthreadBuffer 接收到引數 lenght,表示要讀取多長的 buffer。如果 chunk.lenght 大於 remaingLength,說明超了,這個時候就直接讀取 remaingLength 長度,剩下的複製到到 overflow。否則說明讀取的不超過 remainingLenght,讀取然後扣除已經讀取的長度,重新計算剩餘可讀取長度。
    • 3.3 把本次讀取的 buffer 和之前的讀取的 buffer 拼接;之所以要和之前的拼接,是因為讀取 buffer 這個事兒可能無法一次性讀取夠,每次 onChunk 觸發都不再有之前的資料,所以要自己把前面已經讀取過的儲存好。
    • 3.4 當 remainingLength0,說明已經讀取夠了 readBuffer 指定的 lenght 長度,此時
      • 3.4.1 移除 pipedata 監聽器,這麼做可以讓 pipe 暫停轉移緩衝區的資料出來;
      • 3.4.2 呼叫 pipe.pause() 也是暫緩轉移緩衝區資料;
      • 3.4.3 如果有 overflow 說明有超過 length 的資料,呼叫 pipe.unshift 退回 pipe 的緩衝區;
      • 3.4.4 呼叫 readBuffercallback 並傳入長度為 lengthbuffer 資料;

```js // readBuffer 接收到指定長度, // 表示還可以再讀取多少長度的 buffer let reaminingLength = length; const buffers = []; const readChunk = () => { // ...

// 接收 pipe 緩衝的 buffer const onChunk = (arg) => { let chunk = arg;

// 設定 是否超出識別符號
let overflow;

// chunk.length > reaminingLnegth 說明超過指定長度了
if (chunk.length > remainingLength) {
  // 把超超出長度的 buffer 複製出來
  overflow = chunk.slice(remainingLength);

  // 複製 remaingingLength 長度的的 buffer
  chunk = chunk.slice(0, remainingLength);

  // 上面兩步已經複製 reaminingLength,說明第一次就讀取夠 readBuffer 指定的 length 長度
  remainingLength = 0;
} else {
  // 說明不夠 remainingLength
  remainingLength -= chunk.length;
}

buffers.push(chunk);

if (remainingLength === 0) {
  pipe.removeListener('data', onChunk);
  pipe.pause();

  if (overflow) {
    pipe.unshift(overflow);
  }

  callback(null, Buffer.concat(buffers, length));
}

};

pipe.on('data', onChunk); pipe.resume(); }; ```

3.4 this.readNextMessage 讀取長度後的回撥

前面我們分析了 readBuffer 的工作,它會從 readPipe 中讀取指定長度的 buffer 資料,然後呼叫回撥;現在我們看看在 this.readNextMessage 中呼叫 this.readBuffer 後的回撥中的工作;

  1. 更新 this.state 為 長度讀取成功 —— lenght read
  2. lengthBuffer.readInt32BE 從指定的 offset 處的 bufer 讀取有符號的大端序 32 位整數,說人話就是子程序寫入管道的資料總長度,告訴後面處理訊息的方法要讀取多少長度就能取到本次需要的資料;
  3. 更新狀態為 read message,表示已經知道子程序寫入的資料(message 所謂訊息,是為了父子通訊這個場景更應景兒的叫法,就是子程序跑完 loader 以後的資料)
  4. 再次呼叫 this.readBuffer 讀取資料,並且傳入上一次 readBuffer 得到的長度;最後在本次 readBuffer 回撥中處理訊息編碼、轉成 JSON,最後觸發 this.onWorkerMessage 方法處理訊息;

```js class PoolWorker {
// 讀取訊息 readNextMessage() { this.state = 'read length';

// 讀取長度
this.readBuffer(4, (lengthReadError, lengthBuffer) => {
  // 這個回撥就是 readBuffer 後要執行的,以忽略錯誤處理

  // 更新 this.state 為 length 讀取完成
  this.state = 'length read';

  // 獲取 length
  const length = lengthBuffer.readInt32BE(0);

  this.state = 'read message';
  this.readBuffer(length, (messageError, messageBuffer) => {
     // 處理讀取資料的回撥
     // 後文專門開篇討論
  });

});
} } ```

四、總結

本篇小作文詳細討論了 Pool.prototype.readNextMessage 的邏輯,它的核心就是先從程序通訊管道 readPipe 中通過 this.readBuffer 先讀取資料長度,讀取到長度之後再呼叫 this.readBuffer 讀取資料,最後交給處理資料的方法;

此外,我們還詳細介紹了 readBuffer 方法的工作原理:監聽 readPipedata 事件,讀取指定長度的 buffer;在程序通訊中,通訊的訊息由兩部分構成:資料長度 + 資料;這個設計很 nice,資料長度是個 32 位正數,已知 4 個位元組長度。

所以每次先讀取 4 個位元組,得到後面的資料長度,這樣就解決了每次讀取資料不一樣長的。不一樣長好辦啊,子程序會告訴我們資料長度,我們只需要獲取一下這個長度就可以了。