A. 数据倾斜解决思路详解
数据倾斜是由于某个task被分配过多数据,而比其他task需要更多的执行时间(如几十倍,几百倍),导致其他task执行完进入漫长等待的一种现象。
数据倾斜只会发生在多对多或一对多的数据分发的过程中,如spakr的shuffle操作中,在MapRece中的rece阶段,
常见的算子类型为:join,group by 和窗口函数如row_number 。
这是因为这些算子会进行shuffle操作,产生一个key值,如group by的字段,join的on字段,
为了利用多台机器的并发能力,会按这个key值取数范围进行均衡的分发,每台机器尽量分到相同长度的取值范围的key,
然后将这些有key值的数据的数据传输过去。
这时如果某个key范围内的数据量大大多于其他范围的数据量,就会发生数据倾斜。
解决办法:
解决数据倾斜的思路在于,先找到产生数据倾斜的算子操作,然后针对具体的算子,解决它单个key范围被分到过多的数据的问题,
按key的类型,由简便到复杂依次有以下几种解决思路:
1.直接消灭倾斜的key。
2.直接避免shuffle操作,没有了shuffle操作也就没有了数据倾斜
3.通过增多task的数量,减小单个task内的数据量,这个方法适用于某个key范围的数据多的情况。
4.通过特殊处理key值,减小单个task内的数据量,这个方法适用于某些特定的key值的数据过多的情况
第一个解决思路比较简单,找到倾斜的key,直接过滤掉。就没有倾斜问题了。这种操作的适用范围很窄。比如一些空字符串,一些缺省值等等,本身在业务上能接受它们不参与操作。
如果发现造成倾斜的key是这些,就可以直接过滤,非常简单粗暴,性价比最高。
如果该key不能被过滤,就考虑能否将shuffle操作避免掉。
比如join的时候使用广播的方式,将其中一张表广播到所有的机器节点上,这样一个shuffle操作就变成了一个map操作。
广播的方式(map join)适用于join的时候某一张表的数据量比较小的时候,如果两张表都很大,则不适用这种方式。
如果不能避免shuffle操作也不能过滤倾斜的key值,那么我们就要从key值的类型入手,如果倾斜的key值是连续的,不是由单个key值引起的,就可以增大task的数量,
比如,修改shuffle产生的partition参数为更大,就可以使同一个范围内的key值分到不同机器上,
或者使key值重新排列,倒排或者其他方式,使他们不再连续,分配到不同的机器上,就可以防止倾斜。
上述3个操作都比较简单高效,但是应用的场景有限,如果该key不能被过滤,也不能避免shuffle,而且是1个到多个不连续的key引起的,就需要做比较复杂的操作了。
如果是group by,就可以用两阶段聚合法,
将group by a 改成 group by a,b ,然后再group by a
或者增加一个随机数x,将a通过concat(x,a)改成b,将group by a 改成 group by b, 然后再聚合一次去掉x后的b,group by substr(b,length(x))
如果是join操作,就需要分开join,将倾斜的数据和不倾斜的数据分成两部分。
然后两站表不倾斜的部分join得到第一张表。
倾斜的数据,第一张较大的表:增加一个随机数1-x,随机数取决于你想把数据切成几份。
得到 concat(x,a).
另一张较小的表将每一行复制到x份(总共增加x-1份),然后按顺序标上序号1到x,如下所示:
源数据,倾斜的key值为a和b,
大表:aaaaa bbbbb
小表:aaaa bbbb
原来的join最后得到40条数据. 每个key分到20条
处理过key的表,
大表:1a 2a 2a 1a 2a 1b 2b 3b 3b 2b (增加一个随机数前缀1-3)
小表:
1a 2a 3a 1a 2a 3a 1a 2a 3a 1a 2a 3a 这n条数据都按顺序附加一个1~x的前缀
1b 2b 3b 1b 2b 3b 1b 2b 3b 1b 2b 3b
生成40条数据,每个key平均分到6.66条.
6.66条的计算公式是5/3*4, 5是大表的key的条数,有5个,增加随机数之后,被分成了3份,得到了5/3 条,小表虽然也加了随机数但是复制了x份,
所以小表的key还是4个,所以是:5/3*4
最后将倾斜部分的数据和不倾斜的部分的数据分别join之后再union起来就可以了。
当然,解决数据倾斜的不止这些方法,这些方法只是常用的,本质还是打散集中在某台机器,某个task的的数据量。只要能达到这个目的,就可以。