拼多多大資料開發 1 面知識點總結(配影片~超詳細)

語言: CN / TW / HK

點選上方公眾號進入   3分鐘秒懂大資料  主頁

然後點選右上角  “設為標星”  

比別人更快接收硬核文章

大家好,我是土哥。

今天給大家分享一波這十天錄的影片的知識點,同時這十個知識點被 拼多多、網易 的面試官 都考察過,出現的頻率都很高,所以,沒看完影片的小夥伴,可以看看文字,一塊學習學習~

1 Spark 的任務提交流程你熟悉嗎?

當面試官問你,Spark 的任務提交流程你熟悉嗎?

你應該這樣回答他:

當在命令列執行 spark -submit --master  xxx.jar 的命令後,會執行以下操作:

1、客戶端向資源管理器 Master 傳送註冊和申請資源的請求。Master 主要負責任務資源的分配,是 Spark 叢集的老大。

2、Master 收到申請資源的請求後,會向指定的 Worker 傳送請求,然後 worker 開啟對應的 executor 程序(計算資源)。

3、executor 程序會向 Driver 端傳送註冊請求,然後申請要計算的 Task。

4、在 Driver 端內部,會執行一些操作,最終會通過 TaskScheduler 提交 Task 到 executor 程序中執行。具體的細節如下:

(1) Driver 端會執行客戶端程式中的 main 方法;

(2)在 main 方法中構建了 SparkContext 上下文物件,該物件是所有 Spark 程式執行的入口,在構建 SparkContext 物件的內部,也構建了兩個物件,分別是 DAGScheduler 和 TaskScheduler。

(3)因為在使用者程式碼程式中,RDD 運算元會涉及大量的轉換操作,然後通過一個動作(action)操作,觸發任務真正的執行, 在這裡會按照 RDD 和 RDD 之間的依賴關係,首先 生成一張 DAG 有向無環圖。圖的方向就是 RDD 運算元的操作順序,最終會將 DAG 有向無環圖傳送給 DAGScheduler 物件。

(4)DAGScheduler 獲取到 DAG 有向無環圖之後, 按照寬依賴,進行 stage 劃分,由於RDD運算元中包含大量的寬依賴,所以會劃分出多個 stage,每一個 stage 內部有很多可以並行執行的 task 執行緒,然後把這些並行執行的 task 執行緒封裝在一個 taskSet 集合中,最後將 多個 taskSet 集合傳送給 TaskScheduler 物件。

(5)TaskScheduler 物件獲取得到這些 taskSet 集合之後,按照 多個 stage 之間的依賴關係,前面的 stage 中的 task 先執行,後面 stage 中的 task 後執行。然後 TaskScheduler 物件依次遍歷每個 taskSet 集合,獲取每一個 task ,最後把每一個 Task 依次提交到 worker 節點上的 Executor 程序中執行。

5 當所有的 Task 任務在Executor 程序中依次執行完成後,Driver 端會向 Master 傳送一個登出請求。

6、Master 接收到請求後,然後通知對應的 worker 節點關閉 executor 程序,最後 worker 節點上的的計算資源得到釋放。

2 Spark 資料傾斜的解決方案有哪些?

首先,產生資料傾斜的主要原因 是在 shuffle 過程中,由於不同 的 key 對應的資料量不同,從而導致不同的 task 所分配的資料量不均勻所產生的。

所以要解決 Spark 的資料傾斜問題,可以從以下幾方面著手處理:

1、 提高 shuffle 操作的並行度。該辦法簡單粗暴,直接增加 shuffle 讀 task 的數量,比如設定 reduceByKey(1000) ,一般預設200

優點:有效緩解資料傾斜。缺點:無法徹底根除問題。

2、使用隨機數字首和擴容 RDD 進行 join 操作。對大量相同的 key 通過附加隨機字首變成不一樣的 key,然後將這些處理後的“不同key”分散到多個task中去處理。

優點:對 join 型別的資料傾斜大都可以處理,缺點:對記憶體要求很高。

3、將 reduce join 轉為 map join。適用於兩張表 join 時,一張表資料量比較小的情況。

通過將小表 全量資料 進行廣播,然後 通過 map 運算元來實現與 join 同樣的效果,也就是 map join,此時就不會發生 shuffle 操作,也就不會發生資料傾斜。

優點:不會發生 shuffle ,缺點:只適用於 大表 + 小表。

4、過濾少數導致資料傾斜的 key。 該辦法的前提條件是:少數幾個資料量特別多的 key 對任務的執行影響不大,可以直接通過 where 子句過濾掉。

優點:實現簡單,完全解決掉資料傾斜。缺點:受限於特定場景。

5、使用 Hive ETL 預處理資料。如果導致資料傾斜的是Hive 發生的,直接對Hive 進行預處理操作,從源頭規避掉資料傾斜問題。

優點:提升 Spark 作業效能。缺點:Hive ETL 也會發生資料傾斜,治標不治本。

6、兩階段聚合方式。適合  reduceByKey ,group by 分組等場景。通過先區域性聚合,再全域性聚合等方式實現解決資料傾斜等。

優點:顯著提升Spark 作業效能。缺點:侷限於聚合類shuffle 操作。

3 Spark 任務排程的方式有哪些?

在 Spark 中,任務排程的方式包含 Stage 級的排程和 Task 級的排程。

首先 在建立 SparkContext 上下文物件時,會創建出 DAGScheduler  和 TaskScheduler 。

其中 DAGScheduler 負責 Stage 級的排程,主要是將 job 根據寬依賴切分成若干 Stages,並將每個 Stage 打包成 TaskSet 交給 TaskScheduler 排程。

TaskScheduler 負責 Task 級的排程,將 DAGScheduler 傳送過來的 TaskSet 按照指定的排程 策略分發到 Executor 上執行。

TaskScheduler 支援兩種排程策略,一種是 FIFO,也是預設的先進先出排程策略,另一種是 FAIR,公平策略。

4 在 Flink 中,反壓有哪些危害呢?

反壓如果不能得到正確的處理,可能會影響到 checkpoint 時長和 state 大小,甚至 可能會導致資源耗盡甚至系統崩潰。

1)影響 checkpoint 時長:barrier 不會越過普通資料,資料處理被阻塞也會導致 checkpoint barrier 流經整個資料管道的時長變長,導致 checkpoint 總體時間(End to End Duration)變長。

2)影響 state 大小:barrier 對齊時,接受到較快的輸入管道的 barrier 後,它後面數 據會被快取起來但不處理,直到較慢的輸入管道的 barrier 也到達,這些被快取的資料會 被放到 state 裡面,導致 checkpoint 變大。

這兩個影響對於生產環境的作業來說是十分危險的,因為 checkpoint 是保證資料一 致性的關鍵,checkpoint 時間變長有可能導致 checkpoint 超時失敗,而 state 大小同樣可能拖慢 checkpoint 甚至導致 OOM (使用 Heap-based StateBackend)或者實體記憶體使用超出容器資源(使用 RocksDBStateBackend)的穩定性問題。

5 Flink  SQL 支援哪些 Join 操作呢?

Flink SQL 支援的 Join 操作主要包括以下 3 大類:

1、流表與流表的 Join 2    流表與維表的 Join 3、動態表字段的列轉行等。

1 流表與流表的 Join 包含  Regular Join 、 Interval Join、Temporal Join。

其中 Regular Join 主要是用於 兩條流之間的 操作,可以是 inner join 、或者 out join ,full join 等,適用場景如計算點選率等。缺點 會產生回撤流。

其次 Interval Join  主要是計算兩條流在一段時間區間內的 Join,可以讓一條流去 Join 另一條流中前後一段時間內的資料。

而 Temporal Join 主要是用於 快照 join ,包括事件時間,處理時間的 Temporal Join,類似於離線中的快照 Join,可以用於匯率計算的場景。

2 流表與維表的 Join ,如 Lookup Join , Lookup Join 主要用於 流與外部維表的 Join 操作,因為一般的使用者畫像資料會儲存在Mysql、Hbase 或者 Redis 中,當用戶日誌流過來後,需要實時查詢資料,就需要用到 LookUp join 操作。

3 動態表字段的列轉行。包含 Table Function 和 Array Expansion。

其中  Array Expansion 是將表中 ARRAY 型別欄位(列)壓平,轉為多行,適用於將一行轉多行的操作。

而 Table Function 和  Array Expansion 功能類似,但本質是一個 UDTF 函式,即使用者可以自定義UDF 等實現邏輯處理。

6、Flink SQL 應用提交流程

(1) 呼叫 parse() 方法,將 sql 轉為 未經校驗的 AST 抽象語法樹(sqlNode) ,在解析階段主要用到詞法解析和語法解析。

詞法解析將 sql 語句轉為一組 token,語法解析對 token 進行遞迴下降語法分析。

(2)呼叫 validate() 方法,將 AST 抽象語法樹轉為經過校驗的抽象語法樹(SqlNode).在校驗階段主要校驗 兩部分:

校驗表名、欄位名、函式名是否正確,

校驗特殊的型別是否正確,如 join 操作、groupby 是否有巢狀等。

(3)呼叫 rel() 方法,將 抽象語法樹 SqlNode 轉為 關係代數樹 RelNode(關係表示式) 和 RexNode(行表示式) ,在這個步驟中,DDL 不執行 rel 方法,因為 DDL 實際是對元資料的修改,不涉及複雜查詢。

(4)呼叫 convert()方法,將 RelNode 轉為 operation ,operation 包含多種型別,但最終都會生成根節點 ModifyOperation。

在 Flink 內部的 operation 之後,會呼叫 translate 方法將 operation 轉為 transformation。在這中間也經歷了四大步驟:

(1) 呼叫 translateToRel() 方法 先將 ModifyOperation 轉換成 Calcite RelNode 邏輯計劃樹,再對應轉換成 FlinkLogicalRel( RelNode 邏輯計劃樹);

(2) 呼叫 optimize() 方法 將 FlinkLogicalRel 優化成 FlinkPhysicalRel。在這中間的優化規則包含 基於規則優化 RBO 和 基於代價優化 CBO 。

(3) 呼叫 TranslateToExecNodeGraph() 方法 將物理計劃轉為 execGraph。

(4) 呼叫 TranslateToPlan() 方法 將 execGraph 轉為 transformation。

7  kafka 是如何保證對應型別資料被寫到相同分割槽的?

主要是通過 訊息鍵 和 分割槽器 來實現,分割槽器為鍵生成一個 offset,

然後使用 offset 對主題分割槽進行取模,為訊息選取分割槽,這樣就可以保證包含同一個鍵的訊息會被寫到同一個分割槽上。

如果 ProducerRecord 沒有指定分割槽,且訊息的 key 不為空,則使用 Hash 演算法來計算分割槽分配。

如果 ProducerRecord 沒有指定分割槽,且訊息的 key 也是空,則用 輪詢 的方式選擇一個分割槽。

8 Kafka 的檔案儲存機制瞭解嗎?

在 Kafka 中,一個 Topic 會被分割成多個 Partition 分割槽,當用戶檢視建立的一個 Partition 時,可以看到裡面包含 3 個檔案,分別為 log 檔案、index 檔案,以及 timeindex 檔案。

這三個檔案中儲存的都是二進位制格式的資料,其中 log 檔案儲存的是 BatchRecords 訊息內容,而 index 和 timeindex 分別儲存的是一些索引資訊。

這三個檔案共同組成一個 Segment,而檔名中的(0)表示的是一個 Segment 的起始 Offset。

Kafka 會根據 log.segment.bytes 的配置來決定單個 Segment 檔案(log)的大小,當寫入資料達到這個大小時就會建立一個新的 Segment。

在 三個檔案中:timeindex 檔案包含兩個欄位,分別為:timestamp 和 offset , i ndex 檔案 包含兩個欄位,分別為: offset  和 position,  log 檔案包含多個欄位,其中最重要的就是 records 欄位。

根據這三個檔案,就可以基於 offset 找到對應的 Message。

9 Kafka 的 訊息確認機制瞭解嗎?

為保證 producer 傳送的資料,能可靠的達到指定的 topic ,Producer 提供了訊息確認機制也就是(ack 機制)。生產者往 Broker 的 topic 中傳送訊息時,通過配置 ack 的值來決定訊息傳送成功與否。

當 acks = 0:producer 不會等待任何來自 broker 的響應。

特點:低延遲,高吞吐,資料可能會丟失。

如果當中出現問題,導致 broker 沒有收到訊息,那麼 producer 無從得知,會造成訊息丟失。

當 acks = 1(預設值)時,只要叢集中 partition 的 Leader 節點收到訊息,生產者就會收到一個來自伺服器的成功響應。

這種情況,如果在 follower 同步之前,leader 出現故障,將會丟失資料。

當 acks = -1時,只有當所有參與複製的節點全部都收到訊息時,生產者才會收到一個來自伺服器的成功響應。

這種模式是最安全的,可以保證不止一個伺服器收到訊息,就算有伺服器發生崩潰,整個叢集依然可以執行。

根據實際的應用場景,選擇設定不同的 acks,以此保證資料的可靠性。

另外,Producer 傳送訊息還可以選擇同步或非同步模式,如果設定成非同步,雖然會極大的提高訊息傳送的效能,但是這樣會增加丟失資料的風險。

如果需要確保訊息的可靠性,必須將 producer.type 設定為 sync

10 你能談一下 yarn 的基礎架構及排程流程嗎?

yarn 的基礎架構主要包含 3 大元件,分別為 ResourceManager、ApplicationMaster、NodeManager.

其中:

ResourceManager 是一個全域性的資源管理器,負責整個系統的資源管理和分配,主要包括兩個元件,即排程器(Scheduler)和應用程式管理器(Applications Manager)。

ApplicationMaster:ApplicationMaster 是 Resource Manager 根據 接收使用者提交的作業,按照作業的上下文資訊等 分配出一個 container 資源,然後 通知 NodeManager 為使用者作業創建出一個 ApplicationMaster。

NodeManager:NodeManager 管理 YARN 叢集中的每個節點,對節點進行資源監控和健康狀態管理。

yarn 的排程流程簡單總結如下:

1、客戶端提交應用程式給 ResourceManager ResouceManager 收到請求後,將分配 Container 資源,並通知對應的 NodeManager 啟動一個 ApplicationMaster。

2、applicationMaster 來執行和管理 container 裡面的任務,其中 container 會通過心跳機制向 applicationMaster 來發送執行資訊。

3、任務完成之後,application 向 ResourceManager 報告,任務完成,container 進行資源釋放。

拼多多一面考察的演算法

舉例:

(1)輸入:1,3,6,2,7,5    輸出:4

含義:當去除2時,最大遞增子序列長度為1,3,6,7,所以長度為 4

(1)輸入:1,3,8,2,6,5,7,9,3,11,1,2,3,4,3,5,6,7,8   輸出:8

含義:當去除數字 3 時,最大長度為1,2,3,4,5,6,7,8 所以長度為8

解題思路:

該題的做法就是,定義最大遞增長度初始值 max = 0,同時定義 累加器初始值 count = 1, 並定義開關初始值 flag = 0, 因為題目要求最多隻能去除一個數字,所以讓 flag 剛開始為0,當遇到要去除的數字時,flag ++,然後判斷 flag <=1 裡面的邏輯即可。

程式碼如下:

    public static int test(int[] nums){
if(nums.length==0||nums==null){
return 0;
}
// 累加器初始值為 1
int count = 1;
// 定義最大遞增序列初始值為 1
int max = 1;
// 定義開關 flag 初始值為 0
int flag = 0;

for(int i=1;i<nums.length;i++){
if(flag<=1){
if(nums[i] - nums[i-1] >0){
count++;
}else {
if (i==nums.length-1) {
break;
}
if(nums[i+1]-nums[i-1]>0&&nums[i] - nums[i-1]<=0){
flag++;
}else {
count = 1;
}
}
}else{
flag = 0;
count = 1;
}
max = Math.max(count,max);
}
return max;
}

以上就是 10 個影片的全部內容,想看影片教程的,可以關注土哥抖音號,每天都會分享一道知識點,堅持打卡 100 天~

之前看到許多小夥伴的留言,建議影片多新增一些圖片,後面影片會不斷優化,希望能幫助大家更好的學習。 覺得對你有幫助,請關注點贊、轉發、在看 三連走起~