阿里雲 Serverless 非同步任務處理系統在資料分析領域的應用

語言: CN / TW / HK

非同步任務處理系統中的資料分析

Cloud Native

資料處理、機器學習訓練、資料統計分析是最為常見的一類離線任務。這類任務往往都是經過了一系列的預處理後,由上游統一發送到任務平臺進行批量訓練及分析。在處理語言方面,Python 由於其所提供的豐富的資料處理庫,成為了資料領域最為常用的語言之一。函式計算原生支援 Python runtime,並支援快捷的引入第三方庫,使得使用函式計算非同步任務進行處理變得極為方便。

資料分析場景常見訴求

資料分析場景往往具有執行時間長、併發量大的特點。在離線場景中,往往會定時觸發一批大量的資料進行集中處理。由於這種觸發特性,業務方往往會對資源利用率(成本)具有較高的要求,期望能夠滿足效率的同時,儘量降低成本。具體歸納如下:

  1. 程式開發便捷,對於第三方包及自定義依賴友好;

  2. 支援長時執行。能夠檢視執行過程中的任務狀態,或登入機器進行操作。如果出現數據錯誤支援手動停止任務;

  3. 資源利用率高,成本最優。

以上訴求非常適合使用函式計算非同步任務。

典型案例 - 資料庫自治服務

  • 業務基本情況

阿里雲集團內部的資料庫巡檢平臺主要用於對 sql 語句的慢查詢、日誌等進行優化分析。整個平臺任務分為離線訓練及線上分析兩類主要任務,其中線上分析業務的的計算規模達到了上萬核,離線業務的每日執行時長也在數萬核小時。由於線上分析、離線訓練時間上的不確定性,很難提高叢集整體資源利用率,並且在業務高峰來時需要極大的彈性算力支援。使用函式計算後,整個業務的架構圖如下:

  • 業務痛點及架構演進

資料庫巡檢平臺負責阿里巴巴全網各 Region 的資料庫 SQL 優化及分析工作。Mysql 資料來源於各 Region 的各個叢集,並統一在 Region 維度進行一次預聚合及儲存。在進行分析時,由於需要跨 region 的聚合及統計,巡檢平臺首先嚐試在內網搭建大型 Flink 叢集進行統計分析工作。但是在實際使用中,遇到了如下問題:

  1. 資料處理演算法迭代繁瑣。主要體現在演算法的部署、測試及釋出上。Flink 的 Runtime 能力極大限制了釋出週期;

  2. 對於常見的及一些自定義的第三方庫,Flink 支援不是很好。演算法所依賴的一些機器學習、統計的庫在 Flink 官方 Python runtime 中要麼沒有,要麼版本老舊,使用不便,無法滿足要求;

  3. 走 Flink 轉發鏈路較長,Flink 排查問題困難;

  4. 峰值時彈性速度及資源均較難滿足要求。並且整體成本非常高。

在瞭解了函式計算後,針對 Flink 計算部分進行了演算法任務的遷移工作,將核心訓練及統計演算法遷移至函式計算。通過使用函式計算非同步任務所提供的相關能力,整個開發、運維及成本得到了極大的提升。

  • 遷移函式計算架構後的效果

  1. 遷移函式計算後,系統能夠完整承接峰值流量,快速完成每日分析及訓練任務;

  2. 函式計算豐富的 Runtime 能力支援了業務的快速迭代;

  3. 計算上相同的核數成本變為了原來 Flink 的 1/3。

函式計算非同步任務非常適用於這類資料處理任務。函式計算在降低運算資源的成本同時,能夠將您從繁雜的平臺運維工作中解放出來,專注於演算法開發及優化。

函式計算非同步任務最佳實踐-Kafka ETL

Cloud Native

ETL 是資料處理中較為常見的任務。原始資料或存在於 Kafka 中,或存在於 DB 中,因為業務需要對資料進行處理後轉儲到其他儲存介質(或存回原來的任務佇列)。這類業務也屬於明顯的任務場景。如果您採用了雲上的中介軟體服務(如雲上的 Kafka),您就可以利用函式計算強大的觸發器整合生態便捷的整合 Kafka,而無需關注諸如 Kafka Connector 的部署、錯誤處理等與業務無關的操作。

ETL 任務場景的需求

一個 ETL 任務往往包含 Source、Sink 及處理單元三個部分,因此 ETL 任務除了對算力的要求外,還需要任務系統具有極強的上下游連線生態。除此之外,由於資料處理的準確性要求,需要任務處理系統能夠提供任務去重、Exactly Once 的操作語義。並且,對於處理失敗的訊息,需要能夠進行補償(如重試、死信佇列)的能力。總結如下:

  1. 任務的準確執行:

  1. 任務的上下游:

  1. 運算元能力的要求:

Serverless Task 對 ETL 任務的支援

函式計算支援的 Destinationg 功能可以很好的支援 ETL 任務對於便捷連線上下游、任務準確執行的相關訴求。函式計算豐富的 Runtime 支援也使得對於資料處理的任務變得極為靈活。在 Kafka ETL 任務處理場景中,我們主要用到的 Serverless Task 能力如下:

  1. 非同步目標配置功能:

  1. 靈活的運算元及第三方庫支援:

Kafka ETL 任務處理示例

我們以簡單的 ETL 任務處理為例,資料來源來自 Kafka,經過函式計算處理後,將任務執行結果及上下游資訊推送至訊息服務 MNS。見 函式計算部分專案原始碼 [ 1]

  • Kafka 資源準備

1. 進入 Kafka 控制檯,點選購買例項,之後部署。等待例項部署完成;

2. 進入建立好的例項中,建立一個測試用 Topic。

  • 目標資源準備(MNS)

進入 MNS 控制檯,分別建立兩個佇列:

  1. dead-letter-queue:作為死信佇列使用。當訊息處理失敗後,執行的上下文資訊將投遞到這裡;

  2. fc-etl-processed-message:作為任務成功執行後的推送目標。

建立完成後,如下圖所示:

  • 部署  

  1. 下載安裝 Serverless Devs:

npm install @serverless-devs/s
詳細文件可以參考Serverless Devs 安裝文件[2]
  1. 配置金鑰資訊:

s config add

詳細文件可以參考 阿里雲金鑰配置文件 [ 3]

  1. 進入專案,修改 s.yaml 檔案中的目標 ARN 為上述建立後的 MNS 佇列 ARN,並修改服務角色為已存在的角色;

  2. 部署:s deploy -t s.yaml

  • 配置 ETL 任務

1. 進入 kafka 控制檯 - connector 任務列表標籤頁,點選建立 Connector;

2. 在配置完基本資訊、源的 Topic 後,配置目標服務。在這裡面我們選擇函式計算作為目標:

您可以根據業務需求配置傳送批大小及重試次數。至此,我們已完成任務的基本配置。 注意:這裡面的傳送模式請選擇“非同步”模式。

進入到函式計算非同步配置頁面,我們可以看到目前的配置如下:

  • 測試 ETL 任務

1.  進入 kafka 控制檯 - connector 任務列表標籤頁,點選測試; 填完訊息內容後,點擊發送:

2. 傳送多條訊息後,進入到函式控制檯。我們可以看到有多條訊息在執行中。此時我們選擇使用停止任務的方式來模擬一次任務執行失敗:

3. 進入到訊息服務 MNS 控制檯中,我們可以看到兩個先前建立的佇列中均有一條可用訊息,分別代表一次執行和失敗的任務內容:

4. 進入到佇列詳情中,我們可以看到兩條訊息內容。以成功的訊息內容為例:

{
"timestamp":1646826806389,
"requestContext":{
"requestId":"919889e7-60ff-408f-a0c7-627bbff88456",
"functionArn":"acs:fc:::services/fc-etl-job.LATEST/functions/fc-job-function",
"condition":"",
"approximateInvokeCount":1
},
"requestPayload":"[{\"key\":\"k1\",\"offset\":1,\"overflowFlag\":false,\"partition\":5,\"timestamp\":1646826803356,\"topic\":\"connector-demo\",\"value\":\"k1\",\"valueSize\":4}]",
"responseContext":{
"statusCode":200,
"functionError":""
},
"responsePayload":"[\n {\n \"key\": \"k1\",\n \"offset\": 1,\n \"overflowFlag\": false,\n \"partition\": 5,\n \"timestamp\": 1646826803356,\n \"topic\": \"connector-demo\",\n \"value\": \"k1\",\n \"valueSize\": 4\n }\n]"
}

在這裡面,我們可以看到 "responsePayload" 這一個 Key 中有函式返回的原始內容。一般情況下我們會將資料處理的結果作為 response 返回,所以在後續的處理中,可以通過讀取 "responsePayload" 來獲取處理後的結果。

"requestPayload" 這一個 Key 中是 Kafka 觸發函式計算的原始內容,通過讀取這條資料中的內容,便可以獲取原始資料。

函式計算非同步任務最佳實踐-音影片處理

Cloud Native

隨著計算機技術和網路的發展,影片點播技術因其良好的人機互動性和流媒體傳輸技術倍受教育、娛樂等行業的青睞。當前雲端計算平臺廠商的產品線不斷成熟完善,如果想要搭建影片點播類應用,直接上雲會掃清硬體採購、技術等各種障礙。以阿里云為例,典型的解決方案如下:

在該解決方案中,物件儲存 OSS 可以支援海量影片儲存,採集上傳的影片被轉碼以適配各種終端、CDN 加速終端裝置播放影片的速度。此外還有一些 內容安全 [ 4] 審查需求,例如鑑黃、鑑恐等。

音影片是典型的長時處理場景,非常適合使用函式計算任務。

音影片處理的需求

在影片點播解決方案中,影片轉碼是最消耗計算力的一個子系統,雖然您可以使用雲上專門的轉碼服務,但在某些場景下,您仍會選擇自己搭建轉碼服務,例如:

  • 需要更彈性的影片處理服務。例如,已經在虛擬機器或容器平臺上基於 FFmpeg 部署了一套影片處理服務,但想在此基礎上 提升資源利用率,實現具有明顯波峰波谷、流量突增情況下的快彈及穩定性;

  • 需要批量快速處理多個超大的影片 。例如,每週五定時產生幾百個 4 GB 以上 1080P 的大影片,每個任務可能執行時長達數小時;

  • 對影片處理任務希望實時 掌握進度 ;並在一些出現錯誤的情況下需要登入例項排查問題甚至 停止執行中的任務 避免資源消耗。

Serverless Task 對音影片場景的支援

上述訴求是典型的任務場景。而由於這類任務往往具有波峰波谷的特性,如何進行計算資源的運維,並儘可能的降低其成本,這部分的工作量甚至比實際影片處理業務的工作量還要大。Serverless Task 這一產品形態就是為了解決這類場景而誕生的,通過 Serverless Task,您可以快速構建高彈性、高可用、低成本免運維的影片處理平臺。

在這個場景中,我們會用到的 Serverless Task 的主要能力如下:

  1. 免運維 & 低成本:計算資源隨用隨彈,不使用不付費;

  2. 長時執行任務負載友好:單個例項最長支援 24h 的執行時長;

  3. 任務去重:支援觸發端的錯誤補償。對於單一任務,Serverless Task 能夠做到自動去重的能力,執行更可靠;

  4. 任務可觀測:所有執行中、執行成功、執行失敗的任務可追溯,可查詢;支援任務的執行歷史資料查詢、任務日誌查詢;

  5. 任務可操作:您可以停止、重試任務;

  6. 敏捷開發 & 測試:官方支援 S 工具進行自動化一鍵部署;支援登入執行中函式例項的能力,您可以直接登入例項除錯 ffmpeg 等第三方程式,所見即所得。

Serverless -FFmpeg 影片轉碼

專案原始碼 [ 5] 見文末

  • 部署

  1. 下載安裝 Serverless Devs:

npm install @serverless-devs/s
詳細文件可以參考Serverless Devs 安裝文件[2]
  1. 配置金鑰資訊:

s config add

詳細文件可以參考 阿里雲金鑰配置文件 [ 3 ]

  1. 初始化專案:s init video-transcode -d video-transcode

  2. 進入專案並部署:cd video-transcode && s deploy

  • 呼叫函式

  1. 發起 5 次非同步任務函式呼叫

$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"mov"}' --invocation-type async   --stateful-async-invocation-id my1-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: bf7d7745-886b-42fc-af21-ba87d98e1b1c


$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"mov"}' --invocation-type async --stateful-async-invocation-id my2-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: edb06071-ca26-4580-b0af-3959344cf5c3


$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"flv"}' --invocation-type async --stateful-async-invocation-id my3-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: 41101e41-3c0a-497a-b63c-35d510aef6fb


$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"avi"}' --invocation-type async --stateful-async-invocation-id my4-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: ff48cc04-c61b-4cd3-ae1b-1aaaa1f6c2b2


$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"m3u8"}' --invocation-type async --stateful-async-invocation-id my5-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: d4b02745-420c-4c9e-bc05-75cbdd2d010f

2. 登入  FC 控制檯 [ 6 ]

可以清晰看出每一次轉碼任務的執行情況:

    • A 影片是什麼時候開始轉碼的, 什麼時候轉碼結束

    • B 影片轉碼任務不太符合預期, 我中途可以點選停止呼叫

    • 通過呼叫狀態過濾和時間視窗過濾,我可以知道現在有多少個任務正在執行, 歷史完成情況是怎麼樣的

    • 可以追溯每次轉碼任務執行日誌和觸發payload

    • 當您的轉碼函式有異常時候, 會觸發 dest-fail 函式的執行,您在這個函式可以新增您自定義的邏輯, 比如報警

轉碼完畢後, 您也可以登入 OSS 控制檯到指定的輸出目錄檢視轉碼後的影片。

在本地使用該專案時,不僅可以部署,還可以進行更多的操作,例如檢視日誌,檢視指標,進行多種模式的除錯等,這些操作詳情可以參考 函式計算元件命令文件 [ 7]

參考連結:

[1]  函式計算部分專案原始碼:

http://github.com/awesome-fc/Stateful-Async-Invocation

[2] Serverless Devs 安裝文件:

http://github.com/Serverless-Devs/Serverless- Devs/blob/master/docs/zh/install.md

[3] 阿里雲金鑰配置文件:

http://github.com/devsapp/fc/blob/main/docs/zh/config.md

[4] 內容安全:

http://help.aliyun.com/product/28415.html

[5] 專案原始碼:

http://github.com/devsapp/start-ffmpeg/tree/master/transcode/src

[6] FC 控制檯:

http://fcnext.console.aliyun.com/overview

[7] 函式計算元件命令文件:

http://github.com/devsapp/fc#%E6%96%87%E6%A1%A3%E7%9B%B8%E5%85%B3

詳解非同步任務專題往期文章推薦

非同步任務處理系統,如何解決業務長耗時、高併發難題?

詳解非同步任務:函式計算的任務觸發去重

詳解非同步任務:任務的狀態及生命週期管理

詳解非同步任務 | 看 Serverless Task 如何解決任務排程&可觀測性中的問

Serverless 函式計算徵集令

Cloud Native

點選上方卡片瞭解活動詳情!

為了幫助更多開發者們客觀瞭解並使用阿里雲函式計算 (FC) ,阿里雲開發者社群攜手雲原生應用平臺 Serverless 團隊釋出 “Serverless 函式計算徵集令”,

7月31日前釋出函式計算評測文章,即有機會獲得千元好禮 + 千元函式計算資源包!

點選閱讀原文,立即參加!