Elasticsearch:Hadoop 大資料整合 (Hadoop => Elasticsearch)

語言: CN / TW / HK

在本文章中,我們將學習如何使用 Elasticsearch Hadoop 處理大量資料。 對於我們的練習,我們將使用一個簡單的 Apache access 日誌來表示我們的 “大資料”。 我們將學習如何編寫 MapReduce 作業以使用 Hadoop 攝取檔案並將其索引到 Elasticsearch 中。在我們今天的練習中,我們將使用如下的架構來搭建我們的系統:

如上所示,我們在左邊的 macOS 中安裝 Elasticsearch 及 Kibana,而在 Ubuntu OS 中安裝 Hadoop。我們將以最新的 Elastic Stack 8.4.2 來進行展示。

Hadoop 是什麼?

當我們需要收集、處理/轉換和/或儲存數千 GB、數千 TB 甚至更多的資料時,Hadoop 可能是完成這項工作的合適工具。它是從頭開始構建的,考慮到這樣的想法:

  • 一次使用多臺計算機(形成一個叢集),以便它可以並行處理資料,從而更快地完成工作。我們可以這樣想。如果一臺伺服器需要處理 100 TB 的資料,它可能會在 500 小時內完成。但是如果我們有 100 臺伺服器,每臺只能取一部分資料,例如 server1 可以取第一個 TB,server2 可以取第二個 TB,以此類推。現在他們每個人都只有 1 TB 的資料要處理,而且他們都可以同時處理自己的資料部分。這樣,工作可以在 5 小時內完成,而不是 500 小時。當然,這是理論上的和想象的,因為在實踐中我們不會減少 100 倍所需的時間,但我們可以非常接近如果條件理想。
  • 在需要時可以很容易地調整計算能力。有更多的資料要處理,而問題要複雜得多?將更多計算機新增到叢集。從某種意義上說,這就像在超級計算機上增加了更多的 CPU 核心。
  • 資料不斷增長,因此 Hadoop 也必須能夠輕鬆靈活地擴充套件其儲存容量,以滿足需求。我們新增到叢集的每臺計算機都會擴充套件 Hadoop 分散式檔案系統 (HDFS) 的可用總儲存空間。
  • 與其他軟體不同,它不僅會在硬體故障發生時嘗試從硬體故障中恢復。設計理念實際上假設某些硬體肯定會失敗。當有數千臺計算機並行工作時,可以保證某處某處會不時出現故障。因此,預設情況下,Hadoop 建立資料塊的副本並將它們分佈在單獨的硬體上,因此當偶爾的伺服器起火或硬碟或 SSD 宕機時,不會丟失任何內容。

總而言之,Hadoop 非常擅長攝取和處理大量資訊。它將資料分佈在叢集中可用的多個節點上,並使用 MapReduce 程式設計模型在多臺機器上同時處理資料(並行處理)。

但這聽起來可能有點類似於 Elasticsearch 資料攝取工具所做的事情。儘管它們是為處理相當不同的場景而設計的,但它們有時可能會有些重疊。那麼我們為什麼以及何時使用其中一個而不是另一個呢?

Hadoop vs Logstash/Elasticsearch

首先,我們不應該考慮哪個比哪個更好。 每個人都擅長為其創造的工作。 每個都有優點和缺點。

為了嘗試給你繪製一個圖片並讓你瞭解我們何時使用其中一個,讓我們考慮以下場景:

  • 當我們需要從數十億個網站中提取資料時,就像谷歌這樣的搜尋引擎所做的那樣,我們會發現像 Elasticsearch 及 Hadoop 這樣的工具非常有用和高效。
  • 當我們需要以這樣一種方式儲存資料並對其進行索引以便以後可以快速有效地搜尋時,我們會發現像 Elasticsearch 這樣的東西非常有用。
  • 最後,當我們想要收集實時資料時,例如來自網際網路上許多交易所的美元/歐元價格,我們會發現像 Logstash 這樣的工具非常適合這項工作。

MapReduce 是如何工作的?

如前所述,雖然像 Logstash 甚至 Spark 這樣的工具更易於使用,但它們也將我們限制在它們使用的方法中。也就是說,我們只能微調他們允許我們調整的設定,我們不能改變他們的程式設計邏輯在幕後的工作方式。這通常不是問題,只要我們能做我們想做的事。

然而,使用 Hadoop,我們可以在低得多的級別上更好地控制事物的工作方式,從而允許更多的定製,更重要的是優化。當我們處理 PB 級資料時,優化非常重要。它可以幫助我們將工作所需的時間從幾個月縮短到幾周,並顯著降低運營成本和所需資源。

讓我們先來看看 MapReduce,它增加了我們工作的複雜性,但也允許前面提到的更高級別的控制。

MapReduce 過程通常包括三個主要階段:MapShuffleReduce

最初,資料被分成更小的塊,可以分佈在不同的計算節點上。接下來,每個節點都可以對其接收到的資料塊執行 map 任務。這種並行處理大大加快了程序。叢集擁有的節點越多,工作完成的速度就越快。

以鍵/值(key/value)對的形式對映的資料現在位於不同的伺服器上。所有具有相同鍵的值都需要組合在一起。這是 shuffle 階段。接下來,shuffle 後的資料經過reduce 階段。

在上面的圖片舉例說明了這些階段在三行單詞的集合上的作用。

在這裡,我們假設我們有一個簡單的文字檔案,我們需要計算每個單詞出現的次數。

第一步是讀取資料並將其拆分為可以有效傳送到所有處理節點的塊。在我們的例子中,我們假設檔案被分成三行。

Map 階段

接下來是 Map 階段。 每行文字被用作 map(key, value, context) 方法的輸入。 這是我們必須編寫所需的自定義邏輯的地方。 對於這個字數統計示例,“value” 引數將儲存行輸入(檔案中的文字行)。 然後,我們將使用空格字元作為單詞分隔符拆分行,然後遍歷每個拆分(單詞)並使用 context.write(key, value) 發出 map 輸出。 在這裡,我們的 key 是單詞,例如 “Banana”,值是 1,表示它是單詞的單次出現。 從上圖中我們可以看到,第一行我們得到 , , 作為鍵/值(key/value)對。

Shuffle 階段

Shuffle 階段負責從 map 器中獲取 對,並根據分割槽器(partitioner)決定每個去往哪個 reducer。

從顯示每個階段執行的影象中,我們可以看到我們最終在 reduce 階段有五個分割槽。 Shuffle 是由框架在內部完成的,所以我們在這裡不會有任何自定義程式碼。

Reduce 階段

Shuffle 階段的輸出被饋送到 reduce 階段:作為其輸入,每個 reducer 接收在 shuffle 階段形成的組之一。 這由一個鍵和與該鍵相關的值列表組成。 在這裡,我們再次必須編寫我們希望在此階段執行的自定義邏輯。 在此示例中,對於每個鍵,我們必須計算其值列表中元素的總和。 這樣,我們就得到了每個鍵的總計數,它最終代表了我們文字檔案中每個唯一單詞的計數。

Reduce 階段的輸出也遵循 格式。 如前所述,在此示例中,鍵將表示單詞,值表示單詞重複的次數。

安裝

Elasticsearch 及 Kibana

如果你還沒有安裝好自己的 Elasticsearch 及 Kibana,請參考我之前的文章來進行安裝:

請注意:你需要選擇 Elastic Stack 8.x 的安裝指南。

Hadoop

我們需要在 Ubuntu OS 上安裝 Hadoop。我們可以按照如下的步驟來進行安裝。

安裝 Java

我們可以安裝最新版本的 Java:

sudo apt install default-jdk default-jre -y

我們可以通過如下的命令來檢視 Java 的版本:

java -version

```

  1. $ java -version
  2. openjdk version "11.0.16" 2022-07-19
  3. OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu122.04)
  4. OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu122.04, mixed mode)

```

建立 Hadoop 使用者及配置無需使用密碼的 SSH 連線

新增一個新的使用者 hadoop

sudo adduser hadoop

`

  1. $ sudo adduser hadoop
  2. Adding user `hadoop' ...
  3. Adding new group `hadoop' (1001) ...
  4. Adding new user hadoop' (1001) with grouphadoop' ...
  5. Creating home directory `/home/hadoop' ...
  6. Copying files from `/etc/skel' ...
  7. New password:
  8. BAD PASSWORD: The password is shorter than 8 characters
  9. Retype new password:
  10. Sorry, passwords do not match.
  11. New password:
  12. Retype new password:
  13. passwd: password updated successfully
  14. Changing the user information for hadoop
  15. Enter the new value, or press ENTER for the default
  16. Full Name []:
  17. Room Number []:
  18. Work Phone []:
  19. Home Phone []:
  20. Other []:
  21. Is the information correct? [Y/n] y

`` ```

新增 hadoop 使用者到 sudo 組:

sudo usermod -aG sudo hadoop

切換到 hadoop 使用者:

sudo su - hadoop

```

  1. $ sudo su - hadoop
  2. To run a command as administrator (user "root"), use "sudo ".
  3. See "man sudo_root" for details.

```

安裝 OpenSSH 伺服器及客戶端:

sudo apt install openssh-server openssh-client -y

如果你得到一個提示時,請回復:

keep the local version currently installed

切換到 hadoop 使用者:

sudo su - hadoop

生成公鑰和私鑰對:

ssh-keygen -t rsa

``

  1. $ ssh-keygen -t rsa
  2. Generating public/private rsa key pair.
  3. Enter file in which to save the key (/home/hadoop/.ssh/id_rsa):
  4. Created directory '/home/hadoop/.ssh'.
  5. Enter passphrase (empty for no passphrase):
  6. Enter same passphrase again:
  7. Your identification has been saved in /home/hadoop/.ssh/id_rsa
  8. Your public key has been saved in /home/hadoop/.ssh/id_rsa.pub
  9. The key fingerprint is:
  10. SHA256:tD9JRpv+c6KMlsevCMGJSLSdGPsDrqwl1yhFu51NIy8 [email protected]
  11. The key's randomart image is:
  12. +---[RSA 3072]----+
  13. | o |
  14. | . * . |
  15. | B o . . |
  16. | + = o o o o |
  17. | = = * S = |
  18. |.o = B o = . |
  19. |+.= E + o= |
  20. |.= . .o+ooo . |
  21. |. .o.++o+ |
  22. +----[SHA256]-----+

` ```

在上面,我們可以把 passphrase 設定為空。將生成的公鑰從 id_rsa.pub 新增到 authorized_keys:

sudo cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

1. $ sudo cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys 2. [sudo] password for hadoop:

更改 authorized_keys 檔案的許可權:

sudo chmod 640 ~/.ssh/authorized_keys

驗證無密碼 SSH 是否正常工作:

ssh localhost

``1. [email protected]:~$ ssh localhost 2. The authenticity of host 'localhost (127.0.0.1)' can't be established. 3. ED25519 key fingerprint is SHA256:lN78YGD118UAp/ZmzrtWnrqicHaFkJbs5pIZfTH06b0. 4. This key is not known by any other names 5. Are you sure you want to continue connecting (yes/no/[fingerprint])? y 6. Please type 'yes', 'no' or the fingerprint: yes 7. Warning: Permanently added 'localhost' (ED25519) to the list of known hosts. 8. Enter passphrase for key '/home/hadoop/.ssh/id_rsa': 9. Welcome to Ubuntu 22.04.1 LTS (GNU/Linux 5.15.0-48-generic aarch64)

    • Documentation: https://help.ubuntu.com
    • Management: https://landscape.canonical.com
    • Support: https://ubuntu.com/advantage
  1. This system has been minimized by removing packages and content that are

  2. not required on a system that users do not log into.

  3. To restore this content, you can run the 'unminimize' command.

  4. 0 updates can be applied immediately.

  5. The programs included with the Ubuntu system are free software;

  6. the exact distribution terms for each program are described in the
  7. individual files in /usr/share/doc/*/copyright.

  8. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by

  9. applicable law.

  10. [email protected]:~$` ```

安裝 Hadoop

首先我們使用 hadoop 使用者登入:

sudo su - hadoop

下載最新的穩定版 Hadoop。 要獲取最新版本,請訪問 Apache Hadoop 官方下載頁面

wget https://downloads.apache.org/hadoop/common/stable/hadoop-3.3.4.tar.gz

解壓縮下載的檔案:

tar -xvzf hadoop-3.3.4.tar.gz

將解壓後的目錄移動到 /usr/local/ 目錄

sudo mv hadoop-3.3.4 /usr/local/hadoop

1. [email protected]:~$ pwd 2. /home/hadoop 3. [email protected]:~$ ls 4. hadoop-3.3.4 hadoop-3.3.4.tar.gz 5. [email protected]:~$ sudo mv hadoop-3.3.4 /usr/local/hadoop 6. [sudo] password for hadoop:

建立目錄來儲存系統日誌:

sudo mkdir /usr/local/hadoop/logs

更改 hadoop 目錄的所有權:

sudo chown -R hadoop:hadoop /usr/local/hadoop

配置 Hadoop

我們還是在 hadoop 使用者下,編輯檔案 ~/.bashrc 以配置 Hadoop 環境變數:

sudo nano ~/.bashrc

啟用環境變數:

source ~/.bashrc

配置 Java 環境變數

Hadoop 有許多元件使其能夠執行其核心功能。 要配置YARN、HDFS、MapReduce 等這些元件,以及 Hadoop 相關的專案設定,需要在 hadoop-env.sh 配置檔案中定義 Java 環境變數。

找到 Java 路徑:

$ which javac

```

  1. $ which javac
  2. /usr/bin/javac

```

找到 OpenJDK 目錄:

$ readlink -f /usr/bin/javac

```

  1. $ readlink -f /usr/bin/javac
  2. /usr/lib/jvm/java-11-openjdk-arm64/bin/javac

```

我們也可以直接使用一個命令來完成上面的兩個操作:

readlink -f $(which javac)

編輯 hadoop-env.sh 檔案:

$ sudo nano $HADOOP_HOME/etc/hadoop/hadoop-env.sh

將以下行新增到檔案中。 然後,關閉並儲存檔案:

```

  1. export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
  2. export HADOOP_CLASSPATH+=" $HADOOP_HOME/lib/*.jar"

```

瀏覽到 hadoop lib 目錄:

$ cd /usr/local/hadoop/lib

下載 Javax 啟用檔案:

$ sudo wget https://jcenter.bintray.com/javax/activation/javax.activation-api/1.2.0/javax.activation-api-1.2.0.jar

驗證 Hadoop 版本:

$ hadoop version

```

  1. $ hadoop version
  2. Hadoop 3.3.4
  3. Source code repository https://github.com/apache/hadoop.git -r a585a73c3e02ac62350c136643a5e7f6095a3dbb
  4. Compiled by stevel on 2022-07-29T12:32Z
  5. Compiled with protoc 3.7.1
  6. From source with checksum fb9dd8918a7b8a5b430d61af858f6ec
  7. This command was run using /usr/local/hadoop/share/hadoop/common/hadoop-common-3.3.4.jar

```

至此,我們完成了 hadoop 的安裝。我們不再需要針對 NameNode 等進行配置了。

建立 MapReduce 專案

在本練習中,我們將索引以 Apache 組合日誌格式生成的示例訪問日誌檔案。我們將使用 maven 構建工具將我們的 MapReduce 程式碼編譯成 JAR 檔案。

在實際場景中,你必須執行一些額外的步驟:

  • 安裝包含程式碼編輯器(如 Eclipse)的整合開發環境 (IDE),以建立專案並編寫必要的程式碼。
  • 在本地桌面上使用 maven 編譯專案。
  • 將已編譯的專案 (JAR) 從本地桌面傳輸到你的 Hadoop 例項。

我們將解釋如何建立這樣一個專案背後的理論,但我們還將提供一個 GitHub 儲存庫,其中包含一個現成的簡單 Java 專案。這樣,你現在不必浪費時間編寫程式碼,而可以立即開始試驗並檢視 MapReduce 的執行情況。此外,如果你不熟悉 Java 程式設計,你可以檢視示例程式碼以更好地瞭解所有部分的位置以及它們的適用性。

所以,首先,讓我們看一下理論,看看我們將如何構建 MapReduce 程式碼,以及它背後的邏輯是什麼。

為了說明問題的方便,你可以下載程式碼 https://github.com/liu-xiao-guo/elasticsearch-hadoop

設定 pom.xml 依賴項

要開始,我們首先必須使用我們喜歡的程式碼編輯器建立一個空的 Maven 專案。 Eclipse 和 IntelliJ 都有內建模板來執行此操作。 我們可以在建立 maven 專案時跳過原型選擇; 我們只需要一個空的 Maven 專案。

建立專案後,我們將編輯 pom.xml 檔案並使用以下屬性和依賴項。 將來使用新的穩定版本的 Hadoop 和 Elasticsearch 時,可能需要更改下面指定的某些版本號。

pom.xml

``

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. 4.0.0

  6. com.liuxg

  7. elasticsearchhadoop
  8. 1.0-SNAPSHOT

  9. 1.8
  10. 1.8
  11. org.apache.hadoop
  12. hadoop-client
  13. 3.3.4
  14. org.elasticsearch
  15. elasticsearch-hadoop
  16. 8.4.2
  17. commons-httpclient
  18. commons-httpclient
  19. 3.1
  20. org.apache.maven.plugins
  21. maven-shade-plugin
  22. 3.2.4
  23. package
  24. shade
  25. com.liuxg.AccessLogIndexIngestion
  26. 123

` ```

在上面,我們使用了最新的 Elastic Stack 8.4.2 版本。我們可以獲得 Maven 的相關資訊在地址: https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/8.4.2

如上所示,我們也選擇了相應的 hadoop 版本資訊。這個在我們上面的 hadoop 的安裝中可以看到版本資訊。

```

  1. $ hadoop version
  2. Hadoop 3.3.4
  3. Source code repository https://github.com/apache/hadoop.git -r a585a73c3e02ac62350c136643a5e7f6095a3dbb
  4. Compiled by stevel on 2022-07-29T12:32Z
  5. Compiled with protoc 3.7.1
  6. From source with checksum fb9dd8918a7b8a5b430d61af858f6ec
  7. This command was run using /usr/local/hadoop/share/hadoop/common/hadoop-common-3.3.4.jar

```

編寫 MapReduce 作業需要 hadoop-client 庫。 為了寫入 Elasticsearch 索引,我們使用了官方的 elasticsearch-hadoop 庫。 commons-httpclient 也是需要的,因為 elasticsearch-hadoop-使用它能夠通過 HTTP 協議對 Elasticsearch 伺服器進行 REST 呼叫。

定義 Mapper 類的邏輯

我們將定義 AccessLogMapper 並將其用作我們的 mapper 類。 在其中,我們將覆蓋預設的 map() 方法並定義我們想要使用的程式設計邏輯。

``

  1. import org.apache.hadoop.mapreduce.Mapper;
  2. import java.io.IOException;

  3. public class AccessLogIndexIngestion {

  4. public static class AccessLogMapper extends Mapper {

  5. @Override
  6. protected void map(Object key, Object value, Context context) throws IOException, InterruptedException {
  7. }
  8. }

  9. public static void main(String[] args) {

  10. }

  11. }

` ```

如前所述,在這個例子中我們不需要 reducer 類。

為 Hadoop 定義 Elasticsearch 索引

這是日誌檔案的示例:

```

  1. 77.0.42.68 - - [17/May/2015:23:05:48 +0000] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0"
  2. 77.0.42.68 - - [17/May/2015:23:05:32 +0000] "GET /images/jordan-80.png HTTP/1.1" 200 6146 "http://www.semicomplete.com/projects/xdotool/" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0"
  3. 77.0.42.68 - - [18/May/2015:00:05:08 +0000] "GET /images/web/2009/banner.png HTTP/1.1" 200 52315 "http://www.semicomplete.com/style2.css" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0"
  4. 207.241.237.101 - - [18/May/2015:00:05:42 +0000] "GET /blog/geekery/find-that-lost-screen-session.html HTTP/1.0" 200 11214 "http://www.semicomplete.com/blog/tags/tools" "Mozilla/5.0 (compatible; archive.org_bot +http://www.archive.org/details/archive.org_bot)"
  5. 120.202.255.147 - - [18/May/2015:00:05:57 +0000] "GET /files/logstash/logstash-1.1.0-monolithic.jar HTTP/1.1" 304 - "-" "Mozilla/5.0 Gecko/20100115 Firefox/3.6"
  6. 207.241.237.104 - - [18/May/2015:00:05:43 +0000] "GET /geekery/find-that-lost-screen-session-2.html HTTP/1.0" 404 328 "http://www.semicomplete.com/blog/tags/tools" "Mozilla/5.0 (compatible; archive.org_bot +http://www.archive.org/details/archive.org_bot)"

```

為了簡化問題,我在 github 的倉庫中已經包含了一個叫做 access.log 的檔案。它含有上面的 6 個文件。到目前為止,我們只處理了理論,但在這裡,重要的是我們執行下一個命令。

``

  1. curl -k -u elastic:H8_aSaFv0G*muE-Hrmp0 -XPUT "https://10.211.55.2:9200/mylogs" -H "kbn-xsrf: reporting" -H "Content-Type: application/json" -d'
  2. {
  3. "mappings": {
  4. "properties": {
  5. "ip": {
  6. "type": "keyword"
  7. },
  8. "dateTime": {
  9. "type": "date",
  10. "format": ["dd/MMM/yyyy:HH:mm:ss"]
  11. },
  12. "httpStatus": {
  13. "type": "keyword"
  14. },
  15. "url": {
  16. "type": "keyword"
  17. },
  18. "responseCode": {
  19. "type": "keyword"
  20. },
  21. "size": {
  22. "type": "integer"
  23. }
  24. }
  25. }
  26. }'

` ```

我們在執行 Elasticsearch 的機器中的 terminal 中執行上面的命令。請注意在上面,由於我們使用了 HTTPS 的安裝,我們必須使用 -k 選項,或者我們新增證書資訊。我們需要根據自己的配置替換上面的 elastic 超級使用者的密碼。我們也可以在 Kibana 中直接打入如下的命令:

``

  1. PUT mylogs
  2. {
  3. "mappings": {
  4. "properties": {
  5. "ip": {
  6. "type": "keyword"
  7. },
  8. "dateTime": {
  9. "type": "date",
  10. "format": ["dd/MMM/yyyy:HH:mm:ss"]
  11. },
  12. "httpStatus": {
  13. "type": "keyword"
  14. },
  15. "url": {
  16. "type": "keyword"
  17. },
  18. "responseCode": {
  19. "type": "keyword"
  20. },
  21. "size": {
  22. "type": "integer"
  23. }
  24. }
  25. }
  26. }

` ```

上面的命令將生成一個叫做 mylogs 的索引。

將 dateTime 欄位定義為日期至關重要,因為它使我們能夠使用 Kibana 視覺化各種指標。 當然,我們還需要指定訪問日誌中使用的日期/時間格式“dd/MMM/yyyy:HH:mm:ss”,以便正確解析傳遞給 Elasticsearch 的值。

定義 map() 邏輯

由於我們的輸入資料是一個文字檔案,我們使用 TextInputFormat.class。 日誌檔案的每一行都將作為輸入傳遞給 map() 方法。最後,我們可以定義程式的核心邏輯:在 EsOutputFormat.class 的幫助下,我們希望如何處理每一行文字並準備好將其傳送到 Elasticsearch 索引。

map() 方法的 value 輸入引數儲存當前從日誌檔案中提取並準備處理的文字行。 我們可以忽略這個簡單示例的關鍵引數。

``

  1. import org.elasticsearch.hadoop.util.WritableUtils;
  2. import org.apache.hadoop.io.NullWritable;
  3. import java.io.IOException;
  4. import java.util.LinkedHashMap;
  5. import java.util.Map;

  6. @Override

  7. protected void map(Object key, Object value, Context context) throws IOException, InterruptedException {

  8. String logEntry = value.toString();

  9. // Split on space
  10. String[] parts = logEntry.split(" ");
  11. Map entry = new LinkedHashMap<>();

  12. // Combined LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-agent}i\"" combined

  13. entry.put("ip", parts[0]);
  14. // Cleanup dateTime String
  15. entry.put("dateTime", parts[3].replace("[", ""));
  16. // Cleanup extra quote from HTTP Status
  17. entry.put("httpStatus", parts[5].replace("\"", ""));
  18. entry.put("url", parts[6]);
  19. entry.put("responseCode", parts[8]);
  20. // Set size to 0 if not present
  21. entry.put("size", parts[9].replace("-", "0"));
  22. context.write(NullWritable.get(), WritableUtils.toWritable(entry));
  23. }

` ```

我們使用空格字元作為分隔符將行拆分為單獨的部分。 由於我們知道日誌檔案中的第一列表示一個 IP 地址,因此我們知道 parts[0] 儲存了這樣一個地址,因此我們可以準備將該部分作為 IP 欄位傳送到 Elasticsearch。 同樣,我們可以從日誌中傳送其餘的列,但其中一些需要事先進行特殊處理。 例如,當我們使用空格字元作為分隔符拆分輸入字串時,時間欄位被拆分為兩個條目,因為它包含秒數和時區之間的空格(在我們的日誌中為 +0000)。 出於這個原因,我們需要重新組裝時間戳並將第 3 部分和第 4 部分連線起來。

EsOutputFormat.class 忽略 Mapper 類輸出的 “key”,因此在 context.write() 我們將鍵設定為 NullWriteable.get()

MapReduce 作業配置

我們需要告訴我們的程式它可以在哪裡到達 Elasticsearch 以及要寫入哪個索引。 我們使用 conf.set(“es.nodes”, “localhost:9200”); 和 conf.set(“es.resource”, “mylogs”);。由於在 Elastic Stack 8.x 中,HTTPS 是標準的配置,我們需要對訪問 Elasticsearch 做一下特殊的配置。具體可以訪問文件 Configuration | Elasticsearch for Apache Hadoop [8.4] | Elastic

``1. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 2. Configuration conf = new Configuration(); 3. conf.setBoolean("mapred.map.tasks.speculative.execution", false); 4. conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); 5. conf.setBoolean("es.net.ssl.cert.allow.self.signed", true); 6. conf.setBoolean("es.net.ssl", true); 7. conf.setStrings("es.net.ssl.protocol", "TLS"); 8. conf.setStrings("es.net.ssl.truststore.location", "file:///home/hadoop/hadoop/truststore.p12"); 9. conf.setStrings("es.net.ssl.truststore.pass", "password"); 10. conf.setStrings("es.net.http.auth.user", "elastic"); 11. conf.setStrings("es.net.http.auth.pass", "H8_aSaFv0G*muE-Hrmp0"); 12. conf.set("es.nodes", "https://10.211.55.2:9200"); 13. conf.set("es.resource", "mylogs");

  1. Job job = Job.getInstance(conf);
  2. job.setInputFormatClass(TextInputFormat.class);
  3. job.setOutputFormatClass(EsOutputFormat.class);
  4. job.setMapperClass(AccessLogMapper.class);
  5. job.setNumReduceTasks(0);

  6. FileInputFormat.addInputPath(job, new Path(args[0]));

  7. System.exit(job.waitForCompletion(true) ? 0 : 1);
  8. }` ```

我們需要根據自己的配置修改上面的 elastic 超級使用者的密碼。在上面的配置中,我們使用了一個叫做 trusstore.p12 的 truststore 證書,而它的密碼是 password。如果你不知道如何得到這個證書,你可以閱讀我之前的文章  “Elasticsearch:在 Java 客戶端中使用 truststore 來建立 HTTPS 連線”。我們首先進行到 Elasticsearch 的安裝目錄的如下子目錄:

```

  1. $ pwd
  2. /Users/liuxg/test/elasticsearch-8.4.2/config/certs
  3. $ ls
  4. http.p12 http_ca.crt transport.p12
  5. $ keytool -import -file http_ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
  6. Certificate was added to keystore
  7. $ ls
  8. http.p12 http_ca.crt transport.p12 truststore.p12

```

上面的命令將生成一個叫做 truststore.p12 的證書。我們把這個證書拷貝到 Ubuntu OS 機器的一個目錄中,比如 /home/hadoop/hadoop。

在正常情況下,Hadoop 中的推測執行有時可以優化作業。 但是,在這種情況下,由於輸出被髮送到 Elasticsearch,它可能會意外地導致重複條目或其他問題。 這就是為什麼建議在這種情況下禁用推測執行。 你可以在此處閱讀有關此內容的更多資訊:Runtime options | Elasticsearch for Apache Hadoop [8.4] | Elastic

這些行禁用該功能:

```

  1. conf.setBoolean(“mapred.map.tasks.speculative.execution”, false);
  2. conf.setBoolean(“mapred.reduce.tasks.speculative.execution”, false);

```

由於在這種情況下 MapReduce 作業本質上將讀取文字檔案,因此我們使用 TextInputFormat 類作為輸入:job.setInputFormatClass(TextInputFormat.class); 而且,由於我們要寫入 Elasticsearch 索引,因此我們使用 EsOutputFormat 類作為輸出:job.setOutputFormatClass(EsOutputFormat.class); 接下來,我們將要使用的 Mapper 類設定為我們在本練習中建立的類:job.setMapperClass(AccessLogMapper.class);最後,由於我們不需要 reducer,我們將 reduce 任務的數量設定為零:job.setNumReduceTasks(0)。

構建 JAR 檔案

一旦所有程式碼都到位,我們必須構建一個可執行的 JAR。 我們在專案的根目錄下打入如下的命令:

$ mvn clean install

我們會看到很多輸出和檔案被拉入,當這個過程完成時,我們應該看到 “BUILD SUCCESS” 訊息。

```

  1. [INFO] ------------------------------------------------------------------------
  2. [INFO] BUILD SUCCESS
  3. [INFO] ------------------------------------------------------------------------
  4. [INFO] Total time: 16.227 s
  5. [INFO] Finished at: 2022-10-08T14:35:00+08:00
  6. [INFO] ------------------------------------------------------------------------

```

我們可以在當前的目錄下面的 target 子目錄下看到一個 jar 檔案:

```

  1. $ ls ./target/*.jar
  2. ./target/elasticsearchhadoop-1.0-SNAPSHOT.jar
  3. ./target/original-elasticsearchhadoop-1.0-SNAPSHOT.jar

```

我們把上面的 elasticsearchhadoop-1.0-SNAPSHOT.jar 拷貝到 Ubuntu OS 的一個目錄中,比如 /home/hadoop/hadoop。

執行 MapReduce 作業

在上面,我們把 access.log,truststore.p12 檔案 及 elasticsearchhadoop-1.0-SNAPSHOT.jar 拷貝到 Ubuntu OS 機器上,並保存於一個目錄 /home/hadoop/hadoop 中:

```

  1. [email protected]:~/hadoop$ pwd
  2. /home/hadoop/hadoop
  3. [email protected]:~/hadoop$ ls
  4. access.log elasticsearchhadoop-1.0-SNAPSHOT.jar truststore.p12

```

我們接下來使用如下的命令來執行這個作業:

hadoop jar elasticsearchhadoop-1.0-SNAPSHOT.jar access.log

我們應該密切關注 Map-Reduce 框架部分。 在這種情況下,我們可以看到一切都按計劃進行:我們有 6 條輸入記錄和 6 條輸出記錄。

我們可以在 Kibana 中執行如下的命令來進行檢視:

GET mylogs/_count

上面的命令返回結果:

```

  1. {
  2. "count": 6,
  3. "_shards": {
  4. "total": 1,
  5. "successful": 1,
  6. "skipped": 0,
  7. "failed": 0
  8. }
  9. }

```

它表明,我們有 6 個文件被攝入進來了。我們可以使用如下的命令來搜尋:

GET mylogs/_search?filter_path=**.hits

``

  1. {
  2. "hits": {
  3. "hits": [
  4. {
  5. "_index": "mylogs",
  6. "_id": "AgVLt4MBlWqpuHbH3mqB",
  7. "_score": 1,
  8. "_source": {
  9. "size": "3638",
  10. "dateTime": "17/May/2015:23:05:48",
  11. "ip": "77.0.42.68",
  12. "httpStatus": "GET",
  13. "responseCode": "200",
  14. "url": "/favicon.ico"
  15. }
  16. },
  17. {
  18. "_index": "mylogs",
  19. "_id": "AwVLt4MBlWqpuHbH3mqB",
  20. "_score": 1,
  21. "_source": {
  22. "size": "6146",
  23. "dateTime": "17/May/2015:23:05:32",
  24. "ip": "77.0.42.68",
  25. "httpStatus": "GET",
  26. "responseCode": "200",
  27. "url": "/images/jordan-80.png"
  28. }
  29. },
  30. {
  31. "_index": "mylogs",
  32. "_id": "BAVLt4MBlWqpuHbH3mqB",
  33. "_score": 1,
  34. "_source": {
  35. "size": "52315",
  36. "dateTime": "18/May/2015:00:05:08",
  37. "ip": "77.0.42.68",
  38. "httpStatus": "GET",
  39. "responseCode": "200",
  40. "url": "/images/web/2009/banner.png"
  41. }
  42. },
  43. {
  44. "_index": "mylogs",
  45. "_id": "BQVLt4MBlWqpuHbH3mqB",
  46. "_score": 1,
  47. "_source": {
  48. "size": "11214",
  49. "dateTime": "18/May/2015:00:05:42",
  50. "ip": "207.241.237.101",
  51. "httpStatus": "GET",
  52. "responseCode": "200",
  53. "url": "/blog/geekery/find-that-lost-screen-session.html"
  54. }
  55. },
  56. {
  57. "_index": "mylogs",
  58. "_id": "BgVLt4MBlWqpuHbH3mqB",
  59. "_score": 1,
  60. "_source": {
  61. "size": "0",
  62. "dateTime": "18/May/2015:00:05:57",
  63. "ip": "120.202.255.147",
  64. "httpStatus": "GET",
  65. "responseCode": "304",
  66. "url": "/files/logstash/logstash-1.1.0-monolithic.jar"
  67. }
  68. },
  69. {
  70. "_index": "mylogs",
  71. "_id": "BwVLt4MBlWqpuHbH3mqB",
  72. "_score": 1,
  73. "_source": {
  74. "size": "328",
  75. "dateTime": "18/May/2015:00:05:43",
  76. "ip": "207.241.237.104",
  77. "httpStatus": "GET",
  78. "responseCode": "404",
  79. "url": "/geekery/find-that-lost-screen-session-2.html"
  80. }
  81. }
  82. ]
  83. }
  84. }

` ```

我們可以清楚地看到資料已經被成功地攝入到 Elasticsearch 中了。

我們接下來可以使用 Kibana 來做如何進行視覺化的需求了。在這裡就不再詳述了。

參考:

【1】 Configuration | Elasticsearch for Apache Hadoop [8.4] | Elastic

【2】apache-pig Tutorial => Loading data from ElasticSearch

【3】https://www.vultr.com/docs/install-and-configure-apache-hadoop-on-ubuntu-20-04/

【4】Use ES-Hadoop to write HDFS data to Elasticsearch - Elasticsearch - Alibaba Cloud Documentation Center

【5】Elasticsearch Hadoop Tutorial with Hands-on Examples - Coralogix