如何高效實現 MySQL 與 elasticsearch 的數據同步
MySQL 自身簡單、高效、可靠,是又拍雲內部使用最廣泛的數據庫。但是當數據量達到一定程度的時候,對整個 MySQL 的操作會變得非常遲緩。而公司內部 robin/logs 表的數據量已經達到 800w,後續又有全文檢索的需求。這個需求直接在 MySQL 上實施是難以做到的。
原數據庫的同步問題
由於傳統的 mysql 數據庫並不擅長海量數據的檢索,當數據量到達一定規模時(估算單表兩千萬左右),查詢和插入的耗時會明顯增加。同樣,當需要對這些數據進行模糊查詢或是數據分析時,MySQL作為事務型關係數據庫很難提供良好的性能支持。使用適合的數據庫來實現模糊查詢是解決這個問題的關鍵。
但是,切換數據庫會迎來兩個問題,一是已有的服務對現在的 MySQL 重度依賴,二是 MySQL 的事務能力和軟件生態仍然不可替代,直接遷移數據庫的成本過大。我們綜合考慮了下,決定同時使用多個數據庫的方案,不同的數據庫應用於不同的使用場景。而在支持模糊查詢功能的數據庫中,elasticsearch 自然是首選的查詢數據庫。這樣後續對業務需求的切換也會非常靈活。
那具體該如何實現呢?在又拍雲以往的項目中,也有遇到相似的問題。之前採用的方法是在業務中編寫代碼,然後同步到 elasticsearch 中。具體是這樣實施的:每個系統編寫特定的代碼,修改 MySQL 數據庫後,再將更新的數據直接推送到需要同步的數據庫中,或推送到隊列由消費程序來寫入到數據庫中。
但這個方案有一些明顯的缺點:
-
系統高耦合,侵入式代碼,使得業務邏輯複雜度增加
-
方案不通用,每一套同步都需要額外定製,不僅增加業務處理時間,還會提升軟件複復雜度
-
工作量和複雜度增加
在業務中編寫同步方案,雖然在項目早期比較方便,但隨着數據量和系統的發展壯大,往往最後會成為業務的大痛點。
解決思路及方案
調整架構
既然以往的方案有明顯的缺點,那我們如何來解決它呢?優秀的解決方案往往是 “通過架構來解決問題“,那麼能不能通過架構的思想來解決問題呢?
答案是可以的。我們可以將程序偽裝成 “從數據庫”,主庫的增量變化會傳遞到從庫,那這個偽裝成 “從數據庫” 的程序就能實時獲取到數據變化,然後將增量的變化推送到消息隊列 MQ,後續消費者消耗 MQ 的數據,然後經過處理之後再推送到各自需要的數據庫。
這個架構的核心是通過監聽 MySQL 的 binlog 來同步增量數據,通過基於 query 的查詢舊錶來同步舊數據,這就是本文要講的一種異構數據庫同步的實踐。
改進數據庫
經過深度的調研,成功得到了一套異構數據庫同步方案,並且成功將公司生產環境下的 robin/logs 的表同步到了 elasticsearch 上。
首先對 MySQL 開啟 binlog,但是由於 maxwell 需要的 binlog_format=row 原本的生產環境的數據庫不宜修改。這裏請教了海楊前輩,他提供了”從庫聯級“的思路,在從庫中監聽 binlog 繞過了操作生產環境重啟主庫的操作,大大降低了系統風險。
後續操作比較順利,啟動 maxwell 監聽從庫變化,然後將增量變化推送到 kafka ,最後配置 logstash 消費 kafka中的數據變化事件信息,將結果推送到 elasticsearch。配置 logstash需要結合表結構,這是整套方案實施的重點。
這套方案使用到了kafka、maxwell、logstash、elasticsearch。其中 elasticsearch 與 kafka已經在生產環境中有部署,所以無需單獨部署維護。而 logstash 與 maxwell 只需要修改配置文件和啟動命令即可快速上線。整套方案的意義不僅在於成本低,而且可以大規模使用,公司內有 MySQL 同步到其它數據庫的需求時,都可以上任。
成果展示前後對比
使用該方案同步和業務實現同步的對比
寫入到 elasticsearch 性能對比 (8核4G內存)
經過對比測試,800w 數據量全量同步,使用 logstash 寫到 elasticsearch,實際需要大概 3 小時,而舊方案的寫入時間需要 2.5 天。
方案實施細節
接下來,我們來看看具體是如何實現的。
本方案無需編寫額外代碼,非侵入式的,實現 MySQL 數據與 elasticsearch 數據庫的同步。
下列是本次方案需要使用所有的組件:
-
MySQL
-
Kafka
-
Maxwell(監聽 binlog)
-
Logstash(將數據同步給 elasticsearch)
-
Elasticsearch
1. MySQL配置
本次使用 MySQL 5.5 作示範,其他版本的配置可能稍許不同需要
首先我們需要增加一個數據庫只讀的用户,如果已有的可以跳過。
-- 創建一個 用户名為 maxwell 密碼為 xxxxxx 的用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
開啟數據庫的 binlog
,修改 mysql
配置文件,注意 maxwell
需要的 binlog
格式必須是row
。
# /etc/mysql/my.cnf
[mysqld]
# maxwell 需要的 binlog 格式必須是 row
binlog_format=row
# 指定 server_id 此配置關係到主從同步需要按情況設置,
# 由於此mysql沒有開啟主從同步,這邊默認設置為 1
server_id=1
# logbin 輸出的文件名, 按需配置
log-bin=master
重啟 MySQL 並查看配置是否生效:
sudo systemctl restart mysqld
select @@log_bin;
-- 正確結果是 1
select @@binlog_format;
-- 正確結果是 ROW
如果要監聽的數據庫開啟了主從同步,並且不是主數據庫,需要再從數據庫開啟 binlog 聯級同步。
# /etc/my.cnf
log_slave_updates = 1
需要被同步到 elasticsearch 的表結構。
-- robin.logs
show create table robin.logs;
-- 表結構
CREATE TABLE `logs` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`content` text NOT NULL,
`user_id` int(11) NOT NULL,
`status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL,
`type` varchar(20) DEFAULT '',
`meta` text,
`created_at` bigint(15) NOT NULL,
`idx_host` varchar(255) DEFAULT '',
`idx_domain_id` int(11) unsigned DEFAULT NULL,
`idx_record_value` varchar(255) DEFAULT '',
`idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL,
`idx_orig_record_value` varchar(255) DEFAULT '',
PRIMARY KEY (`id`),
KEY `created_at` (`created_at`)
) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8
2. Maxwell 配置
本次使用 maxwell-1.39.2 作示範, 確保機器中包含 java 環境, 推薦 openjdk11
下載 maxwell 程序
wget http://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gz
tar zxvf maxwell-1.39.2.tar.gz **&&** cd maxwell-1.39.2
maxwell 使用了兩個數據庫:
-
一個是需要被監聽binlog的數據庫(只需要讀權限)
-
另一個是記錄maxwell服務狀態的數據庫,當前這兩個數據庫可以是同一個
重要參數説明:
-
host 需要監聽binlog的數據庫地址
-
port 需要監聽binlog的數據庫端口
-
user 需要監聽binlog的數據庫用户名
-
password 需要監聽binlog的密碼
-
replication_host 記錄maxwell服務的數據庫地址
-
replication_port 記錄maxwell服務的數據庫端口
-
replication_user 記錄maxwell服務的數據庫用户名
-
filter 用於監聽binlog數據時過濾不需要的數據庫數據或指定需要的數據庫
-
producer 將監聽到的增量變化數據提交給的消費者 (如 stdout、kafka)
-
kafka.bootstrap.servers kafka 服務地址
-
kafka_version kafka 版本
-
kafka_topic 推送到kafka的主題
啟動 maxwell
注意,如果 kafka 配置了禁止自動創建主題,需要先自行在 kafka 上創建主題,kafka_version 需要根據情況指定, 此次使用了兩張不同的庫
./bin/maxwell
--host=mysql-maxwell.mysql.svc.cluster.fud3
--port=3306
--user=root
--password=password
--replication_host=192.168.5.38
--replication_port=3306
--replication_user=cloner
--replication_password=password
--filter='exclude: *.*, include: robin.logs'
--producer=kafka
--kafka.bootstrap.servers=192.168.30.10:9092
--kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1
3. 安裝 Logstash
Logstash 包中已經包含了 openjdk,無需額外安裝。
wget http://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz
tar zxvf logstash-8.5.0-linux-x86_64.tar.gz
刪除不需要的配置文件。
rm config/logstash.yml
修改 logstash 配置文件,此處語法參考官方文檔(http://www.elastic.co/guide/en/logstash/current/input-plugins.html) 。
# config/logstash-sample.conf
input {
kafka {
bootstrap_servers => "192.168.30.10:9092"
group_id => "main"
topics => ["maxwell-robinlogs"]
}
}
filter {
json {
source => "message"
}
# 將maxwell的事件類型轉化為es的事件類型
# 如增加 -> index 修改-> update
translate {
source => "[type]"
target => "[action]"
dictionary => {
"insert" => "index"
"bootstrap-insert" => "index"
"update" => "update"
"delete" => "delete"
}
fallback => "unknown"
}
# 過濾無效的數據
if ([action] == "unknown") {
drop {}
}
# 處理數據格式
if [data][idx_host] {
mutate {
add_field => { "idx_host" => "%{[data][idx_host]}" }
}
} else {
mutate {
add_field => { "idx_host" => "" }
}
}
if [data][idx_domain_id] {
mutate {
add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" }
}
} else {
mutate {
add_field => { "idx_domain_id" => "" }
}
}
if [data][idx_record_value] {
mutate {
add_field => { "idx_record_value" => "%{[data][idx_record_value]}" }
}
} else {
mutate {
add_field => { "idx_record_value" => "" }
}
}
if [data][idx_record_opt] {
mutate {
add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" }
}
} else {
mutate {
add_field => { "idx_record_opt" => "" }
}
}
if [data][idx_orig_record_value] {
mutate {
add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" }
}
} else {
mutate {
add_field => { "idx_orig_record_value" => "" }
}
}
if [data][type] {
mutate {
replace => { "type" => "%{[data][type]}" }
}
} else {
mutate {
replace => { "type" => "" }
}
}
mutate {
add_field => {
"id" => "%{[data][id]}"
"content" => "%{[data][content]}"
"user_id" => "%{[data][user_id]}"
"status" => "%{[data][status]}"
"meta" => "%{[data][meta]}"
"created_at" => "%{[data][created_at]}"
}
remove_field => ["data"]
}
mutate {
convert => {
"id" => "integer"
"user_id" => "integer"
"idx_domain_id" => "integer"
"created_at" => "integer"
}
}
# 只提煉需要的字段
mutate {
remove_field => [
"message",
"original",
"@version",
"@timestamp",
"event",
"database",
"table",
"ts",
"xid",
"commit",
"tags"
]
}
}
output {
# 結果寫到es
elasticsearch {
hosts => ["http://es-zico2.service.upyun:9500"]
index => "robin_logs"
action => "%{action}"
document_id => "%{id}"
document_type => "robin_logs"
}
# 結果打印到標準輸出
stdout {
codec => rubydebug
}
}
執行程序:
# 測試配置文件*
bin/logstash -f config/logstash-sample.conf --config.test_and_exit
# 啟動*
bin/logstash -f config/logstash-sample.conf --config.reload.automatic
4. 全量同步
完成啟動後,後續的增量數據 maxwell 會自動推送給 logstash 最終推送到 elasticsearch ,而之前的舊數據可以通過 maxwell 的 bootstrap 來同步,往下面表中插入一條任務,那麼 maxwell 會自動將所有符合條件的 where_clause 的數據推送更新。
INSERT INTO maxwell.bootstrap
( database_name, table_name, where_clause, client_id )
values
( 'robin', 'logs', 'id > 1', 'maxwell' );
後續可以在 elasticsearch 檢測數據是否同步完成,可以先查看數量是否一致,然後抽樣對比詳細數據。
# 檢測 elasticsearch 中的數據量
GET robin_logs/robin_logs/_count
- 國密證書 VS 傳統 SSL 證書,到底區別在哪?
- 如何高效實現 MySQL 與 elasticsearch 的數據同步
- 又拍雲邵海楊 - 25年Linux老兵,聊聊運維的“術”與“道”
- 從 AI 繪畫到 ChatGPT,聊聊生成式 AI
- 從 B 站出發,用 Chrome devTools performance 分析頁面如何渲染
- 【白話科普】聊聊網絡架構變革的關鍵——SDN
- 二狗子翻車了,只因上了這個網站……
- 一文讀懂 Kubernetes 存儲設計
- 從實戰出發,聊聊緩存數據庫一致性
- 當談論 React hook,我們究竟説的是什麼?
- 驚!揭露視頻網站節約 30% 成本的祕密
- 紅利風口下,企業出海如何強勢突圍?
- 紅利風口下,企業出海如何強勢突圍?
- 海外雲主機的選擇要注意什麼?
- 安全防禦直播專場|上好“安全鎖”,為跨境支付保駕護航
- 詳解 SSL(三):SSL 證書該如何選擇?
- 詳解 SSL(二):SSL 證書對網站的好處
- 詳解 SSL(二):SSL 證書對網站的好處
- 詳解 SSL(一):網址欄的小綠鎖有什麼意義?
- 詳解 SSL(一):網址欄的小綠鎖有什麼意義?