Flink+ice 實現視覺化規則編排與靈活配置(Demo)

語言: CN / TW / HK

ice文件站:http://124.221.148.247/zh

1 Demo倉庫地址:

github:http://github.com/zjn-zjn/flink-ice

gitee:http://gitee.com/waitmoon/flink-ice

2 Demo功能描述

通過netcat製造輸入流(nc -l 9000 windows:nc -l -p 9000)

flink接收本地9000埠輸入流,以回車(\n)分割單詞

輸入流經過IceProcessor處理後列印結果流

3 專案搭建

使用flink-quickstart-java快速搭建flink專案

3.1 新增ice依賴

因flink為非Spring專案,需依賴ice-core並手動初始化,Spring專案直接依賴ice-client-spring-boot-starter即可

<dependency>
   <groupId>com.waitmoon.ice</groupId>
   <artifactId>ice-core</artifactId>
   <version>${ice.version}</version>
</dependency>

3.2 編寫StreamingJob

public class StreamingJob {
    public static void main(String[] args) throws Exception {
        // 建立 Flink 執行環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //接收本地socket9000埠輸入流,以回車分割單詞
        //通過netcat製造輸入流 nc -l 9000 (windows nc -l -p 9000)
        DataStreamSource<String> stream = env.socketTextStream("localhost", 9000, "\n");
        //按照單詞長度keyBy,使用IceProcessor並列印結果
        stream.keyBy(String::length).process(new IceProcessor()).print().setParallelism(1);
        //執行程式
        env.execute("Flink Streaming Java API Skeleton");
    }
}

3.3 編寫ice運算元IceProcessor

在static程式碼塊中初始化ice客戶端,此處直接使用的自己部署的ice-server地址對應的app:2

運算元功能: 將流內資料放入roam,組裝pack並執行ice規則處理(直接根據iceId觸發,iceId在server配置後臺獲取)

/**
 * ice運算元
 */
public class IceProcessor extends KeyedProcessFunction<Integer, String, String> {
    //ice 客戶端
    private static IceNioClient iceNioClient;
    static {
        //初始化ice客戶端
        try {
            //配置遠端server地址,app,以及節點掃描路徑
            //此處使用了自己搭建的server,後臺地址 http://eg.waitmoon.com/config/list/2
            iceNioClient = new IceNioClient(2, "waitmoon.com:18121", "com.waitmoon.flink.ice.node");
            //啟動ice客戶端
            iceNioClient.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        //組裝IcePack
        IcePack pack = new IcePack();
        //設定要觸發的iceId(配置後臺中需要觸發的ID)
        //http://eg.waitmoon.com/config/detail/2/1081
        pack.setIceId(1081);
        //初始化roam,將單詞和長度放入roam中
        IceRoam roam = new IceRoam();
        roam.put("input", value);
        roam.put("length", ctx.getCurrentKey());
        pack.setRoam(roam);
        //同步執行
        Ice.syncProcess(pack);
        //執行完成後,獲取roam中的result
        String result = roam.getMulti("result");
        if (result != null) {
            //result不為空,將結果放入下游運算元
            out.collect(result);
        }
    }
    @Override
    public void close() {
        if (iceNioClient != null) {
            //清理ice 客戶端
            iceNioClient.destroy();
            iceNioClient = null;
        }
    }
}

3.4 編寫節點ContainsFlow

節點功能: 判斷根據key去roam裡拿的值是否在set中,是則返回true,否則返回false

/**
 * @author waitmoon
 * 過濾性質節點
 * 判斷值在不在集合中
 */
@Data
@Slf4j
@EqualsAndHashCode(callSuper = true)
public class ContainsFlow extends BaseLeafRoamFlow {
    //預設input
    private String key = "input";
    private Set<String> set;
    @Override
    protected boolean doRoamFlow(IceRoam roam) {
        //判斷roam中的key對應的值是否在集合中
        return set.contains(roam.<String>getMulti(key));
    }
    @Override
    public void afterPropertiesSet() {
        log.info("ContainsFlow init with key:{}, set:{} nodeId:{}", key, set, this.getIceNodeId());
    }
    public NodeRunStateEnum errorHandle(IceContext ctx, Throwable t) {
        log.error("error occur id:{} e:", this.findIceNodeId(), t);
        return super.errorHandle(ctx, t);
    }
}

3.5 編寫節點PutNone

節點功能: 將value值放入roam的key中,不干擾流程(不返回true/false)

/**
 * @author waitmoon
 * 不干擾流程性質節點
 * 將一個值放入roam
 */
@Data
@EqualsAndHashCode(callSuper = true)
public class PutNone extends BaseLeafRoamNone {
    //預設result
    private String key = "result";

    private Object value;

    @Override
    protected void doRoamNone(IceRoam roam) {
        //將value放到roam中
        roam.putMulti(key, value);
    }
}

4 專案啟動

4.1 netcat 製造輸入流

mac/linux 使用 nc -l 9000命令,windows使用 nc -l -p 9000 命令 製造一個Socket輸入流

在這裡插入圖片描述

4.2 執行StreamingJob

執行時可以看到ice客戶端啟動相關資訊

在這裡插入圖片描述

5 編排ice規則

在ice-server後臺編輯ice規則,用的是自己部署的ice-server,地址:http://124.221.148.247:8121

5.1 新增app

在這裡插入圖片描述

5.2 新增ice

此處Debug填2表示只打印節點執行過程,pack中的iceId即為此處的ID,點選檢視詳情即可編排規則

在這裡插入圖片描述

5.3編排ice規則

在這裡插入圖片描述

此編排實現邏輯:根據不同的輸入單詞,輸出對應的結果到roam的result欄位中供後續使用

如輸入waitmoon,在管理員列表中,則輸出"you are admin~"到roam的result欄位,並最終由flink​列印

在這裡插入圖片描述

在這裡插入圖片描述

6 釋出與執行

在編排完規則後切記要釋出後才會將變更推送到客戶端並生效!!!

在終端輸入單詞並回車

在這裡插入圖片描述

在flink專案日誌裡可以看到:

在這裡插入圖片描述

ice列印了執行過程,[節點ID:節點類名簡稱:節點執行結果:節點執行耗時]

flink因為最後的sink是print(),所以列印了對應的輸出。

這時候你就可以隨意的更改與編排規則去實現自己的業務啦~~~