大数据分析工程师入门9-Spark SQL
liebian365 2024-12-30 05:07 13 浏览 0 评论
本文为《大数据分析师入门课程》系列的第9篇,在本系列的第8篇-Spark基础中,已经对Spark做了一个入门介绍,在此基础上本篇拎出Spark SQL,主要站在使用者的角度来进行讲解,需要注意的是本文中的例子的代码均使用Scala语言。
主要包括以下内容:
- 你该了解的Spark SQL
- 简单入门操作
- 不得不说的数据源
一你该了解的Spark SQL
1.什么是Spark SQL?
Spark SQL是Spark专门用来处理结构化数据的模块,是Spark的核心组件,在1.0时发布。
SparkSQL替代的是HIVE的查询引擎,HIVE的默认引擎查询效率低是由于其基于MapReduce实现SQL查询,而MapReduce的shuffle是基于磁盘的。
2.Spark SQL特性
其实最初Spark团队推出的是Shark-基于Hive对内存管理、物理计划、执行做了优化,底层使用Spark基于内存的计算引擎,对比Hive性能提升一个数量级。
即便如此高的性能提升,但是由于Shark底层依赖Hive的语法解析器、查询优化器等组件制约其性能的进一步提升。最终Spark团队放弃了Shark,推出了Spark SQL项目,其具备以下特性:
- 标准的数据连接,支持多种数据源
- 多种性能优化技术
- 组件的可扩展性
- 支持多语言开发:Scala、Java、Python、R
- 兼容Hive
3.Spark SQL可以做什么?
- 大数据处理
使用SQL进行大数据处理,使传统的RDBMS人员也可以进行大数据处理,不需要掌握像mapreduce的编程方法。
- 使用高级API进行开发
SparkSQL支持SQL API,DataFrame和Dataset API多种API,使用这些高级API进行编程和采用Sparkcore的RDD API 进行编程有很大的不同。
使用RDD进行编程时,开发人员在采用不同的编程语言和不同的方式开发应用程序时,其应用程序的性能千差万别,但如果使用DataFrame和Dataset进行开发时,资深开发人员和初级开发人员开发的程序性能差异很小,这是因为SparkSQL 内部使用Catalyst optimizer 对执行计划做了很好的优化。
二简 单 入 门 操 作
1.构建入口
Spark SQL中所有功能的入口点是SparkSession类-Spark 2.0引入的新概念,它为用户提供统一的切入点。
早期Spark的切入点是SparkContext,通过它来创建和操作数据集,对于不同的API需要不同的context。
比如:使用sql-需要sqlContext,使用hive-需要hiveContext,使用streaming-需要StreamingContext。SparkSession封装了SparkContext和SQLContext。
要创建一个 SparkSession使用SparkSession.builder():
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
2.创建DataFrame
在一个SparkSession中,应用程序可以从结构化的数据文件、Hive的table、外部数据库和RDD中创建一个DataFrame。
举个例子, 下面就是基于一个JSON文件创建一个DataFrame:
val df =spark.read.json("examples/src/main/resources/people.json")
// 显示出DataFrame的内容
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
3.执行SQL查询
// 将DataFrame注册成一个临时视图
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19|Justin|
// +----+-------+
SparkSession的SQL函数可以让应用程序以编程的方式运行SQL查询,并将结果作为一个 DataFrame返回。
例子中createOrReplaceTempView创建的临时视图是session级别的,也就是会随着session的消失而消失。如果你想让一个临时视图在所有session中相互传递并且可用,直到Spark 应用退出,你可以建立一个全局的临时视图,全局的临时视图存在于系统数据库global_temp中,我们必须加上库名去引用它。
// 将一个DataFrame注册成一个全局临时视图
df.createGlobalTempView("people")
// 注意这里的global_temp
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19|Justin|
// +----+-------+
// 新的session同样可以访问
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19|Justin|
// +----+-------+
4.DataFrame操作示例
import spark.implicits._ //导入隐式转换的包
//打印schema
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
//选择一列进行打印
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
//年龄加1
df.select(#34;name", #34;age" +1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
//选取年龄大于21的
df.filter(#34;age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
//聚合操作
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
5.创建DataSet
Dataset和RDD比较类似,与RDD不同的是实现序列化和反序列化的方式,RDD是使用Java serialization或者Kryo,而Dataset是使用Encoder。
Encoder的动态特性使得Spark可以在执行filtering、sorting和hashing等许多操作时无需把字节反序列化为对象。
// 一个简单的Seq转成DataSet,会有默认的schema
val primitiveDS = Seq(1, 2, 3).toDS().show
// +-----+
// |value|
// +-----+
// | 1|
// | 2|
// | 3|
// +-----+
case class Person(name: String, age: Long)
// 通过反射转换为DataSet
val caseClassDS = Seq(Person("Andy",32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// DataFrame指定一个类则为DataSet
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
通过上述的代码可以看出创建DataSet的代码很简单,一个toDs就可以自动推断出schema的类型,读取json这种结构化的数据得到的是一个DataFrame,再指定它的类则为DataSet。
6.RDD的互操作性
RDD的互操作性指的是RDD和DataFrame的相互转换,DataFrame转RDD很简单,复杂的是RDD转DataFrame。
目前Spark SQL有两种方法:
- 反射推断
Spark SQL 的 Scala 接口支持自动转换一个包含 Case Class的 RDD 为DataFrame。Case Class 定义了表的Schema。Case class 的参数名使用反射读取并且成为了列名。Case class 也可以是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型,这个 RDD 能够被隐式转换成一个 DataFrame 然后被注册为一个表。
// 开启隐式转换
import spark.implicits._
// 读入文本文件并最终转化成DataFrame
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// 将DataFrame注册成表
peopleDF.createOrReplaceTempView("people")
// 执行一条sql查询
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// 通过map操作后得到的是RDD
teenagersDF.map(teenager => "Name: " +teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
另一种更加简单的操作是将RDD中每一行类型变为tuple类型,然后使用toDF依次赋予字段名,需要注意的是使用tuple最高可以支持22个字段。
// 开启隐式转换
import spark.implicits._
// 读入文本文件并最终转化成DataFrame
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => (attributes(0), attributes(1).trim.toInt))
.toDF("name","age")
//peopleDF: org.apache.spark.sql.DataFrame = [name:string, age: int]
- 构造Schema
在无法提前定义schema的情况下,RDD转DataFrame或者DataSet需要构造Schema。
构建一个Schema并将它应用到一个已存在的RDD编程接口需要以下四个步骤:
a.从原始的RDD创建一个tuple或者列表类型的RDD
b.创建一个StructType来匹配RDD中的结构
c.将生成的RDD转换成Row类型的RDD
d.通过createDataFrame方法将Schema应用到RDD
//需要导入类型相关的包
import org.apache.spark.sql.types._
//读取hdfs上的文本文件,保存到rdd中
val peopleRDD =spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// 这里的schema是一个字符串,可以来自于其他未知内容的文件,需要注意的是-这里明确写出来只是为了演示,并不代表提前知道schema信息。
val schemaString = "name age"
// 将有schema信息的字符串转变为StructField类型
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable =true))
//通过StructType方法读入schema
val schema = StructType(fields)
// 将RDD转换成Row类型的RDD
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// 应用schema信息到Row类型的RDD
val peopleDF = spark.createDataFrame(rowRDD,schema)
三不得不说的数据源
在工作中使用Spark SQL进行处理数据的第一步就是读取数据,Spark SQL通过统一的接口去读取和写入数据。主要是read和write操作,不同的数据源相应的Option(附加设置)会有所不同,下面通过例子来具体说明。
1.数据读取
- parquet
1)读取Parquet文件
parquet文件自带schema,读取后是DataFrame格式。
val usersDF =spark.read.load("examples/src/main/resources/users.parquet")
//usersDF: org.apache.spark.sql.DataFrame = [name:string, favorite_color: string ... 1 more field]
2)解析分区信息
parquet文件中如果带有分区信息,那么SparkSQL会自动解析分区信息。比如,这样一份人口数据按照gender和country进行分区存储,目录结构如下:
test
└── spark-sql
└── test
├──gender=male
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└──gender=female
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
通过spark.read.load读取该目录下的文件SparkSQL将自动解析分区信息,返回的DataFrame的Schema如下:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
目前自动解析分区支持数值类型和字符串类型。
自动解析分区类型的参数为:spark.sql.sources.partitionColumnTypeInference.enabled,默认值为true。可以关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不会再进行类型解析。
3)Schema合并
如果读取的多个parquet文件中的Schema信息不一致,Spark SQL可以设置参数进行合并,但是Schema合并是一个高消耗的操作,在大多数情况下并不需要,所以Spark SQL从1.5.0开始默认关闭了该功能。
可以通过下面两种方式开启该功能:
a.读取文件的时候,开启合并功能,只对本次读取文件进行合并Schema操作
b.设置全局SQL选项spark.sql.parquet.mergeSchema为true,每次读取文件都会进行合并Schema操作
具体请看下面的例子:
// sqlContext是之前例子中生成的
// 导入隐式转换
import sqlContext.implicits._
// 创建一个简单的DataFrame并保存
val df1 = sc.makeRDD(1 to 5).map(i => (i, i *2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// 创建另一个DataFrame,注意字段名
val df2 = sc.makeRDD(6 to 10).map(i => (i, i *3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// 读取这两个parquet文件,增加开启合并Schema的设置
val df3 =sqlContext.read.option("mergeSchema","true").parquet("data/test_table")
df3.printSchema()
// 不同名称的字段都保留下来了
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
关于schema合并,有一点需要特别关注,那就是当不同parquet文件的schema有冲突时,合并会失败,如同名的字段,其类型不一致的情况。这时如果你读取的是hive数据源,可能会出现读取失败或者读取字段值全部为NULL的情况。如果大家遇到类型场景,可以考虑是否是这个因素导致。
- json
json文件和parquet文件一样也是带有schema信息,不过需要指明是json文件,才能准确的读取。
val peopleDF =spark.read.format("json").load("examples/src/main/resources/people.json")
//peopleDF: org.apache.spark.sql.DataFrame = [age:bigint, name: string]
- MySQL
读取MySQL中的数据是通过jdbc的方式,需要知道要访问的MySQL数据库、表等信息,具体请看下面的代码:
//MySQL数据的访问ip、端口号和数据库名
val url ="jdbc:mysql://192.168.100.101:3306/testdb"
//要访问的表名
val table = "test"
//建立一个配置变量
val properties = new Properties()
//将用户名存入配置变量
properties.setProperty("user","root")
//将密码存入配置变量
properties.setProperty("password","root")
//需要传入Mysql的URL、表名、配置变量
val df = sqlContext.read.jdbc(url,table,properties)
这里要注意的一个点是,读取MySQL需要运行作业时,classpath下有MySQL的驱动jar,或者通过--jars添加驱动jar。
- hive
读取hive数据的前提是要进行相关的配置,需要将hive-site.xml、core-site.xml、hdfs-site.xml以及hive的lib依赖放入spark的classpath下,或者在提交作业时通过--files和--jars来指定这些配置文件和jar包。之后,就可以很方便的使用hive的数据表了,示例代码如下:
import java.io.File
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
// 数仓地址指向默认设置
val warehouseLocation = newFile("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport() //增加支持hive特性
.getOrCreate()
import spark.implicits._
import spark.sql
//使用sql创建一个表,并将hdfs中的文件导入到表中
sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 使用sql直接指向sql查询
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
2.数据保存
- write
保存用write方法,先看一个简单的例子,将一个DataFrame保存到parquet文件中。
//选取DataFrame中的两列保存到parquet文件中
usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")
- format
format可以指定保存文件的格式,支持json、csv、orc等
//选取DataFrame中的两列保存到json文件中
usersDF.select("name", "favorite_color").write.format("json").save("namesAndFavColors.json")
- mode
在保存数据的时候,要不要考虑数据存不存在?是覆盖还是追加呢?通过mode可以进行设置。
//选取DataFrame中的两列保并追加到parquet文件中
usersDF.select("name","favorite_color").write.mode(SaveMode.append)
.save("namesAndFavColors.parquet")
除了append还有下列选项:
总结
本文通过什么是Spark SQL,有哪些特性,可以做什么让读者对Spark SQL有个整体的了解,然后着重讲解了如何进行入门操作和多种数据源操作。
掌握了以上技能,大数据分析工程师面对Spark SQL相关的工作一定可以游刃有余。
以上就是今天的内容了,如果对你有帮助,希望你能够关注、点赞、转发一键三连支持一下。需要完整学习线路和配套课堂笔记,请回复111。
相关推荐
- 4万多吨豪华游轮遇险 竟是因为这个原因……
-
(观察者网讯)4.7万吨豪华游轮搁浅,竟是因为油量太低?据观察者网此前报道,挪威游轮“维京天空”号上周六(23日)在挪威近海发生引擎故障搁浅。船上载有1300多人,其中28人受伤住院。经过数天的调...
- “菜鸟黑客”必用兵器之“渗透测试篇二”
-
"菜鸟黑客"必用兵器之"渗透测试篇二"上篇文章主要针对伙伴们对"渗透测试"应该如何学习?"渗透测试"的基本流程?本篇文章继续上次的分享,接着介绍一下黑客们常用的渗透测试工具有哪些?以及用实验环境让大家...
- 科幻春晚丨《震动羽翼说“Hello”》两万年星间飞行,探测器对地球的最终告白
-
作者|藤井太洋译者|祝力新【编者按】2021年科幻春晚的最后一篇小说,来自大家喜爱的日本科幻作家藤井太洋。小说将视角放在一颗太空探测器上,延续了他一贯的浪漫风格。...
- 麦子陪你做作业(二):KEGG通路数据库的正确打开姿势
-
作者:麦子KEGG是通路数据库中最庞大的,涵盖基因组网络信息,主要注释基因的功能和调控关系。当我们选到了合适的候选分子,单变量研究也已做完,接着研究机制的时便可使用到它。你需要了解你的分子目前已有哪些...
- 知存科技王绍迪:突破存储墙瓶颈,详解存算一体架构优势
-
智东西(公众号:zhidxcom)编辑|韦世玮智东西6月5日消息,近日,在落幕不久的GTIC2021嵌入式AI创新峰会上,知存科技CEO王绍迪博士以《存算一体AI芯片:AIoT设备的算力新选择》...
- 每日新闻播报(September 14)_每日新闻播报英文
-
AnOscarstatuestandscoveredwithplasticduringpreparationsleadinguptothe87thAcademyAward...
- 香港新巴城巴开放实时到站数据 供科技界研发使用
-
中新网3月22日电据香港《明报》报道,香港特区政府致力推动智慧城市,鼓励公私营机构开放数据,以便科技界研发使用。香港运输署21日与新巴及城巴(两巴)公司签署谅解备忘录,两巴将于2019年第3季度,开...
- 5款不容错过的APP: Red Bull Alert,Flipagram,WifiMapper
-
本周有不少非常出色的app推出,鸵鸟电台做了一个小合集。亮相本周榜单的有WifiMapper's安卓版的app,其中包含了RedBull的一款新型闹钟,还有一款可爱的怪物主题益智游戏。一起来看看我...
- Qt动画效果展示_qt显示图片
-
今天在这篇博文中,主要实践Qt动画,做一个实例来讲解Qt动画使用,其界面如下图所示(由于没有录制为gif动画图片,所以请各位下载查看效果):该程序使用应用程序单窗口,主窗口继承于QMainWindow...
- 如何从0到1设计实现一门自己的脚本语言
-
作者:dong...
- 三年级语文上册 仿写句子 需要的直接下载打印吧
-
描写秋天的好句好段1.秋天来了,山野变成了美丽的图画。苹果露出红红的脸庞,梨树挂起金黄的灯笼,高粱举起了燃烧的火把。大雁在天空一会儿写“人”字,一会儿写“一”字。2.花园里,菊花争奇斗艳,红的似火,粉...
- C++|那些一看就很简洁、优雅、经典的小代码段
-
目录0等概率随机洗牌:1大小写转换2字符串复制...
- 二年级上册语文必考句子仿写,家长打印,孩子照着练
-
二年级上册语文必考句子仿写,家长打印,孩子照着练。具体如下:...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- wireshark怎么抓包 (75)
- qt sleep (64)
- cs1.6指令代码大全 (55)
- factory-method (60)
- sqlite3_bind_blob (52)
- hibernate update (63)
- c++ base64 (70)
- nc 命令 (52)
- wm_close (51)
- epollin (51)
- sqlca.sqlcode (57)
- lua ipairs (60)
- tv_usec (64)
- 命令行进入文件夹 (53)
- postgresql array (57)
- statfs函数 (57)
- .project文件 (54)
- lua require (56)
- for_each (67)
- c#工厂模式 (57)
- wxsqlite3 (66)
- dmesg -c (58)
- fopen参数 (53)
- tar -zxvf -c (55)
- 速递查询 (52)