教程|運營資料庫 Phoenix SQL:在CDP公有云上使用HBase、Nifi和Kafka

語言: CN / TW / HK

介紹

本教程將向您展示如何使用Apache NiFi將CSV檔案形式的資料放置在雲端儲存的解決方案上,然後對其進行格式化以將其傳送到訊息傳遞佇列(Kafka),並從該佇列中進行消費以將其提取到運營資料庫(HBase),然後使用Phoenix通過SQL語法檢索資料,所有這些都在Cloudera資料平臺-公有云(CDP-PC)中。 

先決條件

  • 在Cloudera Data Platform(CDP)公有云上擁有已建立的環境的管理員訪問許可權。

  • 建立了CDP工作負載使用者

  • 基本的AWS CLI技能

大綱

  • 看視訊

  • 下載資產

  • 設定資料中心叢集

  • 資料中心配置

    • 流程管理

    • 運營資料庫

    • 流式訊息

  • 構建和配置NiFi資料流

  • 執行資料流

  • 檢視HBase資料

  • 總結

  • 進一步閱讀

看視訊

以下視訊簡要概述了本教程的內容:

 視訊如果無法觀看,請移步到https://www.cloudera.com/tutorials/cdp-cod-phoenix-sql-using-hbase-nifi-kafka.html 進行觀看。

下載資產

您有兩(2)個選項來獲取本教程所需的資產:

  1. 下載一個ZIP檔案

它僅包含本教程中使用的必要檔案。解壓縮tutorial-files.zip並記住其位置。

2. 克隆我們的GitHub儲存庫

它提供了本教程和其他教程中使用的資產,該資產按教程標題進行組織。

 使用AWS CLI,複製corvette-data-june2020.csv到S3桶,s3a://<storage.location>/tutorial-opdb/,其中<storage.location> 是為您的環境中storage.location.base的屬性值。

注意:您可能需要請環境的管理員獲取storage.location.base的屬性值 。

在此示例中,屬性storage.location.base的值為s3a://usermarketing-cdp-demo;因此,請使用以下命令複製檔案:

aws s3 cp corvette-data-june2020.csv s3://usermarketing-cdp-demo/tutorial-opdb/corvette-data-june2020.csv

配置資料中心叢集

本教程要求我們提供三(3)個數據中心叢集,它們分別為:

1. um-nifi-demo,使用叢集定義7.2.0 - Flow Management Light Duty for AWS

2. um-streaming-demo,使用叢集定義7.2.0 - Streams Messaging Light Duty for AWS 

3. um-opdb-demo,使用叢集定義7.2.0 - Operational Database with SQL for AWS

注意:您的CDP環境可能具有不同的叢集定義。

有關如何設定資料中心的詳細資訊,請參閱如何在Cloudera資料平臺上建立資料中心。

資料中心配置

在本節中,我們將配置資料中心叢集以相互通訊。 

流程管理 

下載HBase配置檔案hbase-clientconfig.zip。解壓縮檔案並記住其位置。

從運營資料庫資料中心開始:

um-opdb-demo> CM-UI>叢集> HBase>操作>下載客戶端配置

將hbase-site.xml檔案複製到每個NiFi工作程式節點中。

注意:客戶端配置檔案中還有其他檔案已下載-我們只需要複製hbase-site.xml。

a)確定NiFi工作程式節點的公共IP地址

從Flow Management Data Hub開始:um-nifi-demo>硬體> NiFi>公共IP

b) 在命令列上,在每個NiFi工作程式節點上發出以下命令。這將複製HBase配置檔案hbase-site.xml並相應地設定許可權:

注意:您需要更新<USERNAME><PUBLIC_IP>,其中<PUBLIC_IP> 是每個NiFi工作程式節點的公共IP地址。

 scp hbase-conf/hbase-site.xml <USERNAME>@<PUBLIC_IP>:/tmp/hbase-site.xmlssh <USERNAME>@<PUBLIC_IP>chmod 777 /tmp/hbase-site.xml && exit

運營資料庫 

我們需要建立一個HBase表和列族-這是我們最終將儲存資料的位置。

確定HBase工作節點的公共IP地址

從運營資料庫資料中心開始:

um-opdb-demo>硬體>工作器>公共IP

建表和列簇

在命令提示符下,SSH進入任何HBase worker節點,然後發出以下命令來建立表和列簇:

注意:您需要更新<USERNAME><PUBLIC_IP>,其中<PUBLIC_IP>是任何(一個)  HBase工作節點的公共IP地址。

sh username@<PUBLIC_IP>hbase shellcreate 'corvette_demo','corvette_data'exitexit

流式訊息 

新增新的Schema

從Streams Messaging Data Hub開始:um-streaming-demo>Schema Registry

  • 單擊   以新增新的Schema

  • 名稱:

corvette_schema

  • 描述:

corvette sensor data

  • 型別:

Avro schema provider

  • Schema組:

Kafka

  • 相容性:

Backward

  • 選擇複選框

EVOLVE

  • 將檔案拖放corvette-schema.json到SCHEMA TEXT中

  • 儲存

 

新增新的主題

從Streams Messaging Data Hub開始:um-streaming-demo>流式訊息管理器

  • 選擇

Topics

  • 點選新增新

  • 主題名稱:

corvette_stream

  • 分割槽:

5

  • 可用性:

Maximum

  • 清理策略:

delete

  • 儲存

 

構建和配置NiFi資料流 

我們將使用下載資源中提供的模板,而不是從頭開始構建資料流 。如果您想學習從頭開始構建資料流,請檢視 將RDBMS資料匯入Hive

從流程管理資料中心開始:um-nifi-demo> NiFi

 

資料流模板匯入Nifi

從下載的檔案中,讓我們將資料流模板corvette-dataflow-template.xml匯入NiFi:

  • 點選  上傳模板

  • 單擊並拖動  到畫布中

建立資料流所需的變數: 

右鍵單擊畫布上的任意位置,然後選擇“變數”。

 

  • 名稱:  cdpuser,值:<使用您的CDP使用者ID>

  • 名稱:  kafkabrokers,值:<Kafka經紀人地址列表,用逗號分隔>

 

Kafka經紀人地址取決於:

從Streams Messaging Data Hub開始:um-streaming-demo>流式訊息管理器>代理

 

根據上圖,該值為:

um-streaming-demo-broker2.usermark.a465-9q4k.cloudera.site:9093um-streaming-demo-broker1.usermark.a465-9q4k.cloudera.site:9093
  • 名稱:schemaurl,值:使用格式 FQDN的架構登錄檔URL由以下方式確定:https://<FQDN>:7790/api/v1 

從Streams Messaging Data Hub開始:um-streaming-demo>硬體>主伺服器> FQDN

 根據上圖,該值為:

https://um-streaming-demo-master0.usermark.a465-9q4k.cloudera.site:7790/api/v1

更新處理器和控制器服務:

處理器:

  • Fetch_from_S3  >屬性

    • Kerberos密碼:<your-cdp-password>

  • Publish_Kafka_Topic  >屬性

    • SSL上下文服務:選擇預設NiFi SSL上下文服務

    • 密碼:<您的cdp密碼>

  • ConsumeKafkaRecord_2_0  >屬性 

    • SSL上下文服務:選擇預設NiFi SSL上下文服務

    • 密碼:<您的cdp密碼> 

控制器服務:

  • HortonworksSchemaRegistry  >屬性

    • Kerberos密碼:<your-cdp-password>

    • SSL上下文服務:選擇預設NiFi SSL上下文服務

  • HBase_2_ClientService  >屬性

    • Kerberos密碼:<your-cdp-password>

 啟用所有控制器服務

  • JsonTreeReader

  • JsonRecordSetWriter

  • HortonworksSchemaRegistry

  • HBase_2_ClientService

  • CorvetteAvroReader

  • CSVReader

  • AvroRecordSetWriter

    注意:如果您看到兩(2)個名為“預設NiFi SSL上下文服務”的服務,請刪除標記為“無效”的重複服務。

執行資料流

讓我們執行剛剛建立的資料流。您可以一次執行整個處理器組或單個處理器。對於常規除錯和診斷,建議一次執行一個處理器。

通過單擊 “操作”選單,一次執行所有處理器 。

幾秒鐘後,您將看到資料流過所有處理器。 從“操作”選單中單擊 以立即停止所有處理器。

讓我們來看看我們建立的有關Kafka主題corvette_stream的一些資料吞吐量指標:

從Streams Messaging Data Hub開始:

  1. um-streaming-demo>流式訊息管理器>主題

  2. 搜尋corvette_stream

  3. 單擊 以檢視配置檔案

  1. 針對我們的Kafka主題的流式資料吞吐量指標:

檢視HBase資料 

我們將使用Apache Phoenix來針對我們的HBase資料建立一個檢視,並使用類似於SQL的語句來檢視我們的資料。

在命令提示符下,像在“運營資料庫配置”中一樣,SSH進入任何(一個) HBase worker節點:

ssh username@<PUBLIC_IP>phoenix-sqlline
  • 建立檢視

create view "corvette_demo" (scanID                                          VARCHAR PRIMARY KEY, "corvette_data"."Offset"                        VARCHAR,"corvette_data"."IntakeManifoldAbsolutePressure" VARCHAR,"corvette_data"."ShortTermFuelTrimBank2"         VARCHAR,"corvette_data"."ShortTermFuelTrimBank1"         VARCHAR,"corvette_data"."FuelSystemStatus"               VARCHAR,"corvette_data"."MassAirflow"                    VARCHAR,"corvette_data"."O2VoltageB1S1"                  VARCHAR,"corvette_data"."O2VoltageB2S1"                  VARCHAR,"corvette_data"."TimingAdvance"                  VARCHAR,"corvette_data"."EquivalenceRatioCommanded"      VARCHAR,"corvette_data"."EngineRPM"                      VARCHAR,"corvette_data"."CommandedThrottleActuator"      VARCHAR,"corvette_data"."ThrottlePosition"               VARCHAR,"corvette_data"."AbsoluteLoad"                   VARCHAR,"corvette_data"."MassAirflowSensor"              VARCHAR,"corvette_data"."VehicleSpeed"                   VARCHAR,"corvette_data"."IntakeAirTemp"                  VARCHAR,"corvette_data"."EngineCoolantTemp"              VARCHAR,"corvette_data"."Wideband02Sensor"               VARCHAR);
  • 執行查詢以查詢異常資料,FuelSystemStatus不等於“ CL-Normal”

SELECT "IntakeManifoldAbsolutePressure", "MassAirflow", "EngineRPM", "MassAirflowSensor", "FuelSystemStatus"FROM "corvette_demo"WHERE "FuelSystemStatus" NOT IN ('CL - Normal');
  • 退出phoenix-sqlline:

!quit

總結 

恭喜您完成了本教程。

正如您現在所體驗到的,使用Cloudera資料平臺-公有云(CDP-PC),可以配置多個Data Hub叢集以相互通訊,這需要花費很少的精力。

NiFi的靈活處理器使提取,轉換和載入資料到HBase變得很容易-希望本教程激發您的想象力,並激發其他創造性的解決方案。 

進一步閱讀 

影片

  • 集合-資料中心的視訊庫

  • 集合-資料流/視訊流庫

網誌

  • 在CDP上使用NiFi,Kafka和HBase構建可擴充套件流程

  • CDP中的運營資料庫效能概述

Meetup

  • 構建可擴充套件的IoT端到端資料收集系統:真實使用案例

其他

  • CDP使用者頁面-其他CDP資源,包括視訊,教程,部落格和事件

  • 有一個問題?加入Cloudera社群

  • Cloudera Data Hub文件

  • 運營資料庫產品資訊

原文連結:https://www.cloudera.com/tutorials/cdp-cod-phoenix-sql-using-hbase-nifi-kafka.html



本文分享自微信公眾號 - 大資料雜貨鋪(bigdataGrocery)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。