從零開始學YC-Framework之Gobrs-Async

語言: CN / TW / HK

一、Gobrs-Async是什麼?

Gobrs-Async是一款功能強大、配置靈活、帶有全鏈路異常回調、內存優化、異常狀態管理於一身的高性能異步編排框架。為企業提供在複雜應用場景下動態任務編排的能力。 針對於複雜場景下,異步線程複雜性、任務依賴性、異常狀態難控制性; Gobrs-Async為此而生。

二、Gobrs-Async能解決什麼問題?

能解決CompletableFuture所不能解決的問題。 怎麼理解呢?

傳統的Future、CompleteableFuture一定程度上可以完成任務編排,並可以把結果傳遞到下一個任務。如CompletableFuture有then方法,但是卻無法做到對每一個執行單元的回調。譬如A執行完畢成功了,後面是B,我希望A在執行完後就有個回調結果,方便我監控當前的執行狀況,或者打個日誌什麼的。失敗了,我也可以記錄個異常信息什麼的。

此時,CompleteableFuture就無能為力了。

Gobrs-Async框架提供了這樣的回調功能。並且,如果執行成功、失敗、異常、超時等場景下都提供了管理線程任務的能力!

三、Gobrs-Async的場景概述是什麼?

1.場景一

説明:

任務A執行完了之後,繼續執行B、C、D。

2.場景二

説明:

任務A執行完了之後執行B然後再執行C、D。

3.場景三

説明:

任務A 執行完了之後執行B、E 然後按照順序 B的流程走C、D、G。 E的流程走F、G

四、Gobrs-Async的核心能力包含哪些?

五、Gobrs-Async與其它多任務異步編排框架對比的結果是怎樣的?

六、Gobrs-Async能解決哪些問題?

  • 1.客户端請求服務端接口,該接口需要調用其他N個微服務的接口。
  • 2.並行執行N個任務,後續根據這1-N個任務的執行結果來決定是否繼續執行下一個任務。
  • 3.需要進行線程隔離的多批次任務。
  • 4.單機工作流任務編排。
  • 5.其他有順序編排的需求。

七、Gobrs-Async具有怎樣的特性?

Gobrs-Async 在開發時考慮了眾多使用者的開發喜歡,對異常處理的使用場景。並被運用到電商生產環境中,在京東經歷這嚴酷的高併發考驗。同時框架中 極簡靈活的配置、全局自定義可中斷全流程異常、內存優化、靈活的接入方式、提供SpringBoot Start 接入方式。更加考慮使用者的開發習慣。僅需要注入GobrsTask的Spring Bean 即可實現全流程接入。

Gobrs-Async 項目目錄及其精簡:

  • 1.gobrs-async-example:Gobrs-Async 接入實例,提供測試用例。
  • 2.gobrs-async-starter:Gobrs-Async 框架核心組件。

Gobrs-Async在設計時,就充分考慮了開發者的使用習慣,沒有依賴任何中間件。對併發框架做了良好的封裝。主要使用 CountDownLatch 、ReentrantLock 、volatile 等一系列併發技術開發設計。

八、Gobrs-Async的整體架構是怎樣的?

1.任務觸發器

任務流的啟動者,負責啟動任務執行流。

2.規則解析引擎

負責解析使用者配置的規則,同時於Spring結合,將配置的 Spring Bean 解析成 TaskBean,進而通過解析引擎加載成 任務裝飾器。進而組裝成任務樹。

3.任務啟動器

負責通過使用解析引擎解析的任務樹。結合 JUC 併發框架調度實現對任務的統一管理,核心方法有

  • (1)trigger 觸發任務加載器,為加載任務準備環境。

4.任務加載器

負責加載任務流程,開始調用任務執行器執行核心流程:

  • (1)load 核心任務流程方法,在這裏阻塞等待整個任務流程。
  • (2)getBeginProcess 獲取子任務開始流程。
  • (3)completed 任務完成。
  • (4)errorInterrupted 任務失敗 中斷任務流程。
  • (5)error 任務失敗。

5.任務執行器

最終的任務執行,每一個任務對應一個TaskActuator 任務的 攔截、異常、執行、線程複用 等必要條件判斷都在這裏處理

  • (1)prepare 任務前置處理。
  • (2)preInterceptor 統一任務前置處理。
  • (3)task 核心任務方法,業務執行內容。
  • (4)postInterceptor 統一後置處理。
  • (5)onSuccess 任務執行成功回調。
  • (6)onFail 任務執行失敗回調。

6.任務總線

任務流程傳遞總線,包括 請求參數、任務加載器、 響應結果, 該對象暴露給使用者,拿到匹配業務的數據信息,例如: 返回結果、主動中斷任務流程等功能 需要任務總線(TaskSupport)支持。

九、Gobrs-Async的核心類圖是怎樣的?

十、在YC-Framework中如何使用Gobrs-Async?

1.引入依賴

<dependency>
    <groupId>com.yc.framework</groupId>
    <artifactId>yc-common-gobrs-async</artifactId>
</dependency>

2.編寫配置類

@Configuration
public class ThreadPoolConfig {

    @Autowired
    private GobrsAsyncThreadPoolFactory factory;

    /**
     * Gobrs thread pool executor.
     */
    @PostConstruct
    public void gobrsThreadPoolExecutor(){
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(300, 500, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue());
        factory.setThreadPoolExecutor(threadPoolExecutor);
    }

}

3.編寫多個Task(具體參照YC-Framework對應的Example)

4.編寫Service

@Service
public class GobrsService {

    @Autowired
    private AService aService;

    @Autowired
    private BService bService;
    
    @Autowired
    private CService cService;
    
    @Autowired
    private DService dService;

    @Autowired
    private EService eService;

    @Autowired
    private FService fService;

    @Autowired
    private GService gService;

    @Resource
    private GobrsAsync gobrsAsync;

    @Resource
    private RuleThermalLoad ruleThermalLoad;

    /**
     * The Executor service.
     */
    ExecutorService executorService = Executors.newCachedThreadPool();


    /**
     * Gobrs async.
     */
    public void gobrsAsync() {
        gobrsAsync.go("test", () -> new Object());
    }


    /**
     * Future.
     */
    public void future() {
        List<AsyncTask> abList = new ArrayList<>();
        abList.add(aService);
        abList.add(bService);
        List<Future> futures = new ArrayList<>();
        for (AsyncTask task : abList) {
            Future<Object> submit = executorService.submit(() -> task.task(new Object(), null));
            futures.add(submit);
        }

        for (Future future : futures) {
            try {
                Object o = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        List<AsyncTask> cdList = new ArrayList<>();
        cdList.add(cService);
        cdList.add(dService);
        List<Future> futurescd = new ArrayList<>();
        for (AsyncTask task : cdList) {
            Future<Object> submit = executorService.submit(() -> task.task(new Object(), null));
            futurescd.add(submit);
        }

        for (Future future : futurescd) {
            try {
                Object o = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }


        List<AsyncTask> efList = new ArrayList<>();
        efList.add(eService);
        efList.add(fService);
        List<Future> futuresef = new ArrayList<>();
        for (AsyncTask task : efList) {
            Future<Object> submit = executorService.submit(() -> task.task(new Object(), null));
            futuresef.add(submit);
        }

        for (Future future : futuresef) {
            try {
                Object o = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }


        Future<Object> submit = executorService.submit(() -> gService.task(new Object(), null));
        try {
            submit.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


    }

    /**
     * Update rule.
     *
     * @param rule the rule
     */
    public void updateRule(Rule rule) {
        Rule r = new Rule();
        r.setName("ruleName");
        r.setContent("AService->CService->EService->GService; BService->DService->FService->HService;");
        ruleThermalLoad.load(rule);
    }
}

5.編寫攔截器

@Slf4j
@Component
public class GobrsInterceptor implements AsyncTaskExceptionInterceptor {


    @SneakyThrows
    @Override
    public void exception(ErrorCallback errorCallback) {

        log.error("Execute global interceptor  error{}", JsonUtil.obj2String(errorCallback.getThrowable()));
    }
}

6.編寫Controller

@RestController
@RequestMapping("gobrs")
public class GobrsController {

    @Resource
    private GobrsAsync gobrsAsync;

    @Autowired
    private GobrsService gobrsService;

    /**
     * Gobrs test string.
     *
     * @return the string
     */
    @RequestMapping("testGobrs")
    public String gobrsTest() {
        Map<Class, Object> params = new HashMap<>();
        params.put(AService.class, "A的參數");
        AsyncResult test = gobrsAsync.go("test", () -> params);
        return "success";
    }


    /**
     * Future.
     */
    @RequestMapping("future")
    public void future() {
        long start = System.currentTimeMillis();
        gobrsService.future();

        long coust = System.currentTimeMillis() - start;
        System.out.println("future " + coust);

    }


    /**
     * Sets gobrs async.
     */
    @RequestMapping("gobrsAsync")
    public void setGobrsAsync() {
        //開始時間: 獲取當前時間毫秒數
        long start = System.currentTimeMillis();
        gobrsService.gobrsAsync();
        //結束時間: 當前時間 - 開始時間
        long coust = System.currentTimeMillis() - start;
        System.out.println("gobrs-Async " + coust);

    }

    /**
     * Update rule.
     *
     * @param rule the rule
     */
    @RequestMapping("updateRule")
    public void updateRule(@RequestBody Rule rule) {
        gobrsService.updateRule(rule);
    }
}

YC-Framework官網:

https://framework.youcongtech.com/

YC-Framework Github源代碼:

https://github.com/developers-youcong/yc-framework

YC-Framework Gitee源代碼:

https://gitee.com/developers-youcong/yc-framework

以上源代碼均已開源,開源不易,如果對你有幫助,不妨給個star!!!