Spark金典例题



解题方案

package com.tzk.sparksql.module

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator

import scala.collection.mutable

object Demo03 {
  def main(args: Array[String]): Unit = {
    //
    val conf = new SparkConf().setMaster("local[*]").setAppName("ClickTop3")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    //数据准备
    val sc = spark.sparkContext
    sc.textFile("datas/city_info.txt").map{
      line =>
val datas = line.split("\t")
        City(datas(0).toLong,datas(1),datas(2))
    }.toDS().show()

    //操作
    spark.udf.register("rateCity",functions.udaf(new RateCity()))
    spark.sql(
      """
        |select
        |u.*,
        |p.product_name,
        |c.city_name,c.area
        |from user_visit_action u
        |join product_info p on u.click_product_id = p.product_id
        |join city_info c on u.city_id = c.city_id
        |where u.click_product_id > -1
        |""".stripMargin).createOrReplaceTempView("t1")
    spark.sql(
      """
        |select
        |t1.area,
        |t1.product_name,
        |count(*) as click_num,
        |rateCity(t1.city_name) rate_city_remark
        |from
        |t1
        |group by t1.area,t1.product_name
        |""".stripMargin).createOrReplaceTempView("t2")
    spark.sql(
      """
        |select
        |area,
        |product_name,
        |click_num,
        |rate_city_remark,
        |rank() over(distribute by area sort by click_num desc) rank_num
        |from
        |t2
        |""".stripMargin).createOrReplaceTempView("t3")
    spark.sql(
      """
        |select *
        |from(
        |t2
        |)t3
        |where rank_num <= 3
        |""".stripMargin).show()
    spark.stop()
  }
  case class City(city_id:BigInt,city_name:String,area:String)
  /**
   * In: String
   * Buffer:Map((String,count),(String,count))
   * Out: String*/
  case class Buff(var total: Long,cityMap: mutable.Map[String,Long])
  class RateCity extends Aggregator[String,Buff,String]{
    override def zero: Buff = Buff(0L,mutable.Map[String,Long]())

    override def reduce(b: Buff, a: String): Buff = {
      b.total += 1
      val newCount = b.cityMap.getOrElse(a,0L) + 1L
      b.cityMap.update(a,newCount)
      b
    }

    override def merge(b1: Buff, b2: Buff): Buff = {
      b1.total += b2.total
      b2.cityMap.foreach{
        case (t,v) => {
          val newCount = b1.cityMap.getOrElse(t, 0L) + v
          b1.cityMap.update(t, newCount)
        }
      }
      b1
    }

    override def finish(reduction: Buff): String = {
      val descCityMapList = reduction.cityMap.toList.sortBy(_._2).reverse
      val countTop1 = descCityMapList(0)._2
      val countTop2 = descCityMapList(1)._2
      val rateTop1 = descCityMapList(0)._1+((countTop1.toDouble/reduction.total.toDouble)*100).toLong+"%"
      val rateTop2 = descCityMapList(1)._1+((countTop2.toDouble/reduction.total.toDouble)*100).toLong+"%"
      val rateOther = (((reduction.total-countTop1-countTop2).toDouble/reduction.total.toDouble)* 100).toLong+"%"
      rateTop1+","+rateTop2+","+rateOther
    }

    override def bufferEncoder: Encoder[Buff] = Encoders.product

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }

}

文章作者: tzkTangXS
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 tzkTangXS !
  目录