Hadoop3.x 三大元件詳解

語言: CN / TW / HK

Hadoop

Hadoop適合海量資料分散式儲存和分散式計算

執行使用者使用簡單的程式設計模型實現跨機器叢集對海量資料進行分散式計算處理

1. 概述

1.1 簡介

Hadoop核心元件

  • HDFS (分散式檔案 儲存 系統):解決海量資料儲存
  • YARN(叢集 資源管理 和任務排程框架):解決資源任務排程
  • MapReduce(分散式 計算 框架):解決海量資料計算

Hadoop發展簡史

  • Hadoop起源於Apache Lucen子專案:Nutch

    Nutch的設計目標是構建一個大型的全網搜尋引擎

    問題:如何解決數十億網頁的儲存和索引問題

  • 三篇論文Google

    • The Google file system 谷歌分散式檔案系統 GFS
    • MapReduce: Simplified Data Processing on Large Clusters 谷歌分散式計算框架
    • Bigtable: A Distributed Storage System for Structured Data 谷歌結構化資料儲存系統

Hadoop現狀

  • HDFS處在生態圈底層與核心地位
  • YARN支撐各種計算引擎執行
  • MapReduce企業一線幾乎不再直接使用,很多軟體的底層依舊使用MapReduce引擎

Hadoop特性優點

  • 擴容能力(scalability):節點數量靈活變化
  • 成本低(Economical):允許通過部署普通廉價的機器組成叢集
  • 效率高(efficiency):併發資料,可以在節點之間動態並行的移動資料
  • 可靠性(reliability):自動維護資料的多份複製,並且在任務失敗後能自動重新部署計算任務

發行版本

  • 開源社群版:Apache開源社群髮型
    • 更新快
    • 相容穩定性不好
  • 商業發行版
    • 基於Apache開源協議
    • 穩定相容好
    • 收費,版本更新慢

架構變遷

  • Hadoop 1.0

    • HDFS

    • MapReduce

  • Hadoop 2.0

    • HDFS

    • MapReduce

    • YARN

  • Hadoop 3.0

    著重於效能優化

    • 精簡核心,類路徑隔離、shell指令碼重構
    • EC糾刪碼,多NameNode支援
    • 任務本地化優化、記憶體引數自動推斷

1.2 安裝部署

Hadoop叢集整體概述

  • Hadoop叢集包括兩個:HDFS叢集、YARN叢集
  • 兩個叢集邏輯上分離、通常物理在一起
    • 叢集互相之間沒有依賴、互不影響
    • 程序部署在同一機器上
  • 兩個叢集都是標準的主從架構叢集

HDFS叢集

  • NameNode
  • DataNode
  • SecondaryNameNode

Yarn叢集

  • ResourceManager
  • NodeManager

安裝Hadoop

Hadoop安裝包結構

叢集安裝部署

  • 偽分散式叢集安裝:一臺機器

    具體安裝檢視官方文件 英文

  • 分散式叢集安裝:三臺機器

    具體安裝檢視官方文件 英文

1.3 啟動&關閉

  1. 要啟動hadoop叢集,首先要格式化HDFS

    $HADOOP_HOME/bin/hdfs namenode -format
  2. 啟動hdfs

    方法一:在 主節點 上啟動namenode,在 每一個從節點 上啟動datanode

    # node1
    $HADOOP_HOME/bin/hdfs --daemon start namenode
    
    # node2 node3...
    $HADOOP_HOME/bin/hdfs --daemon start datanode
    
    # 關閉
    $HADOOP_HOME/bin/hdfs --daemon stop namenode
    $HADOOP_HOME/bin/hdfs --daemon stop datanode

    方法二:如果配置了 etc/hadoop/workers 且所有的節點都配置了ssh免密登陸,在任意一個節點上都可以啟動,執行一次即可

    $HADOOP_HOME/sbin/start-dfs.sh
    
    # 關閉
    $HADOOP_HOME/sbin/stop-dfs.sh
  3. 啟動YARN

    方法一:啟動ResourceManager,在主角色的節點上執行,啟動NodeManager,在 每一個從角色 上執行

    # node1
    $HADOOP_HOME/bin/yarn --daemon start resourcemanager
    
    # node2 ...
    $HADOOP_HOME/bin/yarn --daemon start nodemanager
    
    #關閉
    $HADOOP_HOME/bin/yarn --daemon stop resourcemanager
    $HADOOP_HOME/bin/yarn --daemon stop nodemanager

    方法二:如果配置了 etc/hadoop/workers 且所有的節點都配置了ssh免密登陸,在任意一個節點上執行都可以啟動

    $HADOOP_HOME/sbin/start-yarn.sh
    
    #關閉
    $HADOOP_HOME/sbin/stop-yarn.sh
  4. 也可以使用一鍵執行的指令碼開啟yarn和hdfs

    $HADOOP_HOME/sbin/start-all.sh
    
    $HADOOP_HOME/sbin/stop-all.sh
  5. 開啟日誌伺服器(可選)

    開啟之前需要開啟日誌聚合功能,需要修改 bin/yarn-site.xml ,新增如下內容

    要根據自己的配置,修改伺服器地址

    <!-- 開啟日誌聚集 -->
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>
    <!-- 設定yarn歷史伺服器地址 -->
    <property>
        <name>yarn.log.server.url</name>
        <value>http://node1:19888/jobhistory/logs</value>
    </property>
    <!-- 儲存的時間7天 -->
    <property>
        <name>yarn.log-aggregation.retain-seconds</name>
        <value>604800</value>
    </property>

    啟動日誌伺服器

    $HADOOP_HOME/bin/mapred --daemon start historyserver
    
    #關閉
    $HADOOP_HOME/bin/mapred --daemon stop historyserver

2. HDFS

Hadoop Distributed File System, Hadoop的分散式檔案系統

2.1 概述

  • HDFS主要是解決大資料如何儲存問題的。分散式意味著是HDFS是橫跨在多臺計算機上的儲存系統。

  • HDFS是一種能夠在普通硬體上執行的分散式檔案系統,它是高度容錯的,適應於具有大資料集的應用程式,它非 常適於儲存大型資料 (比如 TB 和 PB)。

  • HDFS使用多臺計算機儲存檔案, 並且提供統一的訪問介面, 像是訪問一個普通檔案系統一樣使用分散式檔案系統

特點

  1. 分散式儲存
  2. 元資料記錄
  3. 分塊儲存
  4. 副本備份

設計目標

  • 硬體故障(Hardware Failure)是常態, HDFS可能有成百上千的伺服器組成,每一個元件都有可能出現故障。因此 故障檢測和自動快速恢復 是HDFS的核心架構目標。
  • HDFS上的應用主要是以流式讀取資料(Streaming Data Access)。HDFS被設計成用於 批處理 ,而不是使用者互動式的。相較於資料訪問的反應時間,更 注重資料訪問的高吞吐量
  • 典型的HDFS檔案大小是GB到TB的級別。所以,HDFS被調整成 支援大檔案(Large Data Sets) 。它應該提供很 高的聚合資料頻寬,一個叢集中支援數百個節點,一個叢集中還應該支援千萬級別的檔案。
  • 大部分HDFS應用對檔案要求的是 write-one-read-many 訪問模型。一個檔案一旦 建立、寫入、關閉之後就不需要修改 了。這一假設簡化了資料一致性問題,使高吞吐量的資料訪問成為可能。
  • 移動計算的代價比之移動資料的代價低 。一個應用請求的計算,離它操作的資料越近就越高效。將計算移動到資料 附近,比之將資料移動到應用所在顯然更好。
  • HDFS被設計為可從一個 平臺輕鬆移植 到另一個平臺。這有助於將HDFS廣泛用作大量應用程式的首選平臺

應用場景

主要特性

  • 主從架構

    • HDFS叢集是標準的master/slave主從架構叢集。
    • 一般一個HDFS叢集是有一個Namenode和一定數目的Datanode組成。
    • Namenode是HDFS主節點,Datanode是HDFS從節點,兩種角色各司其職,共同協調完成分散式的檔案儲存服 務。
    • 官方架構圖中是一主五從模式,其中五個從角色位於兩個機架(Rack)的不同伺服器上。
  • 分塊儲存

    • HDFS中的檔案在物理上是分塊儲存(block)的,預設大小是128M(134217728),不足128M則本身就是一塊 。
    • 塊的大小可以通過配置引數來規定,引數位於 hdfs-default.xml 中: dfs.blocksize
  • 副本機制

    dfs.replication
    
  • 元資料記錄

    在HDFS中,Namenode管理的元資料具有兩種型別:

    • 檔案自身屬性資訊 檔名稱、許可權,修改時間,檔案大小,複製因子,資料塊大小。

    • 檔案塊位置對映資訊 記錄檔案塊和DataNode之間的對映資訊,即哪個塊位於哪個節點上。

  • 抽象統一的目錄樹結構(namespace)

    • HDFS支援傳統的 層次型檔案組織結構 。使用者可以建立目錄,然後將檔案儲存在這些目錄裡。檔案系統名字空間的 層次結構和大多數現有的檔案系統類似:使用者可以建立、刪除、移動或重新命名檔案。

    • Namenode負責維護檔案系統的namespace名稱空間,任何對檔案系統名稱空間或屬性的修改都將被Namenode 記錄下來。

    • HDFS會給客戶端提供一個統一的抽象目錄樹,客戶端通過路徑來訪問檔案,形如: hdfs://namenode:port/dira/dir-b/dir-c/file.data

  • 資料庫儲存

    • 檔案的各個block的具體 儲存管理由DataNode節點承擔
    • 每一個block都可以在多個DataNode上儲存。

2.2 HDFS Shell操作

簡介

  • 命令列介面(英語: command-line interface ,縮寫: CLI ),是指使用者通過鍵盤輸入指令,計算機接收到指令後 ,予以執行一種人際互動方式。
  • Hadoop提供了檔案系統的shell命令列客戶端: hadoop fs [generic options]

檔案系統協議

  • HDFS Shell CLI 支援操作多種檔案系統,包括本地檔案系統 (file:///) 、分散式檔案系統 (hdfs://nn:8020)
  • 具體操作的是什麼檔案系統取決於命令中檔案路徑URL中的字首協議。
  • 如果沒有指定字首,則將會讀取環境變數中的 fs.defaultFS 屬性,以該屬性值作為預設檔案系統。

區別

hadoop dfs
hdfs dfs
hadoop fs

目前版本來看,官方最終推薦使用的是hadoop fs。當然hdfs dfs在市面上的使用也比較多。

引數說明

  • HDFS檔案系統的操作命令很多和Linux類似,因此學習成本相對較低。
  • 可以通過hadoop fs -help命令來檢視每個命令的詳細用法。

操作命令

hadoop fs -xxx [x] <path> ...
  • ls :查詢指定路徑資訊

    hadoop fs -ls [-h] [-R] [<path> ...]
    [-h]
    [-R] 
    
  • put :從本地上傳檔案

    hadoop fs -put [-f] [-p] <localsrc> ... <dst>
    [-f]
    [-p]
    localsrc
    dst
    
  • get :下載檔案到本地

    hadoop fs -get [-f] [-p] <src> ... <localdst>
    localdst
    -f
    -p
    
  • cat:檢視HDFS檔案內容

    hadoop fs -cat <src> ...
  • cp

    hadoop fs -cp [-f] <src> ... <dst>
  • mkdir :建立資料夾

    • [-p] :遞迴建立資料夾
  • rm [-r] :刪除檔案/資料夾

  • apped :追加檔案

    hadoop fs -appendToFile <localsrc> ... <dst>
    • 將所有給定本地檔案的內容追加到給定dst檔案
    • dst如果檔案不存在,將建立該檔案
    • 如果 <localSrc>- ,則輸入為從標準輸入中讀取
    • 適合小檔案合併
    #追加內容到檔案尾部 appendToFile
    [[email protected] ~]# echo 1 >> 1.txt
    [[email protected] ~]# echo 2 >> 2.txt
    [[email protected] ~]# echo 3 >> 3.txt
    [[email protected] ~]# hadoop fs -put 1.txt /
    [[email protected] ~]# hadoop fs -cat /1.txt
    1
    [[email protected] ~]# hadoop fs -appendToFile 2.txt 3.txt /1.txt
    [[email protected] ~]# hadoop fs -cat /1.txt
    1
    2
    3

命令官方指導文件

示例

# 完整命令
bin/hadoop fs -xxx scheme://authority/path

# 顯示檔案
hadoop fs -ls /
# 上傳檔案
hadoop fs -put readme.txt /
# 顯示檔案
hadoop fs -ls /                      
# Found 1 items
# -rw-r--r--   1 root supergroup          0 2022-04-18 21:37 /readme.txt
# 檢視檔案內容
hadoop fs -cat /readme.txt
# 下載檔案到本地
hadoop fs -get /readme.txt read.txt

# 建立資料夾(-p遞迴建立目錄)
hadoop fs -mkdir /test 
hadoop fs -mkdir -p /test2/cur/

統計檔案數量

hadoop fs -ls / | grep / | wc -l

統計檔案大小

hadoop fs -ls / | grep / | awk '{print $8,$5}' 
/readme.txt 0
/test 0

2.3 節點概述

架構圖

NameNode

主角色

  • NameNode是Hadoop分散式檔案系統的核心,架構中的主角色。
  • NameNode維護和管理檔案系統元資料,包括名稱空間目錄樹結構、檔案和塊的位置資訊、訪問許可權等資訊。
  • 基於此, NameNode成為了訪問HDFS的唯一入口
  • NameNode內部通過 記憶體和磁碟檔案 兩種方式管理元資料。
  • 其中磁碟上的元資料檔案包括
    Fsimage
    edits log(Journal)
    

DataNode

從角色

  • DataNode是Hadoop HDFS中的從角色,負責 具體的資料塊儲存
  • DataNode的數量決定了HDFS叢集的整體資料儲存能力。通過和NameNode配合維護著資料塊。
  • 多副本機制:預設為3

SecondaryNameNode

  • Secondary NameNode充當NameNode的輔助節點,但不能替代NameNode。
  • 主要是幫助主角色 進行元資料檔案的合併動作 。可以通俗的理解為主角色的“祕書
  • 負責定期的把edits檔案中的內容合併到fsimage中,合併操作稱為checkpoint,在合併的時候會對edits中的內容進行轉換,生成新的內容儲存到fsimage檔案中
  • SecondaryNameNode程序並不是必須的。

NameNode職責

  • NameNode 僅儲存HDFS的元資料 :檔案系統中所有檔案的目錄樹,並跟蹤整個叢集中的檔案,不儲存實際資料。
  • NameNode知道HDFS中 任何給定檔案的塊列表及其位置 。使用此資訊NameNode知道如何從塊中構建檔案。
  • NameNode 不持久化儲存每個檔案中各個塊所在的datanode的位置資訊 ,這些資訊會在系統啟動時從DataNode重建。
  • NameNode是Hadoop叢集中的 單點故障
  • NameNode所在機器通常會配置有 大量記憶體(RAM)

DataNode職責

  • DataNode 負責最終資料塊block的儲存 。是叢集的從角色,也稱為Slave。
  • DataNode啟動時,會將自己 註冊到NameNode並彙報自己負責持有的塊列表
  • 當某個DataNode關閉時,不會影響資料的可用性。 NameNode將安排由其他DataNode管理的塊進行副本複製 。
  • DataNode所在機器通常配置有 大量的硬碟空間 ,因為實際資料儲存在DataNode中。

2.4 HDFS寫資料流程

寫資料完整流程

pipeline管道

  • Pipeline,中文翻譯為管道。這是HDFS在上傳檔案寫資料過程中採用的一種資料傳輸方式。
  • 客戶端將資料塊寫入第一個資料節點,第一個資料節點儲存資料之後再將塊複製到第二個資料節點,後者儲存後將 其複製到第三個資料節點。
  • 為什麼DataNode之間採用pipeline線性傳輸,而不是一次給三個DataNode拓撲式傳輸呢?
    • 因為資料以管道的方式, 順序的沿著一個方向傳輸,這樣能夠充分利用每個機器的頻寬,避免網路瓶頸和高延遲時 的連線,最小化推送所有資料的延時
    • 線上性推送模式下,每臺機器所有的出口寬頻都用於以最快的速度傳輸資料,而不是在多個接受者之間分配寬頻。

ACK應答響應

  • ACK (Acknowledge character)即是確認字元,在資料通訊中,接收方發給傳送方的一種傳輸類控制字元。表示 發來的資料已確認接收無誤。
  • 在HDFS pipeline管道傳輸資料的過程中,傳輸的反方向會進行ACK校驗,確保資料傳輸安全。

預設3副本儲存策略

  • 預設副本儲存策略是由 BlockPlacementPolicyDefault 指定。
  • 第一塊副本:優先客戶端本地,否則隨機
  • 第二塊副本:不同於第一塊副本的不同機架。
  • 第三塊副本:第二塊副本相同機架不同機器。

實際流程

  1. HDFS客戶端建立物件例項 DistributedFileSystem , 該物件中封裝了與HDFS檔案系統操作的相關方法。

  2. 呼叫 DistributedFileSystem 物件的 create() 方法,通過 RPC請求 NameNode建立檔案。

    • NameNode執行各種檢查判斷:目標檔案是否存在、父目錄是否存在、客戶端是否具有建立該檔案的許可權。檢查通過 ,NameNode就會為本次請求記下一條記錄,返回 FSDataOutputStream 輸出流 物件給客戶端用於寫資料。
  3. 客戶端通過FSDataOutputStream輸出流開始寫入資料。

  4. 客戶端寫入資料時,將資料分成一個個資料包(packet 預設64k), 內部元件DataStreamer請求NameNode挑 選出適合儲存資料副本的一組DataNode地址,預設是3副本儲存

    • DataStreamer將資料包流式傳輸到pipeline的第一個DataNode,該DataNode儲存資料包並將它傳送到pipeline的第二個DataNode。同樣,第二個DataNode儲存資料包並且傳送給第三個(也是最後一個)DataNode
  5. 傳輸的反方向上,會通過ACK機制校驗資料包傳輸是否成功

  6. 客戶端完成資料寫入後,在 FSDataOutputStream 輸出流上呼叫 close() 方法關閉

  7. DistributedFileSystem 聯絡NameNode告知其檔案寫入完成,等待NameNode確認

    因為NameNode已經知道檔案由哪些塊組成(DataStream請求分配資料塊),因此僅需等待最小複製塊即可成功返回 。 最小複製是由引數 dfs.namenode.replication.min 指定,預設是1.

2.5 HDFS讀資料流程

流程圖

  1. HDFS客戶端建立物件例項 DistributedFileSystem , 呼叫該物件的open()方法來開啟希望讀取的檔案

  2. DistributedFileSystem使用RPC呼叫namenode來確定檔案中 前幾個塊的塊位置(分批次讀取)資訊 。 對於每個塊,namenode返回具有該塊所有副本的datanode位置地址列表,並且該地址列表是排序好的,與客戶端的網路拓撲距離近的排序靠前

  3. DistributedFileSystem將FSDataInputStream輸入流返回到客戶端以供其讀取資料

  4. 客戶端在FSDataInputStream輸入流上呼叫read()方法。

    然後,已儲存DataNode地址的InputStream連線到檔案 中第一個塊的最近的DataNode。資料從DataNode流回客戶端,結果客戶端可以在流上重複呼叫read()

  5. 當該塊結束時,FSDataInputStream將關閉與DataNode的連線,然後尋找下一個block塊的最佳datanode位置。 這些操作對使用者來說是透明的。所以使用者感覺起來它一直在讀取一個連續的流。

    客戶端從流中讀取資料時,也會根據需要詢問NameNode來 檢索下一批資料塊的DataNode位置資訊

  6. 一旦客戶端完成讀取,就對FSDataInputStream呼叫close()方法

2.6 HDFS的高可用和高擴充套件

High Available
Federation

高可用(High Available)

HDFS的HA,指的是在一個叢集中存在多個NameNode,分別執行在獨立的物理節點上。在任何時間點,只有一個NameNode是處於Active狀態,其它的是處於Standby狀態。 Active NameNode(簡寫為Active NN)負責所有的客戶端的操作,而Standby NameNode(簡寫為Standby NN)用來同步Active NameNode的狀態資訊,以提供快速的故障恢復能力。

為了保證Active NN與Standby NN節點狀態同步,即元資料保持一致。除了DataNode需要向這些NameNode傳送block位置資訊外,還構建了一組獨立的守護程序”JournalNodes”(簡寫為JN),用來同步Edits資訊。當Active NN執行任何有關名稱空間的修改,它需要持久化到一半以上的JNs上。而Standby NN負責觀察JNs的變化,讀取從Active NN傳送過來的Edits資訊,並更新自己內部的名稱空間。一旦Active NN遇到錯誤,Standby NN需要保證從JNs中讀出了全部的Edits,然後切換成Active狀態,如果有多個Standby NN,還會涉及到選主的操作,選擇一個切換為Active 狀態。

需要注意一點,為了保證Active NN與Standby NN節點狀態同步,即元資料保持一致

這裡的元資料包含兩塊,一個是靜態的,一個是動態的

靜態的是fsimage和edits,其實fsimage是由edits檔案合併生成的,所以只需要保證edits檔案內容的一致性。這個就是需要保證多個NameNode中edits檔案內容的事務性同步。這塊的工作是由JournalNodes叢集進行同步的

動態資料是指block和DataNode節點的資訊,這個如何保證呢? 當DataNode啟動的時候,上報資料資訊的時候需要向每個NameNode都上報一份。 這樣就可以保證多個NameNode的元資料資訊都一樣了,當一個NameNode down掉以後,立刻從Standby NN中選擇一個進行接管,沒有影響,因為每個NameNode 的元資料時刻都是同步的。

注意:使用HA的時候,不能啟動SecondaryNameNode,會出錯。 之前是SecondaryNameNode負責合併edits到fsimage檔案 那麼現在這個工作被standby NN負責了。

NameNode 切換可以自動切換,也可以手工切換,如果想要實現自動切換,需要使用到zookeeper叢集。

使用zookeeper叢集自動切換的原理是這樣的

當多個NameNode 啟動的時候會向zookeeper中註冊一個臨時節點,當NameNode掛掉的時候,這個臨時節點也就消失了,這屬於zookeeper的特性,這個時候,zookeeper就會有一個watcher監視器監視到,就知道這個節點down掉了,然後會選擇一個節點轉為Active,把down掉的節點轉為Standby

高擴充套件(Federation)

HDFS Federation可以解決單一名稱空間存在的問題,使用多個NameNode,每個NameNode負責一個命令空間

這種設計可提供以下特性:

  1. HDFS叢集擴充套件性 :多個NameNode分管一部分目錄,使得一個叢集可以擴充套件到更多節點,不再因記憶體的限制制約檔案儲存數目。
  2. 效能更高效 :多個NameNode管理不同的資料,且同時對外提供服務,將為使用者提供更高的讀寫吞吐率。
  3. 良好的隔離性 :使用者可根據需要將不同業務資料交由不同NameNode管理,這樣不同業務之間影響很小。

如果真用到了Federation,一般也會和前面我們講的HA結合起來使用,來看這個圖

3. MapReduce

3.1 分治思想

  • MapReduce的思想核心是“ 先分再合,分而治之 ”。
  • 所謂“分而治之”就是 把一個複雜的問題,按照一定的“分解”方法分為等價的規模較小的若干部分,然後逐個解 決,分別找出各部分的結果,然後把各部分的結果組成整個問題的最終結果
  • Map 表示第一階段,負責“ 拆分 ”:即把複雜的任務 分解為若干個“簡單的子任務”來並行處理 。可以進行拆分的 前提是這些小任務可以平行計算,彼此間幾乎 沒有依賴關係
  • Reduce 表示第二階段,負責“ 合併 ”:即對map階段的結果進行全域性彙總。
  • 這兩個階段合起來正是MapReduce思想的體現。

3.2 設計構思

1. 如何對付大資料處理場景

  • 對相互間不具有計算依賴關係的大資料計算任務,實現並行最自然的辦法就是 採取MapReduce分而治之 的策略。
  • 首先Map階段進行拆分,把大資料拆分成若干份小資料,多個程式同時平行計算產生中間結果;然後是Reduce聚 合階段,通過程式對並行的結果進行最終的彙總計算,得出最終的結果。
  • 不可拆分的計算任務或相互間有依賴關係的資料無法進行平行計算!

2. 構建抽象程式設計模型

  • MapReduce借鑑了 函式式 語言中的思想,用 MapReduce 兩個函式提供了高層的並行程式設計抽象模型。

  • map: 對一組資料元素進行某種重複式的處理

  • reduce : 對Map的中間結果進行某種進一步的結果整理
  • MapReduce中定義瞭如下的Map和Reduce兩個抽象的程式設計介面,由使用者去程式設計實現:

    map: (k1; v1) → (k2; v2) 
    reduce: (k2; [v2]) → (k3; v3)

    通過以上兩個程式設計介面,大家可以看出MapReduce處理的資料型別是 鍵值對

3. 統一架構、隱藏底層細節

  • 如何提供統一的計算框架,如果沒有統一封裝底層細節,那麼程式設計師則需要考慮諸如資料儲存、劃分、分發、結果 收集、錯誤恢復等諸多細節;為此,MapReduce設計並提供了統一的計算框架,為程式設計師隱藏了絕大多數系統層 面的處理細節。
  • MapReduce最大的亮點在於通過抽象模型和計算框架把需要 做什麼(what need to do) 與具體 怎麼做(how to do) 分開了,為程式設計師提供一個抽象和高層的程式設計介面和框架。 (業務和底層技術分開)
  • 程式設計師僅需要關心其應用層的具體計算問題,僅需編寫少量的處理應用本身計算問題的業務程式程式碼。
  • 至於如何具體完成這個平行計算任務所相關的諸多系統層細節被隱藏起來,交給計算框架去處理:從分佈程式碼的執行,到大到數千小到單個節點叢集的自動排程使用。

3.3 介紹

分散式計算概念

  • 分散式計算 是一種計算方法,和 集中式計算 是相對的。
  • 隨著計算技術的發展,有些應用需要非常巨大的計算能力才能完成,如果採用集中式計算,需要耗費相當長的時間 來完成。
  • 分散式計算 將該應用分解成許多小的部分,分配給多臺計算機進行處理 。這樣可以節約整體計算時間,大大提高計算效率。

MapReduce

  • Hadoop MapReduce是一個 分散式計算框架 ,用於輕鬆編寫分散式應用程式,這些應用程式以可靠,容錯的方式 並行處理大型硬體叢集(數千個節點)上的大量資料(多TB資料集)。
  • MapReduce是一種面向海量資料處理的一種指導思想,也是一種用於對大規模資料進行分散式計算的程式設計模型。

產生背景

  • MapReduce最早由Google於2004年在一篇名為《MapReduce:Simplified Data Processingon Large Clusters 》的論文中提出。
  • 論文中谷歌把分散式資料處理的過程拆分為Map和Reduce兩個操作函式(受到函數語言程式設計語言的啟發),隨後被 Apache Hadoop參考並作為開源版本提供支援,叫做Hadoop MapReduce。
  • 它的出現解決了人們在最初面臨海量資料束手無策的問題,同時它還是 易於使用和高度可擴充套件 的,使得開發者無需 關係分散式系統底層的複雜性即可很容易的編寫分散式資料處理程式,並在成千上萬臺普通的商用伺服器中執行。

特點

  • 易於程式設計

    MapReduce框架提供了用於二次開發的介面;簡單地實現一些介面,就可以完成一個分散式程式。任務計算交給計算 框架去處理,將分散式程式部署到hadoop叢集上執行,叢集節點可以擴充套件到成百上千個等。

  • 良好的擴充套件性

    當計算機資源不能得到滿足的時候,可以通過增加機器來擴充套件它的計算能力。基於MapReduce的分散式計算得特點可 以隨節點數目增長保持近似於線性的增長,這個特點是MapReduce處理海量資料的關鍵,通過將計算節點增至幾百或 者幾千可以很容易地處理數百TB甚至PB級別的離線資料。

  • 高容錯性

    Hadoop叢集是分散式搭建和部署得,任何單一機器節點宕機了,它可以把上面的計算任務轉移到另一個節點上執行, 不影響整個作業任務得完成,過程完全是由Hadoop內部完成的。

  • 適合海量資料的離線處理

    可以處理GB、TB和PB級別得資料量

侷限性

MapReduce雖然有很多的優勢,也有相對得侷限性,侷限性不代表不能做,而是在有些場景下實現的效果比較差,並 不適合用MapReduce來處理,主要表現在以下結果方面:

  • 實時計算效能差

    MapReduce主要應用於離線作業,無法作到秒級或者是亞秒級得資料響應。

  • 不能進行流式計算

    流式計算特點是資料是源源不斷得計算,並且資料是動態的;而MapReduce作為一個離線計算框架,主要是針對靜態 資料集得,資料是不能動態變化得。

例項程序

一個完整的MapReduce程式在分散式執行時有三類

  • MRAppMaster :負責整個MR程式的過程排程及狀態協調
  • MapTask :負責map階段的整個資料處理流程
  • ReduceTask :負責reduce階段的整個資料處理流程

階段組成

  • 一個MapReduce程式設計模型中 只能包含一個Map階段和一個Reduce階段,或者只有Map階段
  • 不能有諸如多個map階段、多個reduce階段的情景出現
  • 如果使用者的業務邏輯非常複雜,那就只能多個MapReduce程式序列執行

資料型別

  • 注意:整個MapReduce程式中,資料都是以 kv鍵值對的形式流轉
  • 在實際程式設計解決各種業務問題中,需要考慮每個階段的輸入輸出kv分別是什麼
  • MapReduce內建了很多預設屬性,比如排序、分組等,都和資料的k有關,所以說kv的型別資料確定及其重要的

3.3 官方示例

  • 一個最終完整版本的MR程式需要 使用者編寫的程式碼和Hadoop自己實現的程式碼 整合在一起才可以
  • 其中使用者負責map、reduce兩個階段的業務問題,Hadoop負責底層所有的技術問題
  • 由於MapReduce計算引擎天生的弊端(慢),所以在 企業中工作很少涉及到MapReduce直接程式設計 ,但是某些軟體的背後還依賴MapReduce引擎
  • 可以通過官方提供的示例來感受MapReduce及其內部執行流程,因為後續的新的計算引擎比如Spark,當中就有 MapReduce深深的影子存在。

示例說明

/XXX/hadoop-XXX/share/hadoop/mapreduce/
hadoop-mapreduce-examples-3.3.0.jar
[hadoop jar|yarn jar] hadoop-mapreduce-examples-XXX.jar args… 

1. 評估圓周率的值

蒙特卡洛方法

  • 執行MapReduce程式評估一下圓周率的值,執行中可以去YARN頁面上觀察程式的執行的情況。
  • 第一個引數:pi表示MapReduce程式執行圓周率計算任務
  • 第二個引數:用於指定map階段執行的任務task次數,併發度,這裡是10
  • 第三個引數:用於指定每個map任務取樣的個數,這裡是50。
/opt/hadoop-3.3.0/share/hadoop/mapreduce# hadoop jar hadoop-mapreduce-examples-3.3.0.jar pi 10 50

2. WordCount單詞詞頻統計

WordCount中文叫做單詞統計、詞頻統計,指的是統計指定檔案中,每個單詞出現的總次數

實現思路

  • map階段的核心:把 輸入的資料經過切割,全部標記1 ,因此輸出就是<單詞,1>

  • shuffle階段核心: 經過MR程式內部自帶預設的排序分組等功能,把key相同的單詞會作為一組資料構成新的kv對

  • reduce階段核心:處理shuffle完的一組資料,該組資料就是該單詞所有的鍵值對。 對所有的1進行累加求和,就是 單詞的總次數

程式提交

  • 自己隨便寫個文字檔案 1.txt 到HDFS檔案系統的 /input 目錄下,如果沒有這個目錄,使用shell建立
    hadoop fs -mkdir /input
    hadoop fs -put 1.txt /input 
    
  • 準備好之後,執行官方MapReduce例項,對上述檔案進行單詞次數統計
    • 第一個引數:wordcount表示執行單詞統計任務
    • 第二個引數:指定輸入檔案的路徑
    • 第三個引數:指定輸出結果的路徑(該路徑不能已存在)
hadoop jar hadoop-mapreduce-examples-3.3.0.jar wordcount /input /output

3.4 MapReduce執行流程

流程圖

Map執行過程

  • 第一階段:把輸入目錄下檔案按照一定的標準逐個進行 邏輯切片 ,形成切片規劃。

    預設 Split size = Block size(128M) ,每一個切片由一個MapTask處理。 (getSplits)

  • 第二階段:對切片中的資料按照一定的規則讀取解析返回鍵值對

    預設是 按行讀取資料 。key是每一行的起始位置偏移量,value是本行的文字內容。 (TextInputFormat)

  • 第三階段:呼叫Mapper類中的 map方法處理資料

    每讀取解析出來的一個 <key, value> ,呼叫一次map方法

  • 第四階段:按照一定的規則對Map輸出的鍵值對進行 分割槽partition 。預設不分割槽,因為只有一個reducetask。 分割槽的數量就是reducetask執行的數量。 (分割槽方法預設是hash求餘法)

  • 第五階段:Map輸出資料寫入 記憶體緩衝區 ,達到比例溢位到磁碟上。 溢位spill 的時候根據key進行 排序sort 。 預設根據key字典序排序。

  • 第六階段:對所有溢位檔案進行最終的 merge合併 ,成為一個檔案。

Reduce執行過程

  • 第一階段:ReduceTask會主動從MapTask 複製拉取 屬於需要自己處理的資料。
  • 第二階段:把拉取來資料,全部進行 合併merge ,即把分散的資料合併成一個大的資料。再對合並後的資料 排序
  • 第三階段:對排序後的鍵值對 呼叫reduce方法鍵相等 的鍵值對呼叫一次reduce方法。最後把這些輸出的鍵值對寫入到HDFS檔案中。

shuffle

  • Shuffle的本意是洗牌、混洗的意思,把一組有規則的資料儘量打亂成無規則的資料。
  • 而在MapReduce中,Shuffle更像是洗牌的逆過程,指的是 將map端的無規則輸出按指定的規則“打亂”成具有一 定規則的資料,以便reduce端接收處理。
  • 一般把從 Map產生輸出開始到Reduce取得資料作為輸入之前的過程稱作shuffle。

Map端Shuffle

一個map最後只會產生一個檔案

  • Collect階段:將MapTask的結果收集輸出到預設大小為100M的環形緩衝區,儲存之前會對key進行分割槽的計算, 預設Hash分割槽(分割槽數量為reducetask的數量,對每個key的hash值對reducetask數求餘映射到某個reducetask,key值相同的會對映到相同的reducetask)。
  • Spill階段:當記憶體中的資料量達到一定的閥值(預設80%)的時候,就會將資料寫入本地磁碟,在將資料寫入磁碟之前需要對資料進行一次排序的操作,如果配置了combiner,還會將有相同分割槽號和key的資料進行排序。 (示例圖上顯示有三個分割槽,即三個reducetask)
  • Merge階段:把所有溢位的臨時檔案進行一次合併操作,以確保一個MapTask最終只產生一箇中間資料檔案。(三個分割槽會分別給三個reducetask,不同map中的相同分割槽會到同一個reducetask上合併)

Reducer端Shuffle

  • Copy階段: ReduceTask啟動Fetcher執行緒到已經完成MapTask的節點上覆制一份屬於自己的資料。
  • Merge階段:在ReduceTask遠端複製資料的同時,會在後臺開啟兩個執行緒對記憶體到本地的資料檔案進行合併操作 。
  • Sort階段:在對資料進行合併的同時,會進行排序操作,由於MapTask階段已經對資料進行了區域性的排序, ReduceTask只需保證Copy的資料的最終整體有效性即可。

Shuffle機制弊端

  • Shuffle是MapReduce程式的核心與精髓,是MapReduce的靈魂所在。
  • Shuffle也是MapReduce被詬病最多的地方所在。MapReduce相比較於Spark、Flink計算引擎慢的原因,跟 Shuffle機制有很大的關係。
  • Shuffle中 頻繁涉及到資料在記憶體、磁碟之間的多次往復

3.5 任務日誌檢視

需要開啟YARN的日誌聚合功能,把散落在NodeManager節點上的日誌統一收集管理,方便檢視日誌

bin\mapred --daemion start historyserver
yarn logs -applicationId <ID>

3.6 中止任務

在命令列中ctrl+c無法停止程式,因為程式已經提交到Hadoop叢集執行 了

  • yarn application -kill <ID>

4. YARN

4.1 介紹

  • Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協調者)是一種新的Hadoop資源管 理器。
  • YARN是一個 通用資源管理系統和排程平臺 ,可為上層應用提供統一的資源管理和排程。
  • 它的引入為叢集在利用率、資源統一管理和資料共享等方面帶來了巨大好處。

功能

  • 資源管理系統 :叢集的硬體資源,和程式執行相關,比如 記憶體、CPU 等。
  • 排程平臺 :多個程式同時申請計算 資源如何分配 ,排程的規則(演算法)。
  • 通用 :不僅僅支援MapReduce程式,理論上 支援各種計算程式 。YARN不關心你幹什麼,只關心你要資源,在有的情況下給你,用完之後還我。

概述

  • 可以把Hadoop YARN理解為相當於一個 分散式的作業系統平臺 ,而MapReduce等計算程式則相當於運行於操作 系統之上的應用程式, YARN為這些程式提供運算所需的資源 (記憶體、CPU等)。
  • Hadoop能有今天這個地位,YARN可以說是功不可沒。因為有了YARN ,更多計算框架可以接入到 HDFS中,而 不單單是 MapReduce, 正是因為YARN的包容,使得其他計算框架能專注於計算效能的提升
  • HDFS可能不是最優秀的大資料儲存系統,但卻是應用最廣泛的大資料儲存系統, YARN功不可沒。
  • yarn.nodemanager.resource.memory-mb :單節點可分配的實體記憶體總量,預設是8MB*1024,即8G
  • yarn.nodemanager.resource.cpu-vcores :單節點可分配的虛擬CPU個數預設是8

4.2 架構元件

架構圖

client
container 容器(資源的抽象):容器之間邏輯上隔離的

YARN三大元件

  • ResourceManager(RM)
    • YARN叢集中的主角色,決定系統中所有應用程式之間 資源分配的最終許可權,即最終仲裁者
    • 接收使用者的作業提交,並通過NM分配、管理各個機器上的計算資源。
  • NodeManager(NM)
    • YARN中的從角色,一臺機器上一個,負責 管理本機器上的計算資源
    • 根據RM命令,啟動Container容器、監視容器的資源使用情況。並且向RM主角色彙報資源使用情況
  • ApplicationMaster (App Mstr) (AM)
    • 使用者提交的每個應用程式均包含一個AM。
    • 應用程式內的“老大” ,負責程式內部各階段的資源申請,監督程式的執行情況。
    • 管理程式的進行

4.3 程式提交互動流程

核心互動流程

  • MR作業提交 Client-->RM
  • 資源的申請 MrAppMaster-->RM
  • MR作業狀態彙報 Container(Map|Reduce Task)-->Container(MrAppMaster)
  • 節點的狀態彙報 NM-->RM

整體概述

當用戶向 YARN 中提交一個應用程式後, YARN將分兩個階段執行該應用程式 。

  • 第一個階段是 客戶端申請資源啟動執行本次程式的ApplicationMaster
  • 第二個階段是 由ApplicationMaster根據本次程式內部具體情況,為它申請資源,並監控它的整個執行過程 ,直 到執行完成。

MR提交YARN互動流程

  • 第1步:使用者通過客戶端向YARN中ResourceManager提交應用程式(比如hadoop jar提交MR程式)
  • 第2步:ResourceManager為該應用程式分配第一個Container(容器),並與對應的NodeManager通訊,要求它在這個Container中啟動這個應用程式的 ApplicationMaster
  • 第3步:ApplicationMaster啟動成功之後, 首先向ResourceManager註冊並保持通訊 ,這樣使用者可以直接通過 ResourceManager檢視應用程式的執行狀態(處理了百分之幾)
  • 第4步:AM為本次程式內部的各個Task任務 向RM申請資源 ,並監控它的執行狀態
  • 第5步:一旦 ApplicationMaster 申請到資源後,便與對應的 NodeManager 通訊 ,要求它啟動任務。
  • 第6步:NodeManager 為任務設定好執行環境後,將任務啟動命令寫到一個指令碼中,並通過執行該指令碼啟動任務
  • 第7步:各個任務通過某個 RPC 協議向 ApplicationMaster 彙報自己的狀態和進度 ,以讓 ApplicationMaster 隨時掌握各個任務的執行狀態,從而可以在任務失敗時重新啟動任務。在應用程式執行過程中,使用者可隨時通過 RPC 向 ApplicationMaster 查詢應用程式的當前執行狀態
  • 第8步:應用程式執行完成後,ApplicationMaster 向 ResourceManager 登出並關閉自己

4.4 排程器Scheduler

  • 在理想情況下,應用程式提出的請求將立即得到YARN批准。但是實際中, 資源是有限的 ,並且在繁忙的群集上, 應用程式通常將需要等待其某些請求得到滿足。YARN排程程式的工作是 根據一些定義的策略為應用程式分配資源

  • 在YARN中,負責給應用分配資源的就是 Scheduler ,它是ResourceManager的核心元件之一。

    Scheduler完全專用於排程作業,它無法跟蹤應用程式的狀態。

  • 一般而言,排程是一個難題,並且沒有一個“最佳”策略,為此,YARN提供了多種排程器和可配置的策略供選擇

排程器策略

  • FIFO Scheduler(先進先出排程器)
  • Capacity Scheduler(容量排程器)(Apache版本YARN預設使用Capacity Scheduler)
  • Fair Scheduler(公平排程器)

FIFO

  • FIFO Scheduler是Hadoop1.x中JobTracker原有的排程器實現,此排程器在YARN中保留了下來。
  • FIFO Scheduler是一個先進先出的思想,即 先提交的應用先執行 。排程工作不考慮優先順序和範圍,適用於負載較低的小規模叢集。當使用大型共享叢集時,它的效率較低且會導致一些問題。
  • FIFO Scheduler擁有一個控制全域性的佇列queue,預設queue名稱為default,該排程器會獲取當前叢集上所有的 資源資訊作用於這個全域性的queue

優勢

  • 無需配置、先到先得、易於執行

劣勢

  • 任務的優先順序不會變高,因此高優先順序的作業需要等待
  • 不適合共享叢集

Capacity

FIFO Schedule的多佇列版本

  • Capacity Scheduler容量排程是Apache Hadoop3.x預設排程策略。該策略允許 多個組織共享整個叢集資源 ,每個 組織可以獲得叢集的一部分計算能力。 通過為每個組織分配專門的佇列,然後再為每個佇列分配一定的叢集資源 , 這樣整個叢集就可以通過設定多個佇列的方式給多個組織提供服務了。
  • Capacity可以理解成一個個的資源佇列,這個資源佇列是使用者自己去分配的。佇列內部又可以垂直劃分,這樣一個 組織內部的多個成員就可以共享這個佇列資源了,在一個佇列內部,資源的排程是採用的是先進先出(FIFO)策略

資源佇列劃分

  • Capacity Scheduler排程器以佇列為單位劃分資源。簡單通俗點來說,就是一個個佇列有獨立的資源,佇列的結構 和資源是可以進行配置的

優勢

  • 層次化的佇列設計 (Hierarchical Queues)
    • 層次化的管理,可以更容易、更合理分配和限制資源的使用
  • 容量保證 (Capacity Guarantees)
    • 每個佇列上都可以設定一個資源的佔比,保證每個佇列都不會佔用整個叢集的資源
  • 安全 (Security)
    • 每個佇列有嚴格的訪問控制。使用者只能向自己的佇列裡面提交任務,而且不能修改或者訪問其他佇列的任務
  • 彈性分配 (Elasticity)
    • 空閒的資源可以被分配給任何佇列。 當多個隊列出現爭用的時候,則會按照權重比例進行平衡

Fair

  • Fair Scheduler叫做 公平排程 ,提供了YARN應用程式 公平地共享大型叢集中資源 的另一種方式。使所有應用在平均情況下隨著時間的流逝可以獲得相等的資源份額。
  • Fair Scheduler設計目標是為所有的應用分配公平的資源(對公平的定義通過引數來設定)。
  • 公平排程可以在多個佇列間工作,允許資源共享和搶佔

如何理解公平共享

  • 有兩個使用者A和B,每個使用者都有自己的佇列。
  • A啟動一個作業,由於沒有B的需求,它分配了叢集所有可用的資源。
  • 然後B在A的作業仍在執行時啟動了一個作業,經過一段時間,A,B各自作業都使用了一半的資源。
  • 現在,如果B使用者在其他作業仍在執行時開始第二個作業,它將與B的另一個作業共享其資源,因此B的每個作業將擁有資源的四分之一,而A的繼續將擁有一半的資源。結果是資源在使用者之間公平地共享。

優勢

  • 分層佇列 :佇列可以按層次結構排列以劃分資源,並可以配置權重以按特定比例共享叢集。
  • 基於使用者或組的佇列對映 :可以根據提交任務的使用者名稱或組來分配佇列。如果任務指定了一個佇列,則在該佇列中提交任務。
  • 資源搶佔 :根據應用的配置,搶佔和分配資源可以是友好的或是強制的。預設不啟用資源搶佔。
  • 保證最小配額 :可以設定佇列最小資源,允許將保證的最小份額分配給佇列,保證使用者可以啟動任務。當佇列不能滿足最小資源時,可以從其它佇列搶佔。當佇列資源使用不完時,可以給其它佇列使用。這對於確保某些使用者、組或生產應用始終獲得足夠的資源。
  • 允許資源共享 :即當一個應用執行時,如果其它佇列沒有任務執行,則可以使用其它佇列,當其它佇列有應用需要資源 時再將佔用的佇列釋放出來。所有的應用都從資源佇列中分配資源。
  • 預設不限制每個佇列和使用者可以同時執行應用的數量 。可以配置來限制佇列和使用者並行執行的應用數量。限制並行 執行應用數量不會導致任務提交失敗,超出的應用會在佇列中等待。

4.5 多資源佇列使用

修改hadoop檔案中 etc/hadoop/capacity-scheduler.xml

下面增加了兩個佇列online和offline,將以下內容新增進去,而不是覆蓋。

<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default,online,offline</value>
    <description>佇列列表,多個佇列之間使用逗號分割</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>70</value>
    <description>default佇列70%</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.online.capacity</name>
    <value>10</value>
    <description>online佇列10%</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.offline.capacity</name>
    <value>20</value>
    <description>offline佇列20%</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>70</value>
    <description>Default佇列可使用的資源上限.</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
    <value>10</value>
    <description>online佇列可使用的資源上限.</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
    <value>20</value>
    <description>offline佇列可使用的資源上限.</description>
  </property>

5. 序列化機制

為了提高磁碟IO效能,Hadoop棄用了java中的序列化,自己編寫了writable實現類

注意:

  • Text等價於java.lang.String的writable,針對utf-8序列
  • NullWritable是單例,獲取例項使用NullWritable.get()

Hadoop序列化機制的特點

  • 緊湊:高效使用儲存空間
  • 快速:讀寫資料的額外開銷小
  • 可擴充套件:可透明地讀取老格式的資料
  • 互操作:支援多語言的互動

Java序列化的不足

  • 不精簡,附加資訊多,不太適合隨機訪問
  • 儲存空間大,遞迴地輸出類的超類描述知道不再有超類

6. InputFormat

原始碼

  1. getSplits : 對檔案進行分割槽
  2. createRecordReader : 將 InputSplit 中的資料解析成Record,即 <k1, v1>
public abstract class InputFormat<K, V> {

  /** 
   * Logically split the set of input files for the job.  
   * 
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i><input-file-path, start, offset></i> tuple. The InputFormat
   * also creates the {@link RecordReader} to read the {@link InputSplit}.
   * 
   * @param context job configuration.
   * @return an array of {@link InputSplit}s for the job.
   */
  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
  
  /**
   * Create a record reader for a given split. The framework will call
   * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
   * the split is used.
   * @param split the split to be read
   * @param context the information about the task
   * @return a new record reader
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}

僅對FileInputFormat原始碼進行分析

6.1 InputSplit

原始碼註解(Hadoop3.3.0)

/** 
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();

    /*
       getFormatMinSplitSize() = 1
       getMinSplitSize(job) = 0
       minSize = 1
     */
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

    /*
        沒有預設值
        getMaxSplitSize(job) = Long.MAX_VALUE
        所以maxSize等於Long的最大值
     */
    long maxSize = getMaxSplitSize(job);

    // generate splits
    // 建立List,總部內儲存生成的InputSplit
    List<InputSplit> splits = new ArrayList<InputSplit>();
    // 獲取輸入檔案列表
    List<FileStatus> files = listStatus(job);

    // ignoreDirs = false
    boolean ignoreDirs = !getInputDirRecursive(job)
      && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);

    // 迭代輸入檔案列表
    for (FileStatus file: files) {
      // 是否忽略子目錄,預設不忽略
      if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      // 獲取 檔案/目錄 路徑
      Path path = file.getPath();
      // 獲取 檔案/目錄 長度
      long length = file.getLen();
      if (length != 0) {
        // 儲存檔案的Block塊所在的位置
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        // 判斷檔案是否支援切割,預設為true
        if (isSplitable(job, path)) {

          // 獲取檔案的Block大小,預設128M
          long blockSize = file.getBlockSize();

          // 計算split的大小
          /*
            內部使用的公式是: Math.max(minSize, Math.min(maxSize, blockSize))
                           Math.max(1, Math.min(Long.MAX_VALUE, 128))
            預設情況下split的大小和Block size相等
           */
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          // 還需要處理的檔案剩餘位元組大小,其實就是這個檔案的原始大小
          long bytesRemaining = length;

          /*
            SPLIT_SLOP = 1.1
            只要剩餘檔案大於1.1倍的分割槽size就繼續切割
           */
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            // 獲取block的索引
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            /*
              組裝InputSplit
              path: 路徑
              length-bytesRemaining 起始位置
              splitSize 大小
              blkLocations[blkIndex].getHosts() 和 blkLocations[blkIndex].getCachedHosts() 所在的主機

             */
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          // 不支援切割
          if (LOG.isDebugEnabled()) {
            // Log only if the file is big enough to be splitted
            if (length > Math.min(file.getBlockSize(), minSize)) {
              LOG.debug("File is not splittable so no parallelization "
                  + "is possible: " + file.getPath());
            }
          }
          // 整個作為一個InputSplit
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
}

6.2 RecordReader

  • 每一個 InputSplit 都有一個RecordReader,作用是把InputSplit中的資料解析成Record,即 <k1, v1>

行閱讀器的初始化方法原始碼

// 初始化方法
  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    // 獲取傳過來的InputSplit,將InputSplit轉換成子類FileSplit
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    // MAX_LINE_LENGTH對應的引數預設沒有配置,所以會取Integer.MAX_VALUE
    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
    // 獲取InputSplit的起始位置
    start = split.getStart();
    // 獲取InputSplit的結束位置
    end = start + split.getLength();
    // 獲取InputSplit的路徑
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    // 開啟檔案,並跳到InputSplit的起始位置
    final FutureDataInputStreamBuilder builder =
        file.getFileSystem(job).openFile(file);
    FutureIOSupport.propagateOptions(builder, job,
        MRJobConfig.INPUT_FILE_OPTION_PREFIX,
        MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
    fileIn = FutureIOSupport.awaitFuture(builder.build());

    // 獲取檔案的壓縮資訊
    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    // 如果檔案是壓縮檔案,則執行if中的語句
    if (null!=codec) {
      //... 省略程式碼 
    } else {
      // 跳轉到檔案的起始位置
      fileIn.seek(start);
      // 針對未壓縮檔案,建立一個閱讀器讀取一行行的資料
      in = new UncompressedSplitLineReader(
          fileIn, job, this.recordDelimiterBytes, split.getLength());
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    /*
      注意:如果這個InputSplit不是第一個InputSplit,我們將會丟棄讀取出來的第一行
      因為我們總是通過next方法多讀取一行
      因此,如果一行資料被拆分到了兩個InputSplit中,不會產生問題。
     */
    // 如果start不等於0,表示不是第一個inputsplit,所以把start的值重置為第二行的起始位置
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

7. MR效能優化

7.1 小檔案問題

  • Hadoop的HDFS和MR框架是針對大資料檔案來設計的,在小檔案的處理上不但效率低下,而且十分消耗記憶體資源
  • HDFS提供了兩種型別的容器,SequenceFile和MapFile

SequenceFile

<key, value>

程式碼實現

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;


/*
    small files
 */
public class SmallFileSeq {

    public static void main(String[] args) throws Exception {
        write("/root/smallfiles", "/seqFile");
        read("/seqFile");
    }

    /**
     * 生成SequenceFile檔案
     * @param inputDir 本地檔案
     * @param outputFile hdfs檔案
     * @throws Exception
     */
    private static void write(String inputDir,String outputFile) throws Exception {
        // 建立一個配置物件
        Configuration conf = new Configuration();
        // 指定hdfs路徑
        conf.set("fs.defaultFS", "hdfs://node1:8020");

        // 刪除輸出檔案
        FileSystem fileSystem = FileSystem.get(conf);
        fileSystem.delete(new Path(outputFile), true);

        // 三個元素:輸出路徑、key的型別、value的型別
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                SequenceFile.Writer.file(new Path(outputFile)),
                SequenceFile.Writer.keyClass(Text.class),
                SequenceFile.Writer.valueClass(Text.class)
        };

        // 建立一個writer例項
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);

        // 指定需要壓縮的檔案的目錄
        File inputDirPath = new File(inputDir);
        if (inputDirPath.isDirectory()) {
            // 獲取目錄中的檔案
            File[] files = inputDirPath.listFiles();

            assert files != null;
            for (File file : files) {
                // 獲取檔案的全部記憶體
                String content = FileUtils.readFileToString(file, "UTF-8");
                // 獲取檔名
                String fileName = file.getName();
                Text key = new Text(fileName);
                Text value = new Text(content);
                // 寫入資料
                writer.append(key, value);
            }
        }
        writer.close();
    }

    /**
     * 讀取SequenceFile檔案
     * @param inputFile
     * @throws Exception
     */
    private static void read(String inputFile) throws Exception {
        // 建立一個配置物件
        Configuration conf = new Configuration();
        // 指定hdfs路徑
        conf.set("fs.defaultFS", "hdfs://node1:8020");
        // 建立閱讀器
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));

        Text key = new Text();
        Text value = new Text();
        while (reader.next(key, value)) {
            System.out.println("檔名:" + key.toString() + ",");
            System.out.println("檔案內容:\n" + value.toString() + ".");
        }
        reader.close();
    }
}

MapFile

  • MapFile是排序後的SequenceFile,MapFile由兩部分組成,分別是index和data
  • index作為檔案的資料索引,主要記錄了每個Record的key值,以及該Record在檔案中的偏移位置
  • 在MapFile被訪問的時候,索引檔案會被載入到記憶體,通過索引對映關係可迅速定位到指定Record所在檔案位置

SequenceFile檔案是用來儲存key-value資料的,但它並不保證這些儲存的key-value是有序的,而MapFile檔案則可以看做是儲存有序key-value的SequenceFile檔案。MapFile檔案保證key-value的有序(基於key)是通過每一次寫入key-value時的檢查機制,這種檢查機制其實很簡單,就是保證當前正要寫入的key-value與上一個剛寫入的key-value符合設定的順序,但是,這種有序是由使用者來保證的,一旦寫入的key-value不符合key的非遞減順序,則會直接報錯而不是自動的去對輸入的key-value排序

程式碼例項

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;


/*
    small files
 */
public class SmallFilemap {

    public static void main(String[] args) throws Exception {
        write("/root/smallfiles", "/mapFile");
        read("/mapFile");
    }

    /**
     * 生成MapFile檔案
     * @param inputDir 本地目錄
     * @param outputDir hdfs目錄
     * @throws Exception
     */
    private static void write(String inputDir,String outputDir) throws Exception {
        // 建立一個配置物件
        Configuration conf = new Configuration();
        // 指定hdfs路徑
        conf.set("fs.defaultFS", "hdfs://node1:8020");

        // 刪除輸出檔案
        FileSystem fileSystem = FileSystem.get(conf);
        fileSystem.delete(new Path(outputDir), true);

        // 兩個元素:key的型別、value的型別
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                MapFile.Writer.keyClass(Text.class),
                MapFile.Writer.valueClass(Text.class)
        };

        // 建立一個writer例項
        MapFile.Writer writer = new MapFile.Writer(conf, new Path(outputDir), opts);

        // 指定需要壓縮的檔案的目錄
        File inputDirPath = new File(inputDir);
        if (inputDirPath.isDirectory()) {
            // 獲取目錄中的檔案
            File[] files = inputDirPath.listFiles();

            for (File file : files) {
                // 獲取檔案的全部記憶體
                String content = FileUtils.readFileToString(file, "UTF-8");
                // 獲取檔名
                String fileName = file.getName();
                Text key = new Text(fileName);
                Text value = new Text(content);
                // 寫入資料
                writer.append(key, value);
            }
        }
        writer.close();
    }

    /**
     * 讀取MapFile檔案
     * @param inputDir MapFile檔案路徑
     * @throws Exception
     */
    private static void read(String inputDir)throws Exception{
        // 建立一個配置物件
        Configuration conf = new Configuration();
        // 指定hdfs路徑
        conf.set("fs.defaultFS", "hdfs://node1:8020");
        //建立閱讀器
        MapFile.Reader reader = new MapFile.Reader(new Path(inputDir),conf);
        Text key = new Text();
        Text value = new Text();
        //迴圈讀取資料
        while(reader.next(key,value)){
            //輸出檔名稱
            System.out.print("檔名:"+key.toString()+",");
            //輸出檔案內容
            System.out.println("檔案內容:"+value.toString()+"");
        }
        reader.close();
    }
}

7.2 資料傾斜問題

  • MapReduce程式執行時,Reduce節點大部分執行完畢,但是有一個或者幾個Reduce節點執行很慢,導致整個程式處理時間變得很長,具體表現為:Reduce階段卡著不動
    • 示例:利用hash分割槽方法,如果某個key值特別多,那麼會導致這個key值對應的Reducetask執行量很大,而其他的task則很快執行完畢。

解決方法

  1. 增加reduce個數(但不一定有用)
  2. 將傾斜資料打散

8. 參考資料