Flink+ice 實現視覺化規則編排與靈活配置(Demo)
ice文件站:http://124.221.148.247/zh
1 Demo倉庫地址:
github:http://github.com/zjn-zjn/flink-ice
gitee:http://gitee.com/waitmoon/flink-ice
2 Demo功能描述
通過netcat製造輸入流(nc -l 9000 windows:nc -l -p 9000)
flink接收本地9000埠輸入流,以回車(\n)分割單詞
輸入流經過IceProcessor處理後列印結果流
3 專案搭建
使用flink-quickstart-java快速搭建flink專案
3.1 新增ice依賴
因flink為非Spring專案,需依賴ice-core並手動初始化,Spring專案直接依賴ice-client-spring-boot-starter即可
<dependency>
<groupId>com.waitmoon.ice</groupId>
<artifactId>ice-core</artifactId>
<version>${ice.version}</version>
</dependency>
3.2 編寫StreamingJob
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 建立 Flink 執行環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//接收本地socket9000埠輸入流,以回車分割單詞
//通過netcat製造輸入流 nc -l 9000 (windows nc -l -p 9000)
DataStreamSource<String> stream = env.socketTextStream("localhost", 9000, "\n");
//按照單詞長度keyBy,使用IceProcessor並列印結果
stream.keyBy(String::length).process(new IceProcessor()).print().setParallelism(1);
//執行程式
env.execute("Flink Streaming Java API Skeleton");
}
}
3.3 編寫ice運算元IceProcessor
在static程式碼塊中初始化ice客戶端,此處直接使用的自己部署的ice-server地址對應的app:2
運算元功能: 將流內資料放入roam,組裝pack並執行ice規則處理(直接根據iceId觸發,iceId在server配置後臺獲取)
/**
* ice運算元
*/
public class IceProcessor extends KeyedProcessFunction<Integer, String, String> {
//ice 客戶端
private static IceNioClient iceNioClient;
static {
//初始化ice客戶端
try {
//配置遠端server地址,app,以及節點掃描路徑
//此處使用了自己搭建的server,後臺地址 http://eg.waitmoon.com/config/list/2
iceNioClient = new IceNioClient(2, "waitmoon.com:18121", "com.waitmoon.flink.ice.node");
//啟動ice客戶端
iceNioClient.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
//組裝IcePack
IcePack pack = new IcePack();
//設定要觸發的iceId(配置後臺中需要觸發的ID)
//http://eg.waitmoon.com/config/detail/2/1081
pack.setIceId(1081);
//初始化roam,將單詞和長度放入roam中
IceRoam roam = new IceRoam();
roam.put("input", value);
roam.put("length", ctx.getCurrentKey());
pack.setRoam(roam);
//同步執行
Ice.syncProcess(pack);
//執行完成後,獲取roam中的result
String result = roam.getMulti("result");
if (result != null) {
//result不為空,將結果放入下游運算元
out.collect(result);
}
}
@Override
public void close() {
if (iceNioClient != null) {
//清理ice 客戶端
iceNioClient.destroy();
iceNioClient = null;
}
}
}
3.4 編寫節點ContainsFlow
節點功能: 判斷根據key去roam裡拿的值是否在set中,是則返回true,否則返回false
/**
* @author waitmoon
* 過濾性質節點
* 判斷值在不在集合中
*/
@Data
@Slf4j
@EqualsAndHashCode(callSuper = true)
public class ContainsFlow extends BaseLeafRoamFlow {
//預設input
private String key = "input";
private Set<String> set;
@Override
protected boolean doRoamFlow(IceRoam roam) {
//判斷roam中的key對應的值是否在集合中
return set.contains(roam.<String>getMulti(key));
}
@Override
public void afterPropertiesSet() {
log.info("ContainsFlow init with key:{}, set:{} nodeId:{}", key, set, this.getIceNodeId());
}
public NodeRunStateEnum errorHandle(IceContext ctx, Throwable t) {
log.error("error occur id:{} e:", this.findIceNodeId(), t);
return super.errorHandle(ctx, t);
}
}
3.5 編寫節點PutNone
節點功能: 將value值放入roam的key中,不干擾流程(不返回true/false)
/**
* @author waitmoon
* 不干擾流程性質節點
* 將一個值放入roam
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class PutNone extends BaseLeafRoamNone {
//預設result
private String key = "result";
private Object value;
@Override
protected void doRoamNone(IceRoam roam) {
//將value放到roam中
roam.putMulti(key, value);
}
}
4 專案啟動
4.1 netcat 製造輸入流
mac/linux 使用 nc -l 9000命令,windows使用 nc -l -p 9000 命令 製造一個Socket輸入流
4.2 執行StreamingJob
執行時可以看到ice客戶端啟動相關資訊
5 編排ice規則
在ice-server後臺編輯ice規則,用的是自己部署的ice-server,地址:http://124.221.148.247:8121
5.1 新增app
5.2 新增ice
此處Debug填2表示只打印節點執行過程,pack中的iceId即為此處的ID,點選檢視詳情即可編排規則
5.3編排ice規則
此編排實現邏輯:根據不同的輸入單詞,輸出對應的結果到roam的result欄位中供後續使用
如輸入waitmoon,在管理員列表中,則輸出"you are admin~"到roam的result欄位,並最終由flink列印
6 釋出與執行
在編排完規則後切記要釋出後才會將變更推送到客戶端並生效!!!
在終端輸入單詞並回車
在flink專案日誌裡可以看到:
ice列印了執行過程,[節點ID:節點類名簡稱:節點執行結果:節點執行耗時]
flink因為最後的sink是print(),所以列印了對應的輸出。
這時候你就可以隨意的更改與編排規則去實現自己的業務啦~~~