Elasticsearch:使用 Node.js 將實時資料提取到 Elasticsearch 中(一)

語言: CN / TW / HK

Elasticsearch 是一個強大的 RESTful 搜尋和分析引擎,能夠處理越來越多的用例。 它將集中儲存你的資料,以實現閃電般的快速搜尋、微調相關性以及可輕鬆擴充套件的強大分析。 關於如何使用 Elastic Stack(又名 ELK 堆疊)將資料攝取到 Elasticsearch 的資源有很多。在今天的文章中,我將詳細介紹如何使用 Node.js 從零開始來把地震的實時資料採集到 Elasticsearch 中。

如果你選擇的程式語言是 JavaScript,並且你需要使用 RESTful API 方法從第三方應用程式獲取資料,那麼使用 Node.js 獲取資料是一個不錯的選擇。 你還可以託管伺服器,讓它持續實時攝取資料。 該演示將向您展示如何設定一個 Node.js + Express.js 伺服器,該伺服器實時將資料提取到 Elasticsearch 中,然後可以對這些資料進行分析並以有意義的方式採取行動。

對於此演示,我們將使用 USGS 實時釋出的公開可用的全球地震資料。

準備工作

Elasticsearch 及 Kibana

如果你還沒有安裝好自己的 Elasticsearch 及 Kibana 的話,那麼請參考我之前的文章:

在今天的展示中,我將使用 Elastic Stack 8.x 來進行展示。在安裝的時候,請參考相應的 Elastic Stack 8.x 的文章來進行安裝。

Node.js

你需要安裝好自己的 Node.js 來完成下面的練習。你可以參考 Node.js 連結來進行相應的安裝。

實時資料

根據 USGS 網上所提供的資訊,我們可以在地址  https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson 找到相應的地震資訊資料。我們可以通過如下的命令來進行檢視:

curl https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson | jq .

如上所示,它是一個以 JSON 格式給出來的資料資訊。這個資料會實時發生變化,我們可以通過反覆訪問這個介面來得到所需要的地震資訊。在這裡,我們需要注意的是:

  • "time": 1672471359610,這是一個時間資訊,可以作為我們的 timestamp 來對它進行分析。我們將最終把它存入到 @timestamp 裡。
  • "id": "nc73827101",這是一個地震特有的 id,我們將以這個 id 成為資料的 id。

  • "geometry",這個是地震發生的地理位置。我們可以需要在 Elasticsearch 中為它定一下為 geo_point 資料型別。我們將把它變為:

雖然資料有很多,但是我們最終需要的資料格式是這樣的:

```

  1. {
  2. "mag": 1.13,
  3. "place": "11km ENE of Coachella, CA",
  4. "@timestamp": 2022-05-02T20:07:53.266Z,
  5. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
  6. "sig": 20,
  7. "type": "earthquake",
  8. "depth": 2.09,
  9. "coordinates": {
  10. "lat": 33.7276667,
  11. "lon": -116.0736667
  12. }
  13. }

```

在接下來的步驟裡,我來詳細介紹如何達到我們最終的目的。

建立 Node.js 應用

建立最基本的 express 應用

我們將從 0 開始一步一步地建立 Node.js 應用。首先我們在自己的電腦中建立一個目錄:

mkdir earthquake_app

```

  1. $ pwd
  2. /Users/liuxg/demos
  3. $ mkdir earthquake_app
  4. $ cd earthquake_app/

```

我們進入到該目錄中,並打入如下的命令:

npm init -y

``

  1. $ npm init -y
  2. Wrote to /Users/liuxg/demos/earthquake_app/package.json:

  3. {

  4. "name": "earthquake_app",
  5. "version": "1.0.0",
  6. "description": "",
  7. "main": "index.js",
  8. "scripts": {
  9. "test": "echo \"Error: no test specified\" && exit 1"
  10. },
  11. "keywords": [],
  12. "author": "",
  13. "license": "ISC"
  14. }

  15. $ ls

  16. package.json

` ```

上述命令生成一個叫做 package.json 的檔案。在以後安裝的 packages,它也會自動新增到這個檔案中。預設的設定顯然不是我們想要的。我們需要對它做一些修改。

在接下來的程式碼中,我們將會使用如下的一些 packages:

  • @elastic/elasticsearch
  • axios
  • config
  • cors
  • express
  • log-timestamp
  • nodemon

我們可以通過如下的命令來進行安裝:

npm i @elastic/elasticsearch axios config cors express log-timestamp nodemon

```

  1. $ npm i @elastic/elasticsearch axios config cors express log-timestamp nodemon
  2. npm notice Beginning October 4, 2021, all connections to the npm registry - including for package installation - must use TLS 1.2 or higher. You are currently using plaintext http to connect. Please visit the GitHub blog for more information: https://github.blog/2021-08-23-npm-registry-deprecating-tls-1-0-tls-1-1/
  3. npm notice Beginning October 4, 2021, all connections to the npm registry - including for package installation - must use TLS 1.2 or higher. You are currently using plaintext http to connect. Please visit the GitHub blog for more information: https://github.blog/2021-08-23-npm-registry-deprecating-tls-1-0-tls-1-1/

  4. added 118 packages in 17s

  5. 11 packages are looking for funding

  6. run npm fund for details

```

由於我之前已經安裝過,所以我上面顯示的資訊和你的可能會有所不同。我們再次來檢視 package.json 檔案:

``

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ ls
  4. node_modules package-lock.json package.json
  5. $ cat package.json
  6. {
  7. "name": "earthquake_app",
  8. "version": "1.0.0",
  9. "description": "",
  10. "main": "index.js",
  11. "scripts": {
  12. "test": "echo \"Error: no test specified\" && exit 1"
  13. },
  14. "keywords": [],
  15. "author": "",
  16. "license": "ISC",
  17. "dependencies": {
  18. "@elastic/elasticsearch": "^8.5.0",
  19. "axios": "^1.2.2",
  20. "config": "^3.3.8",
  21. "cors": "^2.8.5",
  22. "express": "^4.18.2",
  23. "log-timestamp": "^0.3.0",
  24. "nodemon": "^2.0.20"
  25. }
  26. }

` ```

很顯然,我們最新安裝的 packages 已經自動新增到 package.json 檔案中了。

我們接下來建立一個叫做 server 的子目錄,並在它裡面建立一個叫做 server.js 的檔案:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ mkdir server
  4. $ touch server/server.js

```

在上面,我們建立了一個叫做 server.js 的檔案。這個將來就是我們需要執行的 server 指令碼。為了能夠讓我們的 package.json 檔案的配置能讓 npm 進行執行,我們需要對它進行修改。

``

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ cat package.json
  4. {
  5. "name": "earthquake_app",
  6. "version": "1.0.0",
  7. "description": "",
  8. "main": "sever.js",
  9. "scripts": {
  10. "start": "nodemon server/server.js",
  11. "test": "echo \"Error: no test specified\" && exit 1"
  12. },
  13. "keywords": [],
  14. "author": "",
  15. "license": "ISC",
  16. "dependencies": {
  17. "@elastic/elasticsearch": "^8.5.0",
  18. "axios": "^1.2.2",
  19. "config": "^3.3.8",
  20. "cors": "^2.8.5",
  21. "express": "^4.18.2",
  22. "log-timestamp": "^0.3.0",
  23. "nodemon": "^2.0.20"
  24. }
  25. }

` ```

很多人可能會奇怪,為啥使用 nodemon 來啟動指令碼。它的好處是當我們修改好 server.js 裡的指令碼,那麼它會自動重新啟動伺服器的執行,而不需要我們每次都需要打入如下的命令:

npm start

接下為了驗證我們的 express 應用是否能成功地執行,我們修改 server.js 為如下的程式碼:

server/server.js

```

  1. onst express = require('express');

  2. const app = express();

  3. const port = 5001;

  4. app.get('/', (req, res) => {

  5. res.send('Hello World!')
  6. })

  7. app.listen(port, () => console.log(Server listening at http://localhost:${port}));

```

我們接下來使用如下的命令來進行啟動:

npm start

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ npm start

  4. [email protected] start

  5. nodemon server/server.js

  6. [nodemon] 2.0.20

  7. [nodemon] to restart at any time, enter rs
  8. [nodemon] watching path(s): .
  9. [nodemon] watching extensions: js,mjs,json
  10. [nodemon] starting node server/server.js
  11. Server listening at http://localhost:5001

```

我們可以看到伺服器已經成功地執行起來了,並且它運行於 5001 埠上。我們可以通過瀏覽器來進行訪問它的網址:

上面顯示我們的伺服器執行正常。

安全地連線 Node.js 伺服器到 Elasticsearch

接下來,我們需要建立程式碼來安全地連線 Node.js 服務到我們本地部署的 Elasticsearch 中。我們可以參考之前的文章 “Elasticsearch:使用最新的 Nodejs client 8.x 來建立索引並搜尋”。我們可以在專案的更目錄下建立如下的兩個子目錄:

```

  1. mkdir config
  2. mkdir -p server/elasticsearch

```

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ mkdir config
  4. $ mkdir -p server/elasticsearch
  5. $ ls -d */
  6. config/ node_modules/ server/

```

在 config 子目錄下,我們建立如下的一個叫做 default.json 的檔案。這個是用來配置如何連線到 Elasticsearch 的:

config/default.json

```

  1. {
  2. "elastic": {
  3. "elasticsearch_endpoint": "https://localhost:9200",
  4. "username": "elastic",
  5. "password": "-pK6Yth+mU8O-f+Q*F3i",
  6. "apiKey": "eVBKOFhJVUJUN1gwSDQyLU5halY6R1BVRjNOUmpRYUtkTmpXTUZHdWZVUQ==",
  7. "certificate": "/Users/liuxg/elastic/elasticsearch-8.5.3/config/certs/http_ca.crt",
  8. "caFingerprint": "E3D36275D9FA80CF96F74E6537FC74E7952511A75E01605EBCFB8FC9F08F598C"
  9. }
  10. }

```

我們先不要著急來了解這些配置引數。有些我們可能並不一定要用到。這些設定針對我們每個人的 Elasticsearch 的安裝的不同而不同。在上面的引數解釋如下:

  • elasticsearch_endpoint:這個是 Elasticsearch 的訪問地址
  • username:這個是訪問 Elasticsearch 的使用者名稱,你可以不選用超級使用者 elastic,而且在生產環境中,也不是推薦的方法
  • password:這個是上面 username 賬號的密碼
  • apiKey:這個是訪問 Elasticsearch 所需要的 apiKey。你可以參考  “Elasticsearch:使用最新的 Nodejs client 8.x 來建立索引並搜尋” 來了解如何進行生產。在下面的程式碼中,我們也可以使用 code 來進行生成
  • certificate:這個是證書的位置。每個 Elasticsearch 叢集都會有一個生成的證書位置。我們需要填入這個位置資訊
  • caFingerprint:這個是證書的 fingerprint 資訊。我們可以採用 fingerprint 來進行連線。在本演示中,我將不使用這種方式。更多資訊,請參考 Connecting | Elasticsearch JavaScript Client [master] | Elastic

我們在 elasticsearch 目錄下建立一個叫做 client.js 的檔案:

server/elasticsearch/client.js

``1. const { Client } = require('@elastic/elasticsearch'); 2. const config = require('config'); 3. const fs = require('fs')

  1. const elasticConfig = config.get('elastic');

  2. // const client = new Client ( {

  3. // node: elasticConfig.elasticsearch_endpoint,
  4. // auth: {
  5. // apiKey: elasticConfig.apiKey
  6. // },
  7. // tls: {
  8. // ca: fs.readFileSync(elasticConfig.certificate),
  9. // rejectUnauthorized: true
  10. // }
  11. // });

  12. const client = new Client ( {

  13. node: elasticConfig.elasticsearch_endpoint,
  14. auth: {
  15. username: elasticConfig.username,
  16. password: elasticConfig.password
  17. },
  18. tls: {
  19. ca: fs.readFileSync(elasticConfig.certificate),
  20. rejectUnauthorized: true
  21. }
  22. });

  23. client.ping()

  24. .then(response => console.log("You are connected to Elasticsearch!"))
  25. .catch(error => console.error("Elasticsearch is not connected."))

  26. module.exports = client;` ```

在上面,我使用了兩種方法來連線到 Elasticsearch。一種是通過 username/password 的方式來進行連線:

```

  1. const client = new Client ( {
  2. node: elasticConfig.elasticsearch_endpoint,
  3. auth: {
  4. username: elasticConfig.username,
  5. password: elasticConfig.password
  6. },
  7. tls: {
  8. ca: fs.readFileSync(elasticConfig.certificate),
  9. rejectUnauthorized: true
  10. }
  11. });

```

而另外一種就是被註釋掉的那個方法:

```

  1. const client = new Client ( {
  2. node: elasticConfig.elasticsearch_endpoint,
  3. auth: {
  4. apiKey: elasticConfig.apiKey
  5. },
  6. tls: {
  7. ca: fs.readFileSync(elasticConfig.certificate),
  8. rejectUnauthorized: true
  9. }
  10. });

```

這個也是被推薦的方法。在實際的使用中,我們更推薦使用 API key 來進行連線。

我們首先來使用 username/password 的方式來進行連線。我們需要修改我們的 server.js 來進行驗證:

server/server.js

```

  1. const express = require('express');
  2. const client = require('./elasticsearch/client');

  3. const app = express();

  4. const port = 5001;

  5. app.get('/', (req, res) => {

  6. res.send('Hello World!')
  7. })

  8. app.listen(port, () => console.log(Server listening at http://localhost:${port}));

```

我們重新執行伺服器。我們可以看到如下的輸出:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ npm start

  4. [email protected] start

  5. nodemon server/server.js

  6. [nodemon] 2.0.20

  7. [nodemon] to restart at any time, enter rs
  8. [nodemon] watching path(s): .
  9. [nodemon] watching extensions: js,mjs,json
  10. [nodemon] starting node server/server.js
  11. Server listening at http://localhost:5001
  12. You are connected to Elasticsearch!

```

上面的輸出表明我們已經能夠成功地連線到 Elasticsearch 了。

使用程式碼獲取 API key

我們接下來可以通過程式碼來獲得 API key,儘管我們可以通過其它的方法來獲得。請詳細閱讀 “Elasticsearch:建立 API key 介面訪問 Elasticsearch”。在這裡,我們可以使用 Node.js 程式碼來動態地生成一個 API key。我們在 server 目錄下建立如下的一個檔案:

sever/create-api-key.js

`

  1. const client = require('./elasticsearch/client');

  2. async function generateApiKeys(opts) {

  3. const body = await client.security.createApiKey({
  4. body: {
  5. name: 'earthquake_app',
  6. role_descriptors: {
  7. earthquakes_example_writer: {
  8. cluster: ['monitor'],
  9. index: [
  10. {
  11. names: ['earthquakes'],
  12. privileges: ['create_index', 'write', 'read', 'manage'],
  13. },
  14. ],
  15. },
  16. },
  17. },
  18. });
  19. return Buffer.from(${body.id}:${body.api_key}).toString('base64');
  20. }

  21. generateApiKeys()

  22. .then(console.log)
  23. .catch((err) => {
  24. console.error(err);
  25. process.exit(1);
  26. });

`` ```

我們使用如下的命令來執行這個 Node.js 的程式碼:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ ls server/create-api-key.js
  4. server/create-api-key.js
  5. $ node server/create-api-key.js
  6. You are connected to Elasticsearch!
  7. emZJSGFZVUJUN1gwSDQyLWRLaS06LVpHaXR1bm5RQnEybE4zOWoyd0g5Zw==

```

我們可以把上面命令生成的 API key 寫入到之前的 default.json 檔案中。這樣我們也可以通過 API key 的方式來訪問 Elasticsearch 了,如果我們需要的話。這樣 client.js 實際上可以寫成:

server/elasticsearch/client.js

``1. const { Client } = require('@elastic/elasticsearch'); 2. const config = require('config'); 3. const fs = require('fs')

  1. const elasticConfig = config.get('elastic');

  2. const client = new Client ( {

  3. node: elasticConfig.elasticsearch_endpoint,
  4. auth: {
  5. apiKey: elasticConfig.apiKey
  6. },
  7. tls: {
  8. ca: fs.readFileSync(elasticConfig.certificate),
  9. rejectUnauthorized: true
  10. }
  11. });

  12. // const client = new Client ( {

  13. // node: elasticConfig.elasticsearch_endpoint,
  14. // auth: {
  15. // username: elasticConfig.username,
  16. // password: elasticConfig.password
  17. // },
  18. // tls: {
  19. // ca: fs.readFileSync(elasticConfig.certificate),
  20. // rejectUnauthorized: true
  21. // }
  22. // });

  23. client.ping()

  24. .then(response => console.log("You are connected to Elasticsearch!"))
  25. .catch(error => console.error("Elasticsearch is not connected."))

  26. module.exports = client;` ```

我們重新執行 server.js,我們可以看到如下的輸出:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ npm start

  4. [email protected] start

  5. nodemon server/server.js

  6. [nodemon] 2.0.20

  7. [nodemon] to restart at any time, enter rs
  8. [nodemon] watching path(s): .
  9. [nodemon] watching extensions: js,mjs,json
  10. [nodemon] starting node server/server.js
  11. Server listening at http://localhost:5001
  12. You are connected to Elasticsearch!

```

很顯然,我們的 API key 方式是成功的。使用 API key 的好處是我們不必要暴露使用者的密碼在程式碼中,而且,我們甚至可以為這個 API key 來設定有效時間及許可權。可以授予最小所需要的許可權,以確保安全。

設定 RESTful API 呼叫以從源檢索資料

現在我們的伺服器正在執行並且 Elasticsearch 已連線,我們需要測試對 USGS 的 API 呼叫以接收初始資料。 在專案的根目錄下,建立一個名為 routes 的資料夾和一個名為 api 的子資料夾。 在 api 資料夾中,建立一個名為 data.js 的檔案並新增以下程式碼:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ mkdir -p server/routes/api

```

我在 routes/api 目錄下建立一個如下的 data.js 檔案:

server/routes/api/data.js

`

  1. require('log-timestamp');
  2. const express = require('express');
  3. const router = express.Router();
  4. const axios = require('axios')
  5. const client = require('../../elasticsearch/client');
  6. const URL = https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson;

  7. router.get('/earthquakes', async function (req, res) {

  8. console.log('Loading Application...');

  9. //======= Check that Elasticsearch is up and running =======\

  10. pingElasticsearch = async () => {
  11. await client.ping(
  12. function(error,res) {
  13. if (error) {
  14. console.error('elasticsearch cluster is down!');
  15. } else {
  16. console.log('Elasticsearch Ready');
  17. }
  18. }
  19. );
  20. }

  21. // ====== Get Data From USGS and then index into Elasticsearch

  22. indexAllDocs = async () => {
  23. try {
  24. console.log('Getting Data From Host')

  25. const EARTHQUAKES = await axios.get(${URL},{

  26. headers: {
  27. 'Content-Type': [
  28. 'application/json',
  29. 'charset=utf-8'
  30. ]
  31. }
  32. });

  33. console.log('Data Received!')

  34. results = EARTHQUAKES.data.features

  35. console.log('Indexing Data...')

  36. console.log(results)

  37. res.json(results)

  38. if (EARTHQUAKES.data.length) {

  39. indexAllDocs();
  40. } else {
  41. console.log('All Data Has Been Indexed!');
  42. };
  43. } catch (err) {
  44. console.log(err)
  45. };

  46. console.log('Preparing For The Next Data Check...');

  47. }

  48. console.log("Ping the Elasticsearch server");

  49. pingElasticsearch()

  50. console.log("Get data from USGS");

  51. indexAllDocs()
  52. });

  53. module.exports = router;

`` ```

上面的程式碼使用 npm 包 Axios 對 USGS 地震 API 進行非同步 API 呼叫。 收到資料後,它將顯示為 JSON。 你還可以看到我們在頁面頂部匯入了一個名為 log-timestamp 的依賴項。 這將允許我們將時間戳新增到每個 console.log。

我們接下來修改 server.js 如下:

server/server.js

`

  1. const express = require('express');
  2. const client = require('./elasticsearch/client');

  3. const app = express();

  4. const port = 5001;

  5. //Define Routes

  6. const data = require('./routes/api/data')
  7. app.use('/api/data', data);

  8. app.get('/', (req, res) => {

  9. res.send('Hello World!')
  10. })

  11. app.listen(port, () => console.log(Server listening at http://localhost:${port}));

`` ```

重新執行我們的 server.js。我們通過 Postman 或者其它的工具來對我們的 REST 介面進行訪問:

localhost:5000/api/data/earthquakes

從上面的輸出中,我們可以看出來設計的 REST 介面工作是正常的。它含有一些收集來的資料。在所收集來的資料中,有一些資料是我們並不需要的。我們最終需要的資料是這樣的:

```

  1. {
  2. "mag": 1.13,
  3. "place": "11km ENE of Coachella, CA",
  4. "time": 2022-05-02T20:07:53.266Z,
  5. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
  6. "sig": 20,
  7. "type": "earthquake",
  8. "depth": 2.09,
  9. "coordinates": {
  10. "lat": 33.7276667,
  11. "lon": -116.0736667
  12. }
  13. }

```

也就是說我們可以刪除一下不需要的欄位,並且我們需要轉換一些欄位,比如把 time 欄位轉換為我們想要的 @timestamp 欄位。另外在寫入 Elasticsearch 時,我們需要預先針對 coodinates 欄位進行定義。它是一個 geo_point 型別的欄位。

定義 mapping 及 pipeline

如上所示,我們需要的欄位如上。我們可以如下的一個 earthquakes 索引。我們在 Kibana 的 console 中打入如下的命令:

``

  1. PUT earthquakes
  2. {
  3. "mappings": {
  4. "properties": {
  5. "@timestamp": {
  6. "type": "date"
  7. },
  8. "coordinates": {
  9. "type": "geo_point"
  10. },
  11. "depth": {
  12. "type": "float"
  13. },
  14. "mag": {
  15. "type": "float"
  16. },
  17. "place": {
  18. "type": "text",
  19. "fields": {
  20. "keyword": {
  21. "type": "keyword"
  22. }
  23. }
  24. },
  25. "sig": {
  26. "type": "short"
  27. },
  28. "type": {
  29. "type": "keyword"
  30. },
  31. "url": {
  32. "enabled": false
  33. }
  34. }
  35. }
  36. }

` ```

在上面,我們針對索引的欄位型別做如下的說明:

  • @timestamp:這是一個 date 欄位型別的欄位。我們希望的格式是 2022-05-02T20:07:53.266Z 而不是以 EPOC 形式顯示的值,比如 1672471359610。這個欄位有 time 轉換而來
  • coordinates:這個是一個 geo_point 的欄位。是地震發生的地理位置
  • place:這是一個 multi-field 欄位。我們希望對這個欄位進行統計,也可以針對它進行搜尋
  • sig:這欄位我們使用 short 型別,而不是 long。這樣可以省去儲存空間
  • type:這是一個 keyword 型別的欄位。它只可以做資料分析統計之用
  • url:這個欄位,我們既不想對它進行搜尋,也不想對它進行統計,所有設定 enabled 為 false。這樣可以省去分詞的時間,從而提高攝入資料的速度

為此,我們可以針對上面的 data.js 做更進一步的修改:

server/routes/api/data.js

`

  1. const express = require('express');
  2. const router = express.Router();
  3. const axios = require('axios')
  4. const client = require('../../elasticsearch/client');
  5. const URL = https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson;

  6. //======= Check that Elasticsearch is up and running =======\

  7. function pingElasticsearch() {
  8. console.log("ping .....")
  9. client.ping({
  10. requestTimeout: 30000,
  11. }, function(error,res) {
  12. if (error) {
  13. console.error('elasticsearch cluster is down!');
  14. } else {
  15. console.log('Elasticsearch Ready');
  16. }
  17. });
  18. };

  19. // ====== Get Data From USGS and then index into Elasticsearch

  20. indexAllDocs = async () => {
  21. try {
  22. const EARTHQUAKES = await axios.get(${URL},{
  23. headers: {
  24. 'Content-Type': [
  25. 'application/json',
  26. 'charset=utf-8'
  27. ]
  28. }
  29. });

  30. console.log('Getting Data From Host')

  31. results = EARTHQUAKES.data.features

  32. results.map(

  33. async (results) => (
  34. (earthquakeObject = {
  35. place: results.properties.place, //
  36. time: results.properties.time, //
  37. url: results.properties.url, //
  38. sig: results.properties.sig, //
  39. mag: results.properties.mag, //
  40. type: results.properties.type, //
  41. longitude: results.geometry.coordinates[0], //
  42. latitude: results.geometry.coordinates[1], //
  43. depth: results.geometry.coordinates[2], //
  44. }),
  45. await client.index({
  46. index: 'earthquakes',
  47. id: results.id,
  48. body: earthquakeObject
  49. })
  50. )
  51. );

  52. if (EARTHQUAKES.data.length) {

  53. indexAllDocs();
  54. } else {
  55. console.log('All Data Has Been Indexed!');
  56. };
  57. } catch (err) {
  58. console.log(err)
  59. };

  60. console.log('Preparing For The Next Data Check...');

  61. }

  62. //================== Official API Call ==================\

  63. router.get('/earthquakes', function (req, res) {
  64. res.send('Running Application...');
  65. console.log('Loading Application...')

  66. indexAllDocs(res);

  67. });

  68. module.exports = router;

`` ```

在上面,我們添加了把文件寫入 Elasticsearch 的程式碼部分。我們使用地震資料的 id 作為 Elasticsearch 文件的 id。等伺服器執行起來後,我們需要在 terminal 中打入如下的命令:

curl -XGET http://localhost:5001/api/data/earthquakes

我們可以在 Kibana 中通過如下的命令來檢視文件:

GET earthquakes/_search?filter_path=**.hits

我們可以看到如下的結果:

``

  1. {
  2. "hits": {
  3. "hits": [
  4. {
  5. "_index": "earthquakes",
  6. "_id": "nc73827281",
  7. "_score": 1,
  8. "_source": {
  9. "place": "10km S of Laytonville, CA",
  10. "time": 1672505649740,
  11. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/nc73827281",
  12. "sig": 63,
  13. "mag": 2.02,
  14. "type": "earthquake",
  15. "longitude": -123.4981689,
  16. "latitude": 39.5991669,
  17. "depth": 4.59
  18. }
  19. },
  20. ...

` ```

很顯然,這個文件的 source 和我們之前的想要的格式還是不太一樣。為了能夠使的 time 轉換為 @timestamp,我們可以在 Node.js 的程式碼中進行相應的轉換。我們也可以採用 ingest pipeline 來實現相應的操作。我們定義如下的 ingest pipeine。

``

  1. POST _ingest/pipeline/_simulate
  2. {
  3. "pipeline": {
  4. "description": "This is for data transform for earthquake data",
  5. "processors": [
  6. {
  7. "date": {
  8. "field": "time",
  9. "formats": [
  10. "UNIX_MS"
  11. ]
  12. }
  13. }
  14. ]
  15. },
  16. "docs": [
  17. {
  18. "_source": {
  19. "place": "16km N of Borrego Springs, CA",
  20. "time": 1672507053210,
  21. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
  22. "sig": 10,
  23. "mag": 0.81,
  24. "type": "earthquake",
  25. "longitude": -116.368,
  26. "latitude": 33.4013333,
  27. "depth": 2.91
  28. }
  29. }
  30. ]
  31. }

` ```

在上面的命令中,我們使用 date processor 來把 time 轉換為所需要的格式,並在預設的情況下把 target 設定為 @timestamp。上面命令執行的結果為:

``

  1. {
  2. "docs": [
  3. {
  4. "doc": {
  5. "_index": "_index",
  6. "_id": "_id",
  7. "_version": "-3",
  8. "_source": {
  9. "sig": 10,
  10. "mag": 0.81,
  11. "depth": 2.91,
  12. "@timestamp": "2022-12-31T17:17:33.210Z",
  13. "latitude": 33.4013333,
  14. "place": "16km N of Borrego Springs, CA",
  15. "time": 1672507053210,
  16. "type": "earthquake",
  17. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
  18. "longitude": -116.368
  19. },
  20. "_ingest": {
  21. "timestamp": "2023-01-01T00:31:03.544821Z"
  22. }
  23. }
  24. }
  25. ]
  26. }

` ```

從上面的輸出中,我們可以看出來 @timestamp 欄位已經生成。它的值由 time 欄位轉換而來。我們還發現 latitude 及 longitude 並不是按照我們需要的格式來顯示的。我們需要把它轉化為另外一個像如下的物件:

我們可以通過 rename processor 來操作:

``

  1. POST _ingest/pipeline/_simulate
  2. {
  3. "pipeline": {
  4. "description": "This is for data transform for earthquake data",
  5. "processors": [
  6. {
  7. "date": {
  8. "field": "time",
  9. "formats": [
  10. "UNIX_MS"
  11. ]
  12. }
  13. },
  14. {
  15. "rename": {
  16. "field": "latitude",
  17. "target_field": "coordinates.lat"
  18. }
  19. },
  20. {
  21. "rename": {
  22. "field": "longitude",
  23. "target_field": "coordinates.lon"
  24. }
  25. }
  26. ]
  27. },
  28. "docs": [
  29. {
  30. "_source": {
  31. "place": "16km N of Borrego Springs, CA",
  32. "time": 1672507053210,
  33. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
  34. "sig": 10,
  35. "mag": 0.81,
  36. "type": "earthquake",
  37. "longitude": -116.368,
  38. "latitude": 33.4013333,
  39. "depth": 2.91
  40. }
  41. }
  42. ]
  43. }

` ```

在上面的命令中,我們通過 rename processor 來重新命名 longitude 及 latitude 兩個欄位。執行上面的程式碼,我們可以看到如下的結果:

``

  1. {
  2. "docs": [
  3. {
  4. "doc": {
  5. "_index": "_index",
  6. "_id": "_id",
  7. "_version": "-3",
  8. "_source": {
  9. "sig": 10,
  10. "mag": 0.81,
  11. "depth": 2.91,
  12. "@timestamp": "2022-12-31T17:17:33.210Z",
  13. "coordinates": {
  14. "lon": -116.368,
  15. "lat": 33.4013333
  16. },
  17. "place": "16km N of Borrego Springs, CA",
  18. "time": 1672507053210,
  19. "type": "earthquake",
  20. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271"
  21. },
  22. "_ingest": {
  23. "timestamp": "2023-01-01T00:38:42.729604Z"
  24. }
  25. }
  26. }
  27. ]
  28. }

` ```

很顯然,我們看到了一個新的 coordinates 的欄位。它是一個 object。我們發現有一個多餘的欄位叫做 time。這個並不是我們所需要的。我們可以通過 remove processor 來刪除這個欄位。

``

  1. POST _ingest/pipeline/_simulate
  2. {
  3. "pipeline": {
  4. "description": "This is for data transform for earthquake data",
  5. "processors": [
  6. {
  7. "date": {
  8. "field": "time",
  9. "formats": [
  10. "UNIX_MS"
  11. ]
  12. }
  13. },
  14. {
  15. "rename": {
  16. "field": "latitude",
  17. "target_field": "coordinates.lat"
  18. }
  19. },
  20. {
  21. "rename": {
  22. "field": "longitude",
  23. "target_field": "coordinates.lon"
  24. }
  25. },
  26. {
  27. "remove": {
  28. "field": "time"
  29. }
  30. }
  31. ]
  32. },
  33. "docs": [
  34. {
  35. "_source": {
  36. "place": "16km N of Borrego Springs, CA",
  37. "time": 1672507053210,
  38. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
  39. "sig": 10,
  40. "mag": 0.81,
  41. "type": "earthquake",
  42. "longitude": -116.368,
  43. "latitude": 33.4013333,
  44. "depth": 2.91
  45. }
  46. }
  47. ]
  48. }

` ```

我們執行上面的命令。我們再次檢視輸出的結果:

``

  1. {
  2. "docs": [
  3. {
  4. "doc": {
  5. "_index": "_index",
  6. "_id": "_id",
  7. "_version": "-3",
  8. "_source": {
  9. "sig": 10,
  10. "mag": 0.81,
  11. "depth": 2.91,
  12. "@timestamp": "2022-12-31T17:17:33.210Z",
  13. "coordinates": {
  14. "lon": -116.368,
  15. "lat": 33.4013333
  16. },
  17. "place": "16km N of Borrego Springs, CA",
  18. "type": "earthquake",
  19. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271"
  20. },
  21. "_ingest": {
  22. "timestamp": "2023-01-01T00:44:46.919265Z"
  23. }
  24. }
  25. }
  26. ]
  27. }

` ```

很顯然這個時候,我們的 time 欄位不見了。

在上面,我們通過 _simulate 的端點測試好了我們的 ingest pipeline。接下來,是我們使用命令來建立這個 pipeline 的時候了。我們使用如下的命令來建立這個 pipeline:

``

  1. PUT _ingest/pipeline/earthquake_data_pipeline
  2. {
  3. "description": "This is for data transform for earthquake data",
  4. "processors": [
  5. {
  6. "date": {
  7. "field": "time",
  8. "formats": [
  9. "UNIX_MS"
  10. ]
  11. }
  12. },
  13. {
  14. "rename": {
  15. "field": "latitude",
  16. "target_field": "coordinates.lat"
  17. }
  18. },
  19. {
  20. "rename": {
  21. "field": "longitude",
  22. "target_field": "coordinates.lon"
  23. }
  24. },
  25. {
  26. "remove": {
  27. "field": "time"
  28. }
  29. }
  30. ]
  31. }

` ```

執行上面的命令。這樣我們就建立了一個叫做 earthquake_data_pipeline 的 ingest pipeline。

接下來,我們需要刪除之前所建立的索引,因為它包含我們不需要的一些欄位:

DELETE earthquakes

我們再次執行之前建立索引 earthquakes 的命令:

``

  1. PUT earthquakes
  2. {
  3. "mappings": {
  4. "properties": {
  5. "@timestamp": {
  6. "type": "date"
  7. },
  8. "coordinates": {
  9. "type": "geo_point"
  10. },
  11. "depth": {
  12. "type": "float"
  13. },
  14. "mag": {
  15. "type": "float"
  16. },
  17. "place": {
  18. "type": "text",
  19. "fields": {
  20. "keyword": {
  21. "type": "keyword"
  22. }
  23. }
  24. },
  25. "sig": {
  26. "type": "short"
  27. },
  28. "type": {
  29. "type": "keyword"
  30. },
  31. "url": {
  32. "enabled": false
  33. }
  34. }
  35. }
  36. }

` ```

我們接下來需要修改 data.js 檔案來使用這個 ingest pipeline:

server/routes/api/data.js

`

  1. const express = require('express');
  2. const router = express.Router();
  3. const axios = require('axios')
  4. const client = require('../../elasticsearch/client');
  5. const URL = https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson;

  6. //======= Check that Elasticsearch is up and running =======\

  7. function pingElasticsearch() {
  8. console.log("ping .....")
  9. client.ping({
  10. requestTimeout: 30000,
  11. }, function(error,res) {
  12. if (error) {
  13. console.error('elasticsearch cluster is down!');
  14. } else {
  15. console.log('Elasticsearch Ready');
  16. }
  17. });
  18. };

  19. // ====== Get Data From USGS and then index into Elasticsearch

  20. indexAllDocs = async () => {
  21. try {
  22. const EARTHQUAKES = await axios.get(${URL},{
  23. headers: {
  24. 'Content-Type': [
  25. 'application/json',
  26. 'charset=utf-8'
  27. ]
  28. }
  29. });

  30. console.log('Getting Data From Host')

  31. results = EARTHQUAKES.data.features

  32. results.map(

  33. async (results) => (
  34. (earthquakeObject = {
  35. place: results.properties.place,
  36. time: results.properties.time,
  37. url: results.properties.url,
  38. sig: results.properties.sig,
  39. mag: results.properties.mag,
  40. type: results.properties.type,
  41. longitude: results.geometry.coordinates[0],
  42. latitude: results.geometry.coordinates[1],
  43. depth: results.geometry.coordinates[2],
  44. }),
  45. await client.index({
  46. index: 'earthquakes',
  47. id: results.id,
  48. body: earthquakeObject,
  49. pipeline: 'earthquake_data_pipeline'
  50. })
  51. )
  52. );

  53. if (EARTHQUAKES.data.length) {

  54. indexAllDocs();
  55. } else {
  56. console.log('All Data Has Been Indexed!');
  57. };
  58. } catch (err) {
  59. console.log(err)
  60. };

  61. console.log('Preparing For The Next Data Check...');

  62. }

  63. //================== Official API Call ==================\

  64. router.get('/earthquakes', function (req, res) {
  65. res.send('Running Application...');
  66. console.log('Loading Application...')

  67. setInterval(() => {

  68. pingElasticsearch()
  69. indexAllDocs(res);
  70. }, 120000);

  71. });

  72. module.exports = router;

`` ```

在上面的程式碼中,我對一下的兩處做了修改:

我們再次使用如下的命令來啟動對資料的採集:

curl -XGET http://localhost:5001/api/data/earthquakes

稍等一點時間(超過2分鐘),我們到 Kibana 中來檢視資料:

GET earthquakes/_search

我們可以看到如下的資料:

``

  1. {
  2. "took": 0,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 1,
  6. "successful": 1,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 9,
  13. "relation": "eq"
  14. },
  15. "max_score": 1,
  16. "hits": [
  17. {
  18. "_index": "earthquakes",
  19. "_id": "us7000j1cr",
  20. "_score": 1,
  21. "_source": {
  22. "sig": 340,
  23. "mag": 4.7,
  24. "depth": 181.449,
  25. "@timestamp": "2023-01-01T06:39:45.239Z",
  26. "coordinates": {
  27. "lon": 70.8869,
  28. "lat": 36.5351
  29. },
  30. "place": "36 km S of Jurm, Afghanistan",
  31. "type": "earthquake",
  32. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/us7000j1cr"
  33. }
  34. },
  35. ...

` ```

從上面,我們可以看出來有9個地震資料已經被寫入。我們可以讓應用執行一段時間,它可能會有更多的資料進來。比如:

``

  1. {
  2. "took": 0,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 1,
  6. "successful": 1,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 10,
  13. "relation": "eq"
  14. },
  15. "max_score": 1,
  16. "hits": [
  17. {
  18. "_index": "earthquakes",
  19. "_id": "nc73827436",
  20. "_score": 1,
  21. "_source": {
  22. "sig": 252,
  23. "mag": 4.04,
  24. "depth": 4.51,
  25. "@timestamp": "2023-01-01T06:49:08.930Z",
  26. "coordinates": {
  27. "lon": -121.220665,
  28. "lat": 36.5789986
  29. },
  30. "place": "9km NW of Pinnacles, CA",
  31. "type": "earthquake",
  32. "url": "https://earthquake.usgs.gov/earthquakes/eventpage/nc73827436"
  33. }
  34. },

` ```

我們可以看到10個數據。

從上面的資料中,我們可以看到最終的資料結構就是我們想要的資料結構。

在接下來的文章中,我將詳細描述如何對這個資料進行視覺化。我將使用 Kibana 來進行展示,也會使用 Web 來進行搜尋。敬請期待!

為了方便大家的學習,我把原始碼放在這裡:https://github.com/liu-xiao-guo/earthquakes-app

參考:

【1】https://medium.com/@webdevmark16/ingesting-real-time-data-into-elasticsearch-with-node-js-a7aa9b5acf8c