多程序打包:thread-loader 原始碼(6)
theme: fancy highlight: an-old-hope
持續創作,加速成長!這是我參與「掘金日新計劃 · 6 月更文挑戰」的11天,點選檢視活動詳情
一、前情回顧
上一篇小作文接著 workerPool.run(data, cb)
內部通過呼叫建立 poolQueue
傳遞的 worker
方法即 this.distributeJob
方法處理 data
的位置繼續討論了:
-
this.distributeJob
方法尋找最合適的worker
或者建立worker
處理data
,建立worker
就來到了PoolWorker
建構函式; -
PoolWorker
建構函式主要作用是通過child_process.spawn
建立子程序執行thread-loader/dist/worker.js
這個檔案處理data
,將spawn
返回的子程序物件掛載到PoolWorker
例項的worker
屬性上,即this.worker
。 -
在建立子程序的時候通過傳遞給
spawn
的options.stdio
自定義管道實現程序間通訊,這裡使用的自定義管道是Stream
物件。這些自定義的管道可以通過子程序物件.stdio[索引]
的方式訪問到,即this.worker.stdio[索引]
,這裡的索引0,1,2
分別是標準輸入、標準輸出、標準錯誤
,3
及以後的是自定義管道。
本文接上文 PoolWorker
建構函式將自定義管道掛載到 PoolWorker
例項——this.readPipe/wirtePipe
掛載完成後的內容。
二、PoolWorker.prototype.listenStdOutAndErrFromWorker
-
方法位置:
thread-loader/src/WorkerPool.js -> PoolWorker.prototype.listenStdOutAndErrFromWorker
-
方法引數:
- 2.1
workerStdout
:子程序的stdout
- 2.2
workerStderr
:子程序的stderr
- 2.1
-
方法作用:監聽子程序的
stdout/stderr
的data
事件,把收到的輸入輸出到父程序的標準輸出、標準錯誤
```js class PoolWorker { constructor(options, onJobDone) { // 監聽子程序的 stdout 和 stderr 的 data 事件 // 把 stdout 和 stderr 輸出的內容輸出到父程序的 shell this.listenStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr); } }
listenStdOutAndErrFromWorker(workerStdout, workerStderr) { if (workerStdout) { workerStdout.on('data', this.writeToStdout); }
if (workerStderr) { workerStderr.on('data', this.writeToStderr); } }
// 寫入父程序的 stdout writeToStdout(data) { if (!this.disposed) { process.stdout.write(data); } }
// 寫入父程序的 stderr writeToStderr(data) { if (!this.disposed) { process.stderr.write(data); } } ```
三、PoolWorker.prototype.readNextMessage
在執行 PoolWorker
的建構函式中被呼叫;
-
方法位置:
thread-loader/src/WorkerPool.js -> PoolWorker.prototype.readNextMessage
-
方法引數:暫無
-
方法作用:
- 3.1 更新
this.state
值為'read length'
表示開始讀取本次message
的長度,這個是buffer
長度; - 3.2 呼叫
this.readBuffer
方法先讀取長度,傳入4
,標識4
個位元組,因為儲存message
長度的是一個32
位整數,從其回撥的readIn32BE
方法呼叫也可佐證這一點。
- 3.1 更新
```js class PoolWorker { constructor(options, onJobDone) { // 讀取訊息,訊息哪裡來的呢? // 很顯然是子程序拋過來的,至於丟擲訊息這是後話了 this.readNextMessage(); }
// 讀取訊息 readNextMessage() { this.state = 'read length';
// 讀取長度
this.readBuffer(4, (lengthReadError, lengthBuffer) => {
// ... 讀取長度後的操作
});
});
}
}
```
3.1 PoolWorker.prototype.readBuffer
-
方法位置:
thread-loader/src/WorkerPool.js -> PoolWorker.prototype.readBuffer
-
方法引數:
- 2.1
length
: 要讀取的buffer
長度 - 2.2
callback
: 取出完成後要執行的回撥函式
- 2.1
-
方法作用:科裡化
readBuffer
方法,繫結可讀流物件為this.readPipe
,從前文可知,這個readPipe
是父子程序通訊的管道。事實上,readPipe
是一個可讀流物件,由thread-loader
下的worker.js
檔案在子程序中執行時建立。
js
class PoolWorker {
readBuffer(length, callback) {
// 科裡化 readBuffer 繫結 readPipe
readBuffer(this.readPipe, length, callback);
}
}
3.2 readBuffer 方法
-
方法位置:
thread-loader/src/readBuffer.js -> function readBuffer
-
方法引數:
- 2.1
pipe
:資料來源管道物件,在thread-loader
中就是用於父子程序通訊的this.readPipe
這個管道 - 2.2
length
:本次要讀取的長度,是buffer
長度;你有沒有想過,我怎麼知道要讀取多長?每次放回的結果長度肯定不一致的?記好這個問題,後面會有答案的; - 2.3
callback
:讀取成功後要執行的回撥含糊
- 2.1
-
方法作用:下面的程式碼時經過簡化過得,結構很清晰
- 3.1 判斷如果
length
為0
,則直接分配一個0
位元組的buffer
,然後呼叫callback
,因為不需要讀取; - 3.2 呼叫
readChunk
私有方法,這個方法內部實現就是給pipe
繫結data
事件,給可讀流繫結data
事件會觸發快取中的資料轉移出來,這樣就能接收到快取區的資料了,把接收到的資料交給onChunk
回撥處理
- 3.1 判斷如果
```js export default function readBuffer(pipe, length, callback) { if (length === 0) { callback(null, Buffer.alloc(0)); return; }
let remainingLength = length;
const buffers = [];
const readChunk = () => { const onChunk = () => { / 具體讀取 buffer 資料的邏輯 / }; pipe.on('data', onChunk); pipe.resume(); };
// 呼叫 readChunk readChunk(); } ```
3.3 onChunk 回撥
-
方法位置:
thread-loader/src/readBuffer.js → function readBuffer → readChunk → onChunk
-
方法引數:
arg
,從可讀流物件得到的資料塊 -
方法作用:從
pipe
快取的buffer
讀取指定長度的內容,如果超出長度把超過的內容退回pipe
的快取區;詳細如下:- 3.1 設定
overflow
,用以承載超過readBuffer
指定的length
長度的資料,以備後面退回緩衝區 - 3.2 判斷
chunk.length > reamainingLength
,remainingLmength
是readBuffer
接收到引數lenght
,表示要讀取多長的buffer
。如果chunk.lenght
大於remaingLength
,說明超了,這個時候就直接讀取remaingLength
長度,剩下的複製到到overflow
。否則說明讀取的不超過remainingLenght
,讀取然後扣除已經讀取的長度,重新計算剩餘可讀取長度。 - 3.3 把本次讀取的
buffer
和之前的讀取的buffer
拼接;之所以要和之前的拼接,是因為讀取 buffer 這個事兒可能無法一次性讀取夠,每次onChunk
觸發都不再有之前的資料,所以要自己把前面已經讀取過的儲存好。 - 3.4 當
remainingLength
為0
,說明已經讀取夠了readBuffer
指定的lenght
長度,此時- 3.4.1 移除
pipe
的data
監聽器,這麼做可以讓pipe
暫停轉移緩衝區的資料出來; - 3.4.2 呼叫
pipe.pause()
也是暫緩轉移緩衝區資料; - 3.4.3 如果有
overflow
說明有超過length
的資料,呼叫pipe.unshift
退回pipe
的緩衝區; - 3.4.4 呼叫
readBuffer
的callback
並傳入長度為length
的buffer
資料;
- 3.4.1 移除
- 3.1 設定
```js // readBuffer 接收到指定長度, // 表示還可以再讀取多少長度的 buffer let reaminingLength = length; const buffers = []; const readChunk = () => { // ...
// 接收 pipe 緩衝的 buffer const onChunk = (arg) => { let chunk = arg;
// 設定 是否超出識別符號
let overflow;
// chunk.length > reaminingLnegth 說明超過指定長度了
if (chunk.length > remainingLength) {
// 把超超出長度的 buffer 複製出來
overflow = chunk.slice(remainingLength);
// 複製 remaingingLength 長度的的 buffer
chunk = chunk.slice(0, remainingLength);
// 上面兩步已經複製 reaminingLength,說明第一次就讀取夠 readBuffer 指定的 length 長度
remainingLength = 0;
} else {
// 說明不夠 remainingLength
remainingLength -= chunk.length;
}
buffers.push(chunk);
if (remainingLength === 0) {
pipe.removeListener('data', onChunk);
pipe.pause();
if (overflow) {
pipe.unshift(overflow);
}
callback(null, Buffer.concat(buffers, length));
}
};
pipe.on('data', onChunk); pipe.resume(); }; ```
3.4 this.readNextMessage 讀取長度後的回撥
前面我們分析了 readBuffer
的工作,它會從 readPipe
中讀取指定長度的 buffer
資料,然後呼叫回撥;現在我們看看在 this.readNextMessage
中呼叫 this.readBuffer
後的回撥中的工作;
- 更新
this.state
為 長度讀取成功 ——lenght read
- lengthBuffer.readInt32BE 從指定的
offset
處的bufer
讀取有符號的大端序32
位整數,說人話就是子程序寫入管道的資料總長度,告訴後面處理訊息的方法要讀取多少長度就能取到本次需要的資料; - 更新狀態為
read message
,表示已經知道子程序寫入的資料(message
所謂訊息,是為了父子通訊這個場景更應景兒的叫法,就是子程序跑完loader
以後的資料) - 再次呼叫
this.readBuffer
讀取資料,並且傳入上一次readBuffer
得到的長度;最後在本次readBuffer
回撥中處理訊息編碼、轉成JSON
,最後觸發this.onWorkerMessage
方法處理訊息;
```js
class PoolWorker {
// 讀取訊息
readNextMessage() {
this.state = 'read length';
// 讀取長度
this.readBuffer(4, (lengthReadError, lengthBuffer) => {
// 這個回撥就是 readBuffer 後要執行的,以忽略錯誤處理
// 更新 this.state 為 length 讀取完成
this.state = 'length read';
// 獲取 length
const length = lengthBuffer.readInt32BE(0);
this.state = 'read message';
this.readBuffer(length, (messageError, messageBuffer) => {
// 處理讀取資料的回撥
// 後文專門開篇討論
});
});
}
}
```
四、總結
本篇小作文詳細討論了 Pool.prototype.readNextMessage
的邏輯,它的核心就是先從程序通訊管道 readPipe
中通過 this.readBuffer
先讀取資料長度,讀取到長度之後再呼叫 this.readBuffer
讀取資料,最後交給處理資料的方法;
此外,我們還詳細介紹了 readBuffer
方法的工作原理:監聽 readPipe
的 data
事件,讀取指定長度的 buffer
;在程序通訊中,通訊的訊息由兩部分構成:資料長度 + 資料
;這個設計很 nice,資料長度是個 32
位正數,已知 4
個位元組長度。
所以每次先讀取 4
個位元組,得到後面的資料長度,這樣就解決了每次讀取資料不一樣長的。不一樣長好辦啊,子程序會告訴我們資料長度,我們只需要獲取一下這個長度就可以了。