HBase資料同步|HBase表資料匯出至Hive

語言: CN / TW / HK

目錄導讀

  • 1. 前言

  • 2. 環境介紹

  • 3. 核心程式碼

    • 3.1 執行命令及注意事項

    • 3.2 核心程式碼

  • 4. 優化建議

  • 5. 參考文章

1. 前言

本篇文章和大家分享使用 Spark 讀取 HBase 錶快照資料的實現細節,此方案適用於對 HBase 表中的資料進行離線 OLAP 處理或同步至 Hive 中。

2. 環境介紹

  • spark2.4 跑在 kerberos 認證下的 hadoop2.6.0-cdh5.13.1 的 yarn 上,即大叢集 A。

  • HBase 在單獨的叢集上,即 HBase 叢集 B,叢集版本是 hbase2.1.0-cdh6.3.2

3. 核心程式碼

3.1 執行命令及注意事項

 sh /opt/spark-2.4.0-bin-hadoop2.6/bin/spark-submit \
--master yarn \
--queue hbase_to_hive_queue \
--name hbase_to_hive \
# driver-memory 可以調大點,否則可能會出現ApplicationMaster UI 開啟卡死的情況
--driver-memory 16g \
--deploy-mode cluster \
# 非kerberos叢集無需
--principal {your principal} \
--keytab /home/{current_user}/{current_user}.keytab \
--executor-memory 12G \
--executor-cores 4 \
--num-executors 20 \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.shuffle.service.enabled=true \
--conf spark.speculation=true \
--conf spark.network.timeout=100000000 \
# 解決讀取壓縮表,發生sanppy等相關的異常
--conf spark.driver.extraClassPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/* \
--conf spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native \
--conf spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/* \
--conf spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native \
--files /etc/hive/conf/hive-site.xml \
--class org.bigdata.leo.hbase.bukload.HBaseExportSnapshotToHive /data/hbase1/bigdata-hbase-bukload.jar \
# HBaseToHive需要的main函式傳參,具體作用後文
-sourceHBaseZkAddress={sourceHBaseZkAddress} \
-sourceHBaseZkPort={sourceHBaseZkPort} \
-sourceHBaseTableName={sourceHBaseTableName} \
-sourceHBaseTableFamilyName={sourceHBaseTableFamilyName} \
-sourceHBaseFieldNameToLower={sourceHBaseFieldNameToLower} \
-sourceHBaseTablePerRegionSplitNum={sourceHBaseTablePerRegionSplitNum} \
-sourceHBaseTableSnapshotName={sourceHBaseTableSnapshotName} \
-sourceHBaseDefaultFS={sourceHBaseDefaultFS} \
-sourceHBaseRootDir={sourceHBaseRootDir} \
-tmpStoreSnapshotPath={tmpStoreSnapshotPath} \
-targetHiveTableName={targetHiveTableName} \
-targetHiveTableNamePrimaryKeyName={targetHiveTableNamePrimaryKeyName} \
-targetHiveTablePartitionOrNot={targetHiveTablePartitionOrNot} \
-targetHiveTablePartitionFieldName={targetHiveTablePartitionFieldName} \
-scanHBaseTableStartTimestamp={scanHBaseTableStartTimestamp} \
-scanHBaseTableEndTimestamp={scanHBaseTableEndTimestamp}

如上命令需要在大叢集上提交併執行,然後讀取 HBase 獨立叢集 B 上的資料。

大叢集在 Kerberos 環境下,HBase 叢集無 Kerberos,所以,Spark 命令提交時需要指定 kerberos 相關的引數,同時,Kerberos 叢集訪問非 Kerberos 叢集的檔案時,需要為設定 Hadoop 的引數: ipc.client.fallback-to-simple-auth-allowed=true

但是,這個引數無論指定在 spark-conf 中隨命令一塊提交,亦或是顯式宣告在程式碼裡,貌似都不起作用,原因猜測,newAPIHadoopRDD 這個 API 強制載入了本地 hdfs-site.xml 檔案,原始碼如下:

  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()

// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
// newAPIHadoopRDD 這個API強制載入了本地hdfs-site.xml檔案
FileSystem.getLocal(conf)

// Add necessary security credentials to the JobConf. Required to access secure HDFS.
val jconf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jconf)
new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
}

最終有效的解決辦法是,在大叢集的客戶端 hdfs-site.xml 高階配置中設定此引數,然後在 yarn 上分發給所有的 node-manager 節點客戶端,(此步驟不用重啟 hadoop 相關的服務)。

操作 HBase Snappy 壓縮的表時,遇到如下報錯:

·21/11/25 17:47:16 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
0.0 (TID 0, centos-bigdata-datanode, executor 1): org.apache.hadoop.hbas
e.DoNotRetryIOException: Compression algorithm 'snappy' previously faile
d test.

一方面解決 snappy 相關 so 檔案的安裝部署問題(網上有很多資料可供參考,CDH 叢集中預設會部署好),一方面在 Spark 提交命令(或環境變數)中指定 snappy(native)目錄:

--conf spark.driver.extraClassPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/*
--conf spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
--conf spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/*
--conf spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native

可以用如下命令來檢驗 snappy 的有效性:

hbase -Djava.library.path=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native/ org.apache.hadoop.hbase.util.CompressionTest /user/leo_jie/requirements.txt snappy

引數解釋

  • sourceHBaseZkAddress: HBase 叢集的 ZK 地址

  • sourceHBaseZkPort: HBase 叢集 ZK 埠

  • sourceHBaseTableName:HBase 叢集的表

  • sourceHBaseTableFamilyName:HBase 叢集表的列簇

  • sourceHBaseFieldNameToLower:HBase 叢集表字段名是否轉大寫去讀

  • sourceHBaseTablePerRegionSplitNum:設定對 HBase 叢集表的每個 region 的切片數

  • sourceHBaseTableSnapshotName:HBase 叢集表的快照名稱,對錶建立快照後去讀是為了避免讀 HBase 表的過程中 Region 或 HFile 發生變化,同時讀快照可以減輕對 HBase 叢集的壓力

  • sourceHBaseDefaultFS:HBase 叢集 HDFS 的 DefaultFS,如:hdfs://hbase_activate_nn_ip:8020/

  • sourceHBaseRootDir:HBase 叢集的 HDFS RootDir,如:hdfs://hbase_activate_nn_ip:8020/hbase

  • tmpStoreSnapshotPath:快照資料臨時儲存目錄,需要是 HBase 叢集的 HDFS 地址,如:hdfs://hbase_activate_nn_ip:8020/tmp/hbase_export_snapshot,這個目錄用於臨時儲存快照的一些資訊資料

如下引數是同步 HBase 表到 hive 時的 hive 相關的引數配置

  • targetHiveTableName

  • targetHiveTableNamePrimaryKeyName

  • targetHiveTablePartitionOrNot

  • targetHiveTablePartitionFieldName

  • scanHBaseTableStartTimestamp:傳入掃描快照資料的開始時間戳,毫秒級別

  • scanHBaseTableEndTimestamp:傳入掃描快照資料的結束時間戳,毫秒級別

3.2 核心程式碼

sparkSession 相關類初始化

sparkSession相關類初始化

HBase 叢集配置資訊

HBase叢集配置資訊

設定對 hbase 快照資料掃描時,scan 物件的配置資訊,如掃描指定列簇、掃描限制的起止時間戳

快照資料掃描

初始化 MR job,使用 TableSnapshotInputFormat 來讀取 HBase 的快照資料,HBaseExportSnapshotSplit 是對 Region 的自定義切片類,即用於分割 Region,sourceHBaseTablePerRegionSplitNum,即切片數。下文會有更詳細的說明

初始化MR job

使用 newAPIHadoopRDD API 讀取 HBase 錶快照資料,並動態生成 RDD,及建立 DataFrame。

newAPIHadoopRDD

HBaseExportSnapshotSplit 切片類和 sourceHBaseTablePerRegionSplitNum 切片數詳解:

預設情況下:

val hFileRdd: RDD[Row] = sparkSession.sparkContext
.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

TableSnapshotInputFormat 類來讀取 HFile 時,首個 Stage 會按照 HBase 表的 Region 個數來劃分 Task,即,如果 HBase 表的 Region 數目為 50,那麼,在該 Stage 中會產生 50 個 Task。

同時,我們線上每個 Region 的最大大小設定為 20G,這意味著,每個 Region 最大可能會有 20G 的資料交給一個 Task 去處理,資料量越大的表,程式需要處理的時間就越長。

我測試 6 個 Region, 1 億資料量的表進行 Spark 讀取時,切片前好幾個小時跑不下來,切片後,任務併發數會提高几倍,程式的整體執行耗時會縮短至 30 分鐘左右。

除此之外,分割槽策略也應當是我們要考慮的點,我們 HBase 叢集中的表建立時採取的預分割槽策略大多是十六進位制,因此, HBaseExportSnapshotSplit 類繼承了 org.apache.hadoop.hbase.util.RegionSplitter.HexStringSplit,並重寫了 split(start: Array[Byte], end: Array[Byte], numSplits: Int, inclusive: Boolean)方法,其作用是對傳入 startKey 和 endKey 的 Region 的起止區域進行切分操作,即把一個 Region 下的資料儘可能切分成相等資料量的多份。

這個操作的原則是,儘可能切分的準確,儘可能切分的合理。

準確是指,切分規則觸發後,儘可能保證每份的資料條數是相等的(至於每份資料的空間大小是否均衡,便不太好保證啦);

合理是指,把一個 Region 分割的份數要合理,即,分割後所得到的切片數不能太多也不能太少。

舉個例子,用 Spark 讀取一張具有 50 個 Region 的表,一個 Region 的最大大小如果是 20G,包含 1000 萬條資料,那麼,切片後的理想情況是,1000 萬條資料分割為 10 份,每份資料佔據的大小為 2G,資料條目為 100 萬。在 Spark 程式執行的過程中,原先首個 Stage 劃分的 50 個 Task 將變成 500 個,併發可以擴大為原來的十倍,Spark 程式執行的也會更加穩定,同時也能極大地縮短整體的執行耗時。

但是, 如果併發數設定的過大,HBase叢集將會產生大量的讀IO ,可能會影響 HBase 叢集的線上讀寫。這時可以平衡 Spark 的 num-executors 和 executor-cores,假如是 10 * 5 的比例,那麼 Spark 任務執行的並行度可以控制為 50,這兩個引數可以根據 HBase 叢集的讀 IO 的承受能力來進行設定。

HBaseExportSnapshotSplit 實現參考:

package org.bigdata.leo.hbase.bukload

import org.apache.commons.lang.StringUtils
import org.apache.commons.lang3.ArrayUtils
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.RegionSplitter.{HexStringSplit, UniformSplit}
import org.bigdata.leo.hbase.bukload.util.RowKeySplitUtil

/**
* @author leojie 2021/11/22 11:13 上午
*/

class HBaseExportSnapshotSplit extends HexStringSplit {
override def split(start: Array[Byte], end: Array[Byte], numSplits: Int, inclusive: Boolean): Array[Array[Byte]] = {
var keyStart = subRow(start)
var keyEnd = subRow(end)

if (StringUtils.isBlank(Bytes.toStringBinary(keyStart))) {
if (!judgeConvertHexOrNot(keyEnd)) {
keyStart = ArrayUtils.EMPTY_BYTE_ARRAY
new UniformSplit().split(keyStart, keyEnd, numSplits, inclusive)
} else {
keyStart = "00000000".getBytes
super.split(keyStart, keyEnd, numSplits, inclusive)
}
} else if (StringUtils.isBlank(Bytes.toStringBinary(keyEnd))) {
if (!judgeConvertHexOrNot(keyStart)) {
val xFF = 0xFF.toByte
keyEnd = Array[Byte](xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF)
new UniformSplit().split(keyStart, keyEnd, numSplits, inclusive)
} else {
keyEnd = "FFFFFFFF".getBytes
super.split(keyStart, keyEnd, numSplits, inclusive)
}
} else {
if (judgeConvertHexOrNot(keyStart) && judgeConvertHexOrNot(keyEnd)) {
super.split(keyStart, keyEnd, numSplits, inclusive)
} else {
RowKeySplitUtil.getStartAndEndRowBreakpoint(Bytes.toStringBinary(keyStart), Bytes.toStringBinary(keyEnd), numSplits)
}
}
}

def judgeConvertHexOrNot(row: Array[Byte]): Boolean = {
try {
this.convertToBigInteger(row)
true
} catch {
case e: NumberFormatException => false
case _ => false
}
}

def subRow(row: Array[Byte]): Array[Byte] = {
val strRow = Bytes.toStringBinary(row)
if (StringUtils.isBlank(strRow)) {
return row
}
val strBuffer = new StringBuffer(strRow)
if (strRow.length < 8) {
val addLength = 8 - strRow.length
for (i <- 0 until addLength) {
strBuffer.append("0")
}
strBuffer.toString.getBytes()
} else {
strRow.substring(0, 8).getBytes
}
}
}

涉及到的工具類, RowKeySplitUtil

package org.bigdata.leo.hbase.bukload.util;


import org.apache.hadoop.hbase.util.Bytes;
import java.util.*;
import java.util.stream.Collectors;

/**
* @author leojie 2021/11/23 10:55 上午
*/

public class RowKeySplitUtil {
private static final int DEFAULT_PREFIX_LENGTH = 2;

public static void main(String[] args) {
List<String> totalList = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "q");
System.out.println(getListBreakpoint(totalList, 3));
final byte[][] split = getStartAndEndRowBreakpoint("CWWxUAeGeE3qyOxC0uWysFA==", "FuOEUqXfXzVvsuRZul2nGg==", 10);
for (byte[] bytes : split) {
System.out.println(new String(bytes));
}
}

/**
* 傳入開始和結束的rowKey,獲取劃分為n份的分割點
*
* @param startRow 開始rowKey
* @param endRow 結束rowKey
* @param n n份
* @return 分割點列表
*/

public static byte[][] getStartAndEndRowBreakpoint(String startRow, String endRow, int n) {

List<String> startRowList = getListStrCharFromRowKey(startRow);
List<String> endRowList = getListStrCharFromRowKey(endRow);

List<String> startRowPrefixList = extractNEleFromList(startRowList, DEFAULT_PREFIX_LENGTH);
List<String> endRowPrefixList = extractNEleFromList(endRowList, DEFAULT_PREFIX_LENGTH);
return Bytes.split(String.join("", startRowPrefixList).getBytes(), String.join("", endRowPrefixList).getBytes(), n);

/* List<List<String>> sortedCharList = new ArrayList<>(DEFAULT_PREFIX_LENGTH);
for (int i = 0; i < startRowPrefixList.size(); i++) {
int start = startRowPrefixList.get(i).toCharArray()[0];
int end = endRowPrefixList.get(i).toCharArray()[0];
List<String> tempList = new ArrayList<>();
for (int j = start; j <= end; j++) {
tempList.add(String.valueOf((char) j));
}
sortedCharList.add(tempList);
}
final List<String> totalList = descartes(sortedCharList);
return getListBreakpoint(totalList, n);*/

}

/**
* 獲取傳入rowKey的不重複字元字串列表
*
* @param rowKey rowKey
* @return 不重複字元字串列表
*/

private static List<String> getListStrCharFromRowKey(String rowKey) {
List<String> rowList = new ArrayList<>();
for (char c : rowKey.toCharArray()) {
String tempC = String.valueOf(c);
if (!rowList.contains(tempC)) {
rowList.add(String.valueOf(c));
}
}
return rowList;
}

/**
* 從一個列表中提取前n個元素的子列表
*
* @param dataList 資料列表
* @param n 提取總數
* @return 前n個元素的子列表
*/

private static List<String> extractNEleFromList(List<String> dataList, int n) {
if (dataList.size() < n) {
return dataList;
} else {
return dataList.subList(0, n);
}
}

/**
* 計算n個字串列表的笛卡爾積
*
* @param lists 總的列表
* @return 多個列表的笛卡爾積
*/

private static List<String> descartes(List<List<String>> lists) {
List<String> tempList = new ArrayList<>();
for (List<String> list : lists) {
if (tempList.isEmpty()) {
tempList = list;
} else {
tempList = tempList.stream().flatMap(item -> list.stream().map(item2 -> item + "" + item2)).collect(Collectors.toList());
}
}
return tempList;
}

/**
* 獲取視窗為n的分割點
*
* @param totalList 總列表
* @param n 視窗大小
* @return 分割點列表
*/

private static List<List<String>> getListBreakpoint(List<String> totalList, int n) {
final int total = totalList.size();
final int m = total / n;

List<List<String>> resList = new ArrayList<>();
if (m <= 1) {
resList.add(Arrays.asList(totalList.get(0), totalList.get(total - 1)));
} else {
int loop = total / m;
int remainder = total % m;
for (int i = 0; i < loop; i++) {
int start = i * m;
int end = m * (i + 1) - 1;
if (end >= total) {
end = total - 1;
}
resList.add(Arrays.asList(totalList.get(start), totalList.get(end)));
}
if (remainder > 0) {
final List<String> lastResList = resList.remove(resList.size() - 1);
final List<String> remainderList = totalList.subList(m * loop, m * loop + remainder);
resList.add(Arrays.asList(lastResList.get(0), remainderList.get(remainderList.size() - 1)));
}
}
return resList;
}
}

4. 優化建議

參考http://blog.csdn.net/u012477420/article/details/94324932,將 hbase.mapreduce.splits.per.region 設定為大於1之後,出現了以下問題,即第一個Stage中出現了task failed,報錯資訊類似如下:

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /user/gwjiang/reflux/benchmark/output/v2/restore/bfe4e1ff-20c5-41ec-b6db-5b48f808344c/data/default/leyden-2019-02-28-21/313225b6e5b440c96f161f8cb54670f0/recovered.edits/3780577.seqid for DFSClient_NONMAPREDUCE_623145177_155 on 10.101.5.10 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1558587010_154 on 10.101.5.10

在參考文章裡,作者從原始碼的角度上分析了上述報錯出現的原因,但是解決方案還是有些模糊,我的做法是,直接複製了類 TableSnapshotInputFormatImpl ,把 try catch 的程式碼加了進去,如下圖:

TableSnapshotInputFormatImpl

5. 參考文章

  • http://blog.csdn.net/u012477420/article/details/94324932

  • http://blog.csdn.net/weixin_42047967/article/details/105571856