導(dǎo)讀:B站千億級數(shù)據(jù)同步,每天100T+數(shù)據(jù)導(dǎo)入是如何實(shí)現(xiàn)的?本文將介紹Apache SeaTunnel在嗶哩嗶哩的實(shí)踐。包括以下幾方面內(nèi)容:
- 工具選擇
- 日志
- 提速/限流
- 監(jiān)控自理
01
工具選擇
數(shù)據(jù)集成和數(shù)據(jù)出倉的大體流程如下圖所示,主要以數(shù)倉為中心,從HTTP、MySQL等外部數(shù)據(jù)源抽數(shù)入倉,在數(shù)倉內(nèi)做相應(yīng)業(yè)務(wù)處理后,出倉到對應(yīng)的Clickhouse、MySQL等存儲(chǔ),供業(yè)務(wù)使用。

B站數(shù)據(jù)平臺(tái)在離線出入倉工具上目前主要有兩類。一類是基于DataX二次開發(fā)的Rider項(xiàng)目,另一類是基于Seatunnel 1.1.3二次開發(fā)的AlterEgo項(xiàng)目。

上圖展示了Rider的架構(gòu)。
Rider在使用上支持T+1、H+1定時(shí)調(diào)度,在數(shù)據(jù)源上支持HTTP、MySQL、BOSS等作為數(shù)據(jù)源,此外Rider在使用上主要原生讀寫Hdfs文件。
DataX雖然在單個(gè)進(jìn)程下已經(jīng)足夠優(yōu)秀,但是不支持分布式,另外在大數(shù)據(jù)量下表現(xiàn)不是很好,Seatunnel在分布式場景下表現(xiàn)優(yōu)秀,一方面構(gòu)建在spark之上,天然分布式,另外且自帶了很多插件,非常適合二次開發(fā)。我們調(diào)研后,在Seatunnel基礎(chǔ)上二次開發(fā)了AlterEgo項(xiàng)目。

AlterEgo的工作流程:Input[數(shù)據(jù)源輸入] -> Filter[數(shù)據(jù)處理] -> Output[結(jié)果輸出]。
集群規(guī)模方面,由于歷史原因,目前離線集群與出入倉工具集群是分開部署的。出入倉工具集群節(jié)點(diǎn)數(shù)20+,CPU核數(shù)750+,內(nèi)存1.8T+;每日出入倉調(diào)度任務(wù)同步數(shù)據(jù)方面,日均記錄數(shù)上千億,日均數(shù)據(jù)量在100T以上。
在落地方面,主要提供了界面化操作,對任務(wù)做了抽象和封裝,平臺(tái)化后提供給用戶使用。平臺(tái)會(huì)根據(jù)用戶不同選擇,把任務(wù)封裝為Seatunnel的配置格式或DataX的Json格式配置。目前平臺(tái)的功能如圖,用戶可以選擇數(shù)倉數(shù)據(jù)源的庫名、表名和出倉后存儲(chǔ)的目標(biāo)數(shù)據(jù)源的信息。這里為了管理接入血緣考慮,和安全規(guī)范使用流程,用戶在界面內(nèi)不被允許填寫用戶名密碼,可以選擇對應(yīng)創(chuàng)建的數(shù)據(jù)源。在存儲(chǔ)個(gè)性上,比如為兼容MySQL協(xié)議的數(shù)據(jù)源的導(dǎo)入上,支持了Insert Ignore、Insert Update方式導(dǎo)入數(shù)據(jù)。字段映射可以通過界面上拖拽完成配置。

在任務(wù)運(yùn)維界面,通過DAG查看調(diào)度任務(wù)上下文,出倉和入倉的整個(gè)過程中,任務(wù)是互相依賴的,前面的任務(wù)出問題會(huì)導(dǎo)致后面的任務(wù)產(chǎn)出慢、數(shù)據(jù)延遲等。因此排查問題的過程中,往往需要在任務(wù)DAG中找到上游依賴最長的鏈路或是未完成鏈路并排查問題。
02
日志
平臺(tái)化落地其實(shí)難度不大,套個(gè)皮就可以了,但是很多時(shí)候我們要考慮的是面向運(yùn)維開發(fā),細(xì)節(jié)都是對用戶是封裝好的,需要為用戶提供足夠的運(yùn)維工具支持。這里以日志為例,排錯(cuò)是很常見的場景,當(dāng)用戶排錯(cuò)時(shí),用戶并不希望看到密密麻麻且無用的Yarn日志,但如果使用spark日志,由于Spark環(huán)境配置繁瑣,直接暴露Spark UI給用戶也會(huì)讓用戶的使用體驗(yàn)不佳。此外,在后期我們整合入離線大集群后,集群節(jié)點(diǎn)數(shù)目有四五千個(gè)節(jié)點(diǎn),集群規(guī)模大就導(dǎo)致日志聚合會(huì)慢,日志響應(yīng)時(shí)間長。

為解決日志查詢困難的問題,我們對日志層做了優(yōu)化,在Spark上使用LogAgent把業(yè)務(wù)日志轉(zhuǎn)發(fā)到我們的日志服務(wù)上。為方便查詢,且讓日志歷史信息可追溯,LogAgent在日志中追加了jobId、jobHistoryId、priority等信息。這樣采集日志后,我們會(huì)根據(jù)日志內(nèi)信息做各類告警,例如當(dāng)任務(wù)出倉條數(shù)為0時(shí)發(fā)送告警等。此外,當(dāng)日志有報(bào)錯(cuò)打出,用戶可以直接在日志界面展示的日志里定位程序的問題,所有操作直接在平臺(tái)就可以完成,而不需要其他復(fù)雜的配置。
03
提速/限流
- ClickHouse出倉
Clickhouse數(shù)據(jù)出倉方式有三種:
寫分布式表:寫入性能偏低,代碼比較簡單,不需要依賴RDD Repartition。
寫Local表:需要在本地做一次repartition,會(huì)有性能壓力。但寫入性能會(huì)更高,和寫分布式表一樣,主要用Jdbc協(xié)議。
BulkLoad:BulkLoad將寫壓力前置到Spark層,寫入速度快,降低了Clickhouse側(cè)壓力,寫入不影響讀性能,做到讀寫分離,更加安全。依賴的是文件復(fù)制。
Clickhouse出倉任務(wù)調(diào)度記錄達(dá)到60億以上,數(shù)據(jù)量達(dá)到13T以上;手動(dòng)補(bǔ)數(shù)據(jù)數(shù)據(jù)量在70T,數(shù)據(jù)量和記錄數(shù)都在不斷增長。
- 創(chuàng)作中心-出倉加速
簡單介紹下我們對創(chuàng)作中心的數(shù)據(jù)出倉做了一系列優(yōu)化,加速了數(shù)據(jù)出倉過程。
創(chuàng)作中心使用大量使用TiDB,我們利用jdbc協(xié)議批量寫數(shù)據(jù)。當(dāng)寫得快會(huì)導(dǎo)致TiDB-Server IO高、壓力大。另外在數(shù)據(jù)出倉過程中,可能有新建分區(qū)表的需求,當(dāng)出現(xiàn)DDL操作時(shí),寫入很容易出現(xiàn)Information schema is changed導(dǎo)致失敗。如果存在更新數(shù)據(jù)場景,也會(huì)由于Insert update時(shí)需要把數(shù)據(jù)全部讀出,當(dāng)任務(wù)出現(xiàn)失敗后重試時(shí)任務(wù)耗時(shí)會(huì)增長,性能降低。此外,TIDB集群是有限的,多個(gè)業(yè)務(wù)同時(shí)寫入TiDB時(shí)候會(huì)出現(xiàn)多實(shí)例競爭寫入資源,導(dǎo)致寫入時(shí)間耗時(shí)增加。
以上問題,我們的應(yīng)對方案主要是基于業(yè)務(wù)大都為KV查詢,用自研的分布式KV存儲(chǔ)TaiShan替代TiDB。創(chuàng)作中心業(yè)務(wù)主要集中在點(diǎn)查和Range查詢,比較適合KV類存儲(chǔ)。TaiShan是B站自研的分布式KV存儲(chǔ),經(jīng)過多次出倉實(shí)際壓測,TaiShan的Batch寫入方式和TiDB性能接近,實(shí)際使用并沒有多少性能提升,寫入多時(shí)也會(huì)影響到讀的業(yè)務(wù)。我們最終采用正在做的Bulk Load方式寫入TaiShan。Bulk Load優(yōu)化和前面介紹的Clickhouse的優(yōu)化類似,將寫入壓力前置,放到SeaTunnel層來生成數(shù)據(jù)文件;對于業(yè)務(wù)庫能實(shí)現(xiàn)簡單的讀寫分離,但可能會(huì)存在一些熱點(diǎn)問題,需要前置一次repartion。

我們對jdbc協(xié)議寫入TiDB和BulkLoad的方案分別做了壓測,TiDB寫入3、4億條數(shù)據(jù)多實(shí)例寫入的情況下,壓測任務(wù)要運(yùn)行兩小時(shí)以上,TaiShan只需要十幾分鐘即可跑完壓測,從結(jié)果來看Bulkload簡直不要太好,但有個(gè)無法回避的問題是TiDB集群是多個(gè)業(yè)務(wù)同時(shí)寫入的,分散到單個(gè)任務(wù)看起來寫入時(shí)間長。
我們也在嘗試?yán)@過TiDB直接將數(shù)據(jù)寫入TiKV,這個(gè)方案我們也在調(diào)研和實(shí)踐中,感興趣的小伙伴可以看下:
https://github.com/tidb-incubator/TiBigData

- 限速
在出倉場景,實(shí)際上還要考慮限流以及熔斷,沒有限速可能導(dǎo)致業(yè)務(wù)庫有一些問題,畢竟服務(wù)器能力有限,寫的太快將導(dǎo)致讀有影響。最開始我們使用的方法是在代碼內(nèi)Sleep,簡單實(shí)現(xiàn)就是假設(shè)數(shù)據(jù)寫入很快,可以在一毫秒內(nèi)完成,那么寫入的耗時(shí)就是代碼中sleep的時(shí)間,假如為例限速1w或5k,我們會(huì)通過sleep的時(shí)長就可以得出Spark需要的Executor數(shù),達(dá)到間接的、不準(zhǔn)確的限流。漏斗桶和令牌桶一個(gè)限制流入一個(gè)限制流出,我們用的不是很多。分布式的話我們小范圍使用了Sentinel,但分布式限流如果觸發(fā)熔斷可能由于寫入資源有限而寫入一直處于熔斷狀態(tài),導(dǎo)致寫入時(shí)間長、數(shù)據(jù)任務(wù)破線。BBR算法是個(gè)很好的工具,有很多種實(shí)現(xiàn),依賴的參數(shù)很多,甚至可以有對端的CPU及內(nèi)存水位,不斷嘗試得到最佳寫入量,在使用上可以很好的改善峰值問題。
04
監(jiān)控自理
- 監(jiān)控
在使用上,我們承接了幾乎所有的離線出倉和入倉任務(wù),作為數(shù)倉的零層和尾層,在入倉和出倉時(shí)需要及時(shí)感知可能存在的問題,一方面任務(wù)打優(yōu)先級,方便分級處理,在運(yùn)行時(shí),基于歷史指標(biāo)預(yù)測當(dāng)前任務(wù)的指標(biāo),當(dāng)出現(xiàn)問題時(shí)及時(shí)告警接入檢查。
AlterEgo中,我們在Spark的Application Job內(nèi)定時(shí)上報(bào)寫入速度和寫入的數(shù)據(jù)量。Rider是常駐的可以運(yùn)行多個(gè)Job,在Job中以Job為單位上報(bào)監(jiān)控?cái)?shù)據(jù)。AlterEgo和Rider的數(shù)據(jù)全部接入到消息隊(duì)列里,消息最后被Aulick消費(fèi)。Aulick內(nèi)設(shè)計(jì)了多種指標(biāo)監(jiān)聽器,用于任務(wù)的監(jiān)控,包括運(yùn)行時(shí)間、起止時(shí)間、速度、數(shù)據(jù)量、失敗重試次數(shù)和TiDB和MySQL特有的插入/更新數(shù)據(jù)量?;谶@些采集的指標(biāo)數(shù)據(jù),可以做到任務(wù)實(shí)例頁,方便用戶查看,另外匯總信息可以通過Grafana工具以及其他BI工具功能展示,異常告警交由Sensor做告警觸發(fā)。

數(shù)據(jù)采集上,Rider方式實(shí)現(xiàn)有的內(nèi)存Channel可以拿到同步的數(shù)據(jù)量等指標(biāo)信息。AlterEgo方式由于是分布式,會(huì)有多個(gè)Executor進(jìn)程同時(shí)上報(bào),目前我們主要是通過自定義了累加器完成指標(biāo)的上報(bào),Executor端在使用累加器時(shí)實(shí)例化定時(shí)采集線程,由于各個(gè)Executor進(jìn)程啟動(dòng)時(shí)間不同,所以在上報(bào)時(shí)的時(shí)間點(diǎn)是不準(zhǔn)確的,在使用上我們把時(shí)間按照10秒一個(gè)窗口進(jìn)行規(guī)整,如在0-10秒上報(bào)的數(shù)據(jù)會(huì)全部規(guī)整到0秒上進(jìn)行匯總。

- 自理
在數(shù)據(jù)同步過程中,在數(shù)據(jù)維度上,需要發(fā)現(xiàn)異常讀寫速度、異常讀寫流量和異常走勢,出現(xiàn)問題及時(shí)監(jiān)控和報(bào)警,報(bào)警會(huì)有電話報(bào)警到對應(yīng)負(fù)責(zé)人,對數(shù)據(jù)異常進(jìn)行處理,防止由于上游數(shù)據(jù)導(dǎo)入任務(wù)異常導(dǎo)致下游數(shù)據(jù)產(chǎn)出問題。這類其實(shí)也可以通過DQC去做,但側(cè)重點(diǎn)不同,這里更關(guān)注事中觸發(fā)。
在時(shí)間維度上,基于任務(wù)歷史預(yù)測,數(shù)據(jù)同步任務(wù)到時(shí)間未啟動(dòng),或者任務(wù)已提交到Y(jié)arn但是由于資源不足沒有啟動(dòng),數(shù)據(jù)同步任務(wù)執(zhí)行時(shí)間過長,也是需要及時(shí)處理。
在診斷方面,在任務(wù)失敗后需要解析Error日志進(jìn)行失敗歸因以及跟蹤,方便用戶自理,有一定量以后,還可以做任務(wù)的統(tǒng)計(jì),以及資源優(yōu)化。
05
精彩問答
Q1:3-4億的BulkLoad壓測性能提升是如何實(shí)現(xiàn)的?
A:BulkLoad是先寫數(shù)據(jù)到本地磁盤,然后推到存儲(chǔ)系統(tǒng),由存儲(chǔ)系統(tǒng)加載到內(nèi)存,首先是Seatunnel分布式的,在執(zhí)行時(shí)是分布式多進(jìn)程生成各自的數(shù)據(jù)文件,最后再把數(shù)據(jù)文件推到存儲(chǔ)系統(tǒng), 實(shí)際的性能就看開多少并發(fā)度了,并且整個(gè)過程不會(huì)太消耗存儲(chǔ)系統(tǒng)的CPU及IO壓力,對讀也是非常友好。
Q2:限速階段如何感知下游壓力?
A:感知下游壓力,目前簡單做法可以通過后端返回的RT時(shí)間或者失敗來感知到存儲(chǔ)端壓力,目前熔斷可以簡單這么做,但這種不精細(xì),無法處理好峰值問題,高級玩法可以參考BBR算法,依賴的參數(shù)很多,可以有存儲(chǔ)段的CPU及內(nèi)存水位,可以很好的改善峰值問題。
Q3:能否監(jiān)控精細(xì)到Byte字節(jié)數(shù)?
A:大部分細(xì)節(jié)指標(biāo)需要自己試下,B站這邊是通過自定義d累加器實(shí)的現(xiàn),在寫入數(shù)據(jù)時(shí)記錄數(shù)據(jù)字節(jié)數(shù)、條數(shù)等,簡單點(diǎn)可以自己getBytes拿到,我們這邊累加器會(huì)定時(shí)上報(bào)到消息隊(duì)列,然后由Aulick消費(fèi)數(shù)據(jù)后再進(jìn)行相應(yīng)地做報(bào)警動(dòng)作。比如如果傳輸?shù)淖止?jié)量,字節(jié)速率存在異常,就可以及時(shí)的發(fā)下,找相應(yīng)的同學(xué)協(xié)同排查問題。
Q4:DataX和Seatunnel是否可以互相替代?
A:工具平臺(tái)在落地上,互相替換是很有必要的,出去性能差別外,在執(zhí)行上也只是配置文件的區(qū)別。在集群規(guī)模很大后會(huì)收到很多環(huán)境問題、異常及各類問題影響,在工具使用上有個(gè)降級方案還是很有必要的。比如我們最近遇到了JDK的Bug,在kerberos認(rèn)證時(shí)認(rèn)證隊(duì)列可能拉長,引起寫入速度慢,事故當(dāng)時(shí),DataX在寫數(shù)據(jù)時(shí)需要經(jīng)過認(rèn)證,無法及時(shí)定位問題,大部分集成任務(wù)運(yùn)行出現(xiàn)速度慢、以及超時(shí)。為了防止事故再次出現(xiàn),我們已經(jīng)實(shí)現(xiàn)了大部分任務(wù)的可互相替換運(yùn)行數(shù)據(jù)同步任務(wù),實(shí)現(xiàn)任務(wù)的高可用。
今天的分享就到這里,謝謝大家。
本文經(jīng)授權(quán)發(fā)布,不代表增長黑客立場,如若轉(zhuǎn)載,請注明出處:http://m.allfloridahomeinspectors.com/quan/60849.html