資料開發流程規範及資料監控

語言: CN / TW / HK

點選上方卡片進入 五分鐘學大資料 主頁

然後點選右上角 “ 設為星標

比別人更快接收好文章

一、背景

在大資料時代,規範地進行資料資產管理已成為推動網際網路、大資料、人工智慧和實體經濟深度融合的必要條件。貼近業務屬性、兼顧研發各階段要點的研發規範,可以切實提高研發效率,保障資料研發工作有條不紊地運作。而不完善的研發流程,會降低研發效率,增加成本與風險。

資料研發規範旨在為廣大資料研發者、管理者提供規範化的研發流程指導方法,目的是簡化、規範日常工作流程,提高工作效率,減少無效與冗餘工作,賦能企業、政府更強大的資料掌控力來應對海量增長的業務資料,從而釋放更多人力與財力專注於業務創新。

二、資料開發流程

鑑於對日常資料倉庫研發工作的總結與歸納,將資料倉庫研發流程抽象為如下幾點:

  1. 需求階段:資料產品經理應如何應對不斷變化的業務需求。

  2. 設計階段:資料產品經理、資料開發者應如何綜合性能、成本、效率、質量等因素,更好地組織與儲存資料。

  3. 開發階段:資料研發者如何高效、規範地進行編碼工作。

  4. 測試階段:測試人員應如何準確地暴露程式碼問題與專案風險,提升產出質量。

  5. 釋出階段:如何將具備釋出條件的程式平穩地釋出到線上穩定產出。

  6. 運維階段:運維人員應如何保障資料產出的時效性和穩定性。

具體開發流程

  1. 需求:與運營產品討論需求。業務方把需求提交到JIRA,並且和產品溝通過。

  2. PRD評審:產品評審PRD文件。

  3. 技術方案討論:最好是負責人先溝通一個初級的方案,然後找大家一起討論(可能比直接頭腦風暴效率搞,根據負責人的經驗來討論);然後找大家一起討論。

  4. 技術設計評審:設計評審叫上測試。

  5. 設計評審的原則是,評審會議應該是設計方案大家基本認同的前提下,做方案的文件。

  6. 設計介面:重點準確描述輸入和輸出。

  7. 設計欄位:根據需求定義欄位,並確定欄位指標和獲取來源,建立資料字典。

  8. 開發:開分支,寫程式碼。做好測試case的建立,然後自測。

  9. 程式碼review:叫上測試和一個其他開發同學,給出review的結果。目的是讓其他同學幫忙review其中的邏輯。

  10. 提測:給出提測報告,包括羅列測試點。

  11. 上線:提前告知運維,提前申請機器資源,根據業務預估好CPU、儲存、頻寬等資源。

  12. 文件:開發完成後,文件記錄一下流程以及提供資料表字段說明,方便重構。

資料需求流程

各個角色職責

這個流程針對的是專案是開發,在專案立項的開始,就需要明確各個角色的職責,而且需要和多個角色進行配合。作為資料開發人員,需要協調和各個角色之間的互動:

  • 需要和產品評估該需求的合理性,現有技術棧能否支援該需求,例如:公司想要做個實時資料大盤,如果沒有實時數倉的架構,是沒法完成這塊需求。一旦確定開發,需要協調資源,包含開發資源、裝置資源等等。

  • 需要和業務方、產品方評估資料可行性,資料開發的資料來源並不是憑空出現的,需要和業務方明確已有資料能否支撐需求開發,如果缺少資料,則需要另行規劃缺失資料的抽取方案。

  • 需要自己評估技術可行性,資料開發可能涉及到資料傳輸、資料同步、ETL、實時開發、離線開發等等,要評估從資料來源獲取到資料展現一套流程的可行性,例如:資料來源如果為多個地方產出,可能需要從binlong獲取、Kafka讀取、業務庫同步、HDFS讀取等等,資料輸出也可能到各個地方,例如:mysql、hive、ES、Kafka、redis等等多個儲存,需要在開發之前確定整套資料的流程。

  • 需要確定是否滿足安全與合規要求,對於一些敏感資料如何處理,是一個很重要的組成部分,作為資料開發人員,可能接觸的資料比較多,但是哪些資料可以展現、哪些資料脫敏後可以展現、哪些資料不能落地等等,而且在資料流轉過程中,也要關注資料的安全性,能否落地、能否轉存等等。

  • 需要和測試同學同步資料處理邏輯,並將一些邏輯的SQL進行文件化,方便測試同學進行單元測試,在交付測試之前,需要對程式碼進行自測,以便保障流入到測試執行環節的程式碼達到一定的質量標準。同時最好能讓程式碼通過配置在不同環境進行切換,方便測試同學在測試環境、預發環境進行測試,測試通過後同一套程式碼能夠直接上線。

三、日常資料支撐

除了專案式的開發外,資料開發人員大部分情況下都會面對產品提出來的一些臨時性的資料需求,例如拉去一下近半年的銷售情況、使用者訪問情況等等,這部分資料支撐不需要後端配合、可能也不需要進行測試,而是在已明確的資料指標的基礎上,定期或者不定期的提供一個數據報表。這部分的資料開發模式相對來說比較簡單和快速,但是也需要明確:

  • 明確資料需求模板、常規需求申請單等等,提供需求單的目的是避免長時間的溝通,特別是已經有的資料指標,只需要讓產品提供一份詳細的資料需求單,按照需求單的模版進行提供資料即可。模版如下:

指標需求中通常會涉及到下表中的約定項,如果需要自定義約定項,可以在自定義格式列進行填寫。

  • 明確需求的指標含義,和所需求的欄位明細、統計週期、開發週期等。

四、注意

  • 需求評審完成後,如果發生需求變更或者迭代, 定需要提供迭代/變更的需求申請單 ,或者提供JIRA,避免需求不可追溯。

  • 對於一些重要指標的定義,就算文件中寫了,也要和產品進行確定,例如產品需要近半年的所有銷量,那麼要明確這個銷量是否包含退款、是按照成交時間還是付款時間來計算等等。避免資料指標不匹配,導致二次開發。

  • 開發過程中,文件要規範,先設計在開發,而且在做系統建設的時候,要有全域性視野,不侷限某一個點,並不是釋出完成了,就算結束,程式碼開發完成只是第一步,後續的文件建設、程式碼覆盤、資料監控、資料告警、穩定性等等,都需要在開始規劃好。

  • 及時反饋,在開發過程,不論進行到哪個階段,專案期間每天都需要和前後端同步一下進度,避免延期的風險。

  • 故障處理,在程式上下後,可能會因為客觀或者程式碼的原因出現一些BUG,不同的故障處理方案不同,但是注意覆盤和故障記錄,避免下次出現相同的BUG。

故障等級定義:

P0:

1.全域性問題,影響所有使用者,例如系統必現崩潰,主要功能不可用,嚴重影響使用者正常交易。

2.涉及到使用者資金損失的問題。

解決時間:2小時內。

反饋時間:0.5小時。

反饋方式:comments自動郵件方式+即時通訊:例如QQ\微信\釘釘\電話

P1:

1.全域性問題,影響所有使用者,例如系統次要功能不可用,系統偶現崩潰且崩潰率超過50%。

2.區域性問題,影響超過20%的使用者,例如系統主要功能不可用,系統必現崩潰。解決時間:待定不過夜。

反饋時間:1小時。

反饋方式:comments自動郵件方式+即時通訊:例如QQ\微信\釘釘\電話

P2:

1.區域性問題,影響使用者10%-20%,例如系統次要功能不可用,或者系統某一個邏輯不可用,系統崩潰率20-50%。

解決時間:待定48小時。

反饋方式:comments自動郵件方式。

P3:

1.區域性問題,影響使用者10%以下,例如系統次要功能不可用,系統部分邏輯不正常,僅在某一單一機型或單一使用者出現的問題。

解決時間:待定下個版本釋出。

反饋方式:下個版本的需求計劃中體現。

P4:

1.系統文字錯誤,系統樣式錯誤,系統互動友好性等不影響使用者正常使用的功能。(包含全域性性質)

解決時間:下個版本上線時。

反饋方式:下個版本的需求計劃中體現。

P0\P1級別問題在規定時間內無法解決的,需要該問題的研發同學在問題comments內說明無法在規定時間內解決的合理的解釋,並告知該問題具體的解決時間點同時郵件說明。

五、資料監控與告警

背景

監控系統的一般套路:採集->儲存->展示->告警。

監控系統對於大資料平臺的重要性不言而喻,一般是對大資料整個架構、各個資料的輸入輸出流、中介軟體的穩定性、資料的準確性、資源的使用情況、任務的執行情況進行監控。一般的監控告警通過採集告警日誌、錯誤資料、關鍵詞匹配等獲取錯誤的資料進行實時展現並告警。

常見的監控系統以 Grafana 為基礎,主要功能是將收集儲存的資料按照不同維度、不同應用、不同使用者進行配置化的展示;為了保證資料安全,每個團隊只能看到自己的應用資料。同時對不同維度的資料,可以進行報警配置,根據最常用的報警方式,提供了釘釘報警、郵件報警、webhook報警三種方式。

不過最近在使用Flink的時候有一個業務場景,需要對歷史資料進行監控,方便檢視各個實時任務的表是否有資料產生。所以提供一個python指令碼版的監控各個業務表的資料,並做釘釘告警的功能。

介紹

在做實時資料開發過程中,由於對接了不同的業務方,起了多個實時任務的程式,而資料的監控在運維那邊,但運維同學只有針對整個叢集的監控,對單個作業的監控還沒建立起來,所以會初選一些實時任務在叢集上runing的狀態,但是對Kafka的消費卻丟失,而Kafka目前只保留7天的資料,一旦資料丟失,需要通過離線任務去校驗,會非常的耗時。

所以在這個背景上,單獨做了針對自己輸出的業務報表資料的監控,每天輸出一些資料產生異常的表,並釘釘告警,方便快速處理。

整體的架構圖如下:

如果業務沒有離線校驗的情況下,如何去監控資料表是否產生。

例如以釘釘告警為例:

1、建立釘釘機器人

釘釘開發文件:https://open.dingtalk.com/document/org/application-types

2、完成安全設定後,複製出機器人的Webhook地址,可用於向這個群傳送訊息,格式如下:

https://oapi.dingtalk.com/robot/send?access_token=XXXXXX

同時指定全設定,一般選擇關鍵詞,例如:監控報警等,這個機器人所傳送的訊息,必須包含監控報警 這個詞,才能傳送成功。建立成功後顯示:

3、編寫監控資料庫表的指令碼:

# coding=utf-8
import datetime
import sys
import os
import requests
import json
import pymysql

class test_monitor():

def __init__(self):
self.database='warehouse'
self.host='localhost'
self.username='test'
self.password='123456'
self.table_list = [
{
"table_name": "ods_tgc_scu_index_online",
"ds": "date_create"
},
{
"table_name": "mid_jx_order_detail_online_result",
"ds": "created_at"
}
]

def get_data(self,table_name,ds_time):
try:
db = pymysql.connect(self.host, self.username, self.password, self.database, charset='utf8')
cursor = db.cursor()
yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
today = datetime.datetime.now().strftime("%Y-%m-%d")
sql = 'select count(1) from '+ table_name + ' where ' + ds_time + ' >= %s and '+ ds_time + ' < %s'
cursor.execute(sql,(yesterday,today))
data = cursor.fetchone()
if(data[0] > 0):
num = data[0]
return num
else:
num = 0
return num
cursor.close()
db.close()
except pymysql.InternalError as error:
code, message = error.args
print(">>>>>>>>>>>>>", code, message)
return -1
def push_data(self,table_name,result):
day = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
db = pymysql.connect(self.host, self.username, self.password, self.database, charset='utf8')
cursor = db.cursor()
sql = 'INSERT INTO souche_enable_market_monitor (datab,table_name,num,ds) value (%s,%s,%s,%s)'
cursor.execute(sql,(self.database,table_name,result,day))
db.commit()
cursor.close()
db.close()


def run(self):
def dingmessage(self):
webhook = "https://oapi.dingtalk.com/robot/send?access_token=XXXXXXXXX"
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}
message = {
"msgtype": "text",
"text": {
"content":self
},
"at": {
"atMobiles": [ #此處為需要@什麼人。填寫具體使用者
"此處為需要@什麼人。填寫具體使用者",
],
"isAtAll": True #此處為是否@所有人 True 所有人 False 無需所有人
}
}
message_json = json.dumps(message)
info = requests.post(url=webhook,data=message_json,headers=header)
print('傳送成功')
print(info.json())
day = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
for table in self.table_list:
table_name = table['table_name']
ds_time = table['ds']
result = self.get_data(table_name,ds_time)
self.push_data(table_name,result)
dingmessage('監控報警:\n'+day+'\n'+table_name+"資料量為:"+str(result))


if __name__ == '__main__':
test_monitor().run()

4、任務結果和監控

這樣就可以每天看到昨天的資料是否產生,也可以設定閥值,將沒達到閥值的表輸出告警,然後方便去排查原因和恢復。

優化

上面的指令碼是需要將配置檔案寫到腳本里面的,如果設計到的業務比較多,那麼需要很多人同時修改這個指令碼,沒法做到資料安全的問題,所以下一步準備將這個配置檔案生成一張表,每個人可以通過資料庫insert的操作去新增自己需要監控的表。而且除了釘釘告警還可以發郵件。

例如:

Mysql資料條數的檢測

  • 目的:每天早上檢查配置表中各條記錄是否大於等於閾值,每天一條的,閾值寫1即可。

  • 結果:將不超過閥值的資料釘釘告警

1、表結構設計

CREATE TABLE `warehouse.dj_rpt_check_conf` (
`db` varchar(8) NOT NULL COMMENT '資料庫別名,例如bi,online,warehouse結果庫)' ,
`tbl` varchar(64) NOT NULL COMMENT '表名',
`condition` varchar(256) NOT NULL COMMENT '篩選條件',
`threshold` bigint(20) NOT NULL DEFAULT 0 COMMENT '閾值',
`owner` varchar(16) NOT NULL default 'nobody' COMMENT '負責人:每個人自己固定用一個名字',
`ptype` varchar(8) NOT NULL COMMENT '檢查週期,例如:d(天),w(周,週一),m(月,1號)',
unique index tbl_db (tbl,db)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;

插入資料:
insert into dj_rpt_check_conf values ('bi','dj_share_category_di','stat_date="${ds}"',20,'dy','d');

2、監控指令碼


# coding: utf-8

from djtool import *
import pandas as pd
import sqlalchemy as sq
from sqlalchemy import exc
import sys
import requests
import json
import copy

owner_mobile={'gw':'123456789'}

def check_table(conn,table,condition,threshold):
try:
cursor = conn.cursor()
check_sql= 'select count(1) from ' + table + ' where ' + condition
cursor.execute(check_sql)
data = cursor.fetchone()
if(data[0]<threshold):
return data[0]
else:
return None
except pymysql.InternalError as error:
code, message = error.args
print(">>>>>>>>>>>>>", code, message)
return -1

ds= sys.argv[1]
check_conf = pd.read_sql_table('dj_rpt_check_conf',get_sqlalchemy_conn('mysql','bg'))

check_conf['condition'] = check_conf.condition.str.replace('\$\{ds\}',ds)
check_conf['real_cnt'] = 0
check_conf['failed'] = 0

for db in check_conf.db.unique():
db_conn=get_pymysql_conn("mysql_"+db)
for index,row in check_conf[(check_conf.db==db) & (check_conf.ptype=='d')].iterrows():
real_cnt = check_table(db_conn,row['tbl'],row['condition'],row['threshold'])
if(real_cnt is not None):
check_conf.loc[index,'real_cnt']=real_cnt
check_conf.loc[index,'failed']=1

mail_text = '''
配置表:`warehouse.dj_rpt_check_conf`
'''

fail_conf = check_conf[(check_conf.failed==1)]

if(fail_conf.shape[0]>0):
send_mail(['[email protected]'],[],ds+'--BI任務失敗列表',mail_text + fail_conf.to_html())
else:
send_mail(['[email protected]'],[],ds+'--已經加入監控的BI任務完成:)',mail_text)

headers = {'Content-Type': 'application/json'}
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxx'
msg={
"msgtype": "markdown",
"markdown": {"title":"BI任務失敗了:"+ds,
"text":"#### BI任務失敗了:"+ds+" \n @mobile 失敗任務:\n- fail_task "
},
"at": {
"atMobiles": [
"88888"
]
}
}

for owner in fail_conf.owner.unique():
tmp_msg = copy.deepcopy(msg)
tmp_msg['at']['atMobiles']=[owner_mobile[owner]]
tmp_msg['markdown']['text']=tmp_msg['markdown']['text'].replace('mobile',owner_mobile[owner])
tmp_msg['markdown']['text']=tmp_msg['markdown']['text'].replace('fail_task',"\n- ".join(fail_conf[(fail_conf.owner==owner)].tbl.values.tolist()))
requests.post(ding_url, headers=headers,data=json.dumps(tmp_msg))

3、處理釘釘發郵件,可以通過自定義的方式傳送郵件

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import smtplib
import sys
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email.utils import formataddr
from sys import argv

def main():
sender = 'test'
receivers = ['[email protected]']
password = 'xxxxxxxx'
message = MIMEMultipart()
message['From'] = formataddr(["資料組",sender])
message['To'] = formataddr(["資料組成員",receivers])
subject = 'rest'
message['Subject'] = Header(subject, 'utf-8')
message.attach(MIMEText(sys.argv[1]+'資料見附件\n', 'plain', 'utf-8'))
att1 = MIMEText(open("aa.csv", 'rb').read(), 'base64', 'utf-8')
att1["Content-Type"] = 'application/octet-stream'
att1["Content-Disposition"] = 'attachment; '+'filename='+sys.argv[1]+'.csv'
message.attach(att1)
try:
server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
server.login(sender, password)
server.sendmail(sender,receivers,message.as_string())
print "郵件傳送成功"
except smtplib.SMTPException:
print "Error: 無法傳送郵件"

if __name__ == '__main__':
main()

不過這個執行過程,需要將要發的文字先下載到本地,然後才能傳送,所以一般的執行指令碼如下:

#!/usr/bin/env bash
export JAVA_HOME=/opt/jdk1.8.0_121
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=$PATH:${JAVA_HOME}/bin:{JRE_HOME}/bin:$PATH
export PATH=$PATH:/opt/mysql/bin
dateStrDay=$1
if [ -z "$1" ] ; then
dateStrDay=`date +%Y-%m-%d`
fi
echo $dateStrDay
dateStr=`date +%Y-%m-%d-%M-%S`

hadoop fs -rm -r /tmp/gaowei/test

cd /opt/task/gaowei/warehouse/test
rm *.csv

spark2-submit --class cn.idongjia.data.auction.AuctionOrder --master yarn --deploy-mode cluster /opt/task/gaowei/warehouse/test/datawarehouse_2.11-1.0.jar

hadoop fs -getmerge /tmp/gaowei/test/* /opt/task/gaowei/warehouse/test/bb.csv

sed '1i\使用者id,拍賣訂單數,跑單數,異常訂單數,是否禁言禁拍(0表示否,1表示是),是否在白名單(0表示否,1表示是),是否遮蔽(0表示否,1表示是,時間)' /opt/task/gaowei/warehouse/test/bb.csv > /opt/task/gaowei/warehouse/test/aa.csv

python Email.py ${dateStrDay}

擴充套件

除了python發郵件,Scala和Java也可以直接發郵件,程式碼如下:

package spark_tmp.utils
import java.io.File

import com.typesafe.config.ConfigFactory
import org.apache.spark.rdd.RDD
import play.api.libs.mailer._

object TaskSendMail {
/**
* 定義一個發郵件的人
* @param host STMP服務地址
* @param port STMP服務埠號
* @param user STMP服務使用者郵箱
* @param password STMP服務郵箱密碼
* @param timeout setSocketTomeout 預設: 60s
* @param connectionTimeout setSocketConnectionTimeout 預設:60s
* @return 返回一個可以發郵件的使用者
*/

def createMailer(host:String, port: Int, user: String, password: String, timeout:Int = 10000, connectionTimeout:Int = 10000):SMTPMailer ={
// STMP服務SMTPConfiguration
val configuration = new SMTPConfiguration(
host, port, false, false, false,
Option(user), Option(password), false, timeout = Option(timeout),
connectionTimeout = Option(connectionTimeout), ConfigFactory.empty(), false
)
val mailer: SMTPMailer = new SMTPMailer(configuration)
mailer
}


/**
* 生成一封郵件
* @param subject 郵件主題
* @param from 郵件傳送地址
* @param to 郵件接收地址
* @param bodyText 郵件內容
* @param bodyHtml 郵件的超文字內容
* @param charset 字元編碼 預設:utf-8
* @param attachments 郵件的附件
* @return 一封郵件
*/

def createEmail(subject:String, from:String, to:Seq[String], bodyText:String = "ok", bodyHtml:String = "", charset:String = "utf-8", attachments:Seq[Attachment] = Seq.empty): Email = {

val email = Email(subject, from, to,
bodyText = Option[String](bodyText), bodyHtml = Option[String](bodyHtml),
charset= Option[String](charset),attachments = attachments

)
email

}

/**
* 生成一個附件
* @param name 附件的名字
* @param fileStr 以本地檔案為附件相關引數
* @param rdd 以hdfs檔案或rdd或df為附件相關引數
* @return
*/

def createAttachments(name: String, fileStr: String = "", rdd:RDD[String] = null): Attachment = {
var attachment: Attachment = null
if(fileStr.contains(":")){
val file: File = new File(fileStr)
attachment = AttachmentFile(name, file)
}else{
val data: Array[Byte] = rdd.collect().mkString("\n").getBytes()
// 根據檔案型別選擇MimeTypes對應的值
val mimetype = "text/plain"
attachment = AttachmentData(name, data, mimetype)
}
attachment
}


/**
* 主要針對日常簡單結果的快速傳送
* @param subject 郵件主題名字
* @param toStr 郵件的接收人,多名以,分割
* @param bodyText 郵件的內容
* @return 使用者裝置 <[email protected]>
*/

def dailyEmail(subject:String, toStr:String, bodyText: String):String={
val to = toStr.split(",").toList
// 阿里雲企業 郵箱
val host = "smtp.189.cn"
val port = 25
val user = "[email protected]"
val password = "xxxxxxxxxx"
val from = user


val mailer: SMTPMailer = TaskSendMail.createMailer(host, port, user, password)
val email: Email = TaskSendMail.createEmail(subject, from, to, bodyText = bodyText)
val userdev: String = mailer.send(email)
userdev
}
}

作者:高威

連結:https://zhuanlan.zhihu.com/p/146063232

--END--

非常歡迎大家加我 個人微信 有關大資料的問題我們在 群內 一起討論

長按上方掃碼二維碼,加我微信,拉你進群