解题方案
package com.tzk.sparksql.module
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.mutable
object Demo03 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("ClickTop3")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
sc.textFile("datas/city_info.txt").map{
line =>
val datas = line.split("\t")
City(datas(0).toLong,datas(1),datas(2))
}.toDS().show()
spark.udf.register("rateCity",functions.udaf(new RateCity()))
spark.sql(
"""
|select
|u.*,
|p.product_name,
|c.city_name,c.area
|from user_visit_action u
|join product_info p on u.click_product_id = p.product_id
|join city_info c on u.city_id = c.city_id
|where u.click_product_id > -1
|""".stripMargin).createOrReplaceTempView("t1")
spark.sql(
"""
|select
|t1.area,
|t1.product_name,
|count(*) as click_num,
|rateCity(t1.city_name) rate_city_remark
|from
|t1
|group by t1.area,t1.product_name
|""".stripMargin).createOrReplaceTempView("t2")
spark.sql(
"""
|select
|area,
|product_name,
|click_num,
|rate_city_remark,
|rank() over(distribute by area sort by click_num desc) rank_num
|from
|t2
|""".stripMargin).createOrReplaceTempView("t3")
spark.sql(
"""
|select *
|from(
|t2
|)t3
|where rank_num <= 3
|""".stripMargin).show()
spark.stop()
}
case class City(city_id:BigInt,city_name:String,area:String)
case class Buff(var total: Long,cityMap: mutable.Map[String,Long])
class RateCity extends Aggregator[String,Buff,String]{
override def zero: Buff = Buff(0L,mutable.Map[String,Long]())
override def reduce(b: Buff, a: String): Buff = {
b.total += 1
val newCount = b.cityMap.getOrElse(a,0L) + 1L
b.cityMap.update(a,newCount)
b
}
override def merge(b1: Buff, b2: Buff): Buff = {
b1.total += b2.total
b2.cityMap.foreach{
case (t,v) => {
val newCount = b1.cityMap.getOrElse(t, 0L) + v
b1.cityMap.update(t, newCount)
}
}
b1
}
override def finish(reduction: Buff): String = {
val descCityMapList = reduction.cityMap.toList.sortBy(_._2).reverse
val countTop1 = descCityMapList(0)._2
val countTop2 = descCityMapList(1)._2
val rateTop1 = descCityMapList(0)._1+((countTop1.toDouble/reduction.total.toDouble)*100).toLong+"%"
val rateTop2 = descCityMapList(1)._1+((countTop2.toDouble/reduction.total.toDouble)*100).toLong+"%"
val rateOther = (((reduction.total-countTop1-countTop2).toDouble/reduction.total.toDouble)* 100).toLong+"%"
rateTop1+","+rateTop2+","+rateOther
}
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[String] = Encoders.STRING
}
}