Rollup函数
在对数据进行小计或合计运算时,rollup和cube一样,算是常用的操作了。Spark的DataFrame提供了rollup函数支持此功能。
假设准备了如下数据:
trait SalesDataFrameFixture extends DataFrameFixture
with SparkSqlSupport {
implicit class StringFuncs(str: String) {
def toTimestamp = new Timestamp(Date.valueOf(str).getTime)
}
import sqlContext.implicits._
val sales = Seq(
(1, "Widget Co", 1000.00, 0.00, "广东省", "深圳市", "2014-02-01".toTimestamp),
(2, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-11".toTimestamp),
(3, "Acme Widgets", 1000.00, 500.00, "四川省", "绵阳市", "2014-02-12".toTimestamp),
(4, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-13".toTimestamp),
(5, "Widget Co", 1000.00, 0.00, "广东省", "广州市", "2015-01-01".toTimestamp),
(6, "Acme Widgets", 1000.00, 500.00, "四川省", "泸州市", "2015-01-11".toTimestamp),
(7, "Widgetry", 1000.00, 200.00, "四川省", "成都市", "2015-02-11".toTimestamp),
(8, "Widgets R Us", 3000.00, 0.0, "四川省", "绵阳市", "2015-02-19".toTimestamp),
(9, "Widgets R Us", 2000.00, 0.0, "广东省", "深圳市", "2015-02-20".toTimestamp),
(10, "Ye Olde Widgete", 3000.00, 0.0, "广东省", "深圳市", "2015-02-28".toTimestamp),
(11, "Ye Olde Widgete", 3000.00, 0.0, "广东省", "广州市", "2015-02-28".toTimestamp)
)
val saleDF = sqlContext.sparkContext.parallelize(sales, 4).toDF("id", "name", "sales", "discount", "province", "city", "saleDate")
}
注册临时表,并执行SQL语句:
saleDF.registerTempTable("sales")
val dataFrame = sqlContext.sql("select province,city,sales from sales")
dataFrame.show
执行的结果如下:
| province |city | sales |
|----------|-----|-------|
| 广东省| 深圳市|1000.0|
| 四川省| 成都市|1000.0|
| 四川省| 绵阳市|1000.0|
| 四川省| 成都市|1000.0|
| 广东省| 广州市|1000.0|
| 四川省| 泸州市|1000.0|
| 四川省| 成都市|1000.0|
| 四川省| 绵阳市|3000.0|
| 广东省| 深圳市|2000.0|
| 广东省| 深圳市|3000.0|
| 广东省| 广州市|3000.0|
对该DataFrame执行rollup:
val resultDF = dataFrame.rollup($"province", $"city").agg(Map("sales" -> "sum"))
resultDF.show
在这个例子中,rollup操作相当于对dataFrame中的province与city进行分组,并在此基础上针对sales进行求和运算,故而获得的结果为:
|province|city|sum(sales)|
|--------|----|----------|
| null|null| 18000.0|
| 广东省|null| 10000.0|
| 广东省| 深圳市| 6000.0|
| 四川省|null| 8000.0|
| 四川省| 成都市| 3000.0|
| 四川省| 绵阳市| 4000.0|
| 广东省| 广州市| 4000.0|
| 四川省| 泸州市| 1000.0|
操作非常简单,然而遗憾地是并不符合我们产品的场景,因为我们需要根据某些元数据直接组装为Spark SQL的sql语句。在Spark的hiveContext中,支持这样的语法:
hiveContext.sql("select province, city, sum(sales) from sales group by province, city with rollup")
可惜,SQLContext并不支持这一功能。我在Spark User Mailing List中咨询了这个问题。Intel的Cheng Hao(Spark的一位非常活跃的contributer)告诉了我为何不支持的原因。因为在Spark SQL 1.x版本中,对SQL语法的解析采用了Scala的Parser机制。这种实现方式较弱,对语法的解析支持不够。Spark的Issue #5080尝试提供此功能,然而并没有被合并到Master中。Spark并不希望在1.x版本的SQLParser中添加新的关键字,它的计划是在Spark 2.0中用HQL Parser来替代目前较为简陋的SQL Parser。
如果希望在sql中使用rollup,那么有三个选择:
- 使用HQLContext;
- pull #5080的代码,自己建立一个Spark的分支;
- 等待Spark 2.0版本发布。