Alluxio 原始碼完整解析 | 你不知道的開源資料編排系統(下篇)
回顧
在《Alluxio-原始碼解析-上》主要講述了Alluxio本地環境搭建,原始碼專案結構,服務程序的啟動流程和服務間RPC呼叫。
本篇將在上篇的基礎上,繼續為大家講述Alluxio中重點類詳解,Alluxio中Block底層讀寫流程,Alluxio Client呼叫流程和 Alluxio內建的輕量級排程框架。
Part 1 重點類講述
1.1****Journaled
Journaled介面定義可被Journaled持久化維護的通用方法,通過JournalEntryIterable#getJournalEntryIterator獲取Journal元素遍歷資訊,該介面提供預設checkpoint方法。Journaled介面繼承Checkpointed、JournalEntryIterable,定義的方法包括:
getJournalEntryIterator:獲取Journal所有元素;
getCheckpointName:獲取checkpoint class類名稱;
writeToCheckpoint:持久化寫入所有狀態的checkpoint;
restoreFromCheckpoint:checkpoint恢復;
processJournalEntry:處理指定的Journal元素,Journal處理核心方法;
resetState:重置Journal狀態;
applyAndJournal:對Journal元素執行和應用Journal操作。
1.2****UnderFileSystem
Alluxio管理和適配資料在底層各個儲存系統執行操作,實現UnderFileSystem介面的底層儲存可以作為Alluxio的合法UFS。
1.2.1. 類圖
UnderFileSystem的類圖如下所示,主要由抽象類BaseUnderFileSystem實現,而BaseUnderFileSystem下主要分為兩大類:
ConsistentUnderFileSystem:具備一致性的UFS實現,主要包括:LocalUnderFileSystem、HdfsUnderFileSystem、CephFSUnderFileSystem等;
ObjectUnderFileSystem:物件儲存UFS實現,主要包括:S3AUnderFileSystem、COSUnderFileSystem、OSSUnderFileSystem等。
1.2.2. 介面方法
在UnderFileSystem中有兩類介面API:
儲存系統通用操作,如:建立/刪除檔案,檔案重新命名;
處理資料持久化最終一致性的操作(eventual consistency),如:解決當AlluxioMaster維護元資料成功時,但執行UFS操作失敗的問題。
1.2.2.1. 儲存系統操作
create:指定path路徑,在UFS中建立資料檔案(父目錄不存在會自動建立),可通過CreateOptions設定建立檔案的使用者組和ACL策略;
deleteDirectory:刪除指定目錄,可通過DeleteOptions設定刪除的策略和遍歷方式;
deleteFile:刪除指定檔案;
getDirectoryStatus:獲取UFS指定目錄狀態,需傳入已存在的目錄檔案;
getFileStatus:獲取UFS指定檔案狀態;
getStatus:獲取UFS狀態,可指定目錄或檔案;
isDirectory:判斷指定路徑在UFS是否是目錄;
open:開啟UFS上指定檔案,可通過OpenOptions設定檔案開啟引數;
renameDirectory:UFS上指定目錄重新命名;
renameFile:UFS上指定檔案重新命名;
exists:判斷指定的檔案或目錄是否存在;
getAclPair:獲取UFS的ACL策略;
getBlockSizeByte:獲取指定目錄下UFS的每個Block檔案大小,單位bytes;
getFileLocations:獲取指定路徑在UFS關聯的儲存Location列表;
getFingerprint:計算並獲取指定路徑的檔案標識(指紋),檔案標識(指紋)的計算必須是確定且不可變的;
getOperationMode:獲取底層UFS的操作模式,Alluxio的底層儲存可以由多種型別UFS組成,該方法用來確定底層UFS的操作模式,例子:底層UFS為:hdfs://ns1/,hdfs://ns2/,則返回結果:{hdfs://ns1/:NO_ACCESS,hdfs://ns2/:READ_WRITE};
getPhysicalStores:獲取所有UFS型別,包括資料結構和對應許可權
getSpace:通過制定SpaceType獲取UFS中指定路徑的儲存空間資訊,SpaceType包括:SPACE_TOTAL、SPACE_FREE、SPACE_USED;
getUnderFSType:獲取UFS型別,如hdfs;
isFile:判斷檔案檔案在UFS是否存在;
isObjectStorage:判斷UFS是否是物件儲存;
isSeekable:判斷UFS是否支援搜尋;
listStatus:指定UFS路徑下的檔案狀態列表,該列表不保證順序,可通過ListOptions設定是否支援遍歷;
mkdirs:在UFS上建立指定目錄,可通過MkdirsOptions設定目錄建立規則,如ACL和遞迴父目錄建立;
setAclEntries:指定路徑,設定UFS的ALC策略集合;
setMode:指定路徑,設定UFS ALC Mode,如0777;
setOwner:指定路徑,設定UFS ALC的user和group;
supportsFlush:判斷UFS是否支援檔案Flush;
supportsActiveSync:判斷UFS是否支援ActiveSync(訪問內部檔案共享),ActiveSync相關的介面包括:getActiveSyncInfo、startSync、stopSync、startActiveSyncPolling、stopActiveSyncPolling。
1.2.2.2. 最終一致性操作
createNonexistingFile:建立不存在的檔案,若檔案存在,則退出;
deleteExistingDirectory:刪除指定目錄;
deleteExistingFile:刪除指定檔案;
getExistingDirectoryStatus:獲取UFS指定目錄狀態;
getExistingFileStatus:獲取UFS指定檔案狀態;
getExistingStatus:獲取UFS狀態,可指定目錄或檔案;
isExistingDirectory:判斷指定路徑在UFS是否是目錄;
openExistingFile:開啟UFS上指定檔案,可通過OpenOptions設定檔案開啟引數;
renameRenamableDirectory:UFS上指定目錄重新命名;
renameRenamableFile:UFS上指定檔案重新命名。
1.2.2.3. 其他操作
cleanup:當資料檔案建立時沒有正常的成功結束或被拋棄處理,則對底層UFS清理;
connectFromMaster:指定AlluxioMaster主機地址,建立指定Master與UFS連線;
connectFromWorker:指定AlluxioWorker主機地址,建立指定Worker與UFS連線;
resolveUri:給定Alluxio基礎URI和路徑,返回拼裝後的Alluxio路徑。
1.3 UfsManager
Alluxio中對底層UFS(Under FileSystem)管理操作的通用統一介面類定義,定義的介面方法包括:
addMount:UFS掛載到Alluxio,該方法僅針對Alluxio處理,不對底層UFS操作;
removeMount:移除Alluxio中的UFS掛載;
get:根據mountId獲取掛載的UFS資訊;
getRoot:獲取Alluxio上掛載的根目錄資訊;
getJournal:獲取Journal的Location地址;
其中AbstractUfsManager抽象類對UFS管理介面進行基本實現。
1.3.1. UfsClient
維護底層UFS的Client連線資訊和其他相關UFS的描述資訊,基於UfsClient實現Alluxio對UnderFileSystem的操作。
1.4 BlockClient
BlockClient抽象類定義呼叫方對Block基本的讀寫操作,其類圖示意如下,主要包括:BlockWriter、BlockReader。
讀寫Block的定義的方法類:
1.5 DefaultFileSystemMaster
Master服務維護所有FileSystem(檔案系統)元資料變更的管理操作,DefaultFileSystemMaster內部基於InodeTree維護檔案系統結構,並將InodeTree持久化到日誌檔案(journal);除此之外,其內部維護多個管理操作,如:InodeLockManager、MasterUfsManager、MountTable等;
備註:DefaultFil1.5. DefaultFileSystemMastereSystemMaster的啟動start方法詳情前面所述內容。
1.5.1. 介面方法
FileSystemMaster介面定義master中針對FS的操作方法,DefaultFileSystemMaster繼承FileSystemMaster,其介面方法主要包括:
cleanupUfs:週期性清理底層UFS;
getFileId:基於Alluxio路徑URI 獲取檔案ID,若檔案不快取Alluxio,則呼叫UFS獲取;
getFileInfo:根據檔案ID獲取檔案詳情,該介面僅對內部服務開放,不對使用者直接開放;
getPersistenceState:根據檔案ID,獲取該檔案的持久化狀態;
listStatus:指定Alluxio路徑,獲取檔案狀態列表;
checkAccess:校驗指定Alluxio路徑的許可權;
checkConsistency:校驗指定Alluxio路徑的檔案資料一致性;
completeFile:關閉/結束指定Alluxio路徑,關閉後,則該檔案不可寫;
createFile:基於指定Alluxio檔案路徑,建立檔案FileInfo;
getNewBlockIdForFile:指定Alluxio檔案路徑,獲取下個待操作Block檔案的Block ID;
getMountPointInfoSummary:獲取Alluxio中mount(掛載)路徑的快照資訊;
getDisplayMountPointInfo:獲取Alluxio使用者展示的Mount資訊;
delete:刪除指定Alluxio路徑的檔案元資訊;
getFileBlockInfoList:獲取指定Alluxio路徑下的所有Block列表資訊;
getInAlluxioFiles:獲取Alluxio中所有的檔案列表路徑;
getInMemoryFiles:獲取Alluxio中所有快取在記憶體的檔案列表路徑;
createDirectory:建立Alluxio對應的目錄,並返回目錄ID;
rename:Alluxio中檔案重新命名操作的元資料變更;
free:指定Alluxio目錄下,釋放所有alluxio快取的block檔案資訊,支援目錄下遍歷的檔案釋放;
getPath:根據指定FileId獲取Alluxio URI路徑;
getPinIdList:獲取被固定的inode id列表;
getUfsAddress:獲取master所需的UFS地址;
getUfsInfo:根據掛載ID獲取對應UFS資訊;
getLostFiles:獲取worker節點丟失的檔案列表;
mount:核心操作,將UFS路徑掛載在Alluxio指定路徑;
unmount:取消指定Alluxio路徑上的UFS掛載;
updateMount:更新指定Alluxio路徑掛載資訊;
setAcl:設定Alluxio路徑ACL;
updateUfsMode:設定底層UFS Mode;
validateInodeBlocks:驗證inode block資訊是否具備完整性;
workerHeartbeat:指定worker ID,通知對應worker進行檔案的儲存持久化;
getWorkerInfoList:獲取所有worker節點資訊列表;
getTimeSeries:獲取alluxio master中元資料儲存的時間版本資訊;
1.6 DefaultBlockWorker
1.6.1. 介面
Worker Server針對Block的管理操作,實現介面類:BlockWorker,其介面方法主要包括:
getWorkerId:獲取worker id;
abortBlock:丟棄session中臨時建立的block檔案;
accessBlock:訪問指定session和block id下的block資訊,該方法可能會在block快取釋放被訪問;
commitBlock:提交block到Alluxio的管理空間,待提交的block必須是臨時的,當block提交成功之前,block是不支援讀寫訪問;
commitBlockInUfs:將block提交到UFS持久化;
createBlock:在Alluxio管理空間建立block,基於BlockWriter類可對block進行寫操作,在block commit提交之前都是臨時的;
getTempBlockMeta:獲取臨時block元資料;
createBlockWriter:基於session和block id建立BlockWriter,用於block的寫操作;
getReport:獲取worker與master週期性心跳的報告;
getStoreMeta:獲取整個block儲存的元資料資訊,包括block中每個儲存目錄對映和每層儲存的容量情況;
getStoreMetaFull:與getStoreMeta相似,但包括完整的blockId列表,獲取代價更高;
getVolatileBlockMeta:根據指定blockId獲取block元資料資訊;
lockBlock:對block進行加鎖操作;
moveBlock:將block從當前儲存Location移動到目標Location;當前僅支援分層儲存移動;
moveBlockToMedium:block移動並指定對應的儲存介質型別(MediumType);
createBlockReader:建立BlockReader進行Block讀操作,可讀取Alluxio Block和 UFS Block;
createUfsBlockReader:建立BlockReader進行UFS Block讀操作;
removeBlock:從Allxuio管理空間移除Block;
requestSpace:為指定block獲取儲存空間,該block必須為臨時block;
unlockBlock:對block去除鎖操作;
asyncCache:提交非同步快取請求進行非同步的快取管理;
updatePinList:更新底層block儲存佔用的pin列表;
getFileInfo:基於指定file id獲取檔案資訊。
1.6.2. TieredBlockStore
BlockStore定義block的儲存介面,用於管理本地block儲存,其介面核心目的:具體實現BlockWorker中定義的方法類,介面如下:
TieredBlockStore是BlockStore的實現類,實現了Alluxio中核心功能點:分層儲存,使得對應的儲存物件可基於block形式進行分層儲存管理,並對外暴露提供API進行block管理。TieredBlockStore中內建分配演算法確定新block的存取和舊block的釋放,基於BlockMetadataManager維護分層儲存狀態、block讀寫鎖管理等元資料資訊。
TieredBlockStore是執行緒安全的,所有基於block級別的操作都需要呼叫BlockLockManager來獲取對應的讀寫鎖,保證該block下的元資料操作和I/O操作是執行緒安全的。任何block的元資料操作都需要基於BlockMetadataManager來獲取元資料的ReentrantReadWriteLock 讀寫鎖。
Allocator介面定義Alluxio中資料管理的分配策略,介面方法:allocateBlockWithView,目前內部有三種實現類:
RoundRobinAllocator:基於round-robin輪訓分配,預設從最高層開始分配,當最高層儲存空間不足則會到下一層,該分配策略不支援指定儲存具體的分層。
MaxFreeAllocator:分配到儲存中最大剩餘空間,當沒有指定具體儲存分層,預設從最高層開始分配;
GreedyAllocator:返回滿足儲存block大小的第一層儲存空間,是儲存分配的示例類;
其中BlockStoreLocation定義儲存block的location地址和分層資訊,描述了三個儲存維度:儲存層別名、對應儲存層目錄地址,儲存層介質資訊。
1.6.2.1. createBlock
當存在可用空間(space)時,基於block分配演算法建立臨時block;特別的:建立block不會觸發其他block的銷燬釋放,通過BlockMetadataAllocatorView 獲取只讀的Block元資料資訊,為Allocator排程提供資料來源,Allocator分配排程後返回StorageDirView物件並建立TempBlockMeta 並通過BlockMetadataManager管理。儲存分配後的元資料會基於createBlockFile方法持久化到Block元檔案。
Allocator介面定義Alluxio中資料管理的分配策略,介面方法:allocateBlockWithView,目前內部有三種實現類:
RoundRobinAllocator:基於round-robin輪詢分配,預設從最高層開始分配,當最高層儲存空間不足則會到下一層,該分配策略不支援指定儲存具體的分層。
MaxFreeAllocator:分配到儲存中最大剩餘空間,當沒有指定具體儲存分層,預設從最高層開始分配;
GreedyAllocator:返回滿足儲存block大小的第一層儲存空間,是儲存分配的示例類;
其中BlockStoreLocation定義儲存block的location地址和分層資訊,描述了三個儲存維度:儲存層別名、對應儲存層目錄地址,儲存層介質資訊。
1.6.2.2. freeSpace
同步方法執行Block快取儲存空間執行立刻刪除釋放,當所有儲存分層的空間釋放操作結束後才能支援新Block建立。根據BlockMetadataEvictorView 獲取Block儲存中可移除的Block資訊。判斷當前快取儲存中是否滿足最小連續空間和最小可用空間,若同時滿足,則不進行後續空間清理操作;若不滿足,則遍歷Block資訊,判斷是否可清理,若可以清理,則刪除對應的Block檔案及元資料,通過BlockStoreEventListener事件監聽器同步Block釋放操作。
BlockStoreEventListener 監聽BlockStore中元資料變化成功結束的觸發事件,主要包括的介面方法類:
onAccessBlock:訪問Block 事件觸發;
onAbortBlock:清理和釋放臨時Block 事件觸發;
onCommitBlock:提交臨時Block並關聯Block的儲存資訊BlockStoreLocation 事件觸發;
onMoveBlockByClient:基於Client移動Block的BlockStoreLocation 事件觸發;
onMoveBlockByWorker:基於Worker移動Block的BlockStoreLocation 事件觸發;
onRemoveBlockByClient:基於Client移除並釋放Block的BlockStoreLocation 事件觸發;
onRemoveBlock:移除並釋放Block 事件觸發;
onBlockLost:Block丟失 事件觸發;
onStorageLost:儲存目錄丟失 事件觸發。
1.7 PlanDefinition
Alluxio中內建輕量級角度系統的Job執行計劃定義,有兩個核心部分,1. PlanDefinition#selectExecutors:該方法在Master節點呼叫,用於選擇執行任務的AlluxioJobWorker,2.PlanDefinition#runTask:在JobWorker中執行指定作業計劃。PlanDefinition 主要包括的作業定義實現有:
MoveDefinition:在FileSystemMaster校驗層級上觸發Block的移動操作;
ReplicateDefinition:在FileSystemMaster校驗層級上觸發Block的複製操作;
EvictDefinition:在FileSystemMaster校驗層級上觸發Block釋放操作;
PersistDefinition:將Alluxio Block快取儲存持久化到底層UFS;
CompactDefinition:在指定目錄下降結構化表的資料檔案進行壓縮;
MigrateDefinition:Block移動,源和目標Block可以掛載在不同的UFS節點;
LoadDefinition:實現簡單的Block檔案的Load操作。
1.7.1. TaskExecutorManager
管理JobWorker Task執行器,真正的執行任務通過執行緒池呼叫TaskExecutor#run,而TaskExecutor#run底層通過PlanDefinition#runTask 實現;同時TaskExecutorManager內部也管理Task的執行容量和Task生命週期管理,如:獲取執行的執行緒池,對任務執行限流/解除限流,任務啟停。
Part 2 Block讀寫操作
2.1 讀操作
BlockWorker RPC服務提供的客戶端的讀操作,大致流程如下:
BlockWorkerClientServiceHandler.readBlock方法定義Block讀取,預設建立請求引數StreamObserverresponseObserver 建立 CallStreamObserver;若支援零拷貝,則使用DataMessageServerStreamObserver
基於CallStreamObserver 建立BlockReadHandler,並呼叫BlockReadHandler#onReady 開啟資料讀取,基於執行緒池提交建立DataReader執行緒執行;
DataReader是Alluxio用於I/O資料讀取的執行緒類,封裝了核心的Alluxio讀操作邏輯,(1).獲取Alluxio資料輸入流DataBuffer;(2)呼叫CallStreamObserver.onNext觸發和監聽資料流讀取;
DataReader獲取DataBuffer是整個讀取處理的核心邏輯,判斷資料讀取來源:Local、UFS,是否進行Block移動實現短路讀;
-
建立開啟Block,若請求需要加速(promote=true)則操作BlockWorker.moveBlock,將Block移動到儲存更高層;
-
呼叫DefaultBlockWorker#createBlockReader 建立BlockReader,判斷本地Worker是否可以直接訪問,若支援則返回LocalFileBlockReader;若為UFS中,則呼叫UnderFileSystemBlockReader;
-
呼叫BlockReader.transferTo 讀取資料,並將I/O封裝為NettyDataBuffer返回。
2.1.1. UnderFileSystemBlockReader
UnderFileSystemBlockReader 類實現直接從UFS讀取並將讀取的資訊快取到讀取的Worker Block中,大致流程如下:
UfsInputStreamCache.acquire 根據ufs、路徑、blockId獲取輸入流InputStream,若InputStream在快取中直接獲取,若不存在,則根據ufs.openExistingFile 獲取底層UFS的檔案輸入流InputStream;
獲取並更新BlockWriter,判斷是否存在有對應Block存在,不存在則呼叫BlockStore.createBlock新建臨時Block,並返回對應BlockWriter;
根據第一步驟獲取的輸入流InputStream和引數offset讀取檔案,讀取的資料:(1).通過BlockWriter寫入Block快取對應Worker;(2).返回呼叫方讀取資訊。
備註:
LocalFileBlockReader:基於FileChannel.map方法的I/O操作讀取文字檔案資訊
RemoteBlockReader:基於遠端的Worker(非本地Worker)讀取,暫不支援;
DelegatingBlockReader根據不同的使用場景,判斷和選擇使用的BlockReader實現類。
2.1.2. ShortCircuitBlockReadHandler
ShortCircuitBlockReadHandler類是RPC服務實現提供短路讀能力,首先Grpc的StreamObserver(觀察者模式),一次onNext呼叫說明一次訊息讀取,大致的執行流程:
根據OpenLocalBlockRequest獲取是否進行加速讀取,若加速(promote=true)則呼叫BlockWorker.moveBlock將儲存移動更高層儲存分層;
呼叫BlockWorker.lockBlock 獲取Block的讀寫操作鎖,最後BlockWorker.accessBlock獲取訪問Block
2.2 寫操作
BlockWorker RPC服務提供的客戶端的寫操作,大致流程如下:
BlockWorkerClientServiceHandler.writeBlock方法定義Block寫入,預設建立請求引數StreamObserverresponseObserver 建立 CallStreamObserver;若支援零拷貝,則使用 BlockWorkerClientServiceHandler;
基於CallStreamObserver 建立DelegationWriteHandler,並呼叫DelegationWriteHandler#onCancel關閉資料寫操作;呼叫onNext方法進行資料流監聽寫操作;
DelegationWriteHandler 根據請求Command型別獲取對應的AbstractWriteHandler 實現類:
-
ALLUXIO_BLOCK:BlockWriteHandler,資料僅寫入Alluxio Block,基於BlockWriter實現寫操作;
-
UFS_FILE:UfsFileWriteHandler,資料僅寫入UFS,基於UFS Client建立目錄檔案並進行I/O操作;
-
UFS_FALLBACK_BLOCK:UfsFallbackBlockWriteHandler,先基於BlockWriteHandler寫入Alluxio Block再寫入UFS;
AbstractWriteHandler 抽象類關係如下:
2.2.1. LocalFileBlockWriter
基於本地Worker寫入Block檔案資訊,呼叫FileChannel.map
2.2.2. ShortCircuitBlockWriteHandler
ShortCircuitBlockWriterHandler實現短路讀的建立本地Block能力,基於onNext呼叫,大致執行流程:
若僅申請空間資源,則基於BlockWorker.requestSpace 獲取Block建立的請求空間資源;
若需建立臨時Block,則呼叫BlockWorker.createBlock建立Block並返回對應Block路徑。
Part 3 Catalog管理
AlluxioCatalog進行Alluxio中Catalog管理物件,封裝和維護了Alluxio中註冊的DB資訊及各個DB下的Table等元資料資訊,其基本的方法操作如下,包括:獲取資料庫db資訊,db元資料同步,db繫結/解綁等操作。
attachDatabase:將繫結的db元資料資訊維護在記憶體中並同步持久化到Journal中;
syncDatabase:會基於底層udb獲取最新元資料database資訊,如Hive則呼叫HMS客戶端介面方法IMetaStoreClient#getDatabase獲取資料庫資訊。
Part 4 Client操作
4.1 Client
Client介面抽象定義Alluxio中Client操作,其繼承和實現類如下所示,封裝了對接各個元件的RPC介面:
FileSystemMasterClient:封裝 FileSystemMasterClientServiceHandler 相關RPC呼叫,進行元資料管理操作
BlockMasterClient:封裝 BlockMasterClientServiceHandler相關RPC呼叫,進行Block管理操作
TableMasterClient:封裝 TableMasterClientServiceHandler 相關RPC呼叫,進行Alluxio Table Catalog管理操作
MetaMasterClient:封裝 MetaMasterClientServiceHandler 相關RPC呼叫
MetaMasterConfigClient:封裝 MetaMasterConfigurationServiceHandler 相關RPC呼叫
JobMasterClient:封裝JobMasterClientServiceHandler 相關RPC呼叫,進行Alluxio Job的呼叫操作;
4.1.1. FileSystem
Client中定義的檔案系統操作介面類,用於元資料管理和資料管理,使用者可根據其實現類BaseFileSystem 擴充套件Client檔案操作行為。
FileSystem 中定義的介面方法主要包括以下幾類:
checkAccess:檢查指定路徑許可權;
createDirectory:基於AlluxioURI 建立檔案目錄;
createFile:基於AlluxioURI 建立資料檔案;
delete:基於AlluxioURI 刪除指定檔案/目錄;
exists:基於AlluxioURI判斷指定檔案/目錄是否存在;
free:基於AlluxioURI 釋放Alluxio空間,但不刪除UFS資料檔案了;
listStatus:列出AlluxioURI檔案/目錄資訊;
mount/updateMount/unmount:掛載/更新/取消掛載指定AlluxioURI目錄;
openFile:開啟並讀取AlluxioURI檔案輸入流;
persist:將Alluxio中快取的資料非同步持久化底層UFS;
rename:Alluxio檔案重新命名。
4.1.2. FileSystemContext
維護Alluxio基於Client進行檔案系統操作的上下文資訊,通常的,一個Client JVM程序會使用同個FileSystem連線Alluxio,因此Client物件會在不同執行緒中共享。FileSystemContext 只有當用戶需要個性化配置和認證時才被建立,執行緒共享的Client會針對FileSystemContext維護獨立的執行緒空間,FileSystemContext 執行緒不共享(執行緒安全)會增加Client連線的資源使用,因此當用戶停止Alluxio操作後,需要關閉FileSystemContext釋放資源。
4.1.3. FileInStream/FileOutStream
Client中定義基於Alluxio檔案操作的輸入/輸出流,如下所示:
輸出流:AlluxioFileOutStream,Alluxio輸出流寫入,底層操作BlockOutStream
輸入流:AlluxioFileInStream:Alluxio輸入流讀取,封裝了本地/遠端節點資料讀取,或者直接基於底層UFS;底層操作BlockInStream,LocalCacheFileInStream,AlluxioHdfsInputStream
4.2 AbstractShell
Client的功能可以通過Shell對外提供操作,AbstractShell抽象類定義Alluxio中Shell命令操作,其繼承子類包括:
FileSystemShell:Alluxio Shell檔案操作入口類;
FileSystemAdminShell:Alluxio檔案系統管理操作;
CollectInfo:Alluxio中從所有Woker節點採集資訊命令;
TableShell:Alluxio表管理操作;
JobShell:Alluxio執行job管理操作。
4.2.1. CatCommand
以CatCommand為例,簡述Alluxio Client進行檔案讀取的大致流程如:
FileSystemShell接收shell命令,執行"cat"開啟檔案操作,呼叫CatCommand.run命令,shell命令支援正則和多目錄,對每個指定目錄執行自定義實現的runPlainPath操作;
CatCommand#runPlainPath 方法通過getStatus判斷檔案型別,若為目錄則退出,若為檔案則基於FileSystem開啟檔案獲取客戶端輸入流物件FileInStream(AlluxioFileInStream);
基於AlluxioFileInStream#read讀取檔案內容,URIStatus維護Alluxio中目錄和檔案元資料快照資訊,基於URIStatus獲取指定Alluxio檔案對應Block資訊,通過Client AlluxioBlockStore中維護的Block資訊獲取BlockInStream(Block輸入流);
基於BlockInStream呼叫輸入流讀取操作,底層基於Block的資料讀取介面DataReader實現,基於DataReader讀取Block詳情下述的Block讀操作。
4.2.2. TouchCommand
以TouchCommand為例,簡述Alluxio Client進行檔案寫入的大致流程如:
FileSystemShell接收shell命令,執行"touch"開啟檔案操作,呼叫TouchCommand.run命令,shell命令支援正則和多目錄,對每個指定目錄執行自定義實現的runPlainPath操作;
TouchCommand#runPlainPath 方法呼叫FileSystem.createFile 建立檔案並在結束後關閉該連線;
FileSystem.createFile的方法詳解如下:
-
基於FileSystemMasterClient獲取FileSystemMasterClientServiceHandler 遠端的RPC連線資訊;
-
基於FileSystemMasterClient 呼叫RPC介面建立資料檔案(createFile),將新建Alluxio檔案元資料資訊同步Alluxio Master;
-
FileSystem新建Client端的Alluxio檔案輸出流物件:AlluxioFileOutStream,其底層呼叫Block的DataWriter物件進行檔案處理;
-
輸出流完成後,執行AlluxioFileOutStream#close方法,呼叫FileSystemMasterClient#completeFile 判斷是否已執行完成,最終基於RPC介面實現completeFile;
Part 5 輕量級排程
Alluxio內部基於AlluxioJobMaster和AlluxioJobWoker實現輕量級內建的Alluxio操作排程,Master負責作業的排程管理,而Worker真正執行作業操作。
5.1排程管理
由前文AlluxioJobMaster啟動流程可知,AlluxioJobMaster在啟動時會觸發JobMaster Server啟動,JobMaster內部維護執行計劃(plan)的管理追蹤器:PlanTracker,用於建立、移除、訪問任務作業集合,每個作業都有對應的PlanCoordinator用於分散式作業執行協調。外部服務可通過HTTP和RPC方式呼叫JobMaster.run 方法根據作業配置(JobConfig)啟動並進行作業排程(同步/執行緒安全的)。JobConfig 定義作業配置介面,分為兩類:PlanConfig(單作業執行)、WorkflowConfig(一組作業流執行)。
JobMaster中作業排程管理的大致流程如下:
外部介面可呼叫JobMaster.run方法觸發作業執行,以Plan作業型別為例,呼叫PlanTracker執行run方法;
PlanTracker先校驗並移除已完成的作業,並基於PlanCoordinator建立新的作業例項並啟動該作業例項;
PlanCoordinator作業啟動流程:
-
基於JobConfig獲取對應的PlanDefinition;
-
根據可用的Worker列表和PlanDefinition,呼叫selectExecutors方法獲取待執行作業Worker列表;
-
呼叫CommandManager提交作業,將作業及待執行作業worker列表資訊維護在記憶體佇列中;
最後,Job Master和Job Worker節點通過RPC心跳檢測,下發具體的作業資訊給Worker執行。
5.2 作業執行
由前文AlluxioJobWorker啟動流程可知,AlluxioJobWorker啟動時會觸發心跳檢測執行緒CommandHandlingExecutor,對接收到的作業執行排程處理,每個作業啟動一個執行緒執行,作業執行大致流程如下:
CommandHandlingExecutor執行緒啟動與JobMaster進行心跳檢測,基於JobMasterClient.heartbeat方法獲取所有的待執行作業列表;
遍歷待執行作業列表,從執行緒池呼叫CommandHandler.run執行緒類執行作業排程,包括的作業型別:啟動、取消、註冊作業;
CommandHandler啟動作業會呼叫TaskExecutorManager 執行作業,以Future執行TaskExecutor 進行執行緒級別作業排程;
TaskExecutor真正執行作業排程:
-
對應作業引數進行反序列化操作;
-
根據PlanDefinitionRegistry 獲取執行Job的PlanDefinition並調動runTask執行作業;
以PersistDefinition為例,大致說明Job Executor操作,將Alluxio Block儲存持久化到底層UFS:
獲取Alluxio的資料儲存URI,讀取對應的資料輸入流in;
獲取指定的UFS目標路徑,根據UfsClient判斷該路徑是否存在,若不存在則建立,並基於UnderFileSystem建立輸出流out;
根據I/O操作工具類,將資料從資料流拷貝輸出流,持久化到UFS。
想要獲取更多有趣有料的【活動資訊】【技術文章】【大咖觀點】,請關注[Alluxio智庫]:
- Alluxio跨叢集同步機制的設計與實現
- 如何借力Alluxio推動大資料產品效能提升與成本優化?
- 2023年五大趨勢預測 | 大資料分析、人工智慧和雲產業展望
- 如何用Alluxio加速雲上深度學習訓練?
- 【螞蟻】Alluxio在螞蟻集團大規模訓練中的應用
- 從博士論文到被各大廠應用,Alluxio 如何走過 7 年創業路
- Presto on Alluxio By Alluxio SDS 單節點搭建
- Alluxio Local Cache 監控指南 Alluxio Alluxio
- 幫助 Meta 解決 Presto 中的資料孤島問題
- B站基於Iceberg Alluxio助力湖倉一體專案落地實踐
- 【聯通】資料編排技術在聯通的應用
- 【聯通】資料編排技術在聯通的應用
- Alluxio 原始碼完整解析 | 你不知道的開源資料編排系統(下篇)
- Alluxio 原始碼完整解析 | 你不知道的開源資料編排系統 (上篇)
- Meta(Facebook): 基於Alluxio Shadow Cache優化Presto架構決策
- Apache頂級專案Ranger和Alluxio的最佳實踐(附教程)
- 金山雲團隊分享 | 5000字讀懂Presto如何與Alluxio搭配
- 2min速覽:從設計、實現和優化角度淺談Alluxio元資料同步
- 華能 Alluxio | 數字化浪潮下跨地域資料聯邦訪問與分析
- 什麼是一致性雜湊?可以應用在哪些場景?