Spark SQL实现原理-逻辑计划优化-操作下推:EliminateOuterJoin规则
该规则对outer join操作进行优化,目的是尽可能的消除outer join操作,把它转化成inner或其他的join类型。EliminateOuterJoin优化规则能够生效的情况是:join操作后面跟一个filter操作(按逻辑计划树的节点组织来说,就是:当filter操作是join操作的父节点时)。
EliminateOuterJoin规则的逻辑
EliminateOuterJoin优化规则的执行逻辑可以分成几种情况来说明,具体的逻辑如下:
1.若join是full outer类型,且对join的双方都使用了filter(过滤)操作,则把full outer类型转换成inner类型。
2.若join是left outer类型,且对join的右侧使用了filter(过滤)操作,则把left outer类型转换成inner类型。
3.若join是right outer类型,且对join的左侧使用了filter(过滤)操作,则把right outer类型转换成inner类型。
4.若join是full outer类型,且只对join的左侧使用了filter(过滤)操作,则把full outer类型转换成left outer类型。
5.若join是full outer类型,且只对join的右侧使用了filter(过滤)操作,则把full outer类型转换成right outer类型。
我们知道,一般在实际的开发过程中会比较谨慎使用inner join,因为inner join会丢数据。但Spark SQL为什么还要outer类型优化成inner类型呢?下面继续分析。
优化规则的效果
我们通过例子来看一下以上各种情况的实际效果,从而能更加深刻的理解这个优化过程。
1.使用join,并对两边都使用了filter操作
import spark.implicits._
case class Person(id: String, age: Int)
case class Record(no: String, age: Int)
val data1 = Seq(Person("Michael", 9), Person("Andy", 7), Person("Justin", 11))
val data2 = Seq(Record("Michael", 29), Record("Andy2", 30), Record("Justin2", 11))
val ds1 = spark.createDataset(data1)
val ds2 = spark.createDataset(data2)
// 对join的两边的数据集都添加过滤条件
ds1.join(ds2, ds1("id")===ds2("no"), "fullouter").\
where(ds1("age")>=9 and ds2("age") >=20).explain(true)
以上代码,我们使用fullouter join,并对参与join两端的数据集都添加了过滤条件,可以看到下面通过EliminateOuterJoin规则优化后的逻辑计划。右边是优化后的逻辑计划,可以看到FullOuter转换成了Inner。
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
Filter ((age#147 >= 9) && (age#152 >= 20)) Filter ((age#147 >= 9) && (age#152 >= 20))
!+- Join FullOuter, (id#146 = no#151) +- Join Inner, (id#146 = no#151)
:- LocalRelation [id#146, age#147] :- LocalRelation [id#146, age#147]
+- LocalRelation [no#151, age#152] +- LocalRelation [no#151, age#152]
可以看到join的类型从FullOuter变成了Inner。那么,为什么能把full outer类型转换成inner类型呢?
我们按我们的逻辑把处理过程拆成2步,第一步是进行full outer其结果如下:
id | no | ds1.age | ds2.age |
Michael | Michael | 9 | 29 |
Justin2 | 11 | ||
Justin | 11 | ||
Andy | 7 | ||
Andy2 | 30 |
第二步进行过滤操作,这样最终的结果就是,只剩一条记录:
id | no | ds1.age | ds2.age |
Michael | Michael | 9 | 29 |
所以,通过以上的分析可以看出,由于full outer类型的join会添加空值,这样只要有过滤的操作空值都将无法匹配上,这相当于排除了所有空值的行,也就是相当于inner join的操作。通过优化成inner join,可以过滤掉一些数据,可以加速整个数据处理的过程。下面的处理逻辑和这个类似。
2.若是left outer类型,且对右侧进行了Filter操作,left outer会转换成inner
我们使用leftouter类型的join,并对右侧进行过滤操作,代码如下:
ds1.join(ds2, ds1("id")===ds2("no"), "leftouter").where(ds2("age") >=20).explain(true)
通过查看TRACE日志,可以看到逻辑计划的Join被优化成了Inner类型。
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
Filter (age#152 >= 20) Filter (age#152 >= 20)
!+- Join LeftOuter, (id#146 = no#151) +- Join Inner, (id#146 = no#151)
:- LocalRelation [id#146, age#147] :- LocalRelation [id#146, age#147]
+- LocalRelation [no#151, age#152] +- LocalRelation [no#151, age#152]
这个优化的逻辑和第1种情况类似,当进行left outer join操作时,会保留左侧所有的数据行,若左侧存在而右侧不存在时填充空值。当对右侧数据集进行过滤操作时,空值一定会被过滤掉,也就是说和左侧不匹配的数据行一定会被过滤掉。
这相当于进行了一次inner join的操作。
3.若是right outer类型,且对左侧进行了Filter操作,则把right outer转换成inner
执行以下rightouter join的操作代码:
ds1.join(ds2, ds1("id")===ds2("no"), "rightouter").where(ds1("age") >=9).explain(true)
可以得到优化后的逻辑计划:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
Filter (age#147 >= 9) Filter (age#147 >= 9)
!+- Join RightOuter, (id#146 = no#151) +- Join Inner, (id#146 = no#151)
:- LocalRelation [id#146, age#147] :- LocalRelation [id#146, age#147]
+- LocalRelation [no#151, age#152] +- LocalRelation [no#151, age#152]
这条优化规则的逻辑和第2条类似。这里不再进行分析了。
4.若是full outer类型,且对左侧使用了Filter操作,则把full outer转换成left outer
执行fullouter的join:
ds1.join(ds2, ds1("id")===ds2("no"), "fullouter").where(ds1("age") >=9).explain(true)
该操作被优化后的逻辑计划如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
Filter (age#147 >= 9) Filter (age#147 >= 9)
!+- Join FullOuter, (id#146 = no#151) +- Join LeftOuter, (id#146 = no#151)
:- LocalRelation [id#146, age#147] :- LocalRelation [id#146, age#147]
+- LocalRelation [no#151, age#152] +- LocalRelation [no#151, age#152]
若对左侧进行过滤操作,会过滤掉所有对右侧填空的行(可以参考第1条的full outer结果表)(注意:要是能join上就不会是空值),从而保留了左侧满足条件的行。这相当于是进行了一次left outer join操作。
5.若是full outer类型,且对右侧使用了Filter操作,则把full outer转换成right outer
执行以下代码:
ds1.join(ds2, ds1("id")===ds2("no"), "fullouter").where(ds2("age") >=20).explain(true)
得到优化后的逻辑计划如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
Filter (age#152 >= 20) Filter (age#152 >= 20)
!+- Join FullOuter, (id#146 = no#151) +- Join RightOuter, (id#146 = no#151)
:- LocalRelation [id#146, age#147] :- LocalRelation [id#146, age#147]
+- LocalRelation [no#151, age#152] +- LocalRelation [no#151, age#152]
若对右侧进行过滤操作,会过滤掉所有对左侧填空的行(可以参考第1条的full outer结果表)(注意:要是能join上就不会是空值),从而保留了右侧满足条件的行。这相当于是进行了一次right outer join操作。
规则的实现
该优化规则的核心实现代码(有删减)如下:
object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
...
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
// 对过滤条件进行检查,看是否是符合该优化规则的过滤条件
val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
// 获取左侧数据集的过滤条件
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
// 获取右侧数据集的过滤条件
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
// 根据现有的join类型,返回优化后的join类型
join.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
case LeftOuter if rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate => LeftOuter
case FullOuter if rightHasNonNullPredicate => RightOuter
case o => o
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// 判断是否是本规则可以优化的情况
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
val newJoinType = buildNewJoinType(f, j)
// 判断join类型是否和原来的相等,若不相当,重新构建Filter逻辑计划
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
}
}
小结
由于full outer join操作的代价很大,所以若对数据集进行了过滤操作就可以对full outer操作进行优化。对join的逻辑计划的优化后面还有很多规则,后面的文章会继续分析。