DolphinDB 流計算應用:引擎級聯監測門禁異常狀態

語言: CN / TW / HK

物聯網的發展為智能安防和自動化監控帶來了更多便利,與此同時,新型城鎮建設、智慧城市與智慧社區的發展也為門禁管理等安防問題智能化提出了更高的要求。在智能化發展的背景下,門禁不僅僅是門禁,更是一套集成了訪客、考勤、消費、巡更、梯控等更多功能的全面便捷的系統安全應用。目前門禁系統主要用於出入口管理,在我國加速推動智慧城市、智慧工地、智慧社區等智慧化建設發展的前提下,門禁系統智能化升級的趨勢成為必然,其普及率和使用率也將更加廣泛,隨着接入門禁系統設備越來越多,對其產生的海量數據進行實時快速的處理也成為了日益重要的問題。

DolphinDB 提供了流數據表流計算引擎用於實時數據處理,為智能安防提供了有力支持。本教程將介紹如何使用流計算引擎多級級聯實現對門禁設備異常狀態的實時監測

1. 背景介紹

1.1 行業背景

我國新型城鎮化建設、智慧城市以及智慧社區的不斷髮展,給智能安防產業提供了巨大的市場拓展空間。智能安防系統正在向安全、軍事、交通、政府、電力、通信、能源、金融、文博、倉庫、別墅、工廠等眾多行業領域延伸,涵蓋廣泛的智能化系列產品及解決方案,主要包括智能安防視頻分析系統、智能交通視頻監控系統、智慧城市智能監控系統、基於異常行為的定製化系統以及計算機視覺分析的前瞻性技術探索等。

根據人們對智能安防系統日益增長的需求,智能安防系統逐漸向指標性能優異、環境適應性強、運行可靠穩定和技術更加兼容的方向發展。常見的智能安防系統一般包含有監控、報警、門禁和遠程控制 4 個主要功能,可以單獨運行也可統一管理。而門禁則是整個智能安防系統中的基礎應用,同時也關係到物聯網領域中的公共安全、城市管理、智能家居等多個方面。

1.2 真實場景

目前一般的門禁系統工作於客户端/服務器端方式,包含以下功能設置,可以完成事件監控、報警聯動。

①門禁、報警綜合管理系統服務器:提供集中管理及監控,輸出,聯動功能。

②門禁工作站:門禁工作站提供功能設置及事件監控,可以接上髮卡設備作為髮卡工作站。

③報警輸入:每個控制箱具有獨立的報警輸入接口,接報警輸入設備,如紅外報警器等。

④報警輸出:每個控制箱具有獨立的報警輸出接口,接報警輸出設備,如聲光報警器等。

⑤門禁控制器:是門禁管理系統的核心部分,對系統的卡直接管理及控制相關設備,具有存儲功能,可存放持卡人資料及各種事件記錄。

⑥讀卡器:工作於射頻方式,採集感應卡的數據傳輸到門禁控制器,以便控制器進行各種管理及相應的控制。

⑦電鎖:電子方式開關,實現開門及鎖門,由門禁控制器直接控制。

⑧開門按鈕:提供方便的開門方式。

⑨門磁:檢測門的狀態信息,然後傳輸到控制器。

⑩報警輸入輸出設備:為加強系統的保安,可以將輸入輸出設備接入門禁控器的輸入輸出接口,實現系統的報警及聯動。

 圖1 工業中心門禁管理系統結構

上圖展示了一個常見的門禁管理系統結構,一般而言,報警系統是安防及門禁系統中保障安全問題的重要功能組件及環節,也可以實現與其他監控設備的聯防聯控。隨着接入智能門禁系統的終端越來越多,如何對海量數據進行實時高效計算,及時反饋報警消息,成為智能門禁以及智慧社區建設的關鍵問題。

1.3 DolphinDB優勢

DolphinDB 是一款高性能分佈式時序數據庫,集成了功能強大的編程語言和高容量高速度的流數據分析系統,為海量結構化數據的快速存儲、檢索、分析及計算提供一站式解決方案,適用於工業物聯網領域。DolphinDB 提供的流式計算框架具備高性能實時流數據處理能力,支持毫秒甚至微秒級別數據計算,非常適合用於門禁安防數據的處理和分析。

2. 需求

假定有一個監控系統,對所有門禁設備每5秒鐘採集1次數據,同時開門或關門的事件會主動上報數據,採集後的數據以 json 格式寫入 mqtt 服務器,本文使用到的數據示例如下:

| recordType | doorEventCode | eventDate           | readerType | sn    | doorNum | card     |
|------------|---------------|---------------------|------------|-------|---------|----------|
| 0          | 11            | 2022.12.01 00:00:00 | false      | a1008 | 1       | ic100000 |
| 1          | 65            | 2022.12.01 00:00:00 | false      | a1010 | 2       | ic100000 |
| 3          | 61            | 2022.12.01 00:00:53 | true       | a1004 | 1       | ic100044 |
| 2          | 66            | 2022.12.01 00:00:53 | true       | a1002 | 2       | ic100020 |
| 2          | 60            | 2022.12.01 00:19:54 | false      | a1008 | 1       | ic100000 |
| 3          | 11            | 2022.12.01 00:19:54 | true       | a1000 | 2       | ic100000 |
| 2          | 66            | 2022.12.01 00:23:21 | true       | a1009 | 1       | ic100082 |
| 2          | 61            | 2022.12.01 00:23:21 | false      | a1006 | 2       | ic100068 |
| 3          | 12            | 2022.12.01 00:45:26 | true       | a1003 | 1       | ic100000 |
| 1          | 11            | 2022.12.01 00:45:26 | false      | a1004 | 2       | ic100000 |

本教程實現門禁異常狀態檢測需要用到的數據字段説明如下:

字段名

説明

doorEventCode

事件碼``11: 合法開門 12: 密碼開門 56: 按鈕開門 60: 開門 61: 關門 65: 軟件開門 66:軟件關門

eventDate

事件時間

doorNum

門號 0-4

保持門禁正常關閉狀態是保證社區或樓宇內居民安全的基礎需求之一,因此本案例需要實現的門禁異常狀態檢測需求是:開門狀態連續存在超過5分鐘報警。

3. 實驗環境

實驗環境的配置如下:

  • 服務器環境:

  • CPU類型:Intel(R) Core(TM) i5-11500 @ 2.70GHz 2.71 GHz

  • 邏輯 CPU 總數:12

  • 內存:16 GB

  • OS:64位 Windows

  • DolphinDB server 部署

  • server 版本:2.00.8 Windows 64,社區版

  • 部署模式:單節點模式

  • DolphinDB GUI:1.30.14 版本

4. 設計思路

DolphinDB 的流計算框架目前已提供時序聚合引擎、橫截面聚合引擎、異常檢測引擎、會話窗口引擎和響應式狀態引擎等10餘種計算引擎應對不同計算場景。本文主要介紹如何用響應式狀態引擎會話窗口引擎實現門禁異常狀態的實時監測。

4.1 使用 DolphinDB 內置流計算引擎監測門禁異常狀態

  • 響應式狀態引擎(createReactiveStateEngine)

DolphinDB 流數據引擎所計算的因子可分為無狀態因子與有狀態因子。無狀態因子僅根據最新一條數據即可完成計算,不需要之前的數據,亦不依賴之前的計算結果。有狀態因子計算除需要最新的數據,還需要歷史數據或之前計算得到的中間結果,統稱為“狀態”。因此有狀態因子計算需要存儲狀態,以供後續因子計算使用,且每次計算都會更新狀態。響應式狀態引擎每輸入一條數據都將觸發一條結果輸出,因此輸入和輸出數據量一致。響應式狀態引擎的算子中只能包含向量函數,DolphinDB 針對生產業務中的常見狀態算子(滑動窗口函數、累積函數、序列相關函數和 topN 相關函數等)進行了優化,大幅提升了這些算子在響應式狀態引擎中的計算效率。

  • 會話窗口引擎(creatSessionWindowEngine)

會話窗口可以理解為一個活動階段(數據產生階段),其前後都是非活動階段(無數據產生階段)。會話窗口引擎與時間序列引擎極為相似,它們計算規則和觸發計算的方式相同。不同之處在於時間序列引擎具有固定的窗口長度和滑動步長,但會話窗口引擎的窗口不是按照固定的頻率產生的,其窗口長度也不是固定的。會話窗口引擎以引擎收到的第一條數據的時間戳作為第一個會話窗口的起始時間。會話窗口收到某條數據之後,若在指定的等待時間內仍未收到下一條新數據,則(該數據的時間戳 + 等待時間)是該窗口的結束時間。窗口結束後收到的第一條新數據的時間戳是新的會話窗口的起始時間。

在物聯網領域應用場景中,由於設備在線的時間段不同,可能某些時間段有大量數據產生,而某些時間段完全沒有數據。若對這類特徵的數據進行滑動窗口計算,無數據的窗口會增加不必要的計算開銷。因此 DolphinDB 開發了會話窗口引擎,以解決此類問題。

4.2 設計思路與方案

對於本案例的需求,由於門禁監控設備採用輪詢方式5秒採集一次數據,在沒有新事件上報的時間裏,會產生重複記錄的數據,因此需要首先對採集數據進行去重處理,再檢測出數據中狀態持續超時的記錄。此時的記錄會包括所有狀態持續超過 5 分鐘的數據,因此仍需將數據接入下一級引擎去除關門告警,只保留開門狀態超時報警。根據 DolphinDB 各個引擎的特點,採用響應式狀態引擎完成第一個與第三個過濾篩選數據的任務,並通過會話窗口引擎檢測出超時數據。將三個引擎級聯,實現多級引擎級聯檢測開門時間大於5分鐘的異常門禁狀態的流水線處理模式。

在 DolphinDB 中的處理流程如下圖所示:

圖2 門禁異常狀態數據處理流程

5. 實現步驟

5.1 定義並共享輸入輸出流數據表

首先定義一個用於實時接收門禁監控設備數據的流數據表,表結構共包含七列,通過 enableTableShareAndPersistence函數共享流數據表並持久化到硬盤上。通過 cacheSize 參數將內存中可保存的最大數據量設定為10萬行。代碼如下:

st=streamTable(
	array(INT,0) as recordype, //記錄類型
	array(INT,0) as doorEventCode, //事件碼
    array(DATETIME,0) as eventDate, //事件時間 
    array(BOOL,0) as readerType, //進出類型 1:入 0:出
   	array(SYMBOL,0) as sn, //設備SN號
    array(INT,0) as doorNum, //門號
    array(SYMBOL,0) as card //卡號            
	)
enableTableShareAndPersistence(st,`doorRecord, false, true, 100000, 100, 0);

其次定義異常狀態流數據表 outputSt1 ,用於響應式狀態引擎的輸出,並將其持久化到磁盤上。createReactiveStateEngine 響應式狀態引擎對輸出表的格式有嚴格要求,它的第一列必須是分組列,其中,根據 keyColumn 的設置,輸出表的前幾列必須和 keyColumn 設置的列及其順序保持一致。在本例中,分組列為門號 doorNum ,數據類型為 INT 。之後的兩列分別為 DATETIME 類型和 INT 類型,用於記錄時間和事件碼。創建及共享流數據表代碼如下:

out1 =streamTable(10000:0,`doorNum`eventDate`doorEventCode,[INT,DATETIME, INT])
enableTableShareAndPersistence(out1,`outputSt,false,true,100000)

有關函數及各參數的詳細説明,參考 DolphinDB用户手冊

5.2 創建響應式狀態引擎過濾重複數據

響應式狀態引擎會對輸入的每一條消息做出計算響應,產生一條記錄作為結果,計算的結果在默認情況下都會輸出到結果表,也就是説輸入 n 個消息,輸出 n 條記錄。如果希望僅輸出一部分結果,可以啟用過濾條件,只有滿足條件的結果才會輸出。

下面的例子檢查記錄數據是否有變化,只有事件類型有變化的記錄才會輸出。設置分組列為 doorNum,輸出表各列的順序為分組列、計算結果列,需要注意保持下一級引擎 dummyTable 的 Schema 與該順序一致。設置 filter 為 prev(doorEventCode)!=doorEventCode,這裏以元代碼的形式表示過濾條件,只有符合過濾條件的結果,即事件碼有變化的數據數據才會被輸出到通過 outputTable 設置的輸出表中。兩個計算指標為 eventDatedoorEventCode,表示原樣輸出。

DolphinDB 內置的流計算引擎均實現了數據表(table)的接口,因此多個引擎流水線處理變得異常簡單,只要將後一個引擎作為前一個引擎的輸出即可。引入流水線處理,可以解決更為複雜的計算問題。在本例中,將輸出表通過 getStreamEngine() 方法接入下一級會話窗口引擎。具體創建引擎代碼如下:

有關函數及各參數的詳細説明,參考 DolphinDB用户手冊

reactivEngine1 = createReactiveStateEngine(name=`reactivEngine1,metrics=<[eventDate,doorEventCode]>,
    dummyTable=objByName(`doorRecord),outputTable= getStreamEngine("swEngine"),keyColumn=`doorNum,
    filter=<prev(doorEventCode)!=doorEventCode>)

5.3 通過級聯會話窗口引擎檢測狀態超時數據

首先創建一張內存表,為響應式狀態引擎提供輸入的表結構,該表結構需要與上一級引擎輸出表的結構一致。在會話窗口引擎中,設置分組列 keyColumn為門號 doorNum,時間列 timeColumn 為時間 eventDate。檢測需求是五分鐘內無數據報警,因此 sessionGap 為300(單位為秒,同 eventDate 列),表示收到某條數據後經過該時間的等待仍無新數據到來,就終止當前窗口。metrics 設為 last(doorEventCode),即返回窗口內的最後一條記錄數據。設置 useSessionStartTimefalse,表示輸出表中的時刻為數據窗口結束時刻,即每個窗口中最後一條數據的時刻 + sessionGap。訂閲流數據後,會話窗口引擎的輸入數據為上一級響應式狀態引擎的輸出,輸出作為下一級響應式狀態引擎的輸入。參考 DolphinDB 用户手冊中 createSessionWindowEngine頁面內容完成對其他參數的設置。代碼如下:

swOut2 = table(1:0,`doorNum`eventDate`doorEventCode,[INT,DATETIME,INT])
swEngine = createSessionWindowEngine(name="swEngine",sessionGap = 300,metrics=<last(doorEventCode)>,
    dummyTable = objByName(`doorRecord), outputTable = getStreamEngine("reactivEngine"), 
    timeColumn = `eventDate, keyColumn =`doorNum, useSessionStartTime = false)

5.4 響應式狀態引擎過濾關門告警

上級會話窗口引擎獲取到的數據包括開門和關門超過5分鐘的數據,因此需要再通過響應式狀態引擎過濾掉關門狀態超時數據,只保留開門告警。與上一級引擎類似,首先同樣創建一張內存表,為響應式狀態引擎提供輸入的表結構,在該響應式狀態引擎中,設置分組列 keyColumn 為門號 doorNum ,兩個計算指標為 eventDatedoorEventCode,表示原樣輸出。filter參數設置為 doorEventCode in [11,12,56,60,65,67],即只輸出記錄的事件碼為開門事件的數據。參考 DolphinDB 用户手冊中 createReactiveStateEngine頁面內容完成對其他參數的設置。代碼如下:

swOut1 =table(1:0,`eventDate`doorNum`doorEventCode,[DATETIME,INT, INT])
reactivEngine = createReactiveStateEngine(name=`reactivEngine, metrics=<[eventDate,doorEventCode]>, 
    dummyTable=swOut1,outputTable= objByName(`outputSt),keyColumn= "doorNum",
    filter=<doorEventCode in [11,12,56,60,65,67]>)

5.5 訂閲流數據

過濾了關門告警數據後,訂閲流數據表 doorRecord 並將 handler 設置為 “向 reactivEngine1 中添加數據”,把收到的流數據寫入上述會話窗口引擎,msgAsTable 設為 true ,表示訂閲的數據是由列組成的元組。代碼如下:

subscribeTable(tableName="doorRecord", actionName="monitor", offset=0,
               handler=append!{reactivEngine1}, msgAsTable=true

5.6 從 MQTT 服務器接收數據

DolphinDB 提供了 MQTT 插件用於訂閲 MQTT 服務器的數據。DolphinDB server 2.00.8 linux 64 JIT 版本已包含 MQTT 插件在 server/plugins/mqtt 目錄下,不用下載插件即可直接加載使用。用户可以使用 mqtt::subscribe 從 MQTT 服務器訂閲數據,在訂閲時需要數據格式解析函數,目前插件提供了 jsoncsv 格式的解析函數,本例使用 mqtt::createJsonParser 解析 json 格式數據。示例代碼如下:

loadPlugin(getHomeDir()+"/plugins/mqtt/PluginMQTTClient.txt")
sp = createJsonParser([INT,INT,DATETIME, BOOL,SYMBOL,INT,SYMBOL], 
    `recordType`doorEventCode`eventDate`readerType`sn`doorNum`card)
mqtt::subscribe(host, port, topic, sp, objByName(`doorRecord))

6. 模擬寫入與驗證

6.1 模擬門禁設備寫入數據

下列代碼模擬門禁設備寫入開門事件與關門事件,每五秒產生一次數據,共產生350條門禁數據記錄,非重複記錄有7次,超時數據有3條,其中開門超時記錄有兩條,模擬寫入數據代碼如下:

def duplicateData(mutable st, num, doorCode, time){
    for(i in 0:num){
        eventTime = time
        st.append!(table(rand(0..5,1) as recordType, doorCode as doorEventCode, eventTime as eventDate, rand([true,false],1) as readerType, rand(`a+string(1000..1010),1) as sn, 1 as doorNum, rand(`ic+string(100000..100000),1) as card))
        eventTime = datetimeAdd(eventTime, 5, `s)
    }
}
startEventDate = 2022.12.01T00:00:00
duplicateData(st, 75, 11, startEventDate)
startEventDate=datetimeAdd(startEventDate , 375, `s)
duplicateData(st, 25, 56, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)
duplicateData(st, 100, 61, startEventDate)
startEventDate=datetimeAdd(startEventDate , 500, `s)
duplicateData(st, 25, 66, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)
duplicateData(st, 70, 12, startEventDate)
startEventDate=datetimeAdd(startEventDate , 350, `s)
duplicateData(st, 30, 60, startEventDate)
startEventDate=datetimeAdd(startEventDate , 150, `s)
duplicateData(st, 25, 67, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)

6.2 驗證監測結果準確性

從模擬的數據中查詢出開門超時且符合過濾條件的數據,通過 eqObj()方法比較流計算引擎獲取到的異常數據與真實異常數據是否相同,從而驗證監測結果的準確性。

t = select *, deltas(eventDate), prev(doorNum), prev(eventDate), prev(doorEventCode) 
    from doorRecord context by doorNum 
resultTable = select prev_doorNum as doorNum,prev_eventDate+300 as eventDate,
              prev_doorEventCode as doorEventCode from t 
              where deltas_eventDate>= 300 and prev_doorEventCode in [11,12,56,60,65,67] 
              and (prev(eventDate)!=eventDate or prev(doorEventCode)!=doorEventCode)
              order by eventDate
eqObj(resultTable.values(),outputSt1.values())

7. 總結

在網絡與數字技術飛速發展的今天,門禁系統早已不再是單純的門道及鑰匙管理,而是逐漸發展成為一套完整的出入門禁安全管理系統。如今的門禁管理系統集微機自動識別技術和現代安全管理措施為一體,涉及電子、機械、光學、聲學、計算機技術、通訊技術、生物技術等諸多新技術,為各類機要部門實現出入口安全防範管理提供了有效措施。

本教程基於 DolphinDB 流數據處理框架,提供了一種實時監測門禁設備異常狀態的低延時解決方案,能夠有效提升對於海量數據的實時計算效率,滿足了門禁系統智能化的計算需求。利用 DolphinDB 流數據處理框架中引擎的流水線處理方式,實現了會話窗口引擎和響應式狀態引擎級聯,將開發難度大大降低。本教程旨在提高開發人員在使用 DolphinDB 內置的流數據框架開發物聯網領域流計算業務場景時的開發效率、降低開發難度,更好地挖掘 DolphinDB 在複雜實時流計算場景中的價值。

參考文獻

  1. 《智能化樓宇技術》

  1. 《安防&智能化》

  1. 《智能安防對於智慧社區的重要性》

  1. 《淺談門禁為何是智能家居安防的基礎》

  1. 《智能安防系統在智慧城市中的應用及展望》

  1. 《淺談智能化門禁系統的當下與未來》

附錄

代碼: streaming_engine_anomaly_alerts