資料結構與演算法的實際應用(有向圖)——有依賴的任務並行處理框架

語言: CN / TW / HK

背景

一開始接到一個技改需求,需要將我們系統的一些首頁查詢介面進行優化,合併為一個聚合介面提供給前端;最初與前端同學進行交流,認為這些首頁查詢介面沒有依賴關係,因此對於我來說解決方案比較簡單,直接將需要查詢的介面進行並行處理即可:

其中每一個查詢抽象成了一個Function,在子執行緒中呼叫apply方法執行。

但是在與前端同學聯調時,發現其中一個介面的入參依賴另一個介面的結果,本想讓前端給我傳過來,但是這樣的話又有一個介面會分離出去,影響首頁載入速度,於是決定還是我後端想辦法處理;如此一來,就需要解決查詢時的前後依賴關係,一開始想了幾種方案:

  • 使用CountDownLatch進行等待
  • 拿到前一個依賴任務的Future,呼叫get方法等待
  • 有依賴的任務序列執行

前兩種方案大同小異,其實都是利用執行緒間的等待機制,待前置查詢執行完成後,喚醒當前執行緒繼續執行查詢;第三種方案則簡單粗暴,由於時間較緊,經過短暫思考後權利利弊還是決定用第三種方案,畢竟我現在只有兩個子查詢是有依賴的,這樣做也是效率最高的辦法;於是在生成查詢的supplier中做了下手腳,將有依賴關係的Function通過andThen方法聚合為一個Function,便完成了需求,這樣子查詢順序變成了(假設B依賴A):

至此,在需求上來說已經實現完成了,但是從技術角度來說不能算完整:假如新來了一個介面G,G也依賴於A,那麼按照這種方案,ABG將會序列執行,然而BG之間沒有依賴關係,它倆理應並行;又或者B依賴於A和C,則ABC會序列執行,然而AC應該並行才合理;再極端的例子,B依賴了ACEF,則整個聚合查詢都退化為了單執行緒,顯然更不合理。

所以,雖然現階段這種方案沒什麼問題,但對於複雜的依賴關係處理是遠不夠完善的,因此我又思考了有沒有更好的方式去處理,並且能夠有一定的通用性。

重構的並行任務處理方案

在此類需求中,其實比較重要的一點就是如何處理這些任務的依賴關係,而且儘可能的讓他們並行處理,由此引入了一些圖的應用。 在我的方案中,有以下幾個概念:

  • ExecutionContext:執行的上下文環境,所有子任務共用,用於處理資料交換和輸入輸出的問題,其實本質上是一個執行緒安全的Map。
  • ExecutionNode:任務節點,即一個子任務的抽象,內部包含:名稱、依賴節點名稱、可執行的Function
  • ExecutionGraph:整個任務流的執行圖,包含所有的執行節點以及排序後的節點,任務流啟動時將按照此類的描述執行。
  • ExecutionFlow:任務流的抽象,本質上也是一個Function,呼叫apply方法開始執行。
  • FlowConfiguration:任務執行的一些配置,包含前置Action、後置Action、執行緒執行器、超時時間等。

圖的生成

ExecutionGraph是將任務的執行過程抽象成了一個有向無環圖,在使用時,傳入每個節點的定義及提前定義好的執行過程(Function)即可生成一個圖。

其中節點的定義規則為:{節點名稱}[{依賴節點A},{依賴節點B},...]

若無依賴,則沒有方括號中的內容。 具體在構建執行圖時,則相當於是對這個有向圖做了一個簡單的排序(拓撲排序):

  1. 首先將所有節點加入到remainMap,preMap為空
  2. 找到沒有依賴的節點,作為第一層,並將這些節點從remainMap移除,加入到preMap中
  3. 遍歷remainMap,找到所有依賴節點均已經在preMap中出現的節點,加入到preMap,並在remainMap移除,這些元素作為下一層
  4. 迴圈第3步,直至remainMap為空

需要注意的是,第3步中,如果remainMap在篩選前後大小不變,則認為是出現了迴圈依賴,或者依賴了一個不存在的節點,即有向圖中出現了環,應丟擲異常,否則將陷入死迴圈。 核心程式碼如下:

    /**
     * 構建執行圖,實質上是對節點做排序
     *
     * @param preNodes   已處理的節點
     * @param remainNode 剩餘節點
     * @param levels     有序節點(結果集)
     */
    private static void buildGraph(Map<String, ExecutionNode> preNodes, Map<String, ExecutionNode> remainNode, List<List<ExecutionNode>> levels) {

        // 若剩餘節點為空,結束
        if (remainNode.isEmpty()) {
            return;
        }

        List<String> currentLevel;
        // 記錄剩餘節點個數
        int remainSize = remainNode.size();

        if (preNodes.isEmpty()) {
            // 若preNodes為空,則為第一層,找出無依賴的節點,即入度為0的節點
            currentLevel = remainNode.values()
                    .stream()
                    .filter(e -> e.getDepend().length == 0)
                    .map(ExecutionNode::getName)
                    .collect(Collectors.toList());
            // 若沒有無依賴的節點,丟擲異常
            if (currentLevel.isEmpty()) {
                throw new IllegalStateException("could not found init node");
            }
        } else {
            // 繼續尋找前置節點已處理的節點,相當於繼續尋找刪除前面的節點依賴後,入度為0的節點
            currentLevel = remainNode.entrySet()
                    .stream()
                    .filter(e -> Stream.of(e.getValue().getDepend()).allMatch(preNodes::containsKey))
                    .map(Map.Entry::getKey)
                    .collect(Collectors.toList());
        }

        List<ExecutionNode> nodes = new ArrayList<>(currentLevel.size());
        // currentLevel中為當前層級的節點,將其加入preNodes中記錄起來,並從剩餘節點中移除
        for (String name : currentLevel) {
            nodes.add(remainNode.get(name));
            preNodes.put(name, remainNode.remove(name));
        }

        // 若為剩餘節點數量不變,說明圖中有環形結構,或依賴了未定義的節點
        if (remainNode.size() == remainSize) {
            throw new IllegalStateException("there may be circular dependencies or dependency does not exist in your graph, exception nodes is: " + remainNode.values());
        }

        // 放入結果集
        levels.add(nodes);
        // 遞迴至下一層
        buildGraph(preNodes, remainNode, levels);
    }

示例 簡單介紹個例子,對於如下節點定義:

A
B
C
D
E[A,B]
F[A,E]
G[C]
H[B,C]
I[G,H]
J[I]
K[D,E]
L[E,H]

則對應的圖為:

這樣,所有的執行任務被分為了四層,每一層之間的任務是互不依賴的,同時下一層的執行依賴上一層。

任務節點執行方案

按照這種圖的排序演算法分割之後,剩下的就比較簡單了,我能想到的有兩種方式:

  1. 每一層並行執行,全部完成之後執行下一層
  2. 利用執行緒間等待機制,等待前置任務執行完成通知當前執行緒

第一種我覺得不算很好,例如上圖,考慮如下情況:L的執行時間較長,大於I和J的執行時間總和,即 t(L) > t(I) + t(J),則J完全沒必要等待第三層全部執行完成,當I結束之後J就可以執行,若t(J) > t(L)時,更是浪費了較多時間。 所以我採用了CompleteFuture的get方法阻塞有依賴的執行緒,只要前置任務執行完畢,當前任務就可以開始,而不必等待上一層所有的任務執行完畢。

方案如下:

  1. 遍歷每一層節點

    a. 對於當前層級的每一個節點,生成CompleteFuture,在構造Supplier時,先從futureMap獲取所有的依賴節點的Future,迴圈呼叫其get方法,阻塞當前執行緒(若已經執行完成則直接返回結果,此處也可以使用allOf.get()方法代替)

    b. 將構造完成的CompleteFuture加入futureMap中

  2. 將futureMap所有的CompleteFuture取出來,使用allOf(...).get()方法等待全部執行完成。

理論上每一個節點只要前置執行完成,它就會開始,但是實際上,由於執行緒池排程等原因,可能還會在阻塞佇列中等待一段時間,但是站在系統資源分配的角度,可以接受其帶來的影響。 實現的核心程式碼:(其實這種傳入引數並生成新函式的方式還有一個高大上的名字:函式柯里化?=_= )

    /**
     * 生成一個執行任務,返回的是一個新函式
     *
     * @param executionGraph    執行任務圖
     * @param flowConfiguration 執行引數
     * @return
     */
    static ExecutionFlow buildFlow(@NonNull ExecutionGraph executionGraph, @NonNull FlowConfiguration flowConfiguration) {
        SpelExpressionParser expressionParser = new SpelExpressionParser();
        return ctx -> {
            long startTime = System.currentTimeMillis();
            List<List<ExecutionNode>> sortedNodes = executionGraph.getSortedNodes();
            Map<String, CompletableFuture<ExecutionContext>> futureMap = new HashMap<>(16);
            // 按圖序構建CompleteFuture
            for (List<ExecutionNode> nodes : sortedNodes) {
                // 同一層級之間的節點沒有依賴,且依賴的CompletableFuture一定已經構建完成
                for (ExecutionNode node : nodes) {
                    String currNodeName = node.getName();
                    CompletableFuture<ExecutionContext> future = CompletableFuture.supplyAsync(() -> {
                        try {
                            long t1 = System.currentTimeMillis();
                            // 前置操作
                            ofNullable(flowConfiguration.getPreAction()).ifPresent(c -> c.accept(ctx));
                            String[] depend = node.getDepend();
                            // 遍歷依賴節點,確保依賴節點已經執行完成
                            for (String s : depend) {
                                try {
                                    // 阻塞等待
                                    futureMap.get(s).get();
                                    logger.info("current node [{}] waiting node [{}] done ({}ms)", currNodeName, s, System.currentTimeMillis() - t1);
                                } catch (InterruptedException | ExecutionException e) {
                                    throw new IllegalStateException("current node [" + currNodeName + "] waiting node [" + s + "] failed", e);
                                }
                            }
                            long t2 = System.currentTimeMillis();
                            logger.info("current node [{}] start", currNodeName);

                            // 前置表示式引數檢查
                            //noinspection ConstantConditions
                            if (Objects.nonNull(node.getBeforeCheckExpression()) &&
                                !node.getBeforeCheckExpression().getValue(ctx.getCtx(), boolean.class)) {
                                throw new IllegalStateException(String.format("before check failed by expression [%s]"
                                        , node.getBeforeCheckExpression().getExpressionString()));
                            }
                            // 執行節點
                            ExecutionContext ret = node.getExecution().apply(ctx);
                            // 用於標識節點是否正確執行
                            ctx.setVar("$" + currNodeName + "_success", true);

                            // 後置表示式引數檢查
                            //noinspection ConstantConditions
                            if (Objects.nonNull(node.getAfterCheckExpression()) &&
                                !node.getAfterCheckExpression().getValue(ctx.getCtx(), boolean.class)) {
                                throw new IllegalStateException(String.format("after check failed by expression [%s]"
                                        , node.getAfterCheckExpression().getExpressionString()));
                            }
                            long tl = System.currentTimeMillis();
                            logger.info("current node [{}] execute done (total: {}ms, execution: {}ms)", currNodeName, tl - t1, tl - t2);
                            return ret;
                        } catch (Throwable e) {
                            logger.warn("current node [{}] execute failed: {}, skip exception={}", currNodeName, e.getMessage(), node.isSkipOnFail(), e);
                            if (node.isSkipOnFail()) {
                                return ctx;
                            }
                            throw e;
                        } finally {
                            // 後置操作
                            ofNullable(flowConfiguration.getFinalAction()).ifPresent(c -> c.accept(ctx));
                        }
                    }, flowConfiguration.getExecutor() == null ? ForkJoinPool.commonPool() : flowConfiguration.getExecutor());
                    // 將構建好的CompletableFuture放入Map
                    futureMap.put(node.getName(), future);
                }
            }
            try {
                // 組合所有的Future,等待所有節點行完畢
                CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0]));
                logger.info("waiting flow execution down, ttl={}", flowConfiguration.getTimeout());
                if (flowConfiguration.getTimeout() > 0) {
                    completableFuture.get(flowConfiguration.getTimeout(), TimeUnit.SECONDS);
                } else {
                    completableFuture.get();
                }
                logger.info("execution flow success ({}ms)", System.currentTimeMillis() - startTime);
                return ctx;
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                logger.warn("execution flow failed: {} ({}ms)", e.getMessage(), System.currentTimeMillis() - startTime);
                throw new IllegalStateException(e);
            }
        };
    }

擴充套件功能

雖然已經完成了我們需要的功能,但是還不夠完善,例如在實際執行每一個節點時,有的節點可以容忍失敗,有的不可以,或者想在節點執行前後能檢查一些引數是否滿足要求,等等,所以我在這裡又對節點定義的表示式做了一些增強,以此來支援一些額外的操作,同時也能在後續進行擴充套件:

在原有的基礎上做一下改造:

{節點名稱}[{依賴節點A},{依賴節點B}, ...]({擴充套件操作A名稱}({表示式}), {擴充套件操作A名稱}({表示式}), ...)

是否忽略失敗

用於在當前節點執行發生異常的時候,是否將異常丟擲,還是忽略這個異常

副檔名稱:skipOnFail

示例:E[A,B](skipOnFail(true))

......
} catch (Throwable e) {
  log.warn("current node [{}] execute failed: {}, skip exception={}", currNodeName, e.getMessage(), node.isSkipOnFail(), e);
  if (node.isSkipOnFail()) {
    return ctx;
  }
	throw e;
} finally {
......

引數檢查

在針對有依賴的場景下,當前節點如何知道所依賴的節點是否執行正常呢,若有異常,我們可以通過異常得知執行失敗,但是有時候不會有異常丟擲(例如呼叫的子方法有攔截器的時候,有可能在攔截器中會處理掉異常資訊),需要檢查資料才能得知是否正確執行,如果把這個檢查的操作由節點內部定義的函式來做,似乎是比較繁瑣的,而且這一部分我覺得由框架來統一處理掉是比較合適的,我想到的方案是用表示式的形式來檢查引數: 給每個節點增加一個前置和後置的表示式,表示式的返回值是boolean型別,若為false則丟擲異常;

表示式結構:{節點名稱}[{依賴節點A},{依賴節點B}, ...](beforeCheck({布林表示式}), afterCheck({布林表示式}))

示例:J[I](beforeCheck([c] == 0 && [k] == 9 && [c] != null), afterCheck([a] == 1))

表示式遵循SpringEL語法,解析時的上下文為ExecutionContext中的變數

執行樣例

執行樣例程式碼:

public static void main(String[] args) {
    Random random = new Random();
    // 簡單對映一個函式集合,隨機暫停一段時間,模擬函式執行,部分函式裡面做了一些簡單的值的設定,模擬結果輸出
    Map<String, Function<ExecutionContext, ExecutionContext>> functionMap = Arrays.stream("A,B,C,D,E,F,G,H,I,J,K,L".split(",")).collect(Collectors.toMap(
            e -> e,
            e -> ctx -> {
                try {
                    int i = 500 + 500 * random.nextInt(5);
                    Thread.sleep(i);
                    ctx.setVar(e, i);
                    switch (e) {
                        case "A":
                            ctx.setVar("a", 1);
                            break;
                        case "B":
                            ctx.setVar("a", 0);
                            break;
                        case "C":
                            ctx.setVar("c", 3);
                            break;
                        case "G":
                            ctx.setVar("g", 9);
                            ctx.setVar("a", 1);
                            break;
                    }
                } catch (InterruptedException ignored) {
                }
                return ctx;
            }
    ));

    // 構造一個執行圖的定義
    ExecutionGraph graph = ExecutionGraph.createGraph(new ArrayList<>() {{
        add("A");
        add("B[A](beforeCheck([a] == 1))");
        add("C");
        add("D");
        add("E[A,B](beforeCheck([a]==2); skipOnFail(true))"); // 該節點會發生異常,但是配置了跳過
        add("F[A,E]");
        add("G[C]");
        add("H[B,C]");
        add("I[G,H]");
        add("J[I](beforeCheck( T(java.util.Arrays).asList(1,2,3).contains([c]) and [g] == [c]*[c] and [c] != null); afterCheck([c] >= 1))");
        add("K[D,E]");
        add("L[E,H]");
    }}, functionMap);

    // CompletableFuture預設使用的ForkJoinPool.commonPool,可以試下使用cachedThreadPool會有什麼不一樣?
    // ExecutorService executorService = Executors.newCachedThreadPool();
    ExecutionFlow executionFlow = ExecutionFlow.buildFlow(graph);
    executionFlow.apply(new ExecutionContext());
}

執行日誌:

/Library/Java/JavaVirtualMachines/jdk-11.0.8.jdk/Contents/Home/bin/java -Dvisualvm.id=76728482855487 -javaagent:/Users/windlively/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/213.6461.79/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=55008:/Users/windlively/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/213.6461.79/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/windlively/IdeaProjects/leisure/frame/target/classes:/Users/windlively/.m2/repository/org/projectlombok/lombok/1.18.10/lombok-1.18.10.jar:/Users/windlively/.m2/repository/org/springframework/spring-expression/5.2.2.RELEASE/spring-expression-5.2.2.RELEASE.jar:/Users/windlively/.m2/repository/org/springframework/spring-core/5.2.2.RELEASE/spring-core-5.2.2.RELEASE.jar:/Users/windlively/.m2/repository/org/springframework/spring-jcl/5.2.2.RELEASE/spring-jcl-5.2.2.RELEASE.jar:/Users/windlively/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.2.2.RELEASE/spring-boot-starter-logging-2.2.2.RELEASE.jar:/Users/windlively/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar:/Users/windlively/.m2/repository/ch/qos/logback/logback-core/1.2.3/logback-core-1.2.3.jar:/Users/windlively/.m2/repository/org/slf4j/slf4j-api/1.7.29/slf4j-api-1.7.29.jar:/Users/windlively/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.12.1/log4j-to-slf4j-2.12.1.jar:/Users/windlively/.m2/repository/org/apache/logging/log4j/log4j-api/2.12.1/log4j-api-2.12.1.jar:/Users/windlively/.m2/repository/org/slf4j/jul-to-slf4j/1.7.29/jul-to-slf4j-1.7.29.jar:/Users/windlively/.m2/repository/org/apache/commons/commons-lang3/3.9/commons-lang3-3.9.jar ink.windlively.frame.parallelprocessor.Main
12:38:04.122 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionGraph - start create graph, node definition: [A, B[A<-](beforeCheck([a] == 1)), C, D, E[A<-,B<-](beforeCheck([a]==2),skipOnFail(true)), F[A<-,E<-], G[C<-], H[B<-,C<-], I[G<-,H<-], J[I<-](beforeCheck(T(java.util.Arrays).asList(1,2,3).contains([c]) and [g] == [c]*[c] and [c] != null),afterCheck([c] >= 1)), K[D<-,E<-], L[E<-,H<-]]
12:38:04.165 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionGraph - build execution graph success: 
A		C		D
B[A<-](beforeCheck([a] == 1))		G[C<-]
E[A<-,B<-](beforeCheck([a]==2),skipOnFail(true))		H[B<-,C<-]
F[A<-,E<-]		I[G<-,H<-]		K[D<-,E<-]		L[E<-,H<-]
J[I<-](beforeCheck(T(java.util.Arrays).asList(1,2,3).contains([c]) and [g] == [c]*[c] and [c] != null),afterCheck([c] >= 1))

12:38:04.183 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - waiting flow execution down, ttl=0
12:38:04.183 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [C] start
12:38:04.184 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [A] start
12:38:04.183 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [D] start
12:38:05.189 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [D] execute done (total: 1006ms, execution: 1006ms)
12:38:05.687 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [A] execute done (total: 1504ms, execution: 1503ms)
12:38:05.689 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] waiting node [A] done (498ms)
12:38:05.689 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [B] waiting node [A] done (1506ms)
12:38:05.689 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] waiting node [A] done (1505ms)
12:38:05.690 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [B] start
12:38:06.688 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [C] execute done (total: 2505ms, execution: 2505ms)
12:38:06.689 [ForkJoinPool.commonPool-worker-11] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [G] waiting node [C] done (2505ms)
12:38:06.689 [ForkJoinPool.commonPool-worker-11] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [G] start
12:38:06.689 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] waiting node [D] done (0ms)
12:38:07.247 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [B] execute done (total: 3064ms, execution: 1557ms)
12:38:07.248 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] waiting node [B] done (3064ms)
12:38:07.248 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] waiting node [B] done (3064ms)
12:38:07.248 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] start
12:38:07.248 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] waiting node [C] done (3064ms)
12:38:07.248 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] start
12:38:07.261 [ForkJoinPool.commonPool-worker-15] WARN ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] execute failed: before check failed by expression [[a]==2], skip exception=true
java.lang.IllegalStateException: before check failed by expression [[a]==2]
	at ink.windlively.frame.parallelprocessor.ExecutionFlow.lambda$buildFlow$2(ExecutionFlow.java:81)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
12:38:07.262 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] waiting node [E] done (2071ms)
12:38:07.262 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] start
12:38:07.262 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] waiting node [E] done (573ms)
12:38:07.262 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] waiting node [E] done (14ms)
12:38:07.262 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] start
12:38:07.767 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] execute done (total: 2576ms, execution: 505ms)
12:38:08.194 [ForkJoinPool.commonPool-worker-11] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [G] execute done (total: 4010ms, execution: 1505ms)
12:38:08.195 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] waiting node [G] done (2506ms)
12:38:09.266 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] execute done (total: 2577ms, execution: 2004ms)
12:38:09.749 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] execute done (total: 5565ms, execution: 2501ms)
12:38:09.750 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] waiting node [H] done (2502ms)
12:38:09.750 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] waiting node [H] done (4062ms)
12:38:09.751 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] start
12:38:09.751 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] start
12:38:10.753 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] execute done (total: 3505ms, execution: 1002ms)
12:38:11.752 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] execute done (total: 6064ms, execution: 2001ms)
12:38:11.753 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [J] waiting node [I] done (4491ms)
12:38:11.753 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [J] start
12:38:12.301 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [J] execute done (total: 5039ms, execution: 548ms)
12:38:12.301 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - execution flow success (8130ms)

程序已結束,退出程式碼0

後記

在引入SpringEL表示式之後,其擴充套件性已經非常強了,Spring表示式支援Java的許多語法、Spring Bean的呼叫、方法呼叫等,例如可以僅用配置表示式的方式,在表示式中通過Bean呼叫的方式定義任務執行過程,並自動設定出入參,配置出一個任務流程,而無需再在程式碼中預先定義好functionMap。

該小框架已經上線穩定執行一段時間了,支撐著我們業務APP首頁資料大量的查詢請求,將原有幾秒鐘的介面總耗時將至平均300ms左右,但仍有許多可以改進的地方,也歡迎提出意見,共同探討。

程式碼地址

GitHub:https://github.com/windlively/leisure/tree/master/frame/src/main/java/ink/windlively/frame/parallelprocessor

Gitee:https://gitee.com/windlively/leisure/tree/master/frame/src/main/java/ink/windlively/frame/parallelprocessor