Node.js 進階 - 多檔案 Stream 合併,序列和併發兩種模式實現

語言: CN / TW / HK
將多個檔案合併為一個檔案,常見的場景是類似於大檔案分片上傳,事先根據一定的檔案大小拆分為多個小檔案上傳到服務端,最後服務端在合併起來。

怎麼合併?一種簡單的辦法是使用 fs.readFile 讀取,fs.writeFile 追加寫入,這種方式是將檔案資料先讀入應用記憶體再寫入,不是很推薦,Node.js 本身提供了 Stream 模組可以更好的處理這種場景。

在 Stream 中合併檔案之前一個比較常用的 API 是 pipe,但是這個 API 對於錯誤處理不是很友好,一不小心還能搞出文件控制代碼記憶體洩漏問題。

本文先介紹 pipe 方法的使用及什麼情況下會遇到檔案控制代碼的記憶體洩漏問題,之後再分別介紹 Stream 合併的兩種實現模式。

pipe VS pipeline

pipe

建立一個可讀流 readable 和一個可寫流 writeable,通過管道 pipe 將可寫流繫結到可讀流,一個簡單的 Stream 操作就完成了。

const fs = require('fs');
const readable = fs.createReadStream('./test1.txt');
const writeable = fs.createWriteStream('./test2.txt');

readable.pipe(writeable);

pipe 方法的兩個引數:

  • destination:是一個可寫流物件,也就是一個數據寫入的目標物件,例如,上面我們建立的 writeable 就是一個可寫流物件

  • options:

    • end:讀取結束時終止寫入流,預設值是 true

readable.pipe(destination[, options])

預設情況下我們是不需要手動呼叫寫入流的 end 方法關閉的。

現在我們改一下, 設定 end 為 false 寫入的目標流將會一直處於開啟狀態, 此時就需要監聽可讀流的 end 事件,結束之後手動呼叫可寫流的 end 方法結束( 為什麼要這樣做?下文 Stream 序列合併會再用到這一特性 )。

// readable.pipe(writeable);

readable.pipe(writeable, {
end: false,
});
readable.on('end', function() {
writeable.end('結束');
});

還需要注意一點 如果可讀流期間發生什麼錯誤,則寫入的目標流將不會關閉 ,例如:process.stderr 和 process.stdout 可寫流在 Nodejs 程序退出前將永遠不會關閉,所以 需要監聽錯誤事件,手動關閉可寫流,防止記憶體洩漏

Linux 下一切皆檔案,為了測試,在建立可讀流時,你可以不建立 test1.txt 檔案,讓可讀流自動觸發 error 事件並且將 writeable 的 close 方法註釋掉,通過 linux 命令 ls -l /proc/${pid}/fd 檢視 error 和非 error 前後的檔案控制代碼變化。

readable.on('error', function(err) {
console.log('error', err);
// writeable.close();
});

console.log(process.pid); // 列印程序 ID
setInterval(function(){}, 5000) // 讓程式不中斷,程序不退出

以下為觸發 error 錯誤下 test2.txt 這個檔案 fd 將會一直開啟,除非程序退出,所以重要的事情再說一遍, 如果使用 pipe 一定要做好錯誤監聽手動關閉每個寫入流 ,以防止 “ 記憶體洩漏 ”。

...
l-wx------ 1 root root 64 Apr 10 15:47 19 -> /root/study/test2.txt
...

注意,Mac 下沒有 /proc 檔案,可通過 docker 測試。不想開兩個終端的,也可以在程式 setInterval 定時器函式裡使用 child_process 模組的 exec 函式執行 ls -l /proc/${process.pid}/fd 命令。

const { exec } = require('child_process');

setInterval(function(){
exec(`ls -l /proc/${process.pid}/fd`, (error, stdout, stderr) => {
console.log(`stdout: \n`, stdout);
})
}, 5000) // 讓程式不中斷,程序不退出

pipeline

Stream 模組的一個新 API pipeline 方法,添加於 Node.js v10.0,Promise 風格需要 Node.js  v15.0+ 支援。相比較於 pipe 方法增加了錯誤處理機制,當管道中的某個流發生錯誤,它會自動處理並釋放掉相應的資源。

try {
await pipeline(
readable,
writable
);
console.log('Pipeline succeeded.');
} catch (err) {
console.log('error', err);
}

序列模式 Stream 合併

使用 pipe 方法實現序列模式的流合併,根據前面講的,設定可讀流的 end 為 false 保持寫入流一直處於開啟狀態,直到所有的可讀流結束(待合併的檔案完成後),我們再將可寫流給關閉。

  • streamMerge 函式為入口函式

  • streamMergeRecursive 函式遞迴呼叫合併檔案

const fs = require('fs');
const path = require('path');

/**
* Stream 合併
* @param { String } sourceFileDirectory 原始檔目錄
* @param { String } targetFile 目標檔案
*/

function streamMerge(sourceFileDirectory, targetFile) {
const scripts = fs.readdirSync(path.resolve(__dirname, sourceFileDirectory)); // 獲取原始檔目錄下的所有檔案
const fileWriteStream = fs.createWriteStream(path.resolve(__dirname, targetFile)); // 建立一個可寫流

// fs.readdir 讀取出來的結果,根據具體的規則做下排序,防止因為順序不對導致最終合併之後的檔案無效。

return streamMergeRecursive(scripts, fileWriteStream, sourceFileDirectory);
}

/**
* Stream 合併的遞迴呼叫
* @param { Array } scripts
* @param { Stream } fileWriteStream
*/

function streamMergeRecursive(scripts=[], fileWriteStream, sourceFileDirectory) {
// 遞迴到尾部情況判斷
if (!scripts.length) {
return fileWriteStream.end("console.log('Stream 合併完成')"); // 最後關閉可寫流,防止記憶體洩漏
}

const currentFile = path.resolve(__dirname, sourceFileDirectory, scripts.shift());
const currentReadStream = fs.createReadStream(currentFile); // 獲取當前的可讀流

currentReadStream.pipe(fileWriteStream, { end: false });
currentReadStream.on('end', function() {
streamMergeRecursive(scripts, fileWriteStream, sourceFileDirectory);
});

currentReadStream.on('error', function(error) { // 監聽錯誤事件,關閉可寫流,防止記憶體洩漏
console.error(error);
fileWriteStream.close();
});
}

streamMerge('./files', './file.js');

併發模式 Stream 合併

流合併也是可以採用併發模式的,核心是通過可寫流的 start、end 屬性控制。

start 有點類似於資料庫查詢的 skip,在 計算時要求檔案分塊的下標必須是 0、1、2... 這樣的規則 ,這種方式可以不用關注每一個流分塊在檔案中的儲存順序,也可以將可讀流傳輸至可寫流的指定位置。

例如,有一個大檔案 dec47b76e3220432100a1155eff7f402(檔案 md5 後的 hash 值) 根據 chunkSize(1048576)拆分為 3 個小檔案。

/chunks
└── dec47b76e3220432100a1155eff7f402-1048576
├── dec47b76e3220432100a1155eff7f402-0
├── dec47b76e3220432100a1155eff7f402-1
└── dec47b76e3220432100a1155eff7f402-2

併發模式的 Stream 合併程式碼實現如下:

/**
* Stream concurrent merge
* @param {String} sourceFileDirectory
* @param {String} targetFile
* @param {Number} chunkSize
*/

export const streamConcurrentMerge = async (sourceFileDirectory, targetFile, chunkSize) => {
const filenames = await fs.readdir(sourceFileDirectory);

await Promise.all(filenames.map(filename => {
const index = filename.split('-').pop();
const start = index * chunkSize;
const end = (index + 1) * chunkSize;

return pipeline(
createReadStream(path.join(sourceFileDirectory, filename)),
createWriteStream(targetFile, {
start,
end,
})
);
}))
}

總結

使用 pipe 時錯誤處理是件需要注意的事情,特別是出現這種情況 readable.pipe(a).pipe(b).pipe(writable) 其中任何一個流關閉或出錯都會導致整個管道停止工作,這個時候就要銷燬所有的流,這種複雜的處理起來極其麻煩, 推薦使用 stream API pipeline 處理,或使用社群 NPM 庫 pump。

將多個檔案合併為一個檔案,使用流的方式有兩種:

  • 第一種是序列模式依次讀取每個檔案的內容,通過 pipe 方法寫入可寫流,直到最後一個檔案讀取完成關閉寫入流。

  • 另一種是併發模式,核心實現是利用寫入流的 start、end 屬性將可讀流傳輸至可寫流的指定位置,上面的實現還可以在優化,比如控制下併發的數量。

-   E N D   -

3 6 0 W 3 C E C M A T C 3 9 L e a d e r