手把手教你如何使用 Timestream 實現物聯網時序數據存儲和分析

語言: CN / TW / HK

Amazon Timestream 是一種快速、可擴展的無服務器時間序列數據庫服務,適用於物聯網和運營應用程序,使用該服務每天可以輕鬆存儲和分析數萬億個事件,速度提高了 1000 倍,而成本僅為關係數據庫的十分之一。通過將近期數據保留在內存中,並根據用户定義的策略將歷史數據移至成本優化的存儲層,Amazon Timestream 為客户節省了管理時間序列數據生命週期的時間和成本。Amazon Timestream 專門構建的查詢引擎可用於訪問和分析近期數據和歷史數據,而無需在查詢中顯式指定數據是保存在內存中還是成本優化層中。Amazon Timestream 內置了時間序列分析函數,可以實現近乎實時地識別數據的趨勢和模式。Amazon Timestream 是無服務器服務,可自動縮放以調整容量和性能,因此無需管理底層基礎設施,可以專注於構建應用程序。

本文介紹通過 Timestream、Kinesis Stream 託管服務和 Grafana 和 Flink Connector 開源軟件實現物聯網(以 PM 2.5場景為示例)時序數據實時採集、存儲和分析,其中包含部署架構、環境部署、數據採集、數據存儲和分析,希望當您有類似物聯網時序數據存儲和分析需求的時候,能從中獲得啟發,助力業務發展。

架構

Amazon Timestream 能夠使用內置的分析函數(如平滑、近似和插值)快速分析物聯網應用程序生成的時間序列數據。例如,智能家居設備製造商可以使用 Amazon Timestream 從設備傳感器收集運動或温度數據,進行插值以識別沒有運動的時間範圍,並提醒消費者採取措施(例如減少熱量)以節約能源。

本文物聯網(以PM 2.5場景為示例),實現 PM2.5數據實時採集、時序數據存儲和實時分析, 其中架構主要分成三大部分:

  • 實時時序數據採集:通過Python數據採集程序結合Kinesis Stream和Kinesis Data Analytics for Apache Flink connector 模擬實現從PM 2.5監控設備, 將數據實時採集數據到Timestream。
  • 時序數據存儲:通過Amazon Timestream時序數據庫實現時序數據存儲,設定內存和磁性存儲(成本優化層)存儲時長,可以實現近期數據保留在內存中,並根據用户定義的策略將歷史數據移至成本優化的存儲層。
  • 實時時序數據分析:通過Grafana (安裝Timesteam For Grafana插件)實時訪問Timestream數據,通過Grafana豐富的分析圖表形式,結合Amazon Timestream 內置的時間序列分析函數,可以實現近乎實時地識別物聯網數據的趨勢和模式。

具體的架構圖如下:

部署環境

1.1 創建 Cloudformation

請使用自己帳號 (region 請選擇 us-east-1)

下載 Github 上 Cloudformation Yaml 文件:

git clone http://github.com/bingbingliu18/Timestream-pm25

Timestream-pm25目錄中包含下面 Cloudformation 所用文件 timestream-short-new.yaml

其它都選擇缺省, 點擊 Create Stack button.

Cloud Formation 創建成功

1.2 連接到新建的Ec2堡壘機:

修改證書文件權限

chmod 0600 [path to downloaded .pem file]

ssh -i [path to downloaded .pem file] [email protected][bastionEndpoint]

執行aws configure:

aws configure

default region name, 輸入: “us-east-1”,其它選擇缺省設置。

1.3 連接到 EC2堡壘機 安裝相應軟件

設置時區

TZ='Asia/Shanghai'; export TZ

Install python3

sudo yum install -y python3

Install python3 pip

sudo yum install -y python3-pip

pip3 install boto3

sudo pip3 install boto3

pip3 install numpy

sudo pip3 install numpy

install git

sudo yum install -y git

1.4 下載 Github Timesteram Sample 程序庫

git clone http://github.com/awslabs/amazon-timestream-tools amazon-timestream-tools

1.5 安裝 Grafana Server

連接到 EC2堡壘機:

sudo vi /etc/yum.repos.d/grafana.repo

For OSS releases:(拷貝以下內容到grafana.repo)

[grafana]
name=grafana
baseurl=http://packages.grafana.com/oss/rpm
repo_gpgcheck=1
enabled=1
gpgcheck=1
gpgkey=http://packages.grafana.com/gpg.key
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt

安裝 grafana server:

sudo yum install -y grafana

啟動 grafana server:

sudo service grafana-server start
sudo service grafana-server status

配置 grafana server 在操作系統啟動時 自動啟動:

sudo /sbin/chkconfig --add grafana-server

1.6 安裝 timestream Plugin

sudo grafana-cli plugins install grafana-timestream-datasource

重啟 grafana

sudo service grafana-server restart

1.7 配置 Grafana 要訪問 Timesteam 服務所用的 IAM Role

獲取 IAM Role Name

選擇 IAM 服務, 選擇要修改的 role, role name:

timestream-iot-grafanaEC2rolelabview-us-east-1

修改 role trust relationship:

將 Policy document 全部選中, 替換成以下內容:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid":"",
      "Effect": "Allow",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Sid":"",
      "Effect": "Allow",
      "Principal": {
        "AWS": "[請替換成CloudFormation output中的role arn]"
      },
      "Action": "sts:AssumeRole"
    } 
  ]
}

修改後的 trust relationship:

1.8登錄到 Grafana server

第一次登錄到 Grafana Server:

  1. 打開瀏覽器 訪問 http://[Grafana server public ip]:3000
  2. 缺省的 Grafana Server 監聽端口是: 3000 .

如何獲取 Ec2 Public IP 地址, 如下圖所示, 訪問 Cloudformation output:

  1. 在登陸界面, 輸入 username: admin; password:admin.(輸入用户名和密碼都是 admin)
  2. 點擊 Log In.登陸成功後, 會收到提示修改密碼

1.9 Grafana server 中增加 Timestream 數據源

增加 Timestream 數據源

1.10 Grafana server 中配置 Timestream 數據源

拷貝配置所需要 role ARN 信息 (從 cloudformation output tab)Default Region: us-east-1

IoT 數據存儲

2.1 創建 Timestream 數據庫 iot

2.2 創建 Timestream 表 pm25

IoT 數據導入

3.1安裝 Flink connector to Timestream

安裝java8

sudo yum install -y java-1.8.0-openjdk*

java -version

安裝debug info, otherwise jmap will throw exception

sudo yum --enablerepo='*-debug*' install -y java-1.8.0-openjdk-debuginfo

Install maven

sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo 
sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo 
sudo yum install -y apache-maven 
mvn --version

change java version from 1.7 to 1.8

sudo update-alternatives --config java

sudo update-alternatives --config javac

安裝 Apache Flink

最新的 Apache Flink 版本支持 Kinesis Data Analytics 是1.8.2.

  1. Create flink folder

cd

mkdir flink

cd flink

  1. 下載 Apache Flink version 1.8.2 源代碼:

wget http://archive.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz

  1. 解壓 Apache Flink 源代碼:

tar -xvf flink-1.8.2-src.tgz

  1. 進入到 Apache Flink 源代碼目錄:

cd flink-1.8.2

  1. Compile and install Apache Flink (這個編譯時間比較長 需要大致20分鐘):

mvn clean install -Pinclude-kinesis -DskipTests

3.2 創建 Kinesis Data Stream Timestreampm25Stream

aws kinesis create-stream --stream-name Timestreampm25Stream --shard-count 1

3.3 運行 Flink Connector 建立 Kinesis 連接到 Timestream:

cd
cd amazon-timestream-tools/integrations/flink_connector
mvn clean compile

數據採集過程中 請持續運行以下命令:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.kinesisanalytics.StreamingJob" -Dexec.args="--InputStreamName 
Timestreampm25Stream --Region us-east-1 --TimestreamDbName iot --TimestreamTableName pm25"

3.4 準備 PM2.5演示數據:

連接到 EC2堡壘機

下載5演示數據生成程序:

cd

mkdir pm25

cd pm25

  1. 下載 Github 上數據採集 Python 程序:

git clone http://github.com/bingbingliu18/Timestream-pm25

cd Timestream-pm25

  1. 運行5演示數據生成程序 (python 程序2個參數 –region default: us-east-1; –stream default: Timestreampm25Stream)

數據採集過程中 請持續運行以下命令:

python3 pm25_new_kinisis_test.py

###IoT 數據分析**

4.1 登陸到 Grafana Server 創建儀表板和 Panel**

創建 Dashboard 查詢時 請設定時區為本地瀏覽器時區:

創建新的 Panel:

選擇要訪問的數據源, 將要查詢分析所執行的 SQL 語句粘貼到新的 Panel 中:

4.2 創建時間數據分析儀表版 Dashboard PM2.5 Analysis 1(Save as PM2.5 Analysis 1)

4.2.1 查詢北京各個監控站點PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'fengtai_xiaotun' THEN avg_pm25 ELSE NULL END AS fengtai_xiaotou,
CASE WHEN location = 'fengtai_yungang' THEN avg_pm25 ELSE NULL END AS fengtai_yungang,
CASE WHEN location = 'daxing' THEN avg_pm25 ELSE NULL END AS daxing,
CASE WHEN location = 'wanshou' THEN avg_pm25 ELSE NULL END AS wanshou,
CASE WHEN location = 'gucheng' THEN avg_pm25 ELSE NULL END AS gucheng,
CASE WHEN location = 'tiantan' THEN avg_pm25 ELSE NULL END AS tiantan,
CASE WHEN location = 'yanshan' THEN avg_pm25 ELSE NULL END AS yanshan,
CASE WHEN location = 'miyun' THEN avg_pm25 ELSE NULL END AS miyun,
CASE WHEN location = 'changping' THEN avg_pm25 ELSE NULL END AS changping,
CASE WHEN location = 'aoti' THEN avg_pm25 ELSE NULL END AS aoti,
CASE WHEN location = 'mengtougou' THEN avg_pm25 ELSE NULL END AS mentougou,
CASE WHEN location = 'huairou' THEN avg_pm25 ELSE NULL END AS huairou,
CASE WHEN location = 'haidian' THEN avg_pm25 ELSE NULL END AS haidian,
CASE WHEN location = 'nongzhan' THEN avg_pm25 ELSE NULL END AS nongzhan,
CASE WHEN location = 'tongzhou' THEN avg_pm25 ELSE NULL END AS tongzhou,
CASE WHEN location = 'dingling' THEN avg_pm25 ELSE NULL END AS dingling,
CASE WHEN location = 'yanqing' THEN avg_pm25 ELSE NULL END AS yanqing,
CASE WHEN location = 'guanyuan' THEN avg_pm25 ELSE NULL END AS guanyuan,
CASE WHEN location = 'dongsi' THEN avg_pm25 ELSE NULL END AS dongsi,
CASE WHEN location = 'shunyi' THEN avg_pm25 ELSE NULL END AS shunyi
FROM 
(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Beijing'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

選擇圖形顯示 select Gauge

Save Panel as Beijing PM2.5 analysis

Edit Panel Title:Beijing PM2.5 analysis

Save Dashboard PM2.5 analysis 1:

4.2.2 查詢上海一天內各個監控站點 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'songjiang' THEN avg_pm25 ELSE NULL END AS songjiang,
CASE WHEN location = 'fengxian' THEN avg_pm25 ELSE NULL END AS fengxian, 
CASE WHEN location = 'no 15 factory' THEN avg_pm25 ELSE NULL END AS No15_factory, 
CASE WHEN location = 'xujing' THEN avg_pm25 ELSE NULL END AS xujing,
 CASE WHEN location = 'pujiang' THEN avg_pm25 ELSE NULL END AS pujiang, 
 CASE WHEN location = 'putuo' THEN avg_pm25 ELSE NULL END AS putuo, 
 CASE WHEN location = 'shangshida' THEN avg_pm25 ELSE NULL END AS shangshida,
CASE WHEN location = 'jingan' THEN avg_pm25 ELSE NULL END AS jingan, 
CASE WHEN location = 'xianxia' THEN avg_pm25 ELSE NULL END AS xianxia, 
CASE WHEN location = 'hongkou' THEN avg_pm25 ELSE NULL END AS hongkou, 
CASE WHEN location = 'jiading' THEN avg_pm25 ELSE NULL END AS jiading, 
CASE WHEN location = 'zhangjiang' THEN avg_pm25 ELSE NULL END AS zhangjiang, 
CASE WHEN location = 'miaohang' THEN avg_pm25 ELSE NULL END AS miaohang, 
CASE WHEN location = 'yangpu' THEN avg_pm25 ELSE NULL END AS yangpu, 
CASE WHEN location = 'huinan' THEN avg_pm25 ELSE NULL END AS huinan, 
CASE WHEN location = 'chongming' THEN avg_pm25 ELSE NULL END AS chongming
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shanghai'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

Save Panel as Shanghai PM2.5 analysis

Edit Panel Title:Shanghai PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.3查詢廣州各個監控站點 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'panyu' THEN avg_pm25 ELSE NULL END AS panyu,
CASE WHEN location = 'commercial school' THEN avg_pm25 ELSE NULL END AS commercial_school, 
CASE WHEN location = 'No 5 middle school' THEN avg_pm25 ELSE NULL END AS No_5_middle_school,
CASE WHEN location = 'guangzhou monitor station' THEN avg_pm25 ELSE NULL END AS Guangzhou_monitor_station, 
CASE WHEN location = 'nansha street' THEN avg_pm25 ELSE NULL END AS Nansha_street, 
CASE WHEN location = 'No 86 middle school' THEN avg_pm25 ELSE NULL END AS No_86_middle_school, 
CASE WHEN location = 'luhu' THEN avg_pm25 ELSE NULL END AS luhu, 
CASE WHEN location = 'nansha' THEN avg_pm25 ELSE NULL END AS nansha, 
CASE WHEN location = 'tiyu west' THEN avg_pm25 ELSE NULL END AS tiyu_west, 
CASE WHEN location = 'jiulong town' THEN avg_pm25 ELSE NULL END AS jiulong_town, 
CASE WHEN location = 'huangpu' THEN avg_pm25 ELSE NULL END AS Huangpu, 
CASE WHEN location = 'baiyun' THEN avg_pm25 ELSE NULL END AS Baiyun, 
CASE WHEN location = 'maofeng mountain' THEN avg_pm25 ELSE NULL END AS Maofeng_mountain, 
CASE WHEN location = 'chong hua' THEN avg_pm25 ELSE NULL END AS Chonghua, 
CASE WHEN location = 'huadu' THEN avg_pm25 ELSE NULL END AS huadu
from(
    SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Guangzhou'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

Save Panel as Guangzhou PM2.5 analysis

Edit Panel Title:Guangzhou PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.4 查詢深圳各個監控站點 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'huaqiao city' THEN avg_pm25 ELSE NULL END AS Huaqiao_city,
 CASE WHEN location = 'xixiang' THEN avg_pm25 ELSE NULL END AS xixiang,
CASE WHEN location = 'guanlan' THEN avg_pm25 ELSE NULL END AS guanlan,
CASE WHEN location = 'longgang' THEN avg_pm25 ELSE NULL END AS Longgang,
CASE WHEN location = 'honghu' THEN avg_pm25 ELSE NULL END AS Honghu,
CASE WHEN location = 'pingshan' THEN avg_pm25 ELSE NULL END AS Pingshan,
CASE WHEN location = 'henggang' THEN avg_pm25 ELSE NULL END AS Henggang,
CASE WHEN location = 'minzhi' THEN avg_pm25 ELSE NULL END AS Minzhi,
CASE WHEN location = 'lianhua' THEN avg_pm25 ELSE NULL END AS Lianhua,
CASE WHEN location = 'yantian' THEN avg_pm25 ELSE NULL END AS Yantian,
CASE WHEN location = 'nanou' THEN avg_pm25 ELSE NULL END AS Nanou,
CASE WHEN location = 'meisha' THEN avg_pm25 ELSE NULL END AS Meisha
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shenzhen'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

Save Panel as Shenzhen PM2.5 analysis

Edit Panel Title:Shenzhen PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.5 深圳華僑城時間序列分析(最近5分鐘內 PM2.5分析)

New Panel

select location, CREATE_TIME_SERIES(time, measure_value::bigint) as PM25 FROM iot.pm25
where measure_name='pm2.5' 
and location='huaqiao city'
and time >= ago(5m)
GROUP BY location

選擇圖形顯示 select Lines; Select Points:

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis

Edit Panel Title: 深圳華僑城最近5分鐘PM2.5分析

Save Dashboard PM2.5 analysis 1

4.2.6找出過去2小時內深圳華僑城以30秒為間隔的平均 PM2.5值 (使用線性插值填充缺失的值)

New Panel

WITH binned_timeseries AS (
    SELECT location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND location='huaqiao city'
        AND time > ago(2h)
    GROUP BY location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY location
)
SELECT time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25)

選擇圖形顯示 select Lines:

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis 1

Edit Panel Title: 過去2小時深圳華僑城平均PM2.5值 (使用線性插值填充缺失值)

Save Dashboard PM2.5 analysis 1

4.2.7 過去5分鐘內所有城市 PM2.5平均值排名 (線性插值)

New Panel

SELECT CASE WHEN city = 'Shanghai' THEN inter_avg_PM25 ELSE NULL END AS Shanghai,
CASE WHEN city = 'Beijing' THEN inter_avg_PM25 ELSE NULL END AS Beijing,
CASE WHEN city = 'Guangzhou' THEN inter_avg_PM25 ELSE NULL END AS Guangzhou,
CASE WHEN city = 'Shenzhen' THEN inter_avg_PM25 ELSE NULL END AS Shenzhen,
CASE WHEN city = 'Hangzhou' THEN inter_avg_PM25 ELSE NULL END AS Hangzhou,
CASE WHEN city = 'Nanjing' THEN inter_avg_PM25 ELSE NULL END AS Nanjing,
CASE WHEN city = 'Chengdu' THEN inter_avg_PM25 ELSE NULL END AS Chengdu,
CASE WHEN city = 'Chongqing' THEN inter_avg_PM25 ELSE NULL END AS Chongqing,
CASE WHEN city = 'Tianjin' THEN inter_avg_PM25 ELSE NULL END AS Tianjin,
CASE WHEN city = 'Shenyang' THEN inter_avg_PM25 ELSE NULL END AS Shenyang,
CASE WHEN city = 'Sanya' THEN inter_avg_PM25 ELSE NULL END AS Sanya,
CASE WHEN city = 'Lasa' THEN inter_avg_PM25 ELSE NULL END AS Lasa
from(
WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), all_location_interpolated as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,avg(interpolated_avg_PM25) AS inter_avg_PM25
from all_location_interpolated
group by city
order by avg(interpolated_avg_PM25) desc)

選擇 Panel 圖形類型:

Save Panel as all city analysis 1

Edit Panel Title: 過去5分鐘所有城市PM2.5平均值

Save Dashboard PM2.5 analysis 1

4.2.8 過去5分鐘內 PM2.5最高的十個採集點(線性插值)

New Panel

WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc desc
limit 10

選擇 Table

Save Panel as all city analysis 2

Edit Panel Title:過去5分鐘內 PM2.5最高的十個採集點(線性插值)

Save Dashboard PM2.5 analysis 1

4.2.9 過去5分鐘內 PM2.5最低的十個採集點(線性插值)

New Panel

WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc asc
limit 10

選擇 Table

Save Panel as all city analysis 3

Edit Panel Title:過去5分鐘內 PM2.5最低的十個採集點(線性插值)

Save Dashboard PM2.5 analysis 1

設置儀表板 每5秒鐘刷新一次:

本 blog 着重介紹通過 Timestream、Kinesis Stream 託管服務和 Grafana 實現物聯網(以 PM 2.5場景為示例)時序數據實時採集、存儲和分析,其中包含部署架構、環境部署、數據採集、數據存儲和分析,希望當您有類似物聯網時序數據存儲和分析需求的時候,有所啟發,實現海量物聯網時序數據高效管理、挖掘物聯網數據中藴含的規律、模式和價值,助力業務發展。

附錄

《Amazon Timestream 開發人員指南》

《AWS Timestream 開發程序示例》

《AWS Timestream 與 Grafana 集成示例》

本篇作者

劉冰冰

AWS 數據庫解決方案架構師,負責基於 AWS 的數據庫解決方案的諮詢與架構設計,同時致力於大數據方面的研究和推廣。在加入 AWS 之前曾在 Oracle 工作多年,在數據庫雲規劃、設計運維調優、DR 解決方案、大數據和數倉以及企業應用等方面有豐富的經驗。