DolphinDB 流計算應用:引擎級聯監測門禁異常狀態

語言: CN / TW / HK

物聯網的發展為智慧安防和自動化監控帶來了更多便利,與此同時,新型城鎮建設、智慧城市與智慧社群的發展也為門禁管理等安防問題智慧化提出了更高的要求。在智慧化發展的背景下,門禁不僅僅是門禁,更是一套集成了訪客、考勤、消費、巡更、梯控等更多功能的全面便捷的系統安全應用。目前門禁系統主要用於出入口管理,在我國加速推動智慧城市、智慧工地、智慧社群等智慧化建設發展的前提下,門禁系統智慧化升級的趨勢成為必然,其普及率和使用率也將更加廣泛,隨著接入門禁系統裝置越來越多,對其產生的海量資料進行實時快速的處理也成為了日益重要的問題。

DolphinDB 提供了流資料表流計算引擎用於實時資料處理,為智慧安防提供了有力支援。本教程將介紹如何使用流計算引擎多級級聯實現對門禁裝置異常狀態的實時監測

1. 背景介紹

1.1 行業背景

我國新型城鎮化建設、智慧城市以及智慧社群的不斷髮展,給智慧安防產業提供了巨大的市場拓展空間。智慧安防系統正在向安全、軍事、交通、政府、電力、通訊、能源、金融、文博、倉庫、別墅、工廠等眾多行業領域延伸,涵蓋廣泛的智慧化系列產品及解決方案,主要包括智慧安防視訊分析系統、智慧交通視訊監控系統、智慧城市智慧監控系統、基於異常行為的定製化系統以及計算機視覺分析的前瞻性技術探索等。

根據人們對智慧安防系統日益增長的需求,智慧安防系統逐漸向指標效能優異、環境適應性強、執行可靠穩定和技術更加相容的方向發展。常見的智慧安防系統一般包含有監控、報警、門禁和遠端控制 4 個主要功能,可以單獨執行也可統一管理。而門禁則是整個智慧安防系統中的基礎應用,同時也關係到物聯網領域中的公共安全、城市管理、智慧家居等多個方面。

1.2 真實場景

目前一般的門禁系統工作於客戶端/伺服器端方式,包含以下功能設定,可以完成事件監控、報警聯動。

①門禁、報警綜合管理系統伺服器:提供集中管理及監控,輸出,聯動功能。

②門禁工作站:門禁工作站提供功能設定及事件監控,可以接上髮卡裝置作為髮卡工作站。

③報警輸入:每個控制箱具有獨立的報警輸入介面,接報警輸入裝置,如紅外報警器等。

④報警輸出:每個控制箱具有獨立的報警輸出介面,接報警輸出裝置,如聲光報警器等。

⑤門禁控制器:是門禁管理系統的核心部分,對系統的卡直接管理及控制相關裝置,具有儲存功能,可存放持卡人資料及各種事件記錄。

⑥讀卡器:工作於射頻方式,採集感應卡的資料傳輸到門禁控制器,以便控制器進行各種管理及相應的控制。

⑦電鎖:電子方式開關,實現開門及鎖門,由門禁控制器直接控制。

⑧開門按鈕:提供方便的開門方式。

⑨門磁:檢測門的狀態資訊,然後傳輸到控制器。

⑩報警輸入輸出裝置:為加強系統的保安,可以將輸入輸出裝置接入門禁控器的輸入輸出介面,實現系統的報警及聯動。

 圖1 工業中心門禁管理系統結構

上圖展示了一個常見的門禁管理系統結構,一般而言,報警系統是安防及門禁系統中保障安全問題的重要功能元件及環節,也可以實現與其他監控裝置的聯防聯控。隨著接入智慧門禁系統的終端越來越多,如何對海量資料進行實時高效計算,及時反饋報警訊息,成為智慧門禁以及智慧社群建設的關鍵問題。

1.3 DolphinDB優勢

DolphinDB 是一款高效能分散式時序資料庫,集成了功能強大的程式語言和高容量高速度的流資料分析系統,為海量結構化資料的快速儲存、檢索、分析及計算提供一站式解決方案,適用於工業物聯網領域。DolphinDB 提供的流式計算框架具備高效能實時流資料處理能力,支援毫秒甚至微秒級別資料計算,非常適合用於門禁安防資料的處理和分析。

2. 需求

假定有一個監控系統,對所有門禁裝置每5秒鐘採集1次資料,同時開門或關門的事件會主動上報資料,採集後的資料以 json 格式寫入 mqtt 伺服器,本文使用到的資料示例如下:

| recordType | doorEventCode | eventDate           | readerType | sn    | doorNum | card     |
|------------|---------------|---------------------|------------|-------|---------|----------|
| 0          | 11            | 2022.12.01 00:00:00 | false      | a1008 | 1       | ic100000 |
| 1          | 65            | 2022.12.01 00:00:00 | false      | a1010 | 2       | ic100000 |
| 3          | 61            | 2022.12.01 00:00:53 | true       | a1004 | 1       | ic100044 |
| 2          | 66            | 2022.12.01 00:00:53 | true       | a1002 | 2       | ic100020 |
| 2          | 60            | 2022.12.01 00:19:54 | false      | a1008 | 1       | ic100000 |
| 3          | 11            | 2022.12.01 00:19:54 | true       | a1000 | 2       | ic100000 |
| 2          | 66            | 2022.12.01 00:23:21 | true       | a1009 | 1       | ic100082 |
| 2          | 61            | 2022.12.01 00:23:21 | false      | a1006 | 2       | ic100068 |
| 3          | 12            | 2022.12.01 00:45:26 | true       | a1003 | 1       | ic100000 |
| 1          | 11            | 2022.12.01 00:45:26 | false      | a1004 | 2       | ic100000 |

本教程實現門禁異常狀態檢測需要用到的資料欄位說明如下:

欄位名

說明

doorEventCode

事件碼``11: 合法開門 12: 密碼開門 56: 按鈕開門 60: 開門 61: 關門 65: 軟體開門 66:軟體關門

eventDate

事件時間

doorNum

門號 0-4

保持門禁正常關閉狀態是保證社群或樓宇內居民安全的基礎需求之一,因此本案例需要實現的門禁異常狀態檢測需求是:開門狀態連續存在超過5分鐘報警。

3. 實驗環境

實驗環境的配置如下:

  • 伺服器環境:

  • CPU型別:Intel(R) Core(TM) i5-11500 @ 2.70GHz 2.71 GHz

  • 邏輯 CPU 總數:12

  • 記憶體:16 GB

  • OS:64位 Windows

  • DolphinDB server 部署

  • server 版本:2.00.8 Windows 64,社群版

  • 部署模式:單節點模式

  • DolphinDB GUI:1.30.14 版本

4. 設計思路

DolphinDB 的流計算框架目前已提供時序聚合引擎、橫截面聚合引擎、異常檢測引擎、會話視窗引擎和響應式狀態引擎等10餘種計算引擎應對不同計算場景。本文主要介紹如何用響應式狀態引擎會話視窗引擎實現門禁異常狀態的實時監測。

4.1 使用 DolphinDB 內建流計算引擎監測門禁異常狀態

  • 響應式狀態引擎(createReactiveStateEngine)

DolphinDB 流資料引擎所計算的因子可分為無狀態因子與有狀態因子。無狀態因子僅根據最新一條資料即可完成計算,不需要之前的資料,亦不依賴之前的計算結果。有狀態因子計算除需要最新的資料,還需要歷史資料或之前計算得到的中間結果,統稱為“狀態”。因此有狀態因子計算需要儲存狀態,以供後續因子計算使用,且每次計算都會更新狀態。響應式狀態引擎每輸入一條資料都將觸發一條結果輸出,因此輸入和輸出資料量一致。響應式狀態引擎的運算元中只能包含向量函式,DolphinDB 針對生產業務中的常見狀態運算元(滑動視窗函式、累積函式、序列相關函式和 topN 相關函式等)進行了優化,大幅提升了這些運算元在響應式狀態引擎中的計算效率。

  • 會話視窗引擎(creatSessionWindowEngine)

會話視窗可以理解為一個活動階段(資料產生階段),其前後都是非活動階段(無資料產生階段)。會話視窗引擎與時間序列引擎極為相似,它們計算規則和觸發計算的方式相同。不同之處在於時間序列引擎具有固定的視窗長度和滑動步長,但會話視窗引擎的視窗不是按照固定的頻率產生的,其視窗長度也不是固定的。會話視窗引擎以引擎收到的第一條資料的時間戳作為第一個會話視窗的起始時間。會話視窗收到某條資料之後,若在指定的等待時間內仍未收到下一條新資料,則(該資料的時間戳 + 等待時間)是該視窗的結束時間。視窗結束後收到的第一條新資料的時間戳是新的會話視窗的起始時間。

在物聯網領域應用場景中,由於裝置線上的時間段不同,可能某些時間段有大量資料產生,而某些時間段完全沒有資料。若對這類特徵的資料進行滑動視窗計算,無資料的視窗會增加不必要的計算開銷。因此 DolphinDB 開發了會話視窗引擎,以解決此類問題。

4.2 設計思路與方案

對於本案例的需求,由於門禁監控裝置採用輪詢方式5秒採集一次資料,在沒有新事件上報的時間裡,會產生重複記錄的資料,因此需要首先對採集資料進行去重處理,再檢測出資料中狀態持續超時的記錄。此時的記錄會包括所有狀態持續超過 5 分鐘的資料,因此仍需將資料接入下一級引擎去除關門告警,只保留開門狀態超時報警。根據 DolphinDB 各個引擎的特點,採用響應式狀態引擎完成第一個與第三個過濾篩選資料的任務,並通過會話視窗引擎檢測出超時資料。將三個引擎級聯,實現多級引擎級聯檢測開門時間大於5分鐘的異常門禁狀態的流水線處理模式。

在 DolphinDB 中的處理流程如下圖所示:

圖2 門禁異常狀態資料處理流程

5. 實現步驟

5.1 定義並共享輸入輸出流資料表

首先定義一個用於實時接收門禁監控裝置資料的流資料表,表結構共包含七列,通過 enableTableShareAndPersistence函式共享流資料表並持久化到硬碟上。通過 cacheSize 引數將記憶體中可儲存的最大資料量設定為10萬行。程式碼如下:

st=streamTable(
	array(INT,0) as recordype, //記錄型別
	array(INT,0) as doorEventCode, //事件碼
    array(DATETIME,0) as eventDate, //事件時間 
    array(BOOL,0) as readerType, //進出型別 1:入 0:出
   	array(SYMBOL,0) as sn, //裝置SN號
    array(INT,0) as doorNum, //門號
    array(SYMBOL,0) as card //卡號            
	)
enableTableShareAndPersistence(st,`doorRecord, false, true, 100000, 100, 0);

其次定義異常狀態流資料表 outputSt1 ,用於響應式狀態引擎的輸出,並將其持久化到磁碟上。createReactiveStateEngine 響應式狀態引擎對輸出表的格式有嚴格要求,它的第一列必須是分組列,其中,根據 keyColumn 的設定,輸出表的前幾列必須和 keyColumn 設定的列及其順序保持一致。在本例中,分組列為門號 doorNum ,資料型別為 INT 。之後的兩列分別為 DATETIME 型別和 INT 型別,用於記錄時間和事件碼。建立及共享流資料表程式碼如下:

out1 =streamTable(10000:0,`doorNum`eventDate`doorEventCode,[INT,DATETIME, INT])
enableTableShareAndPersistence(out1,`outputSt,false,true,100000)

有關函式及各引數的詳細說明,參考 DolphinDB使用者手冊

5.2 建立響應式狀態引擎過濾重複資料

響應式狀態引擎會對輸入的每一條訊息做出計算響應,產生一條記錄作為結果,計算的結果在預設情況下都會輸出到結果表,也就是說輸入 n 個訊息,輸出 n 條記錄。如果希望僅輸出一部分結果,可以啟用過濾條件,只有滿足條件的結果才會輸出。

下面的例子檢查記錄資料是否有變化,只有事件型別有變化的記錄才會輸出。設定分組列為 doorNum,輸出表各列的順序為分組列、計算結果列,需要注意保持下一級引擎 dummyTable 的 Schema 與該順序一致。設定 filter 為 prev(doorEventCode)!=doorEventCode,這裡以元程式碼的形式表示過濾條件,只有符合過濾條件的結果,即事件碼有變化的資料資料才會被輸出到通過 outputTable 設定的輸出表中。兩個計算指標為 eventDatedoorEventCode,表示原樣輸出。

DolphinDB 內建的流計算引擎均實現了資料表(table)的介面,因此多個引擎流水線處理變得異常簡單,只要將後一個引擎作為前一個引擎的輸出即可。引入流水線處理,可以解決更為複雜的計算問題。在本例中,將輸出表通過 getStreamEngine() 方法接入下一級會話視窗引擎。具體建立引擎程式碼如下:

有關函式及各引數的詳細說明,參考 DolphinDB使用者手冊

reactivEngine1 = createReactiveStateEngine(name=`reactivEngine1,metrics=<[eventDate,doorEventCode]>,
    dummyTable=objByName(`doorRecord),outputTable= getStreamEngine("swEngine"),keyColumn=`doorNum,
    filter=<prev(doorEventCode)!=doorEventCode>)

5.3 通過級聯會話視窗引擎檢測狀態超時資料

首先建立一張記憶體表,為響應式狀態引擎提供輸入的表結構,該表結構需要與上一級引擎輸出表的結構一致。在會話視窗引擎中,設定分組列 keyColumn為門號 doorNum,時間列 timeColumn 為時間 eventDate。檢測需求是五分鐘內無資料報警,因此 sessionGap 為300(單位為秒,同 eventDate 列),表示收到某條資料後經過該時間的等待仍無新資料到來,就終止當前視窗。metrics 設為 last(doorEventCode),即返回視窗內的最後一條記錄資料。設定 useSessionStartTimefalse,表示輸出表中的時刻為資料視窗結束時刻,即每個視窗中最後一條資料的時刻 + sessionGap。訂閱流資料後,會話視窗引擎的輸入資料為上一級響應式狀態引擎的輸出,輸出作為下一級響應式狀態引擎的輸入。參考 DolphinDB 使用者手冊中 createSessionWindowEngine頁面內容完成對其他引數的設定。程式碼如下:

swOut2 = table(1:0,`doorNum`eventDate`doorEventCode,[INT,DATETIME,INT])
swEngine = createSessionWindowEngine(name="swEngine",sessionGap = 300,metrics=<last(doorEventCode)>,
    dummyTable = objByName(`doorRecord), outputTable = getStreamEngine("reactivEngine"), 
    timeColumn = `eventDate, keyColumn =`doorNum, useSessionStartTime = false)

5.4 響應式狀態引擎過濾關門告警

上級會話視窗引擎獲取到的資料包括開門和關門超過5分鐘的資料,因此需要再通過響應式狀態引擎過濾掉關門狀態超時資料,只保留開門告警。與上一級引擎類似,首先同樣建立一張記憶體表,為響應式狀態引擎提供輸入的表結構,在該響應式狀態引擎中,設定分組列 keyColumn 為門號 doorNum ,兩個計算指標為 eventDatedoorEventCode,表示原樣輸出。filter引數設定為 doorEventCode in [11,12,56,60,65,67],即只輸出記錄的事件碼為開門事件的資料。參考 DolphinDB 使用者手冊中 createReactiveStateEngine頁面內容完成對其他引數的設定。程式碼如下:

swOut1 =table(1:0,`eventDate`doorNum`doorEventCode,[DATETIME,INT, INT])
reactivEngine = createReactiveStateEngine(name=`reactivEngine, metrics=<[eventDate,doorEventCode]>, 
    dummyTable=swOut1,outputTable= objByName(`outputSt),keyColumn= "doorNum",
    filter=<doorEventCode in [11,12,56,60,65,67]>)

5.5 訂閱流資料

過濾了關門告警資料後,訂閱流資料表 doorRecord 並將 handler 設定為 “向 reactivEngine1 中新增資料”,把收到的流資料寫入上述會話視窗引擎,msgAsTable 設為 true ,表示訂閱的資料是由列組成的元組。程式碼如下:

subscribeTable(tableName="doorRecord", actionName="monitor", offset=0,
               handler=append!{reactivEngine1}, msgAsTable=true

5.6 從 MQTT 伺服器接收資料

DolphinDB 提供了 MQTT 外掛用於訂閱 MQTT 伺服器的資料。DolphinDB server 2.00.8 linux 64 JIT 版本已包含 MQTT 外掛在 server/plugins/mqtt 目錄下,不用下載外掛即可直接載入使用。使用者可以使用 mqtt::subscribe 從 MQTT 伺服器訂閱資料,在訂閱時需要資料格式解析函式,目前外掛提供了 jsoncsv 格式的解析函式,本例使用 mqtt::createJsonParser 解析 json 格式資料。示例程式碼如下:

loadPlugin(getHomeDir()+"/plugins/mqtt/PluginMQTTClient.txt")
sp = createJsonParser([INT,INT,DATETIME, BOOL,SYMBOL,INT,SYMBOL], 
    `recordType`doorEventCode`eventDate`readerType`sn`doorNum`card)
mqtt::subscribe(host, port, topic, sp, objByName(`doorRecord))

6. 模擬寫入與驗證

6.1 模擬門禁裝置寫入資料

下列程式碼模擬門禁裝置寫入開門事件與關門事件,每五秒產生一次資料,共產生350條門禁資料記錄,非重複記錄有7次,超時資料有3條,其中開門超時記錄有兩條,模擬寫入資料程式碼如下:

def duplicateData(mutable st, num, doorCode, time){
    for(i in 0:num){
        eventTime = time
        st.append!(table(rand(0..5,1) as recordType, doorCode as doorEventCode, eventTime as eventDate, rand([true,false],1) as readerType, rand(`a+string(1000..1010),1) as sn, 1 as doorNum, rand(`ic+string(100000..100000),1) as card))
        eventTime = datetimeAdd(eventTime, 5, `s)
    }
}
startEventDate = 2022.12.01T00:00:00
duplicateData(st, 75, 11, startEventDate)
startEventDate=datetimeAdd(startEventDate , 375, `s)
duplicateData(st, 25, 56, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)
duplicateData(st, 100, 61, startEventDate)
startEventDate=datetimeAdd(startEventDate , 500, `s)
duplicateData(st, 25, 66, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)
duplicateData(st, 70, 12, startEventDate)
startEventDate=datetimeAdd(startEventDate , 350, `s)
duplicateData(st, 30, 60, startEventDate)
startEventDate=datetimeAdd(startEventDate , 150, `s)
duplicateData(st, 25, 67, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)

6.2 驗證監測結果準確性

從模擬的資料中查詢出開門超時且符合過濾條件的資料,通過 eqObj()方法比較流計算引擎獲取到的異常資料與真實異常資料是否相同,從而驗證監測結果的準確性。

t = select *, deltas(eventDate), prev(doorNum), prev(eventDate), prev(doorEventCode) 
    from doorRecord context by doorNum 
resultTable = select prev_doorNum as doorNum,prev_eventDate+300 as eventDate,
              prev_doorEventCode as doorEventCode from t 
              where deltas_eventDate>= 300 and prev_doorEventCode in [11,12,56,60,65,67] 
              and (prev(eventDate)!=eventDate or prev(doorEventCode)!=doorEventCode)
              order by eventDate
eqObj(resultTable.values(),outputSt1.values())

7. 總結

在網路與數字技術飛速發展的今天,門禁系統早已不再是單純的門道及鑰匙管理,而是逐漸發展成為一套完整的出入門禁安全管理系統。如今的門禁管理系統集微機自動識別技術和現代安全管理措施為一體,涉及電子、機械、光學、聲學、計算機技術、通訊技術、生物技術等諸多新技術,為各類機要部門實現出入口安全防範管理提供了有效措施。

本教程基於 DolphinDB 流資料處理框架,提供了一種實時監測門禁裝置異常狀態的低延時解決方案,能夠有效提升對於海量資料的實時計算效率,滿足了門禁系統智慧化的計算需求。利用 DolphinDB 流資料處理框架中引擎的流水線處理方式,實現了會話視窗引擎和響應式狀態引擎級聯,將開發難度大大降低。本教程旨在提高開發人員在使用 DolphinDB 內建的流資料框架開發物聯網領域流計算業務場景時的開發效率、降低開發難度,更好地挖掘 DolphinDB 在複雜實時流計算場景中的價值。

參考文獻

  1. 《智慧化樓宇技術》

  1. 《安防&智慧化》

  1. 《智慧安防對於智慧社群的重要性》

  1. 《淺談門禁為何是智慧家居安防的基礎》

  1. 《智慧安防系統在智慧城市中的應用及展望》

  1. 《淺談智慧化門禁系統的當下與未來》

附錄

程式碼: streaming_engine_anomaly_alerts