GoBatch簡介 —— 一款基於go語言的企業級批處理框架(Golang下的SpringBatch)
GoBatch是一款用go語言實現的企業級批處理框架,其設計思想來源於SpringBatch,相當於golang下的SpringBatch框架。
專案倉庫地址: Github: https://github.com/chararch/gobatch Gitee: https://gitee.com/chararch/gobatch
功能
GoBatch的主要功能包括:
- 以模組化方式構建批處理應用程式。
- 管理多個批處理任務的執行。
- 任務被分為多個序列執行的步驟,一個步驟可以通過分割槽由多執行緒並行執行。
- 自動記錄任務執行狀態,支援任務失敗後斷點續跑。
- 內建檔案讀寫元件,支援tsv、csv、json等格式的檔案讀寫及校驗。
- 提供多種Listener,便於對任務和步驟進行擴充套件。
架構
GoBatch整體架構分為三層:
- 介面層:提供上層應用呼叫的介面,主要包括任務編排、任務管理、任務的啟動和暫停。
- 核心層:提供一套任務執行引擎,包括資料處理、檔案讀寫、並行處理和錯誤處理等通用元件或能力。
- 基礎層:包括協程池、事務管理、任務執行狀態記錄、日誌列印等
作為一款批處理框架,GoBatch的核心能力是任務編排和任務執行。應用程式需要首先通過GoBatch介面完成任務編排,才可以執行任務。
從任務結構編排上來說,一個任務(Job)由多個步驟(Step)構成,每個步驟包含一段業務邏輯,各步驟按照先後順序依次執行。 任務編排即是將不同的業務邏輯構造成多個步驟(Step)並按照一定順序組裝成任務(Job),交由GoBatch執行時管理。作為批處理框架,GoBatch可以管理多個任務。
任務執行時,應用程式可以傳遞引數給指定任務,GoBatch根據傳入的任務引數生成一個任務例項(JobInstance)。一個任務例項可能會被多次執行,每次執行時,GoBatch會建立一條任務執行記錄(JobExecution)用於記錄任務的執行狀態資訊。 同理,每個Step的執行也會生成一條步驟執行記錄(StepExecution)。GoBatch會將任務例項(JobInstance)、任務執行記錄(JobExecution)、步驟執行記錄(StepExecution)通過Repository儲存到資料庫中。
GoBatch沒有限定任務執行的觸發方式,應用程式既可以通過定時任務、也可以通過實時事件、還可以通過命令列觸發任務的執行。
GoBatch批處理應用的執行流程如下:
核心設計
任務模型
- 一個任務(Job)由多個步驟(Step)構成,Step包含業務邏輯
- 任務帶有 引數 、引數是一個KV map,不同的引數代表不同的 任務例項(JobInstance) ,在任務的步驟中可以獲取引數值
- 每個任務例項都會從頭執行所有的任務步驟,如果執行失敗,可以從斷點繼續執行
- 任務的每次執行對應一個執行記錄(JobExecution),Step每次執行也對應一個執行記錄(StepExecution)
- 在執行記錄中會記錄任務和步驟的開始、結束時間、狀態,其中步驟執行記錄中會記錄該步驟處理的資料範圍以及遊標,以便在中斷時從斷點繼續執行
Step分類
步驟(Step)分為三類:
- 簡單步驟(Simple Step) :在單執行緒中執行使用者編寫的業務邏輯。
- 分塊步驟(Chunk Step) :在單執行緒中分塊處理大量的資料。分塊步驟包含Reader、Processor、Writer 3個使用者自定義元件,執行順序是按指定分塊大小讀取源資料、處理資料、寫入結果資料,分片處理邏輯在一個事務中,重複執行該流程直到源資料讀取完畢。
-
分割槽步驟(Partition Step)
:將要處理的資料劃分為多個分割槽,每個分割槽由一個單獨的執行緒(子步驟)分別處理,最後將各個分割槽的處理結果進行聚合。Partitioner和Aggregator由使用者自定義,處理每個分割槽的子步驟可以是簡單步驟或分塊步驟。
上圖中的Handler、Reader、Processor、Writer、Partitioner、Aggregator是介面型別,需要應用程式自己實現。
任務編排
任務編排主要由stepBuilder和jobBuilder兩個物件來完成,應用程式先將不同的業務邏輯handler通過stepBuilder生成Step物件,再將Step物件傳給jobBuilder生成Job。
如下圖:
在構造Step物件時,stepBuilder會根據傳入的handler型別選擇合適的Step型別:
- 如果傳入了普通Handler,則會生成simpleStep。
- 如果傳入了Reader物件(以及可選的Processor和Writer),則會生成chunkStep。
- 在滿足前兩個條件之一時,如果還傳入了Partitioner物件,則會生成partitionStep。
stepBuilder的執行流程如下:
任務編排的整體流程如下:
任務執行
應用層可以根據定時任務、外部請求/事件、命令列等觸發GoBatch任務的執行。應用程式通過呼叫jobOperator的Start()/StartAsync()方法啟動任務的執行,在啟動任務時,需要指定任務名稱並傳遞引數。
- StartAsync()方法為非同步執行任務,呼叫該方法後,會啟動一個goroutine用於執行任務並立即返回。
- Start()方法為同步執行任務,呼叫該方法後,會一直阻塞直到整個任務的所有步驟執行完畢才返回。
同步執行任務的流程圖如下:
從流程圖中可以看出,Job的執行就是遍歷所有的Step並執行Step中的業務邏輯。Step有三種類型,每種型別Step的執行流程有一定區別,我們可以根據業務需要使用不同Step型別。
簡單步驟的執行
簡單步驟直接在當前Job的執行緒中執行Handler中的業務邏輯,Handler中的業務邏輯應當是原子性的或具有冪等性。執行流程如下:
從名稱可知,簡單步驟適合於業務邏輯比較簡單的場景。
分塊步驟的執行
分塊步驟也是在當前任務的執行緒中執行(單執行緒),它的執行流程是:
- 通過事務管理器啟動一個事務。
- 從Reader中讀取分塊大小數量的資料項(分塊大小在構造Step時通過chunkSize引數指定)。
- 呼叫Processor逐條處理從Reader中讀取到的資料項,得到處理後的新資料項。
- 呼叫Writer將Processor返回的資料項寫入到底層儲存。
- 如果2~3步執行成功,則提交事務;否則回滾事務。
- 如果前述事務提交成功,則重複執行1~5步,直到Reader返回空(nil)或事務失敗。
流程圖如下:
分塊步驟適合用於處理大批量資料、但需要序列執行的場景,或者是資料量較大、計算資源有限、但對處理時限要求不高的場景。
分割槽步驟的執行
分割槽步驟的執行流程是:
- 呼叫Partitioner對該步驟需要處理的全部資料進行分割槽,生成子步驟的StepExecution例項列表,分割槽資訊儲存在返回的StepExecution中。
- 對Partitioner生成的分割槽StepExecution列表,逐個生成子步驟task並提交到taskpool執行。
- taskpool並行執行所有子步驟。
- 等待所有子步驟執行完成。
- 呼叫Aggregator對子步驟結果進行聚合。
流程圖如下:
分割槽步驟適合用於處理大批量資料、計算資源比較充足的場景。
執行狀態記錄
GoBatch通過以下4個物件記錄批量任務的執行時的狀態資訊:
- JobInstance - 對應某個Job的某一組不同的引數,使用相同的引數啟動任務,只會對應同一個JobInstance
- JobExecution - 對應某個JobInstance的一次執行,如果中途暫停或失敗之後重啟,則會生成一個新的JobExecution
- StepContext - 對應某個JobInstance下某個Step的上下文資訊,該上下文只和JobInstance和step_name有關,與Job或Step的執行次數無關。
- StepExecution - 對應某個JobExecution下某個Step的一次執行,如果中途暫停或失敗之後重啟,則會生成一個新的StepExecution
這4個物件對應資料庫裡的4張表,主要的欄位及相互關係如下圖:
具體的表結構,可以檢視:https://github.com/chararch/gobatch/blob/master/sql/schema_mysql.sql
JobExecution和StepExecution中包含Job和Step的執行開始和結束時間、狀態等資訊。對於chunkStep,在StepExecution裡面會記錄已經Read、Process、Write的資料量,已提交或回滾的chunk次數等。 Job和Step的執行狀態有:
- STARTING: Job或Step提交到協程池等待執行
- STARTED: Job或Step正在執行
- STOPPING: Job或Step正在停止(已收到停止指令)
- STOPPED: Job或Step已停止
- COMPLETED: Job或Step已成功完成
- FAILED: Job或Step已執行失敗
- UNKNOWN: 未知狀態(目前未使用)
狀態轉移如下:
通過記錄Job和Step的執行狀態,GoBatch可以實現斷點續跑功能。試想,當我們在執行一個處理大資料量、耗時很長的任務時,中途由於某些原因(如程序crash、部分資料不規整造成程式出錯等)導致失敗, 在我們解決問題後,如果從頭開始執行勢必會花費很長時間、甚至會造成重複處理。這時我們可能希望從中斷位置繼續處理剩餘資料,GoBatch能夠很好地滿足這一訴求。
並行處理
GoBatch可以管理多個Job,每個Job又包含一個或多個Step,在Job和Step層級都可以並行執行。GoBatch在內部維護了兩個協程池(大小可配置):一個用於Job的執行,一個用於Step(專指partitionStep)的執行。
事務管理
在執行分塊步驟(chunkStep)時,對每個分塊內一小批資料的處理需要處在一個事務之中,GoBatch引入了一個事務管理器元件,其介面定義如下:
type TransactionManager interface { BeginTx() (tx interface{}, err BatchError) Commit(tx interface{}) BatchError Rollback(tx interface{}) BatchError }
這個事務管理器需要使用方根據自身的基礎設施自行實現並註冊到GoBatch。為簡便起見,GoBatch提供了一個基於*sql.DB的預設事務管理器實現,如果使用方是採用關係資料庫來進行資料處理,可以直接使用預設的事務管理器,如下:
gobatch.SetTransactionManager(gobatch.NewTransactionManager(sqlDb))
三方依賴
- GoBatch執行時需要將任務執行狀態資訊儲存到關係資料庫,因此依賴database/sql及資料庫驅動github.com/go-sql-driver/mysql
- 協程池使用了github.com/panjf2000/ants/v2
- 錯誤處理使用了github.com/pkg/errors
- FTP檔案處理使用了github.com/jlaffaye/ftp
使用
- 獲取包
go get -u github.com/chararch/gobatch
- 建立gobatch資料庫和批量任務執行狀態表
create database xxx; //create table from https://github.com/chararch/gobatch/blob/master/sql/schema_mysql.sql
- 初始化資料庫連線並註冊*sql.DB到GoBatch:
db, _ := sql.Open("mysql", "user:[email protected](localhost:3306)/xxx?charset=utf8&parseTime=true") gobatch.SetDB(db)
- 如果使用chunkStep,還需要註冊事務管理器:
gobatch.SetTransactionManager(gobatch.NewTransactionManager(sqlDb))
- 編寫業務邏輯,實現Handler、Reader、Processor、Writer等介面。示例可以參考:https://github.com/chararch/gobatch/
- 構造Step和Job,註冊到GoBatch
step1 := gobatch.NewStep("mytask").Handler(mytask).Build() //step2 := gobatch.NewStep("my_step").Handler(&myReader{}, &myProcessor{}, &myWriter{}).Build() step2 := gobatch.NewStep("my_step").Reader(&myReader{}).Processor(&myProcessor{}).Writer(&myWriter{}).ChunkSize(10).Build() job := gobatch.NewJob("my_job").Step(step1, step2).Build() gobatch.Register(job)
- 啟動任務
gobatch.Start(context.Background(), "my_job", "{\"k\":\"v\"}") //gobatch.StartAsync(context.Background(), "my_job", "{\"k\":\"v\"}")
更多資訊和示例程式碼,可以參見:https://github.com/chararch/gobatch
小結
本文對批處理框架GoBatch的功能、架構、設計和使用方法,做了一個簡要介紹。GoBatch適合用於開發業務邏輯比較複雜的企業級批處理應用,例如金融等領域,也可用於企業在從Java向Go轉型時替換SpringBatch的框架。 作為開源專案,GoBatch目前還處於比較早期的階段,未來致力於將GoBatch打造成golang下的企業級批處理解決方案,後續會有更多相關內容介紹。
問題反饋
https://github.com/chararch/gobatch/issues/new
- 優維低程式碼:構件渲染子構件
- 優維低程式碼:構件 slot 說明
- Go語言愛好者週刊:第 149 期 — 正確率只有 22%
- 優維低程式碼:構件引數傳遞
- 教育業IT運維怎麼做?這家機構給出了他們的答案
- 優維低程式碼:構件基本說明
- GO專案實戰—開發上傳圖片功能
- 熬夜運維必看!監控觀測夠有效,你就可以睡好覺
- 用 Golang 跑「佇列任務」,也可以像 Laravel 一樣簡單
- GitHub 倉庫對比工具 —— github-compare
- 優維低程式碼:編排詳解選單配置
- GoBatch簡介 —— 一款基於go語言的企業級批處理框架(Golang下的SpringBatch)
- Go語言愛好者週刊:第 144 期 — 一道切片的題目
- Wind分散式遊戲伺服器引擎的實現
- Go專案實戰之日誌必備篇[開源十年專案第11次更新]
- 深入Go底層原理,重寫Redis中介軟體實戰
- 從原始碼解讀切片容量增加的計算步驟
- 什麼是中介軟體
- 優維低程式碼:Storyboard 整體結構與路由配置
- 優維低程式碼:事件與互動