A. 數據傾斜解決思路詳解
數據傾斜是由於某個task被分配過多數據,而比其他task需要更多的執行時間(如幾十倍,幾百倍),導致其他task執行完進入漫長等待的一種現象。
數據傾斜只會發生在多對多或一對多的數據分發的過程中,如spakr的shuffle操作中,在MapRece中的rece階段,
常見的運算元類型為:join,group by 和窗口函數如row_number 。
這是因為這些運算元會進行shuffle操作,產生一個key值,如group by的欄位,join的on欄位,
為了利用多台機器的並發能力,會按這個key值取數范圍進行均衡的分發,每台機器盡量分到相同長度的取值范圍的key,
然後將這些有key值的數據的數據傳輸過去。
這時如果某個key范圍內的數據量大大多於其他范圍的數據量,就會發生數據傾斜。
解決辦法:
解決數據傾斜的思路在於,先找到產生數據傾斜的運算元操作,然後針對具體的運算元,解決它單個key范圍被分到過多的數據的問題,
按key的類型,由簡便到復雜依次有以下幾種解決思路:
1.直接消滅傾斜的key。
2.直接避免shuffle操作,沒有了shuffle操作也就沒有了數據傾斜
3.通過增多task的數量,減小單個task內的數據量,這個方法適用於某個key范圍的數據多的情況。
4.通過特殊處理key值,減小單個task內的數據量,這個方法適用於某些特定的key值的數據過多的情況
第一個解決思路比較簡單,找到傾斜的key,直接過濾掉。就沒有傾斜問題了。這種操作的適用范圍很窄。比如一些空字元串,一些預設值等等,本身在業務上能接受它們不參與操作。
如果發現造成傾斜的key是這些,就可以直接過濾,非常簡單粗暴,性價比最高。
如果該key不能被過濾,就考慮能否將shuffle操作避免掉。
比如join的時候使用廣播的方式,將其中一張表廣播到所有的機器節點上,這樣一個shuffle操作就變成了一個map操作。
廣播的方式(map join)適用於join的時候某一張表的數據量比較小的時候,如果兩張表都很大,則不適用這種方式。
如果不能避免shuffle操作也不能過濾傾斜的key值,那麼我們就要從key值的類型入手,如果傾斜的key值是連續的,不是由單個key值引起的,就可以增大task的數量,
比如,修改shuffle產生的partition參數為更大,就可以使同一個范圍內的key值分到不同機器上,
或者使key值重新排列,倒排或者其他方式,使他們不再連續,分配到不同的機器上,就可以防止傾斜。
上述3個操作都比較簡單高效,但是應用的場景有限,如果該key不能被過濾,也不能避免shuffle,而且是1個到多個不連續的key引起的,就需要做比較復雜的操作了。
如果是group by,就可以用兩階段聚合法,
將group by a 改成 group by a,b ,然後再group by a
或者增加一個隨機數x,將a通過concat(x,a)改成b,將group by a 改成 group by b, 然後再聚合一次去掉x後的b,group by substr(b,length(x))
如果是join操作,就需要分開join,將傾斜的數據和不傾斜的數據分成兩部分。
然後兩站表不傾斜的部分join得到第一張表。
傾斜的數據,第一張較大的表:增加一個隨機數1-x,隨機數取決於你想把數據切成幾份。
得到 concat(x,a).
另一張較小的表將每一行復制到x份(總共增加x-1份),然後按順序標上序號1到x,如下所示:
源數據,傾斜的key值為a和b,
大表:aaaaa bbbbb
小表:aaaa bbbb
原來的join最後得到40條數據. 每個key分到20條
處理過key的表,
大表:1a 2a 2a 1a 2a 1b 2b 3b 3b 2b (增加一個隨機數前綴1-3)
小表:
1a 2a 3a 1a 2a 3a 1a 2a 3a 1a 2a 3a 這n條數據都按順序附加一個1~x的前綴
1b 2b 3b 1b 2b 3b 1b 2b 3b 1b 2b 3b
生成40條數據,每個key平均分到6.66條.
6.66條的計算公式是5/3*4, 5是大表的key的條數,有5個,增加隨機數之後,被分成了3份,得到了5/3 條,小表雖然也加了隨機數但是復制了x份,
所以小表的key還是4個,所以是:5/3*4
最後將傾斜部分的數據和不傾斜的部分的數據分別join之後再union起來就可以了。
當然,解決數據傾斜的不止這些方法,這些方法只是常用的,本質還是打散集中在某台機器,某個task的的數據量。只要能達到這個目的,就可以。