SpringBatch從入門到精通-6.4 讀和寫處理-實戰4【掘金日新計劃】

語言: CN / TW / HK

持續創作,加速成長,6月更文活動來啦!| 掘金·日新計劃

持續創作,加速成長!這是我參與「掘金日新計劃 · 6 月更文挑戰」的第15天,點選檢視活動詳情

SpringBatch從入門到精通-1【掘金日新計劃】

SpringBatch從入門到精通-2-StepScope作用域和用法【掘金日新計劃】

SpringBatch從入門到精通-3-並行處理【掘金日新計劃】

SpringBatch從入門到精通-3.2-並行處理-遠端分割槽【掘金日新計劃】

SpringBatch從入門到精通-3.3-並行處理-遠端分割槽(訊息聚合)【掘金日新計劃】

SpringBatch從入門到精通-4 監控和指標【掘金日新計劃】

SpringBatch從入門到精通-4.2 監控和指標-原理【掘金日新計劃】

SpringBatch從入門到精通-5 資料來源配置相關【掘金日新計劃】

SpringBatch從入門到精通-5.2 資料來源配置相關-原理【掘金日新計劃】

SpringBatch從入門到精通-6 讀和寫處理【掘金日新計劃】

SpringBatch從入門到精通-6.1 讀和寫處理-實戰【掘金日新計劃】

SpringBatch從入門到精通-6.2 讀和寫處理-實戰2【掘金日新計劃】

SpringBatch從入門到精通-6.3 讀和寫處理-實戰3【掘金日新計劃】

重用現有服務

批處理系統通常與其他應用程式樣式結合使用。最常見的是線上系統,但它也可以通過移動每種應用程式樣式使用的必要批量資料來支援整合甚至胖客戶端應用程式。出於這個原因,許多使用者希望在他們的批處理作業中重用現有的 DAO 或其他服務是很常見的。Spring 容器本身通過允許注入任何必要的類使這變得相當容易。但是,可能存在現有服務需要充當ItemReaderorItemWriter的情況,以滿足另一個 Spring Batch 類的依賴關係,或者因為它確實是主要的ItemReader一步。為每個需要包裝的服務編寫一個介面卡類是相當簡單的,但因為這是一個常見的問題,Spring Batch 提供了實現: ItemReaderAdapter和ItemWriterAdapter. 這兩個類都通過呼叫委託模式來實現標準的 Spring 方法,並且設定起來相當簡單。

以下 Java 示例使用ItemReaderAdapter:

Java 配置

@Bean public ItemReaderAdapter itemReader() { ItemReaderAdapter reader = new ItemReaderAdapter(); ​ reader.setTargetObject(fooService()); reader.setTargetMethod("generateFoo"); ​ return reader; } ​ @Bean public FooService fooService() { return new FooService(); }

需要注意的重要一點是,合同的合同targetMethod必須與合同相同read:用盡時,它會返回null。否則,它返回一個 Object. 其他任何事情都會阻止框架知道處理何時結束,導致無限迴圈或不正確的失敗,具體取決於ItemWriter.

以下 Java 示例使用ItemWriterAdapter:

Java 配置

@Bean public ItemWriterAdapter itemWriter() { ItemWriterAdapter writer = new ItemWriterAdapter(); ​ writer.setTargetObject(fooService()); writer.setTargetMethod("processFoo"); ​ return writer; } ​ @Bean public FooService fooService() { return new FooService(); }

防止狀態持續存在

預設情況下,所有ItemReader和ItemWriter實現都將其當前狀態儲存在ExecutionContext提交之前。但是,這可能並不總是期望的行為。例如,許多開發人員選擇通過使用程序指示器使他們的資料庫讀取器“可重新執行”。一個額外的列被新增到輸入資料以指示它是否已被處理。當讀取(或寫入)特定記錄時,已處理標誌從 翻轉false到true。然後 SQL 語句可以在子句中包含額外的語句where,例如where PROCESSED_IND = false,從而確保在重新啟動的情況下只返回未處理的記錄。在這種情況下,最好不要儲存任何狀態,例如當前行號,因為它在重新啟動時無關緊要。因此,所有讀取器和寫入器都包含“saveState”屬性。

以下 bean 定義顯示瞭如何防止 Java 中的狀態永續性:

Java 配置

@Bean public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) { return new JdbcCursorItemReaderBuilder<PlayerSummary>() .dataSource(dataSource) .rowMapper(new PlayerSummaryMapper()) .saveState(false) .sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"  + "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"  + "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"  + "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"  + "from games, players where players.player_id ="  + "games.player_id group by games.player_id, games.year_no") .build(); ​ }

上面的ItemReader配置不會ExecutionContext在它參與的任何執行中建立任何條目。

建立自定義 ItemReaders 和 ItemWriters

自定義ItemReader示例

出於本示例的目的,我們建立了一個ItemReader從提供的列表中讀取的簡單實現。我們從實現最基本的合約 ItemReader,read方法開始,如下程式碼所示:

public class CustomItemReader<T> implements ItemReader<T> { ​    List<T> items; ​    public CustomItemReader(List<T> items) {        this.items = items;   } ​    public T read() throws Exception, UnexpectedInputException,       NonTransientResourceException, ParseException { ​        if (!items.isEmpty()) {            return items.remove(0);       }        return null;   } }

前面的類接受一個專案列表並一次返回一個專案,從列表中刪除每個專案。當列表為空時,返回null,從而滿足了 ItemReader 的最基本要求,如下測試程式碼所示:

List<String> items = new ArrayList<>(); items.add("1"); items.add("2"); items.add("3"); ​ ItemReader itemReader = new CustomItemReader<>(items); assertEquals("1", itemReader.read()); assertEquals("2", itemReader.read()); assertEquals("3", itemReader.read()); assertNull(itemReader.read());

使ItemReader可重啟

最後的挑戰是使ItemReader可重新啟動。目前,如果處理被中斷並重新開始,則ItemReader必須從頭開始。這在許多情況下實際上是有效的,但有時最好從停止的地方重新啟動批處理作業。關鍵的判別通常是讀者是有狀態的還是無狀態的。無狀態閱讀器不需要擔心可重新啟動性,但有狀態閱讀器必須嘗試在重新啟動時重建其最後一個已知狀態。因此,我們建議您儘可能讓自定義閱讀器保持無狀態,這樣您就不必擔心可重新啟動性。

如果確實需要儲存狀態,ItemStream則應使用該介面:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream { ​    List<T> items;    int currentIndex = 0;    private static final String CURRENT_INDEX = "current.index"; ​    public CustomItemReader(List<T> items) {        this.items = items;   } ​    public T read() throws Exception, UnexpectedInputException,        ParseException, NonTransientResourceException { ​        if (currentIndex < items.size()) {            return items.get(currentIndex++);       } ​        return null;   } ​    public void open(ExecutionContext executionContext) throws ItemStreamException {        if (executionContext.containsKey(CURRENT_INDEX)) {            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();       }        else {            currentIndex = 0;       }   } ​    public void update(ExecutionContext executionContext) throws ItemStreamException {        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());   } ​    public void close() throws ItemStreamException {} }

在每次呼叫該ItemStream update方法時,當前的索引ItemReader 都儲存在提供ExecutionContext的鍵“current.index”中。呼叫該 ItemStream open方法時,ExecutionContext會檢查它是否包含具有該鍵的條目。如果找到該鍵,則將當前索引移動到該位置。這是一個相當瑣碎的例子,但它仍然符合一般合同:

ExecutionContext executionContext = new ExecutionContext(); ((ItemStream)itemReader).open(executionContext); assertEquals("1", itemReader.read()); ((ItemStream)itemReader).update(executionContext); ​ List<String> items = new ArrayList<>(); items.add("1"); items.add("2"); items.add("3"); itemReader = new CustomItemReader<>(items); ​ ((ItemStream)itemReader).open(executionContext); assertEquals("2", itemReader.read());

大多數ItemReaders都有更復雜的重啟邏輯。JdbcCursorItemReader例如,將最後處理的行的行 ID 儲存在遊標中。

還值得注意的是,其中使用的金鑰ExecutionContext不應該是微不足道的。那是因為相同ExecutionContext的內容用於 a 中的所有ItemStreams內容Step。在大多數情況下,只需在鍵前面加上類名就足以保證唯一性。但是,在同一步驟中使用兩個相同型別的極少數情況下 ItemStream(如果需要兩個檔案進行輸出,可能會發生這種情況),需要一個更唯一的名稱。出於這個原因,許多 Spring Batch ItemReader和ItemWriter實現都有一個setName()屬性,可以覆蓋這個鍵名。

自定義ItemWriter示例

實現自定義ItemWriter在許多方面與ItemReader上面的示例相似,但在足以保證其自己的示例的方式上有所不同。但是,新增可重啟性本質上是相同的,因此在此示例中不涉及。與 ItemReader示例一樣,List使用 a 是為了使示例儘可能簡單:

public class CustomItemWriter<T> implements ItemWriter<T> { ​    List<T> output = TransactionAwareProxyFactory.createTransactionalList(); ​    public void write(List<? extends T> items) throws Exception {        output.addAll(items);   } ​    public List<T> getOutput() {        return output;   } }

使可ItemWriter重啟

為了使ItemWriter可重新啟動,我們將遵循與 相同的過程 ItemReader,新增和實現ItemStream介面以同步執行上下文。在示例中,我們可能必須計算處理的專案數並將其新增為頁尾記錄。如果我們需要這樣做,我們可以 ItemStream在我們的中實現,ItemWriter以便在重新開啟流時從執行上下文中重構計數器。

在許多實際情況下,customItemWriters還委託給另一個本身可重新啟動的寫入程式(例如,寫入檔案時),或者它寫入事務性資源,因此不需要重新啟動,因為它是無狀態的。當你有一個有狀態的 writer 時,你可能應該確保實現ItemStream以及ItemWriter. 還請記住,編寫器的客戶端需要知道ItemStream,因此您可能需要在配置中將其註冊為流。

專案讀取器和寫入器實現

在本節中,我們將向您介紹前幾節中尚未討論的讀者和作者。

裝飾器

在某些情況下,使用者需要將特殊行為附加到預先存在的 ItemReader. Spring Batch 提供了一些開箱即用的裝飾器,可以為您的實現新增額外的ItemReader行為ItemWriter。

Spring Batch 包括以下裝飾器:

SynchronizedItemStreamReader

當使用ItemReader非執行緒安全的時,Spring Batch 提供 SynchronizedItemStreamReader裝飾器,可用於使ItemReader 執行緒安全。Spring Batch 提供了一個SynchronizedItemStreamReaderBuilder用於構造SynchronizedItemStreamReader.

SingleItemPeekableItemReader

Spring Batch 包含一個裝飾器,該裝飾器將 peek 方法新增到ItemReader. 這種檢視方法讓使用者可以檢視前面的一項。對 peek 的重複呼叫返回相同的專案,這是該read方法返回的下一個專案。Spring Batch 提供了一個 SingleItemPeekableItemReaderBuilder用於構造 SingleItemPeekableItemReader.

SingleItemPeekableItemReader 的 peek 方法不是執行緒安全的,因為不可能在多個執行緒中實現 peek。只有一個偷看的執行緒會在下一次讀取呼叫中獲得該專案。

SynchronizedItemStreamWriter

當使用ItemWriter非執行緒安全的時,Spring Batch 提供 SynchronizedItemStreamWriter裝飾器,可用於使ItemWriter 執行緒安全。Spring Batch 提供了一個SynchronizedItemStreamWriterBuilder用於構造SynchronizedItemStreamWriter.

MultiResourceItemWriter

MultiResourceItemWriter包裝 a並在ResourceAwareItemWriterItemStream當前資源中寫入的專案數超過 時建立新的輸出資源 itemCountLimitPerResource。Spring Batch 提供了一個MultiResourceItemWriterBuilder用於構造MultiResourceItemWriter.

ClassifierCompositeItemWriter

基於ClassifierCompositeItemWriter通過ItemWriter 提供的 Classifier. 如果所有委託都是執行緒安全的,則實現是執行緒安全的。Spring Batch 提供了一個ClassifierCompositeItemWriterBuilder用於構造 ClassifierCompositeItemWriter.

ClassifierCompositeItemProcessor

這ClassifierCompositeItemProcessor是一個ItemProcessor基於ItemProcessor通過提供的Classifier. Spring Batch 提供了一個 ClassifierCompositeItemProcessorBuilder用於構造 ClassifierCompositeItemProcessor.

訊息讀者和作家

Spring Batch 為常用的訊息傳遞系統提供了以下閱讀器和編寫器:

AmqpItemReader

TheAmqpItemReader是一個ItemReader使用 anAmqpTemplate來接收或轉換來自交換的訊息。Spring Batch 提供了一個AmqpItemReaderBuilder用於構造AmqpItemReader.

AmqpItemWriter

是AmqpItemWriter一個ItemWriter使用AmqpTemplate傳送訊息到 AMQP 交換。如果提供的名稱未指定,則訊息將傳送到無名交換器AmqpTemplate。Spring Batch 提供了一個AmqpItemWriterBuilder用於構造AmqpItemWriter.

JmsItemReader

JmsItemReader是用於 JMS的ItemReader,它使用JmsTemplate. 模板應該有一個預設目的地,用於為read() 方法提供專案。Spring Batch 提供了一個JmsItemReaderBuilder用於構造 JmsItemReader.

JmsItemWriter

JmsItemWriter是用於 JMS的ItemWriter,它使用JmsTemplate. 模板應該有一個預設目的地,用於在write(List). Spring Batch 提供了一個JmsItemWriterBuilder用於構造JmsItemWriter.

KafkaItemReader

這KafkaItemReader是ItemReader一個 Apache Kafka 主題。它可以配置為從同一主題的多個分割槽讀取訊息。它將訊息偏移量儲存在執行上下文中以支援重新啟動功能。Spring Batch 提供了一個 KafkaItemReaderBuilder用於構造KafkaItemReader.

KafkaItemWriter

這KafkaItemWriter是ItemWriterApache Kafka 的一個,它使用一個KafkaTemplate將事件傳送到預設主題。Spring Batch 提供了一個KafkaItemWriterBuilder用於構造KafkaItemWriter.

資料庫閱讀器

Spring Batch 提供以下資料庫閱讀器:

Neo4jItemReader

這Neo4jItemReader是一個ItemReader通過使用分頁技術從圖形資料庫 Neo4j 中讀取物件的方法。Spring Batch 提供了一個Neo4jItemReaderBuilder用於構造Neo4jItemReader.

MongoItemReader

這MongoItemReader是一個ItemReader使用分頁技術從 MongoDB 讀取文件的方法。Spring Batch 提供了一個MongoItemReaderBuilder用於構造MongoItemReader.

HibernateCursorItemReader

這HibernateCursorItemReader是一個ItemStreamReader用於讀取建立在 Hibernate 之上的資料庫記錄。它執行 HQL 查詢,然後在初始化時,在呼叫方法時迭代結果集read(),依次返回與當前行對應的物件。Spring Batch 提供了一個 HibernateCursorItemReaderBuilder用於構造 HibernateCursorItemReader.

HibernatePagingItemReader

這HibernatePagingItemReader是一個ItemReader用於讀取建立在 Hibernate 之上的資料庫記錄,並且一次只能讀取固定數量的專案。Spring Batch 提供了一個HibernatePagingItemReaderBuilder用於構造 HibernatePagingItemReader.

RepositoryItemReader

RepositoryItemReader是一個ItemReader使用 a 讀取記錄的 a PagingAndSortingRepository。Spring Batch 提供了一個RepositoryItemReaderBuilder用於構造RepositoryItemReader.

資料庫編寫器

Spring Batch 提供以下資料庫編寫器:

Neo4jItemWriter

這Neo4jItemWriter是一個ItemWriter寫入 Neo4j 資料庫的實現。Spring Batch 提供了一個Neo4jItemWriterBuilder用於構造 Neo4jItemWriter.

MongoItemWriter

這MongoItemWriter是一個ItemWriter使用 Spring Data 的實現寫入 MongoDB 儲存的實現MongoOperations。Spring Batch 提供了一個 MongoItemWriterBuilder用於構造MongoItemWriter.

RepositoryItemWriter

這RepositoryItemWriter是來自 Spring Data的ItemWriter包裝器。CrudRepositorySpring Batch 提供了一個RepositoryItemWriterBuilder用於構造RepositoryItemWriter.

HibernateItemWriter

它HibernateItemWriter使用ItemWriterHibernate 會話來儲存或更新不屬於當前 Hibernate 會話的實體。Spring Batch 提供了一個HibernateItemWriterBuilder用於構造HibernateItemWriter.

JdbcBatchItemWriter

這JdbcBatchItemWriter是一個ItemWriter使用批處理功能 NamedParameterJdbcTemplate為所有提供的專案執行批處理語句的方法。Spring Batch 提供了一個JdbcBatchItemWriterBuilder用於構造 JdbcBatchItemWriter.

JpaItemWriter

這JpaItemWriter是一個ItemWriter使用 JPAEntityManagerFactory來合併不屬於永續性上下文的任何實體。Spring Batch 提供了一個 JpaItemWriterBuilder用於構造JpaItemWriter.

GemfireItemWriter

GemfireItemWriter是一個ItemWriter使用 aGemfireTemplate將 GemFire 中的專案作為鍵/值對儲存的一個。Spring Batch 提供了一個GemfireItemWriterBuilder 用於構造GemfireItemWriter.

專業讀者

Spring Batch 提供以下專業閱讀器:

LdifReader

從LdifReadera 讀取 LDIF(LDAP 資料交換格式)記錄Resource,解析它們,並LdapAttribute為每個read執行返回一個物件。Spring Batch 提供了一個LdifReaderBuilder用於構造LdifReader.

MappingLdifReader

從MappingLdifReadera 讀取 LDIF(LDAP 資料交換格式)記錄 Resource,解析它們,然後將每個 LDIF 記錄對映到 POJO(普通舊 Java 物件)。每次讀取都會返回一個 POJO。Spring Batch 提供了一個MappingLdifReaderBuilder用於構造MappingLdifReader.

AvroItemReader

從資源中AvroItemReader讀取序列化的 Avro 資料。每次讀取都會返回一個由 Java 類或 Avro Schema 指定的型別的例項。可以選擇將閱讀器配置為嵌入或不嵌入 Avro 模式的輸入。Spring Batch 提供了一個AvroItemReaderBuilder用於構造AvroItemReader.

專業作家

Spring Batch 提供以下專業編寫器:

SimpleMailMessageItemWriter

SimpleMailMessageItemWriter是一個ItemWriter可以傳送郵件的訊息。它將訊息的實際傳送委託給MailSender. Spring Batch 提供了一個SimpleMailMessageItemWriterBuilder用於構造 SimpleMailMessageItemWriter.

AvroItemWriter

根據AvroItemWrite給定的型別或模式將 Java 物件序列化為 WriteableResource。可以選擇將編寫器配置為在輸出中嵌入或不嵌入 Avro 模式。Spring Batch 提供了一個AvroItemWriterBuilder用於構造AvroItemWriter.

專用處理器

Spring Batch 提供以下專用處理器:

ScriptItemProcessor

這ScriptItemProcessor是一個ItemProcessor將要處理的當前專案傳遞給提供的指令碼,並且指令碼的結果由處理器返回。Spring Batch 提供了一個ScriptItemProcessorBuilder用於構造 ScriptItemProcessor.

程式碼位置: http://github.com/jackssybin/jackssybin_springBatch