5万字Spark全集之末尾Structured Streaming续集(一)
liebian365 2024-12-30 05:08 16 浏览 0 评论
九、Structured Streaming曲折发展史
1、Spark Streaming
Spark Streaming针对实时数据流,提供了一套可扩展、高吞吐、可容错的流式计算模型。Spark Streaming接收实时数据源的数据,切分成很多小的batches,然后被Spark Engine执行,产出同样由很多小的batchs组成的结果流。本质上,这是一种micro-batch(微批处理)的方式处理
不足在于处理延时较高(无法优化到秒以下的数量级), 无法支持基于event_time的时间窗口做聚合逻辑。
2、Structured Streaming
2.1 介绍
官网
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
简介
spark在2.0版本中发布了新的流计算的API,Structured Streaming/结构化流。
Structured Streaming是一个基于Spark SQL引擎的可扩展、容错的流处理引擎。统一了流、批的编程模型,可以使用静态数据批处理一样的方式来编写流式计算操作。并且支持基于event_time的时间窗口的处理逻辑。
随着数据不断地到达,Spark 引擎会以一种增量的方式来执行这些操作,并且持续更新结算结果。可以使用Scala、Java、Python或R中的DataSet/DataFrame API来表示流聚合、事件时间窗口、流到批连接等。此外,Structured Streaming会通过checkpoint和预写日志等机制来实现Exactly-Once语义。
简单来说,对于开发人员来说,根本不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可,Structured Streaming提供了快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑更多细节
默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现端到端的延迟,最短可达100毫秒,并且完全可以保证一次容错。自Spark 2.3以来,引入了一种新的低延迟处理模式,称为连续处理,它可以在至少一次保证的情况下实现低至1毫秒的端到端延迟。也就是类似于 Flink 那样的实时流,而不是小批量处理。实际开发可以根据应用程序要求选择处理模式,但是连续处理在使用的时候仍然有很多限制,目前大部分情况还是应该采用小批量模式。
2.2 API
1.Spark Streaming 时代 -DStream-RDD
Spark Streaming 采用的数据抽象是DStream,而本质上就是时间上连续的RDD,
对数据流的操作就是针对RDD的操作
2.Structured Streaming 时代 - DataSet/DataFrame -RDD
Structured Streaming是Spark2.0新增的可扩展和高容错性的实时计算框架,它构建于Spark SQL引擎,把流式计算也统一到DataFrame/Dataset里去了。
Structured Streaming 相比于 Spark Streaming 的进步就类似于 Dataset 相比于 RDD 的进步
2.3 主要优势
1.简洁的模型。Structured Streaming 的模型很简洁,易于理解。用户可以直接把一个流想象成是无限增长的表格。
2.一致的 API。由于和 Spark SQL 共用大部分 API,对 Spaprk SQL 熟悉的用户很容易上手,代码也十分简洁。font color=red>同时批处理和流处理程序还可以共用代码,不需要开发两套不同的代码,显著提高了开发效率。
3.卓越的性能。Structured Streaming 在与 Spark SQL 共用 API 的同时,也直接使用了 Spark SQL 的 Catalyst 优化器和 Tungsten,数据处理性能十分出色。此外,Structured Streaming 还可以直接从未来 Spark SQL 的各种性能优化中受益。
4.多语言支持。Structured Streaming 直接支持目前 Spark SQL 支持的语言,包括 Scala,Java,Python,R 和 SQL。用户可以选择自己喜欢的语言进行开发。
2.4 编程模型
编程模型概述
一个流的数据源从逻辑上来说就是一个不断增长的动态表格,随着时间的推移,新数据被持续不断地添加到表格的末尾。
对动态数据源进行实时查询,就是对当前的表格内容执行一次 SQL 查询。
数据查询,用户通过触发器(Trigger)设定时间(毫秒级)。也可以设定执行周期。
一个流的输出有多种模式,既可以是基于整个输入执行查询后的完整结果,也可以选择只输出与上次查询相比的差异,或者就是简单地追加最新的结果。
这个模型对于熟悉 SQL 的用户来说很容易掌握,对流的查询跟查询一个表格几乎完全一样,十分简洁,易于理解。
核心思想
Structured Streaming最核心的思想就是将实时到达的数据不断追加到unbound table无界表,到达流的每个数据项(RDD)就像是表中的一个新行被附加到无边界的表中.这样用户就可以用静态结构化数据的批处理查询方式进行流计算,如可以使用SQL对到来的每一行数据进行实时查询处理;(SparkSQL+SparkStreaming=StructuredStreaming)
应用场景
Structured Streaming将数据源映射为类似于关系数据库中的表,然后将经过计算得到的结果映射为另一张表,完全以结构化的方式去操作流式数据,这种编程模型非常有利于处理分析结构化的实时数据;
WordCount图解
如图所示
第一行表示从socket不断接收数据,
第二行可以看成是之前提到的“unbound table",
第三行为最终的wordCounts是结果集。
当有新的数据到达时,Spark会执行“增量"查询,并更新结果集;
该示例设置为Complete Mode(输出所有数据),因此每次都将所有数据输出到控制台;
1.在第1秒时,此时到达的数据为"cat dog"和"dog dog",因此我们可以得到第1秒时的结果集cat=1 dog=3,并输出到控制台;
2.当第2秒时,到达的数据为"owl cat",此时"unbound table"增加了一行数据"owl cat",执行word count查询并更新结果集,可得第2秒时的结果集为cat=2 dog=3 owl=1,并输出到控制台;
3.当第3秒时,到达的数据为"dog"和"owl",此时"unbound table"增加两行数据"dog"和"owl",执行word count查询并更新结果集,可得第3秒时的结果集为cat=2 dog=4 owl=2;
这种模型跟其他很多流式计算引擎都不同。大多数流式计算引擎都需要开发人员自己来维护新数据与历史数据的整合并进行聚合操作。
然后我们就需要自己去考虑和实现容错机制、数据一致性的语义等。
然而在structured streaming的这种模式下,spark会负责将新到达的数据与历史数据进行整合,并完成正确的计算操作,同时更新result table,不需要我们去考虑这些事情。
十、Structured Streaming实战
1、创建Source
spark 2.0中初步提供了一些内置的source支持。
Socket source (for testing): 从socket连接中读取文本内容。
File source: 以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。
Kafka source: 从Kafka中拉取数据,与0.10或以上的版本兼容,后面单独整合Kafka。
2、读取Socket数据
准备工作
nc -lk 9999
hadoop spark sqoop hadoop spark hive hadoop
代码演示
package cn.itcast.structedstreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object WordCount {
def main(args: Array[String]): Unit = {
//1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.接收数据
val dataDF: DataFrame = spark.readStream
.option("host", "node01")
.option("port", 9999)
.format("socket")
.load()
//3.处理数据
import spark.implicits._
val dataDS: Dataset[String] = dataDF.as[String]
val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
val result: Dataset[Row] = wordDS.groupBy("value").count().sort(#34;count".desc)
//result.show()
//Queries with streaming sources must be executed with writeStream.start();
result.writeStream
.format("console")//往控制台写
.outputMode("complete")//每次将所有的数据写出
.trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快
//.option("checkpointLocation","./ckp")//设置checkpoint目录,socket不支持数据恢复,所以第二次启动会报错,需要注掉
.start()//开启
.awaitTermination()//等待停止
}
}
3、读取目录下文本数据
spark应用可以监听某一个目录,而web服务在这个目录上实时产生日志文件,这样对于spark应用来说,日志文件就是实时数据
Structured Streaming支持的文件类型有text,csv,json,parquet
准备工作
在people.json文件输入如下数据:
{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}
注意:文件必须是被移动到目录中的,且文件名不能有特殊字符
需求
使用Structured Streaming统计年龄小于25岁的人群的爱好排行榜
代码演示
package cn.itcast.structedstreaming import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** {"name":"json","age":23,"hobby":"running"}* {"name":"charles","age":32,"hobby":"basketball"}* {"name":"tom","age":28,"hobby":"football"}* {"name":"lili","age":24,"hobby":"running"}* {"name":"bob","age":20,"hobby":"swimming"}* 统计年龄小于25岁的人群的爱好排行榜*/ object WordCount2 { def main(args: Array[String]): Unit = { //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") val Schema: StructType = new StructType() .add("name","string") .add("age","integer") .add("hobby","string") //2.接收数据import spark.implicits._ // Schema must be specified when creating a streaming source DataFrame. val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\\data\\spark\\data") //3.处理数据 val result: Dataset[Row] = dataDF.filter(#34;age" < 25).groupBy("hobby").count().sort(#34;count".desc) //4.输出结果 result.writeStream .format("console") .outputMode("complete") .trigger(Trigger.ProcessingTime(0)) .start() .awaitTermination() } }
4、计算操作
获得到Source之后的基本数据处理方式和之前学习的DataFrame、DataSet一致,不再赘述
5、输出
计算结果可以选择输出到多种设备并进行如下设定
1.output mode:以哪种方式将result table的数据写入sink
2.format/output sink的一些细节:数据格式、位置等。
3.query name:指定查询的标识。类似tempview的名字
4.trigger interval:触发间隔,如果不指定,默认会尽可能快速地处理数据
5.checkpoint地址:一般是hdfs上的目录。注意:Socket不支持数据恢复,如果设置了,第二次启动会报错 ,Kafka支持
相关推荐
- 4万多吨豪华游轮遇险 竟是因为这个原因……
-
(观察者网讯)4.7万吨豪华游轮搁浅,竟是因为油量太低?据观察者网此前报道,挪威游轮“维京天空”号上周六(23日)在挪威近海发生引擎故障搁浅。船上载有1300多人,其中28人受伤住院。经过数天的调...
- “菜鸟黑客”必用兵器之“渗透测试篇二”
-
"菜鸟黑客"必用兵器之"渗透测试篇二"上篇文章主要针对伙伴们对"渗透测试"应该如何学习?"渗透测试"的基本流程?本篇文章继续上次的分享,接着介绍一下黑客们常用的渗透测试工具有哪些?以及用实验环境让大家...
- 科幻春晚丨《震动羽翼说“Hello”》两万年星间飞行,探测器对地球的最终告白
-
作者|藤井太洋译者|祝力新【编者按】2021年科幻春晚的最后一篇小说,来自大家喜爱的日本科幻作家藤井太洋。小说将视角放在一颗太空探测器上,延续了他一贯的浪漫风格。...
- 麦子陪你做作业(二):KEGG通路数据库的正确打开姿势
-
作者:麦子KEGG是通路数据库中最庞大的,涵盖基因组网络信息,主要注释基因的功能和调控关系。当我们选到了合适的候选分子,单变量研究也已做完,接着研究机制的时便可使用到它。你需要了解你的分子目前已有哪些...
- 知存科技王绍迪:突破存储墙瓶颈,详解存算一体架构优势
-
智东西(公众号:zhidxcom)编辑|韦世玮智东西6月5日消息,近日,在落幕不久的GTIC2021嵌入式AI创新峰会上,知存科技CEO王绍迪博士以《存算一体AI芯片:AIoT设备的算力新选择》...
- 每日新闻播报(September 14)_每日新闻播报英文
-
AnOscarstatuestandscoveredwithplasticduringpreparationsleadinguptothe87thAcademyAward...
- 香港新巴城巴开放实时到站数据 供科技界研发使用
-
中新网3月22日电据香港《明报》报道,香港特区政府致力推动智慧城市,鼓励公私营机构开放数据,以便科技界研发使用。香港运输署21日与新巴及城巴(两巴)公司签署谅解备忘录,两巴将于2019年第3季度,开...
- 5款不容错过的APP: Red Bull Alert,Flipagram,WifiMapper
-
本周有不少非常出色的app推出,鸵鸟电台做了一个小合集。亮相本周榜单的有WifiMapper's安卓版的app,其中包含了RedBull的一款新型闹钟,还有一款可爱的怪物主题益智游戏。一起来看看我...
- Qt动画效果展示_qt显示图片
-
今天在这篇博文中,主要实践Qt动画,做一个实例来讲解Qt动画使用,其界面如下图所示(由于没有录制为gif动画图片,所以请各位下载查看效果):该程序使用应用程序单窗口,主窗口继承于QMainWindow...
- 如何从0到1设计实现一门自己的脚本语言
-
作者:dong...
- 三年级语文上册 仿写句子 需要的直接下载打印吧
-
描写秋天的好句好段1.秋天来了,山野变成了美丽的图画。苹果露出红红的脸庞,梨树挂起金黄的灯笼,高粱举起了燃烧的火把。大雁在天空一会儿写“人”字,一会儿写“一”字。2.花园里,菊花争奇斗艳,红的似火,粉...
- C++|那些一看就很简洁、优雅、经典的小代码段
-
目录0等概率随机洗牌:1大小写转换2字符串复制...
- 二年级上册语文必考句子仿写,家长打印,孩子照着练
-
二年级上册语文必考句子仿写,家长打印,孩子照着练。具体如下:...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- wireshark怎么抓包 (75)
- qt sleep (64)
- cs1.6指令代码大全 (55)
- factory-method (60)
- sqlite3_bind_blob (52)
- hibernate update (63)
- c++ base64 (70)
- nc 命令 (52)
- wm_close (51)
- epollin (51)
- sqlca.sqlcode (57)
- lua ipairs (60)
- tv_usec (64)
- 命令行进入文件夹 (53)
- postgresql array (57)
- statfs函数 (57)
- .project文件 (54)
- lua require (56)
- for_each (67)
- c#工厂模式 (57)
- wxsqlite3 (66)
- dmesg -c (58)
- fopen参数 (53)
- tar -zxvf -c (55)
- 速递查询 (52)