博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
2.sparkSQL--DataFrames与RDDs的相互转换
阅读量:7240 次
发布时间:2019-06-29

本文共 3986 字,大约阅读时间需要 13 分钟。

Spark SQL支持两种RDDs转换为DataFrames的方式
使用反射获取RDD内的Schema
    当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
通过编程接口指定Schema
    通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
    这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema。

原文和作者一起讨论:

微信:intsmaze

使用反射获取Schema(Inferring the Schema Using Reflection)
import org.apache.spark.sql.{DataFrameReader, SQLContext}import org.apache.spark.{SparkConf, SparkContext}object InferringSchema {  def main(args: Array[String]) {    //创建SparkConf()并设置App名称    val conf = new SparkConf().setAppName("SQL-intsmaze")    //SQLContext要依赖SparkContext    val sc = new SparkContext(conf)    //创建SQLContext    val sqlContext = new SQLContext(sc)    //从指定的地址创建RDD    val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(","))    //创建case class    //将RDD和case class关联    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))    //导入隐式转换,如果不导入无法将RDD转换成DataFrame    //将RDD转换成DataFrame    import sqlContext.implicits._    val personDF = personRDD.toDF    //注册表    personDF.registerTempTable("intsmaze")    //传入SQL    val df = sqlContext.sql("select * from intsmaze order by age desc limit 2")    //将结果以JSON的方式存储到指定位置    df.write.json("hdfs://192.168.19.131:9000/personresult")    //停止Spark Context    sc.stop()  }}//case class一定要放到外面case class Person(id: Int, name: String, age: Int)
spark shell中不需要导入sqlContext.implicits._是因为spark shell默认已经自动导入了。
打包提交到yarn集群:
/home/hadoop/app/spark/bin/spark-submit --class InferringSchema \--master yarn \--deploy-mode cluster \--driver-memory 512m \--executor-memory 512m \--executor-cores 2 \--queue default \/home/hadoop/sparksql-1.0-SNAPSHOT.jar

 

通过编程接口指定Schema(Programmatically Specifying the Schema)

当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:

从原来的RDD创建一个Row格式的RDD.

创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema.

通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema.

 

import org.apache.spark.sql.{Row, SQLContext}import org.apache.spark.sql.types._import org.apache.spark.{SparkContext, SparkConf}object SpecifyingSchema {  def main(args: Array[String]) {    //创建SparkConf()并设置App名称    val conf = new SparkConf().setAppName("SQL-intsmaze")    //SQLContext要依赖SparkContext    val sc = new SparkContext(conf)    //创建SQLContext    val sqlContext = new SQLContext(sc)    //从指定的地址创建RDD    val personRDD = sc.textFile(args(0)).map(_.split(","))    //通过StructType直接指定每个字段的schema    val schema = StructType(      List(        StructField("id", IntegerType, true),        StructField("name", StringType, true),        StructField("age", IntegerType, true)      )    )    //将RDD映射到rowRDD    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))    //将schema信息应用到rowRDD上    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)    //注册表    personDataFrame.registerTempTable("intsmaze")    //执行SQL    val df = sqlContext.sql("select * from intsmaze order by age desc ")    //将结果以JSON的方式存储到指定位置    df.write.json(args(1))    //停止Spark Context    sc.stop()  }}
将程序打成jar包,上传到spark集群,提交Spark任务
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 2 \ --queue default \ /home/hadoop/sparksql-1.0-SNAPSHOT.jar \ hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

 

/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \--master yarn \--deploy-mode client \--driver-memory 512m \--executor-memory 512m \--executor-cores 2 \--queue default \/home/hadoop/sparksql-1.0-SNAPSHOT.jar \hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

maven项目的pom.xml中添加Spark SQL的依赖

  
org.apache.spark
  
spark-sql_2.10
  
1.6.2

 

作者:
出处:
老铁,你的--->推荐,--->关注,--->评论--->是我继续写作的动力。
微信公众号号:Apache技术研究院
由于博主能力有限,文中可能存在描述不正确,欢迎指正、补充!
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
你可能感兴趣的文章
MBR扇区故障及修复
查看>>
获取jar包路径,遍历
查看>>
【VMware vSAN 6.6】5.1.基于存储策略的管理:vSAN硬件服务器解决方案
查看>>
ISTP论文发表 SCI论文发表 EI论文发表常识
查看>>
怎样轻松几步将视频转换成GIF
查看>>
ECS应用管理最佳实践
查看>>
12.throw和throws是的区别
查看>>
福建海峡银行使用ManageEngine统一管控业务应用系统
查看>>
ssh访问与控制
查看>>
皆大欢喜!iPhone不再耗电,续航增加就靠它
查看>>
编写脚本 sumid.sh,计算/etc/passwd文件中的第10个用户和第20用户的 ID之和
查看>>
宇宙沸腾SCCM 2012 R2系列(10)OSD操作系统部署(二)- 添加和分发系统映像包
查看>>
grub加密
查看>>
vim中高级技巧
查看>>
centos7+VMware Workstation创建共享文件夹
查看>>
1.10版的Django对应的后台编辑器DjangoUeditor
查看>>
蓝桥杯 【基础练习】 十六进制转十进制
查看>>
关于四则运算表达式分析思路
查看>>
OC基础第三讲
查看>>
数据库发布订阅:发送邮件
查看>>