玖叶教程网

前端编程开发入门

Spark SQL实现原理-逻辑计划优化-操作下推:EliminateOuterJoin

Spark SQL实现原理-逻辑计划优化-操作下推:EliminateOuterJoin规则

该规则对outer join操作进行优化,目的是尽可能的消除outer join操作,把它转化成inner或其他的join类型。EliminateOuterJoin优化规则能够生效的情况是:join操作后面跟一个filter操作(按逻辑计划树的节点组织来说,就是:当filter操作是join操作的父节点时)。

EliminateOuterJoin规则的逻辑

EliminateOuterJoin优化规则的执行逻辑可以分成几种情况来说明,具体的逻辑如下:

1.若join是full outer类型,且对join的双方都使用了filter(过滤)操作,则把full outer类型转换成inner类型。

2.若join是left outer类型,且对join的右侧使用了filter(过滤)操作,则把left outer类型转换成inner类型。

3.若join是right outer类型,且对join的左侧使用了filter(过滤)操作,则把right outer类型转换成inner类型。

4.若join是full outer类型,且只对join的左侧使用了filter(过滤)操作,则把full outer类型转换成left outer类型。

5.若join是full outer类型,且只对join的右侧使用了filter(过滤)操作,则把full outer类型转换成right outer类型。

我们知道,一般在实际的开发过程中会比较谨慎使用inner join,因为inner join会丢数据。但Spark SQL为什么还要outer类型优化成inner类型呢?下面继续分析。

优化规则的效果

我们通过例子来看一下以上各种情况的实际效果,从而能更加深刻的理解这个优化过程。

1.使用join,并对两边都使用了filter操作

 import spark.implicits._
 case class Person(id: String, age: Int)
 case class Record(no: String, age: Int)
 
 val data1 = Seq(Person("Michael", 9), Person("Andy", 7), Person("Justin", 11))
 val data2 = Seq(Record("Michael", 29), Record("Andy2", 30), Record("Justin2", 11))
 
 val ds1 = spark.createDataset(data1)
 val ds2 = spark.createDataset(data2)
 // 对join的两边的数据集都添加过滤条件
 ds1.join(ds2, ds1("id")===ds2("no"), "fullouter").\
         where(ds1("age")>=9 and ds2("age") >=20).explain(true)

以上代码,我们使用fullouter join,并对参与join两端的数据集都添加了过滤条件,可以看到下面通过EliminateOuterJoin规则优化后的逻辑计划。右边是优化后的逻辑计划,可以看到FullOuter转换成了Inner。

 === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
  Filter ((age#147 >= 9) && (age#152 >= 20))   Filter ((age#147 >= 9) && (age#152 >= 20))
 !+- Join FullOuter, (id#146 = no#151)         +- Join Inner, (id#146 = no#151)
     :- LocalRelation [id#146, age#147]           :- LocalRelation [id#146, age#147]
     +- LocalRelation [no#151, age#152]           +- LocalRelation [no#151, age#152]

可以看到join的类型从FullOuter变成了Inner。那么,为什么能把full outer类型转换成inner类型呢?

我们按我们的逻辑把处理过程拆成2步,第一步是进行full outer其结果如下:

id

no

ds1.age

ds2.age

Michael

Michael

9

29


Justin2


11

Justin


11


Andy


7



Andy2


30

第二步进行过滤操作,这样最终的结果就是,只剩一条记录:

id

no

ds1.age

ds2.age

Michael

Michael

9

29

所以,通过以上的分析可以看出,由于full outer类型的join会添加空值,这样只要有过滤的操作空值都将无法匹配上,这相当于排除了所有空值的行,也就是相当于inner join的操作。通过优化成inner join,可以过滤掉一些数据,可以加速整个数据处理的过程。下面的处理逻辑和这个类似。

2.若是left outer类型,且对右侧进行了Filter操作,left outer会转换成inner

我们使用leftouter类型的join,并对右侧进行过滤操作,代码如下:

 ds1.join(ds2, ds1("id")===ds2("no"), "leftouter").where(ds2("age") >=20).explain(true)

通过查看TRACE日志,可以看到逻辑计划的Join被优化成了Inner类型。

 === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
  Filter (age#152 >= 20)                  Filter (age#152 >= 20)
 !+- Join LeftOuter, (id#146 = no#151)    +- Join Inner, (id#146 = no#151)
     :- LocalRelation [id#146, age#147]      :- LocalRelation [id#146, age#147]
     +- LocalRelation [no#151, age#152]      +- LocalRelation [no#151, age#152]

这个优化的逻辑和第1种情况类似,当进行left outer join操作时,会保留左侧所有的数据行,若左侧存在而右侧不存在时填充空值。当对右侧数据集进行过滤操作时,空值一定会被过滤掉,也就是说和左侧不匹配的数据行一定会被过滤掉。

这相当于进行了一次inner join的操作。

3.若是right outer类型,且对左侧进行了Filter操作,则把right outer转换成inner

执行以下rightouter join的操作代码:

 ds1.join(ds2, ds1("id")===ds2("no"), "rightouter").where(ds1("age") >=9).explain(true)

可以得到优化后的逻辑计划:

 === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
  Filter (age#147 >= 9)                   Filter (age#147 >= 9)
 !+- Join RightOuter, (id#146 = no#151)   +- Join Inner, (id#146 = no#151)
     :- LocalRelation [id#146, age#147]      :- LocalRelation [id#146, age#147]
     +- LocalRelation [no#151, age#152]      +- LocalRelation [no#151, age#152]

这条优化规则的逻辑和第2条类似。这里不再进行分析了。

4.若是full outer类型,且对左侧使用了Filter操作,则把full outer转换成left outer

执行fullouter的join:

 ds1.join(ds2, ds1("id")===ds2("no"), "fullouter").where(ds1("age") >=9).explain(true)

该操作被优化后的逻辑计划如下:

 === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
  Filter (age#147 >= 9)                   Filter (age#147 >= 9)
 !+- Join FullOuter, (id#146 = no#151)    +- Join LeftOuter, (id#146 = no#151)
     :- LocalRelation [id#146, age#147]      :- LocalRelation [id#146, age#147]
     +- LocalRelation [no#151, age#152]      +- LocalRelation [no#151, age#152]

若对左侧进行过滤操作,会过滤掉所有对右侧填空的行(可以参考第1条的full outer结果表)(注意:要是能join上就不会是空值),从而保留了左侧满足条件的行。这相当于是进行了一次left outer join操作。

5.若是full outer类型,且对右侧使用了Filter操作,则把full outer转换成right outer

执行以下代码:

ds1.join(ds2, ds1("id")===ds2("no"), "fullouter").where(ds2("age") >=20).explain(true)

得到优化后的逻辑计划如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
 Filter (age#152 >= 20)                  Filter (age#152 >= 20)
!+- Join FullOuter, (id#146 = no#151)    +- Join RightOuter, (id#146 = no#151)
    :- LocalRelation [id#146, age#147]      :- LocalRelation [id#146, age#147]
    +- LocalRelation [no#151, age#152]      +- LocalRelation [no#151, age#152]

若对右侧进行过滤操作,会过滤掉所有对左侧填空的行(可以参考第1条的full outer结果表)(注意:要是能join上就不会是空值),从而保留了右侧满足条件的行。这相当于是进行了一次right outer join操作。

规则的实现

该优化规则的核心实现代码(有删减)如下:

object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
	...
  private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
    // 对过滤条件进行检查,看是否是符合该优化规则的过滤条件
    val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
    // 获取左侧数据集的过滤条件
    val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
    // 获取右侧数据集的过滤条件
    val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))

    lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
    lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)

    // 根据现有的join类型,返回优化后的join类型
    join.joinType match {
      case RightOuter if leftHasNonNullPredicate => Inner
      case LeftOuter if rightHasNonNullPredicate => Inner
      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
      case FullOuter if leftHasNonNullPredicate => LeftOuter
      case FullOuter if rightHasNonNullPredicate => RightOuter
      case o => o
    }
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    // 判断是否是本规则可以优化的情况
    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
      val newJoinType = buildNewJoinType(f, j)
    	// 判断join类型是否和原来的相等,若不相当,重新构建Filter逻辑计划
      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
  }
}

小结

由于full outer join操作的代价很大,所以若对数据集进行了过滤操作就可以对full outer操作进行优化。对join的逻辑计划的优化后面还有很多规则,后面的文章会继续分析。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言