数据准备
创表语句
click_product_id CREATE TABLE `user_visit_action`(
`date` string,
`user_id` bigint,
`session_id` string,
`page_id` bigint,
`action_time` string,
`search_keyword` string,
`click_category_id` bigint,
`click_product_id` bigint,
`order_category_ids` string,
`order_product_ids` string,
`pay_category_ids` string,
`pay_product_ids` string,
`city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath 'input/user_visit_action.txt' into table
user_visit_action;
CREATE TABLE `product_info`(
`product_id` bigint,
`product_name` string,
`extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath 'input/product_info.txt' into table product_info;
CREATE TABLE `city_info`(
`city_id` bigint,
`city_name` string,
`area` string)
row format delimited fields terminated by '\t';
load data local inpath 'input/city_info.txt' into table city_info;
需求
这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每
个商品在主要城市中的分布比例,超过两个城市用其他显示。
例表
地区 | 商品名称 | 点击次数 | 城市备注 |
---|---|---|---|
华北 | 商品A | 13423 | 北京21.2%,天津12.23%,其他*% |
需求分析
- 按照点击量排序的
- 每个大区选出前三点击量TOP3榜单
- 需计算大区内部,城市分布比例(最后计算)
功能实现
- 先联立三张表,得到包含所有所需信息的基础表t1(初步过滤掉非点击量数据click_product_id > -1)
- 计算点击量
- 按area distribute by,按 点击量 sort by
- 取出top3
- 然后按area计算城市分布比例
SQL实现
1.解题步骤
-- t1
select
u.*,
p.product_name,
c.city_name,c.area
from user_visit_action u
join product_info p on u.click_product_id = p.product_id
join city_info c on u.city_id = c.city_id
where u.click_product_id > -1
-- t2
select
t1.area,
t1.product_name,
count(*) as click_num,
rateCity(t1.city_name) rate_city_remark
from
t1
group by t1.area,t1.product_name
-- t3
select
area,
product_name,
click_num,
rate_city_remark,
rank() over(distribute by area sort by click_num desc) rank_num
from
t2
-- t4
select *
from(
t2
)t3
where rank_num <= 3
完整Spark代码
package com.tzk.sparksql.module
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.mutable
object Demo03 {
def main(args: Array[String]): Unit = {
//
val conf = new SparkConf().setMaster("local[*]").setAppName("ClickTop3")
val spark = SparkSession.builder().config(conf).getOrCreate()
//操作
spark.udf.register("rateCity",functions.udaf(new RateCity()))
spark.sql(
"""
|select
|u.*,
|p.product_name,
|c.city_name,c.area
|from user_visit_action u
|join product_info p on u.click_product_id = p.product_id
|join city_info c on u.city_id = c.city_id
|where u.click_product_id > -1
|""".stripMargin).createOrReplaceTempView("t1")
spark.sql(
"""
|select
|t1.area,
|t1.product_name,
|count(*) as click_num,
|rateCity(t1.city_name) rate_city_remark
|from
|t1
|group by t1.area,t1.product_name
|""".stripMargin).createOrReplaceTempView("t2")
spark.sql(
"""
|select
|area,
|product_name,
|click_num,
|rate_city_remark,
|rank() over(distribute by area sort by click_num desc) rank_num
|from
|t2
|""".stripMargin).createOrReplaceTempView("t3")
spark.sql(
"""
|select *
|from(
|t2
|)t3
|where rank_num <= 3
|""".stripMargin).show()
spark.stop()
}
/**
* In: String
* Buffer:Map((String,count),(String,count))
* Out: String*/
case class Buff(var total: Long,cityMap: mutable.Map[String,Long])
class RateCity extends Aggregator[String,Buff,String]{
override def zero: Buff = Buff(0L,mutable.Map[String,Long]())
override def reduce(b: Buff, a: String): Buff = {
b.total += 1
val newCount = b.cityMap.getOrElse(a,0L) + 1L
b.cityMap.update(a,newCount)
b
}
override def merge(b1: Buff, b2: Buff): Buff = {
b1.total += b2.total
b2.cityMap.foreach{
case (t,v) => {
val newCount = b1.cityMap.getOrElse(t, 0L) + v
b1.cityMap.update(t, newCount)
}
}
b1
}
override def finish(reduction: Buff): String = {
val descCityMapList = reduction.cityMap.toList.sortBy(_._2).reverse
val countTop1 = descCityMapList(0)._2
val countTop2 = descCityMapList(1)._2
val rateTop1 = descCityMapList(0)._1+((countTop1.toDouble/reduction.total.toDouble)*100).toLong+"%"
val rateTop2 = descCityMapList(1)._1+((countTop2.toDouble/reduction.total.toDouble)*100).toLong+"%"
val rateOther = (((reduction.total-countTop1-countTop2).toDouble/reduction.total.toDouble)* 100).toLong+"%"
rateTop1+","+rateTop2+","+rateOther
}
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[String] = Encoders.STRING
}
}