SparkSQL小题


数据准备

创表语句

click_product_id CREATE TABLE `user_visit_action`(
 `date` string,
 `user_id` bigint,
 `session_id` string,
 `page_id` bigint,
 `action_time` string,
 `search_keyword` string,
 `click_category_id` bigint,
 `click_product_id` bigint,
 `order_category_ids` string,
 `order_product_ids` string,
 `pay_category_ids` string,
 `pay_product_ids` string,
 `city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath 'input/user_visit_action.txt' into table 
user_visit_action;
CREATE TABLE `product_info`(
 `product_id` bigint,
 `product_name` string,
 `extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath 'input/product_info.txt' into table product_info;
CREATE TABLE `city_info`(
 `city_id` bigint,
 `city_name` string,
 `area` string)
row format delimited fields terminated by '\t';
load data local inpath 'input/city_info.txt' into table city_info;

需求

这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每
个商品在主要城市中的分布比例,超过两个城市用其他显示。

例表


地区 商品名称 点击次数 城市备注
华北 商品A 13423 北京21.2%,天津12.23%,其他*%

需求分析

  1. 按照点击量排序的
  2. 每个大区选出前三点击量TOP3榜单
  3. 需计算大区内部,城市分布比例(最后计算)

功能实现

  1. 先联立三张表,得到包含所有所需信息的基础表t1(初步过滤掉非点击量数据click_product_id > -1)
  2. 计算点击量
  3. 按area distribute by,按 点击量 sort by
  4. 取出top3
  5. 然后按area计算城市分布比例

SQL实现

1.解题步骤

-- t1
select 
u.*,
p.product_name,
c.city_name,c.area
from user_visit_action u
join product_info p on u.click_product_id = p.product_id
join city_info c on u.city_id = c.city_id
where u.click_product_id > -1

-- t2
select
t1.area,
t1.product_name,
count(*) as click_num,
rateCity(t1.city_name) rate_city_remark
from
t1
group by t1.area,t1.product_name

-- t3
select
area,
product_name,
click_num,
rate_city_remark,
rank() over(distribute by area sort by click_num desc) rank_num
from
t2

-- t4
select *
from(
t2
)t3
where rank_num <= 3

完整Spark代码

package com.tzk.sparksql.module

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator

import scala.collection.mutable

object Demo03 {
  def main(args: Array[String]): Unit = {
    //
    val conf = new SparkConf().setMaster("local[*]").setAppName("ClickTop3")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    //操作
    spark.udf.register("rateCity",functions.udaf(new RateCity()))
    spark.sql(
      """
        |select
        |u.*,
        |p.product_name,
        |c.city_name,c.area
        |from user_visit_action u
        |join product_info p on u.click_product_id = p.product_id
        |join city_info c on u.city_id = c.city_id
        |where u.click_product_id > -1
        |""".stripMargin).createOrReplaceTempView("t1")
    spark.sql(
      """
        |select
        |t1.area,
        |t1.product_name,
        |count(*) as click_num,
        |rateCity(t1.city_name) rate_city_remark
        |from
        |t1
        |group by t1.area,t1.product_name
        |""".stripMargin).createOrReplaceTempView("t2")
    spark.sql(
      """
        |select
        |area,
        |product_name,
        |click_num,
        |rate_city_remark,
        |rank() over(distribute by area sort by click_num desc) rank_num
        |from
        |t2
        |""".stripMargin).createOrReplaceTempView("t3")
    spark.sql(
      """
        |select *
        |from(
        |t2
        |)t3
        |where rank_num <= 3
        |""".stripMargin).show()
    spark.stop()
  }
  /**
   * In: String
   * Buffer:Map((String,count),(String,count))
   * Out: String*/
  case class Buff(var total: Long,cityMap: mutable.Map[String,Long])
  class RateCity extends Aggregator[String,Buff,String]{
    override def zero: Buff = Buff(0L,mutable.Map[String,Long]())

    override def reduce(b: Buff, a: String): Buff = {
      b.total += 1
      val newCount = b.cityMap.getOrElse(a,0L) + 1L
      b.cityMap.update(a,newCount)
      b
    }

    override def merge(b1: Buff, b2: Buff): Buff = {
      b1.total += b2.total
      b2.cityMap.foreach{
        case (t,v) => {
          val newCount = b1.cityMap.getOrElse(t, 0L) + v
          b1.cityMap.update(t, newCount)
        }
      }
      b1
    }

    override def finish(reduction: Buff): String = {
      val descCityMapList = reduction.cityMap.toList.sortBy(_._2).reverse
      val countTop1 = descCityMapList(0)._2
      val countTop2 = descCityMapList(1)._2
      val rateTop1 = descCityMapList(0)._1+((countTop1.toDouble/reduction.total.toDouble)*100).toLong+"%"
      val rateTop2 = descCityMapList(1)._1+((countTop2.toDouble/reduction.total.toDouble)*100).toLong+"%"
      val rateOther = (((reduction.total-countTop1-countTop2).toDouble/reduction.total.toDouble)* 100).toLong+"%"
      rateTop1+","+rateTop2+","+rateOther
    }

    override def bufferEncoder: Encoder[Buff] = Encoders.product

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }

}

文章作者: tzkTangXS
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 tzkTangXS !
  目录