手把手教你如何使用 Timestream 實現物聯網時序資料儲存和分析

語言: CN / TW / HK

Amazon Timestream 是一種快速、可擴充套件的無伺服器時間序列資料庫服務,適用於物聯網和運營應用程式,使用該服務每天可以輕鬆儲存和分析數萬億個事件,速度提高了 1000 倍,而成本僅為關係資料庫的十分之一。通過將近期資料保留在記憶體中,並根據使用者定義的策略將歷史資料移至成本優化的儲存層,Amazon Timestream 為客戶節省了管理時間序列資料生命週期的時間和成本。Amazon Timestream 專門構建的查詢引擎可用於訪問和分析近期資料和歷史資料,而無需在查詢中顯式指定資料是儲存在記憶體中還是成本優化層中。Amazon Timestream 內建了時間序列分析函式,可以實現近乎實時地識別資料的趨勢和模式。Amazon Timestream 是無伺服器服務,可自動縮放以調整容量和效能,因此無需管理底層基礎設施,可以專注於構建應用程式。

本文介紹通過 Timestream、Kinesis Stream 託管服務和 Grafana 和 Flink Connector 開源軟體實現物聯網(以 PM 2.5場景為示例)時序資料實時採集、儲存和分析,其中包含部署架構、環境部署、資料採集、資料儲存和分析,希望當您有類似物聯網時序資料儲存和分析需求的時候,能從中獲得啟發,助力業務發展。

架構

Amazon Timestream 能夠使用內建的分析函式(如平滑、近似和插值)快速分析物聯網應用程式生成的時間序列資料。例如,智慧家居裝置製造商可以使用 Amazon Timestream 從裝置感測器收集運動或溫度資料,進行插值以識別沒有運動的時間範圍,並提醒消費者採取措施(例如減少熱量)以節約能源。

本文物聯網(以PM 2.5場景為示例),實現 PM2.5資料實時採集、時序資料儲存和實時分析, 其中架構主要分成三大部分:

  • 實時時序資料採集:通過Python資料採集程式結合Kinesis Stream和Kinesis Data Analytics for Apache Flink connector 模擬實現從PM 2.5監控裝置, 將資料實時採集資料到Timestream。
  • 時序資料儲存:通過Amazon Timestream時序資料庫實現時序資料儲存,設定記憶體和磁性儲存(成本優化層)儲存時長,可以實現近期資料保留在記憶體中,並根據使用者定義的策略將歷史資料移至成本優化的儲存層。
  • 實時時序資料分析:通過Grafana (安裝Timesteam For Grafana外掛)實時訪問Timestream資料,通過Grafana豐富的分析圖表形式,結合Amazon Timestream 內建的時間序列分析函式,可以實現近乎實時地識別物聯網資料的趨勢和模式。

具體的架構圖如下:

部署環境

1.1 建立 Cloudformation

請使用自己帳號 (region 請選擇 us-east-1)

下載 Github 上 Cloudformation Yaml 檔案:

git clone http://github.com/bingbingliu18/Timestream-pm25

Timestream-pm25目錄中包含下面 Cloudformation 所用檔案 timestream-short-new.yaml

其它都選擇預設, 點選 Create Stack button.

Cloud Formation 建立成功

1.2 連線到新建的Ec2堡壘機:

修改證書檔案許可權

chmod 0600 [path to downloaded .pem file]

ssh -i [path to downloaded .pem file] ec2-user@[bastionEndpoint]

執行aws configure:

aws configure

default region name, 輸入: “us-east-1”,其它選擇預設設定。

1.3 連線到 EC2堡壘機 安裝相應軟體

設定時區

TZ='Asia/Shanghai'; export TZ

Install python3

sudo yum install -y python3

Install python3 pip

sudo yum install -y python3-pip

pip3 install boto3

sudo pip3 install boto3

pip3 install numpy

sudo pip3 install numpy

install git

sudo yum install -y git

1.4 下載 Github Timesteram Sample 程式庫

git clone http://github.com/awslabs/amazon-timestream-tools amazon-timestream-tools

1.5 安裝 Grafana Server

連線到 EC2堡壘機:

sudo vi /etc/yum.repos.d/grafana.repo

For OSS releases:(拷貝以下內容到grafana.repo)

[grafana]
name=grafana
baseurl=http://packages.grafana.com/oss/rpm
repo_gpgcheck=1
enabled=1
gpgcheck=1
gpgkey=http://packages.grafana.com/gpg.key
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt

安裝 grafana server:

sudo yum install -y grafana

啟動 grafana server:

sudo service grafana-server start
sudo service grafana-server status

配置 grafana server 在作業系統啟動時 自動啟動:

sudo /sbin/chkconfig --add grafana-server

1.6 安裝 timestream Plugin

sudo grafana-cli plugins install grafana-timestream-datasource

重啟 grafana

sudo service grafana-server restart

1.7 配置 Grafana 要訪問 Timesteam 服務所用的 IAM Role

獲取 IAM Role Name

選擇 IAM 服務, 選擇要修改的 role, role name:

timestream-iot-grafanaEC2rolelabview-us-east-1

修改 role trust relationship:

將 Policy document 全部選中, 替換成以下內容:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid":"",
      "Effect": "Allow",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Sid":"",
      "Effect": "Allow",
      "Principal": {
        "AWS": "[請替換成CloudFormation output中的role arn]"
      },
      "Action": "sts:AssumeRole"
    } 
  ]
}

修改後的 trust relationship:

1.8登入到 Grafana server

第一次登入到 Grafana Server:

  1. 開啟瀏覽器 訪問 http://[Grafana server public ip]:3000
  2. 預設的 Grafana Server 監聽埠是: 3000 .

如何獲取 Ec2 Public IP 地址, 如下圖所示, 訪問 Cloudformation output:

  1. 在登陸介面, 輸入 username: admin; password:admin.(輸入使用者名稱和密碼都是 admin)
  2. 點選 Log In.登陸成功後, 會收到提示修改密碼

1.9 Grafana server 中增加 Timestream 資料來源

增加 Timestream 資料來源

1.10 Grafana server 中配置 Timestream 資料來源

拷貝配置所需要 role ARN 資訊 (從 cloudformation output tab)Default Region: us-east-1

IoT 資料儲存

2.1 建立 Timestream 資料庫 iot

2.2 建立 Timestream 表 pm25

IoT 資料匯入

3.1安裝 Flink connector to Timestream

安裝java8

sudo yum install -y java-1.8.0-openjdk*

java -version

安裝debug info, otherwise jmap will throw exception

sudo yum --enablerepo='*-debug*' install -y java-1.8.0-openjdk-debuginfo

Install maven

sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo 
sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo 
sudo yum install -y apache-maven 
mvn --version

change java version from 1.7 to 1.8

sudo update-alternatives --config java

sudo update-alternatives --config javac

安裝 Apache Flink

最新的 Apache Flink 版本支援 Kinesis Data Analytics 是1.8.2.

  1. Create flink folder

cd

mkdir flink

cd flink

  1. 下載 Apache Flink version 1.8.2 原始碼:

wget http://archive.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz

  1. 解壓 Apache Flink 原始碼:

tar -xvf flink-1.8.2-src.tgz

  1. 進入到 Apache Flink 原始碼目錄:

cd flink-1.8.2

  1. Compile and install Apache Flink (這個編譯時間比較長 需要大致20分鐘):

mvn clean install -Pinclude-kinesis -DskipTests

3.2 建立 Kinesis Data Stream Timestreampm25Stream

aws kinesis create-stream --stream-name Timestreampm25Stream --shard-count 1

3.3 執行 Flink Connector 建立 Kinesis 連線到 Timestream:

cd
cd amazon-timestream-tools/integrations/flink_connector
mvn clean compile

資料採集過程中 請持續執行以下命令:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.kinesisanalytics.StreamingJob" -Dexec.args="--InputStreamName 
Timestreampm25Stream --Region us-east-1 --TimestreamDbName iot --TimestreamTableName pm25"

3.4 準備 PM2.5演示資料:

連線到 EC2堡壘機

下載5演示資料生成程式:

cd

mkdir pm25

cd pm25

  1. 下載 Github 上資料採集 Python 程式:

git clone http://github.com/bingbingliu18/Timestream-pm25

cd Timestream-pm25

  1. 執行5演示資料生成程式 (python 程式2個引數 –region default: us-east-1; –stream default: Timestreampm25Stream)

資料採集過程中 請持續執行以下命令:

python3 pm25_new_kinisis_test.py

###IoT 資料分析**

4.1 登陸到 Grafana Server 建立儀表板和 Panel**

建立 Dashboard 查詢時 請設定時區為本地瀏覽器時區:

建立新的 Panel:

選擇要訪問的資料來源, 將要查詢分析所執行的 SQL 語句貼上到新的 Panel 中:

4.2 建立時間資料分析儀表版 Dashboard PM2.5 Analysis 1(Save as PM2.5 Analysis 1)

4.2.1 查詢北京各個監控站點PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'fengtai_xiaotun' THEN avg_pm25 ELSE NULL END AS fengtai_xiaotou,
CASE WHEN location = 'fengtai_yungang' THEN avg_pm25 ELSE NULL END AS fengtai_yungang,
CASE WHEN location = 'daxing' THEN avg_pm25 ELSE NULL END AS daxing,
CASE WHEN location = 'wanshou' THEN avg_pm25 ELSE NULL END AS wanshou,
CASE WHEN location = 'gucheng' THEN avg_pm25 ELSE NULL END AS gucheng,
CASE WHEN location = 'tiantan' THEN avg_pm25 ELSE NULL END AS tiantan,
CASE WHEN location = 'yanshan' THEN avg_pm25 ELSE NULL END AS yanshan,
CASE WHEN location = 'miyun' THEN avg_pm25 ELSE NULL END AS miyun,
CASE WHEN location = 'changping' THEN avg_pm25 ELSE NULL END AS changping,
CASE WHEN location = 'aoti' THEN avg_pm25 ELSE NULL END AS aoti,
CASE WHEN location = 'mengtougou' THEN avg_pm25 ELSE NULL END AS mentougou,
CASE WHEN location = 'huairou' THEN avg_pm25 ELSE NULL END AS huairou,
CASE WHEN location = 'haidian' THEN avg_pm25 ELSE NULL END AS haidian,
CASE WHEN location = 'nongzhan' THEN avg_pm25 ELSE NULL END AS nongzhan,
CASE WHEN location = 'tongzhou' THEN avg_pm25 ELSE NULL END AS tongzhou,
CASE WHEN location = 'dingling' THEN avg_pm25 ELSE NULL END AS dingling,
CASE WHEN location = 'yanqing' THEN avg_pm25 ELSE NULL END AS yanqing,
CASE WHEN location = 'guanyuan' THEN avg_pm25 ELSE NULL END AS guanyuan,
CASE WHEN location = 'dongsi' THEN avg_pm25 ELSE NULL END AS dongsi,
CASE WHEN location = 'shunyi' THEN avg_pm25 ELSE NULL END AS shunyi
FROM 
(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Beijing'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

選擇圖形顯示 select Gauge

Save Panel as Beijing PM2.5 analysis

Edit Panel Title:Beijing PM2.5 analysis

Save Dashboard PM2.5 analysis 1:

4.2.2 查詢上海一天內各個監控站點 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'songjiang' THEN avg_pm25 ELSE NULL END AS songjiang,
CASE WHEN location = 'fengxian' THEN avg_pm25 ELSE NULL END AS fengxian, 
CASE WHEN location = 'no 15 factory' THEN avg_pm25 ELSE NULL END AS No15_factory, 
CASE WHEN location = 'xujing' THEN avg_pm25 ELSE NULL END AS xujing,
 CASE WHEN location = 'pujiang' THEN avg_pm25 ELSE NULL END AS pujiang, 
 CASE WHEN location = 'putuo' THEN avg_pm25 ELSE NULL END AS putuo, 
 CASE WHEN location = 'shangshida' THEN avg_pm25 ELSE NULL END AS shangshida,
CASE WHEN location = 'jingan' THEN avg_pm25 ELSE NULL END AS jingan, 
CASE WHEN location = 'xianxia' THEN avg_pm25 ELSE NULL END AS xianxia, 
CASE WHEN location = 'hongkou' THEN avg_pm25 ELSE NULL END AS hongkou, 
CASE WHEN location = 'jiading' THEN avg_pm25 ELSE NULL END AS jiading, 
CASE WHEN location = 'zhangjiang' THEN avg_pm25 ELSE NULL END AS zhangjiang, 
CASE WHEN location = 'miaohang' THEN avg_pm25 ELSE NULL END AS miaohang, 
CASE WHEN location = 'yangpu' THEN avg_pm25 ELSE NULL END AS yangpu, 
CASE WHEN location = 'huinan' THEN avg_pm25 ELSE NULL END AS huinan, 
CASE WHEN location = 'chongming' THEN avg_pm25 ELSE NULL END AS chongming
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shanghai'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

Save Panel as Shanghai PM2.5 analysis

Edit Panel Title:Shanghai PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.3查詢廣州各個監控站點 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'panyu' THEN avg_pm25 ELSE NULL END AS panyu,
CASE WHEN location = 'commercial school' THEN avg_pm25 ELSE NULL END AS commercial_school, 
CASE WHEN location = 'No 5 middle school' THEN avg_pm25 ELSE NULL END AS No_5_middle_school,
CASE WHEN location = 'guangzhou monitor station' THEN avg_pm25 ELSE NULL END AS Guangzhou_monitor_station, 
CASE WHEN location = 'nansha street' THEN avg_pm25 ELSE NULL END AS Nansha_street, 
CASE WHEN location = 'No 86 middle school' THEN avg_pm25 ELSE NULL END AS No_86_middle_school, 
CASE WHEN location = 'luhu' THEN avg_pm25 ELSE NULL END AS luhu, 
CASE WHEN location = 'nansha' THEN avg_pm25 ELSE NULL END AS nansha, 
CASE WHEN location = 'tiyu west' THEN avg_pm25 ELSE NULL END AS tiyu_west, 
CASE WHEN location = 'jiulong town' THEN avg_pm25 ELSE NULL END AS jiulong_town, 
CASE WHEN location = 'huangpu' THEN avg_pm25 ELSE NULL END AS Huangpu, 
CASE WHEN location = 'baiyun' THEN avg_pm25 ELSE NULL END AS Baiyun, 
CASE WHEN location = 'maofeng mountain' THEN avg_pm25 ELSE NULL END AS Maofeng_mountain, 
CASE WHEN location = 'chong hua' THEN avg_pm25 ELSE NULL END AS Chonghua, 
CASE WHEN location = 'huadu' THEN avg_pm25 ELSE NULL END AS huadu
from(
    SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Guangzhou'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

Save Panel as Guangzhou PM2.5 analysis

Edit Panel Title:Guangzhou PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.4 查詢深圳各個監控站點 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'huaqiao city' THEN avg_pm25 ELSE NULL END AS Huaqiao_city,
 CASE WHEN location = 'xixiang' THEN avg_pm25 ELSE NULL END AS xixiang,
CASE WHEN location = 'guanlan' THEN avg_pm25 ELSE NULL END AS guanlan,
CASE WHEN location = 'longgang' THEN avg_pm25 ELSE NULL END AS Longgang,
CASE WHEN location = 'honghu' THEN avg_pm25 ELSE NULL END AS Honghu,
CASE WHEN location = 'pingshan' THEN avg_pm25 ELSE NULL END AS Pingshan,
CASE WHEN location = 'henggang' THEN avg_pm25 ELSE NULL END AS Henggang,
CASE WHEN location = 'minzhi' THEN avg_pm25 ELSE NULL END AS Minzhi,
CASE WHEN location = 'lianhua' THEN avg_pm25 ELSE NULL END AS Lianhua,
CASE WHEN location = 'yantian' THEN avg_pm25 ELSE NULL END AS Yantian,
CASE WHEN location = 'nanou' THEN avg_pm25 ELSE NULL END AS Nanou,
CASE WHEN location = 'meisha' THEN avg_pm25 ELSE NULL END AS Meisha
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shenzhen'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

Save Panel as Shenzhen PM2.5 analysis

Edit Panel Title:Shenzhen PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.5 深圳華僑城時間序列分析(最近5分鐘內 PM2.5分析)

New Panel

select location, CREATE_TIME_SERIES(time, measure_value::bigint) as PM25 FROM iot.pm25
where measure_name='pm2.5' 
and location='huaqiao city'
and time >= ago(5m)
GROUP BY location

選擇圖形顯示 select Lines; Select Points:

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis

Edit Panel Title: 深圳華僑城最近5分鐘PM2.5分析

Save Dashboard PM2.5 analysis 1

4.2.6找出過去2小時內深圳華僑城以30秒為間隔的平均 PM2.5值 (使用線性插值填充缺失的值)

New Panel

WITH binned_timeseries AS (
    SELECT location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND location='huaqiao city'
        AND time > ago(2h)
    GROUP BY location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY location
)
SELECT time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25)

選擇圖形顯示 select Lines:

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis 1

Edit Panel Title: 過去2小時深圳華僑城平均PM2.5值 (使用線性插值填充缺失值)

Save Dashboard PM2.5 analysis 1

4.2.7 過去5分鐘內所有城市 PM2.5平均值排名 (線性插值)

New Panel

SELECT CASE WHEN city = 'Shanghai' THEN inter_avg_PM25 ELSE NULL END AS Shanghai,
CASE WHEN city = 'Beijing' THEN inter_avg_PM25 ELSE NULL END AS Beijing,
CASE WHEN city = 'Guangzhou' THEN inter_avg_PM25 ELSE NULL END AS Guangzhou,
CASE WHEN city = 'Shenzhen' THEN inter_avg_PM25 ELSE NULL END AS Shenzhen,
CASE WHEN city = 'Hangzhou' THEN inter_avg_PM25 ELSE NULL END AS Hangzhou,
CASE WHEN city = 'Nanjing' THEN inter_avg_PM25 ELSE NULL END AS Nanjing,
CASE WHEN city = 'Chengdu' THEN inter_avg_PM25 ELSE NULL END AS Chengdu,
CASE WHEN city = 'Chongqing' THEN inter_avg_PM25 ELSE NULL END AS Chongqing,
CASE WHEN city = 'Tianjin' THEN inter_avg_PM25 ELSE NULL END AS Tianjin,
CASE WHEN city = 'Shenyang' THEN inter_avg_PM25 ELSE NULL END AS Shenyang,
CASE WHEN city = 'Sanya' THEN inter_avg_PM25 ELSE NULL END AS Sanya,
CASE WHEN city = 'Lasa' THEN inter_avg_PM25 ELSE NULL END AS Lasa
from(
WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), all_location_interpolated as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,avg(interpolated_avg_PM25) AS inter_avg_PM25
from all_location_interpolated
group by city
order by avg(interpolated_avg_PM25) desc)

選擇 Panel 圖形型別:

Save Panel as all city analysis 1

Edit Panel Title: 過去5分鐘所有城市PM2.5平均值

Save Dashboard PM2.5 analysis 1

4.2.8 過去5分鐘內 PM2.5最高的十個採集點(線性插值)

New Panel

WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc desc
limit 10

選擇 Table

Save Panel as all city analysis 2

Edit Panel Title:過去5分鐘內 PM2.5最高的十個採集點(線性插值)

Save Dashboard PM2.5 analysis 1

4.2.9 過去5分鐘內 PM2.5最低的十個採集點(線性插值)

New Panel

WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc asc
limit 10

選擇 Table

Save Panel as all city analysis 3

Edit Panel Title:過去5分鐘內 PM2.5最低的十個採集點(線性插值)

Save Dashboard PM2.5 analysis 1

設定儀表板 每5秒鐘重新整理一次:

本 blog 著重介紹通過 Timestream、Kinesis Stream 託管服務和 Grafana 實現物聯網(以 PM 2.5場景為示例)時序資料實時採集、儲存和分析,其中包含部署架構、環境部署、資料採集、資料儲存和分析,希望當您有類似物聯網時序資料儲存和分析需求的時候,有所啟發,實現海量物聯網時序資料高效管理、挖掘物聯網資料中蘊含的規律、模式和價值,助力業務發展。

附錄

《Amazon Timestream 開發人員指南》

《AWS Timestream 開發程式示例》

《AWS Timestream 與 Grafana 整合示例》

本篇作者

劉冰冰

AWS 資料庫解決方案架構師,負責基於 AWS 的資料庫解決方案的諮詢與架構設計,同時致力於大資料方面的研究和推廣。在加入 AWS 之前曾在 Oracle 工作多年,在資料庫雲規劃、設計運維調優、DR 解決方案、大資料和數倉以及企業應用等方面有豐富的經驗。