SparkSQL的基础命令


DF,DS,RDD三者转换


val rdd = sc.makeRDD(List(("张三",23),("李四",22)))
// RDD => DF
val df = rdd.toDF
// RDD => DS
// 可以直接转,但是工作中一般使用case class,毕竟用DS就是为例操作对象嘛
val ds = rdd.toDS
case class User(name:String,age:Int)
val ds = rdd.toDS[User]
// DF => DS
val ds = df.as[User]
// DS => DF
val df = ds.toDF
// DS => RDD
val rdd = ds.rdd
// DF => RDD
val rdd = df.rdd

创建视图

类似于表,方便sql查询的时候使用

df.createOrReplaceTempView("user")
df.createOrReplaceGlobalTempView("user")
//ds同理

IDEA 开发SparkSQL模板

  1. 添加依赖

    <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql_2.12</artifactId>
     <version>3.0.0</version>
    </dependency>
  2. 代码实现模板

    package com.tzk.sparksql.module
    
    import java.io.FileReader
    import java.util
    
    import com.google.gson.Gson
    import com.google.gson.reflect.TypeToken
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Dataset, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.beans.BeanProperty
    import scala.io.{BufferedSource, Source}
    
    object Demo02 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("TestSparkSQL")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        val sc = spark.sparkContext
    
        import spark.implicits._
        val df = spark.read.json("G:/workspace1/tzkSpark/datas/1.json")
        df.createOrReplaceTempView("user")
        spark.sql("select * from user").show
        //对于复杂json文件,他的支持是不太好的.
        val gson = new Gson()
        val source: BufferedSource = Source.fromFile("datas/支付数据.json", "utf-8")
    
    /*    val recode = gson.fromJson(s, new TypeToken[Recode](){}.getType)
        println(recode)*/
        val infos: Recode = gson.fromJson(source.reader, classOf[Recode])
    /*    infos.RECORDS.forEach{
          case u:User => println(u.user_id)
        }*/
        //采用隐式转换的方式,将java集合转换成scala集合
        import scala.collection.JavaConverters._
        val list: List[User] = infos.RECORDS.asScala.toList
    
        val rdd: RDD[User] = sc.makeRDD(list)
        //val df2 = rdd.toDF
    //    df2.show
        //df2.as[User].show
    val ds:Dataset[User] = rdd.toDS
        ds.printSchema
        ds.show
        ds.createOrReplaceTempView("user")
        spark.sql("select * from user limit 10").show
    
      }
    
    
    }
    case class User(user_id:String, user_name:String, regist_channel:String, regist_time:String, province:String, city:String, platfrom:String, exam_name:String, exam_type_name:String, order_id:String, child_order_id:String, product_name:String, real_pay:String)
    case class Recode(RECORDS:util.List[User])

文章作者: tzkTangXS
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 tzkTangXS !
  目录