[Spark精進]必須掌握的4個RDD運算元之flatMap運算元

語言: CN / TW / HK

小知識,大挑戰!本文正在參與“程式設計師必備小知識”創作活動。

返回第二章

第三個flatMap:從元素到集合、再從集合到元素

flatMap 其實和 map 與 mapPartitions 運算元類似,在功能上,與 map 和 mapPartitions 一樣,flatMap 也是用來做資料對映的,在實現上,對於給定對映函式 f,flatMap(f) 以元素為粒度,對 RDD 進行資料轉換。不過,與前兩者相比,flatMap 的對映函式 f 有著顯著的不同。對於 map 和 mapPartitions 來說,其對映函式 f 的型別,都是(元素) => (元素),即元素到元素。而 flatMap 對映函式 f 的型別,是(元素) => (集合),即元素到集合(如陣列、列表等)。因此,flatMap 的對映過程在邏輯上分為兩步:

  • 以元素為單位,建立集合;
  • 去掉集合“外包裝”,提取集合元素。

這麼說比較抽象,我們還是來舉例說明。假設,我們再次改變 Word Count 的計算邏輯,由原來統計單詞的計數,改為統計相鄰單詞共現的次數,如下圖所示:

在這裡插入圖片描述

對於這樣的計算邏輯,我們該如何使用 flatMap 進行實現呢?這裡我們先給出程式碼實現,然後再分階段地分析 flatMap 的對映過程:

java // 讀取檔案內容 val lineRDD: RDD[String] = _ // 請參考第一講獲取完整程式碼 // 以行為單位提取相鄰單詞 val wordPairRDD: RDD[String] = lineRDD.flatMap( line => { // 將行轉換為單詞陣列 val words: Array[String] = line.split(" ") // 將單個單詞陣列,轉換為相鄰單詞陣列 for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1) }) 在上面的程式碼中,我們採用匿名函式的形式,來提供對映函式 f。這裡 f 的形參是 String 型別的 line,也就是原始檔中的一行文字,而 f 的返回型別是 Array[String],也就是 String 型別的陣列。在對映函式 f 的函式體中,我們先用 split 語句把 line 轉化為單詞陣列,然後再用 for 迴圈結合 yield 語句,依次把單個的單詞,轉化為相鄰單詞詞對。注意,for 迴圈返回的依然是陣列,也即型別為 Array[String]的詞對陣列。由此可見,函式 f 的型別是(String) => (Array[String]),也就是剛剛說的第一步,從元素到集合。

但如果我們去觀察轉換前後的兩個 RDD,也就是 lineRDD 和 wordPairRDD,會發現它們的型別都是 RDD[String],換句話說,它們的元素型別都是 String。回顧 map 與 mapPartitions 這兩個運算元,我們會發現,轉換前後 RDD 的元素型別,與對映函式 f 的型別是一致的。但在 flatMap 這裡,卻出現了 RDD 元素型別與函式型別不一致的情況。這是怎麼回事呢?

其實呢,這正是 flatMap 的“奧妙”所在,為了讓你直觀地理解 flatMap 的對映過程,我畫了一張示意圖,如下所示:

在這裡插入圖片描述 不難發現,對映函式 f 的計算過程,對應著圖中的步驟 1 與步驟 2,每行文字都被轉化為包含相鄰詞對的陣列。緊接著,flatMap 去掉每個陣列的“外包裝”,提取出陣列中型別為 String 的詞對元素,然後以詞對為單位,構建新的資料分割槽,如圖中步驟 3 所示。 這就是 flatMap 對映過程的第二步:去掉集合“外包裝”,提取集合元素。 得到包含詞對元素的 wordPairRDD 之後,我們就可以沿用 Word Count 的後續邏輯,去計算相鄰詞彙的共現次數。你不妨結合文稿中的程式碼與第一講中 Word Count 的程式碼,去實現完整版的“相鄰詞彙計數統計”。

點選跳轉到下一講