这次分享多维分析优化的另一种情况

【本文大纲】

1、描述问题背景

2、讲一下解决思路

3、解决办法(spark sql处理parquet row group原理及分区原理,参数测试,解决方案)

4、效果

1、描述问题

代码如下:

select

netease_user,

if(campaign_id is null, 'all', campaign_id) as campaign_id,

if(spec_id is null, 'all', spec_id) as spec_id,

if(app_bundle is null, 'all', app_bundle) as app_bundle,

if(render_name is null, 'all', render_name) as render_name,

platform,

sum(bidfloor) as success_bidfloor,

count(distinct clk_request_id) as click_pv,

count(distinct exp_deviceid) as exp_uv,

count(distinct exp_request_id) as exp_pv,

count(distinct clk_deviceid) as click_uv,

round(sum(case when winprice<0 then 0 else winprice end)/1000, 4) as cost

from

(

select distinct

nvl(netease_user , 'true') as netease_user,

nvl(render_name , 'null') as render_name,

platform,

nvl(campaign_id, 'null') as campaign_id,

nvl(spec_id, 'null') as spec_id,

nvl(app_bundle , 'null') as app_bundle,

clk_request_id, exp_deviceid, exp_request_id, clk_deviceid, winprice, bidfloor

from table_a where day = '20190815' and platform is not null

) tmp

group by netease_user, campaign_id, spec_id, app_bundle, render_name, platform

grouping sets(

( netease_user, platform),

( netease_user, platform, campaign_id),

( netease_user, platform, spec_id),

( netease_user, platform,app_bundle),

( netease_user, platform,render_name),

( netease_user, platform,campaign_id, spec_id),

( netease_user, platform,campaign_id, app_bundle),

( netease_user, platform,campaign_id, render_name),

( netease_user, platform, spec_id, app_bundle),

( netease_user, platform, spec_id, render_name),

( netease_user, platform, app_bundle, render_name),

( netease_user, platform, campaign_id, spec_id, app_bundle),

( netease_user, platform, spec_id, app_bundle, render_name),

( netease_user, platform, campaign_id, app_bundle, render_name),

( netease_user, platform, campaign_id, spec_id, render_name),

( netease_user, campaign_id, spec_id, app_bundle, render_name, platform)

)

;

整体逻辑与上一篇:【spark sql多维分析优化——细节是魔鬼】  差不多。

不同的是上一篇的基础表 table_a的总量很大,有几十亿,但是这次的基础表数据量有几百万,并不算很大。

但是运行时长还是挺长的:

需要60分钟左右。

来看一下日志:

第二个job比较慢,一定就是expand 慢了:

从上面可以看到,数据过滤后是582w,经过两次expand 后,变成了4.6个亿,4.6个亿的量本来不算大,但因为只有2个task在处理,就显的异常的慢

2、解决思路

解决多维分析的办法一般是:把逻辑拆开,分别计算指标,然后再 join 起来,这个也是上一篇【spark sql多维分析优化——细节是魔鬼】用到的一个办法。但这个办法有个缺点就是如果指标比较多的情况下,代码会写的很长,数据也会被多加载几遍。

对于这次案例来说,不用拆代码,因为5亿左右的量并不算很大,我们只用把task给扩展一下,从2个扩展到20个应该就能很快处理完了。

该怎么扩展呢?

首先我们先简化一下代码:

这里的distinct 是没必要的,从对业务的了解以及日志的数据来看,distinct 并没使数据大量减少,并且由于distinct引起了shuffle,也会占用一部分时间,因此可以把distinct去掉。

去掉distinct后,expand  操作就会被合并到Job 1 中,这样以来我们只要在读取文件时增加task, 让每个task处理更少的数据,就能提高效率。

3、解决办法及遇到的问题

该怎么提高读取文件的并行度呢?

基础表 table_a 存储格式为parquet,我们首先要了解spark sql 是怎么来处理parquet文件的。

3.1 spark sql分区方式(parquet)

spark 通过FileSourceScanExec 来处理hdfs文件:

/** 基础表table_a不为分桶表,读取数据的分区方式走此方法*/

private def createNonBucketedReadRDD(

readFile: (PartitionedFile) => Iterator[InternalRow],

selectedPartitions: Seq[PartitionDirectory],

fsRelation: HadoopFsRelation): RDD[InternalRow] = {

/**defaultMaxSplitBytes 即为spark.sql.files.maxPartitionBytes 参数,默认为128M*/

val defaultMaxSplitBytes =

fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes

/**openCostInBytes 即为spark.sql.files.openCostInBytes 参数,默认为4M*/

val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes

/**defaultParallelism 并行度参数 即 spark.default.parallelism */

val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism

val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum

val bytesPerCore = totalBytes / defaultParallelism

/**分片方法的计算公式*/

/**openCostInBytes与bytesPerCore取最大,然后再与defaultMaxSplitBytes取最小*/

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +

s"open cost is considered as scanning $openCostInBytes bytes.")

/**遍历文件*/

val splitFiles = selectedPartitions.flatMap { partition =>

partition.files.flatMap { file =>

val blockLocations = getBlockLocations(file)

/**判断文件是否支持分割,parquet可分割*/

if (fsRelation.fileFormat.isSplitable(

fsRelation.sparkSession, fsRelation.options, file.getPath)) {

/**依据分片大小maxSplitBytes计算要多少分区来处理数据*/

(0L until file.getLen by maxSplitBytes).map { offset =>

val remaining = file.getLen - offset

/**假如剩余量不足,那么该文件剩余的作为一个分区*/

val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining

val hosts = getBlockHosts(blockLocations, offset, size)

PartitionedFile(

partition.values, file.getPath.toUri.toString, offset, size, hosts)

}

} else {

/**判断文件是否支持分割,如果不能分割,一个文件一个partition*/

val hosts = getBlockHosts(blockLocations, 0, file.getLen)

Seq(PartitionedFile(

partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))

}

}

}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

.....

如果想要增加分区,即task 数量,就要降低最终分片 maxSplitBytes的值,可以通过降低spark.sql.files.maxPartitionBytes 的值来降低 maxSplitBytes 的值

3.2 参数测试及问题

spark.sql.files.maxPartitionBytes 参数默认为128M,生成了四个分区:

table_a 在hdfs 20190815日的数据情况:

205.2 M part-00000-30ceee1e-2ed6-4239-8a6b-45fc6cbf1ef6.c000205.2 M part-00001-30ceee1e-2ed6-4239-8a6b-45fc6cbf1ef6.c0003.8 M part-00002-30ceee1e-2ed6-4239-8a6b-45fc6cbf1ef6.c000

 共三个数据文件,如果设置参数 spark.sql.files.maxPartitionBytes为64M,会把数据分8个块:

##part-00000 四块range: 0-67108864 ; range: 67108864-134217728; range: 134217728-201326592range: 201326592-215189723

##part-00001 四块range: 0-67108864 ; range: 67108864-134217728; range: 134217728-201326592range: 201326592-215167669

##part-00002 一块range: 0-4002630

启动7个task:  

理论上有6个task分别负责每个64M的块数据,然后最后一个task负责part-00000,part-00001剩余的不足64M的两个块以及part-00002。

然而事实是:

分区数确实增加了,由四个增加到了7个,但是新增的3个却没处理什么数据,大部分的数据还是4个partition在处理,所以还是很慢~~~~

task数增加了,但是数据并没有均分到每个task,为什么呢?

仔细研究了一下parquet 文件的结构:

parquet 文件的数据是以row group 存储,一个parquet 文件可能只含有一个row group,也有可能含有多个row group  ,row group  的大小 主要由parquet.block.size 决定。

spark 在处理parquet 文件时,一个row group 只能由一个task 来处理,在hdfs 中一个row group 可能横跨hdfs block ,那么spark是怎么保证一个task只处理一个 row group 的呢?

static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {

List rowGroups = metaData.getRow_groups();

List newRowGroups = new ArrayList();

for (RowGroup rowGroup : rowGroups) {

long totalSize = 0;

long startIndex = getOffset(rowGroup.getColumns().get(0));

for (ColumnChunk col : rowGroup.getColumns()) {

totalSize += col.getMeta_data().getTotal_compressed_size();

}

/**计算row group中点*/

long midPoint = startIndex + totalSize / 2;

/**谁拥有这个row group的中点,谁就可以处理这个row group*/

if (filter.contains(midPoint)) {

newRowGroups.add(rowGroup);

}

}

metaData.setRow_groups(newRowGroups);

return metaData;

}

这就导致并不是所有task 都能够分到数据。

检查table_a发现,生成table_a时,parquet.block.size  用的默认值128M ,这样就导致一个row group 有128M 的大小。

parquet.block.size 是可以依据实际使用情况来调优的,对于做多维分析表,可以设置稍小一点。

最终 经过调试设置parquet.block.size 为16M ;设置spark.sql.files.maxPartitionBytes为16M

4、效果

修改参数后:

读取hdfs文件时,并行了22个task,并且每个task处理数据均匀。

2分40秒就能完成,有没有棒棒哒?

推荐阅读:

有关用户留存模型的一种设计方法

spark sql多维分析优化——细节是魔鬼

记录一次spark sql的优化过程

从一个sql引发的hive谓词下推的全面复盘及源码分析(上)

从一个sql引发的hive谓词下推的全面复盘及源码分析(下)

Hey!

我是小萝卜算子

欢迎关注:数据仓库践行者

分享是最好的学习,这里记录我对数据仓库的实践的思考和总结

每天学习一点点

知识增加一点点

思考深入一点点

在成为最厉害最厉害最厉害的道路上

很高兴认识你

好文推荐

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。