GoBatch簡介 —— 一款基於go語言的企業級批處理框架(Golang下的SpringBatch)

語言: CN / TW / HK

GoBatch是一款用go語言實現的企業級批處理框架,其設計思想來源於SpringBatch,相當於golang下的SpringBatch框架。

專案倉庫地址: Github: https://github.com/chararch/gobatch Gitee: https://gitee.com/chararch/gobatch

功能

GoBatch的主要功能包括:

  1. 以模組化方式構建批處理應用程式。
  2. 管理多個批處理任務的執行。
  3. 任務被分為多個序列執行的步驟,一個步驟可以通過分割槽由多執行緒並行執行。
  4. 自動記錄任務執行狀態,支援任務失敗後斷點續跑。
  5. 內建檔案讀寫元件,支援tsv、csv、json等格式的檔案讀寫及校驗。
  6. 提供多種Listener,便於對任務和步驟進行擴充套件。

架構

GoBatch整體架構分為三層:

  1. 介面層:提供上層應用呼叫的介面,主要包括任務編排、任務管理、任務的啟動和暫停。
  2. 核心層:提供一套任務執行引擎,包括資料處理、檔案讀寫、並行處理和錯誤處理等通用元件或能力。
  3. 基礎層:包括協程池、事務管理、任務執行狀態記錄、日誌列印等

作為一款批處理框架,GoBatch的核心能力是任務編排和任務執行。應用程式需要首先通過GoBatch介面完成任務編排,才可以執行任務。

從任務結構編排上來說,一個任務(Job)由多個步驟(Step)構成,每個步驟包含一段業務邏輯,各步驟按照先後順序依次執行。 任務編排即是將不同的業務邏輯構造成多個步驟(Step)並按照一定順序組裝成任務(Job),交由GoBatch執行時管理。作為批處理框架,GoBatch可以管理多個任務。

任務執行時,應用程式可以傳遞引數給指定任務,GoBatch根據傳入的任務引數生成一個任務例項(JobInstance)。一個任務例項可能會被多次執行,每次執行時,GoBatch會建立一條任務執行記錄(JobExecution)用於記錄任務的執行狀態資訊。 同理,每個Step的執行也會生成一條步驟執行記錄(StepExecution)。GoBatch會將任務例項(JobInstance)、任務執行記錄(JobExecution)、步驟執行記錄(StepExecution)通過Repository儲存到資料庫中。

GoBatch沒有限定任務執行的觸發方式,應用程式既可以通過定時任務、也可以通過實時事件、還可以通過命令列觸發任務的執行。

GoBatch批處理應用的執行流程如下:

核心設計

任務模型

  1. 一個任務(Job)由多個步驟(Step)構成,Step包含業務邏輯
  2. 任務帶有 引數 、引數是一個KV map,不同的引數代表不同的 任務例項(JobInstance) ,在任務的步驟中可以獲取引數值
  3. 每個任務例項都會從頭執行所有的任務步驟,如果執行失敗,可以從斷點繼續執行
  4. 任務的每次執行對應一個執行記錄(JobExecution),Step每次執行也對應一個執行記錄(StepExecution)
  5. 在執行記錄中會記錄任務和步驟的開始、結束時間、狀態,其中步驟執行記錄中會記錄該步驟處理的資料範圍以及遊標,以便在中斷時從斷點繼續執行

Step分類

步驟(Step)分為三類:

  • 簡單步驟(Simple Step) :在單執行緒中執行使用者編寫的業務邏輯。
  • 分塊步驟(Chunk Step) :在單執行緒中分塊處理大量的資料。分塊步驟包含Reader、Processor、Writer 3個使用者自定義元件,執行順序是按指定分塊大小讀取源資料、處理資料、寫入結果資料,分片處理邏輯在一個事務中,重複執行該流程直到源資料讀取完畢。
  • 分割槽步驟(Partition Step) :將要處理的資料劃分為多個分割槽,每個分割槽由一個單獨的執行緒(子步驟)分別處理,最後將各個分割槽的處理結果進行聚合。Partitioner和Aggregator由使用者自定義,處理每個分割槽的子步驟可以是簡單步驟或分塊步驟。

上圖中的Handler、Reader、Processor、Writer、Partitioner、Aggregator是介面型別,需要應用程式自己實現。

任務編排

任務編排主要由stepBuilder和jobBuilder兩個物件來完成,應用程式先將不同的業務邏輯handler通過stepBuilder生成Step物件,再將Step物件傳給jobBuilder生成Job。 如下圖:

在構造Step物件時,stepBuilder會根據傳入的handler型別選擇合適的Step型別:

  1. 如果傳入了普通Handler,則會生成simpleStep。
  2. 如果傳入了Reader物件(以及可選的Processor和Writer),則會生成chunkStep。
  3. 在滿足前兩個條件之一時,如果還傳入了Partitioner物件,則會生成partitionStep。

stepBuilder的執行流程如下:

任務編排的整體流程如下:

任務執行

應用層可以根據定時任務、外部請求/事件、命令列等觸發GoBatch任務的執行。應用程式通過呼叫jobOperator的Start()/StartAsync()方法啟動任務的執行,在啟動任務時,需要指定任務名稱並傳遞引數。

  1. StartAsync()方法為非同步執行任務,呼叫該方法後,會啟動一個goroutine用於執行任務並立即返回。
  2. Start()方法為同步執行任務,呼叫該方法後,會一直阻塞直到整個任務的所有步驟執行完畢才返回。

同步執行任務的流程圖如下:

從流程圖中可以看出,Job的執行就是遍歷所有的Step並執行Step中的業務邏輯。Step有三種類型,每種型別Step的執行流程有一定區別,我們可以根據業務需要使用不同Step型別。

簡單步驟的執行

簡單步驟直接在當前Job的執行緒中執行Handler中的業務邏輯,Handler中的業務邏輯應當是原子性的或具有冪等性。執行流程如下:

從名稱可知,簡單步驟適合於業務邏輯比較簡單的場景。

分塊步驟的執行

分塊步驟也是在當前任務的執行緒中執行(單執行緒),它的執行流程是:

  1. 通過事務管理器啟動一個事務。
  2. 從Reader中讀取分塊大小數量的資料項(分塊大小在構造Step時通過chunkSize引數指定)。
  3. 呼叫Processor逐條處理從Reader中讀取到的資料項,得到處理後的新資料項。
  4. 呼叫Writer將Processor返回的資料項寫入到底層儲存。
  5. 如果2~3步執行成功,則提交事務;否則回滾事務。
  6. 如果前述事務提交成功,則重複執行1~5步,直到Reader返回空(nil)或事務失敗。

流程圖如下:

分塊步驟適合用於處理大批量資料、但需要序列執行的場景,或者是資料量較大、計算資源有限、但對處理時限要求不高的場景。

分割槽步驟的執行

分割槽步驟的執行流程是:

  1. 呼叫Partitioner對該步驟需要處理的全部資料進行分割槽,生成子步驟的StepExecution例項列表,分割槽資訊儲存在返回的StepExecution中。
  2. 對Partitioner生成的分割槽StepExecution列表,逐個生成子步驟task並提交到taskpool執行。
  3. taskpool並行執行所有子步驟。
  4. 等待所有子步驟執行完成。
  5. 呼叫Aggregator對子步驟結果進行聚合。

流程圖如下:

分割槽步驟適合用於處理大批量資料、計算資源比較充足的場景。

執行狀態記錄

GoBatch通過以下4個物件記錄批量任務的執行時的狀態資訊:

  • JobInstance - 對應某個Job的某一組不同的引數,使用相同的引數啟動任務,只會對應同一個JobInstance
  • JobExecution - 對應某個JobInstance的一次執行,如果中途暫停或失敗之後重啟,則會生成一個新的JobExecution
  • StepContext - 對應某個JobInstance下某個Step的上下文資訊,該上下文只和JobInstance和step_name有關,與Job或Step的執行次數無關。
  • StepExecution - 對應某個JobExecution下某個Step的一次執行,如果中途暫停或失敗之後重啟,則會生成一個新的StepExecution

這4個物件對應資料庫裡的4張表,主要的欄位及相互關係如下圖:

具體的表結構,可以檢視:https://github.com/chararch/gobatch/blob/master/sql/schema_mysql.sql

JobExecution和StepExecution中包含Job和Step的執行開始和結束時間、狀態等資訊。對於chunkStep,在StepExecution裡面會記錄已經Read、Process、Write的資料量,已提交或回滾的chunk次數等。 Job和Step的執行狀態有:

  • STARTING: Job或Step提交到協程池等待執行
  • STARTED: Job或Step正在執行
  • STOPPING: Job或Step正在停止(已收到停止指令)
  • STOPPED: Job或Step已停止
  • COMPLETED: Job或Step已成功完成
  • FAILED: Job或Step已執行失敗
  • UNKNOWN: 未知狀態(目前未使用)

狀態轉移如下:

通過記錄Job和Step的執行狀態,GoBatch可以實現斷點續跑功能。試想,當我們在執行一個處理大資料量、耗時很長的任務時,中途由於某些原因(如程序crash、部分資料不規整造成程式出錯等)導致失敗, 在我們解決問題後,如果從頭開始執行勢必會花費很長時間、甚至會造成重複處理。這時我們可能希望從中斷位置繼續處理剩餘資料,GoBatch能夠很好地滿足這一訴求。

並行處理

GoBatch可以管理多個Job,每個Job又包含一個或多個Step,在Job和Step層級都可以並行執行。GoBatch在內部維護了兩個協程池(大小可配置):一個用於Job的執行,一個用於Step(專指partitionStep)的執行。

事務管理

在執行分塊步驟(chunkStep)時,對每個分塊內一小批資料的處理需要處在一個事務之中,GoBatch引入了一個事務管理器元件,其介面定義如下:

type TransactionManager interface {
	BeginTx() (tx interface{}, err BatchError)
	Commit(tx interface{}) BatchError
	Rollback(tx interface{}) BatchError
}

這個事務管理器需要使用方根據自身的基礎設施自行實現並註冊到GoBatch。為簡便起見,GoBatch提供了一個基於*sql.DB的預設事務管理器實現,如果使用方是採用關係資料庫來進行資料處理,可以直接使用預設的事務管理器,如下:

gobatch.SetTransactionManager(gobatch.NewTransactionManager(sqlDb))

三方依賴

  1. GoBatch執行時需要將任務執行狀態資訊儲存到關係資料庫,因此依賴database/sql及資料庫驅動github.com/go-sql-driver/mysql
  2. 協程池使用了github.com/panjf2000/ants/v2
  3. 錯誤處理使用了github.com/pkg/errors
  4. FTP檔案處理使用了github.com/jlaffaye/ftp

使用

  1. 獲取包
go get -u github.com/chararch/gobatch
  1. 建立gobatch資料庫和批量任務執行狀態表
create database xxx;
//create table from https://github.com/chararch/gobatch/blob/master/sql/schema_mysql.sql
  1. 初始化資料庫連線並註冊*sql.DB到GoBatch:
db, _ := sql.Open("mysql", "user:[email protected](localhost:3306)/xxx?charset=utf8&parseTime=true")
gobatch.SetDB(db)
  1. 如果使用chunkStep,還需要註冊事務管理器:
gobatch.SetTransactionManager(gobatch.NewTransactionManager(sqlDb))
  1. 編寫業務邏輯,實現Handler、Reader、Processor、Writer等介面。示例可以參考:https://github.com/chararch/gobatch/
  2. 構造Step和Job,註冊到GoBatch
step1 := gobatch.NewStep("mytask").Handler(mytask).Build()
//step2 := gobatch.NewStep("my_step").Handler(&myReader{}, &myProcessor{}, &myWriter{}).Build()
step2 := gobatch.NewStep("my_step").Reader(&myReader{}).Processor(&myProcessor{}).Writer(&myWriter{}).ChunkSize(10).Build()

job := gobatch.NewJob("my_job").Step(step1, step2).Build()

gobatch.Register(job)
  1. 啟動任務
gobatch.Start(context.Background(), "my_job", "{\"k\":\"v\"}")
//gobatch.StartAsync(context.Background(), "my_job", "{\"k\":\"v\"}")

更多資訊和示例程式碼,可以參見:https://github.com/chararch/gobatch

小結

本文對批處理框架GoBatch的功能、架構、設計和使用方法,做了一個簡要介紹。GoBatch適合用於開發業務邏輯比較複雜的企業級批處理應用,例如金融等領域,也可用於企業在從Java向Go轉型時替換SpringBatch的框架。 作為開源專案,GoBatch目前還處於比較早期的階段,未來致力於將GoBatch打造成golang下的企業級批處理解決方案,後續會有更多相關內容介紹。

問題反饋

https://github.com/chararch/gobatch/issues/new