这样进行Spark的解析
这样进行Spark的解析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
成都创新互联公司专注为客户提供全方位的互联网综合服务,包含不限于做网站、成都网站设计、华池网络推广、成都小程序开发、华池网络营销、华池企业策划、华池品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;成都创新互联公司为所有大学生创业者提供华池建站搭建服务,24小时服务热线:18982081108,官方网址:www.cdcxhl.com
Spark场景
Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多, 所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小 由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web 爬虫和索引。就是对于那种增量修改的应用模型不适合 数据量不是特别大,但是要求实时统计分析需求
Spark Master模式(Url)
1、local:这种方式是在本地启动一个线程来运行作业; 2、local[N]:也是本地模式,但是启动了N个线程; 3、local[*]:还是本地模式,但是用了系统中所有的核; 4、local[N,M]:这里有两个参数,第一个代表的是用到的核个数;第二个参数代表的是容许该作业失败M次; 5、local-cluster[N, cores, memory] :本地伪集群模式; 6、spark:// :这是用到了 Spark 的Standalone模; 7、(mesos|zk):// :这是Mesos模式; 8、yarn\yarn-cluster\yarn-client :这是YARN模式。前面两种代表的是集群模式;后面代表的是客户端模式; 9、simr:// :simr其实是Spark In MapReduce的缩写
Spark deploy模式
1、Local模式 local模式出了伪集群模式(local-cluster),所有的local都是用到了LocalBackend和TaskSchedulerImpl类。LocalBackend接收来自TaskSchedulerImpl的receiveOffers()调用,并根据运行Application传进来的CPU核生成WorkerOffer,并调用scheduler.resourceOffers(offers)生成Task,最后通过 executor.launchTask来执行这些Task。 2、Standalone Standalone模式使用SparkDeploySchedulerBackend和TaskSchedulerImpl,SparkDeploySchedulerBackend是继承自CoarseGrainedSchedulerBackend类,并重写了其中的一些方法。 CoarseGrainedSchedulerBackend是一个粗粒度的资源调度类,在Spark job运行的整个期间,它会保存所有的Executor,在task运行完的时候,并不释放该Executor,也不向Scheduler申请一个新的Executor。Executor的启动方式有很多中,需要根据Application提交的Master URL进行判断。在CoarseGrainedSchedulerBackend中封装了一个DriverActor类,它接受Executor注册(RegisterExecutor)、状态更新(StatusUpdate)、响应Scheduler的ReviveOffers请求、杀死Task等等。 在本模式中将会启动一个或者多个CoarseGrainedExecutorBackend。具体是通过AppClient类向Master请求注册Application。当注册成功之后,Master会向Client进行反馈,并调用schedule启动Driver和CoarseGrainedExecutorBackend,启动的Executor会向DriverActor进行注册。然后CoarseGrainedExecutorBackend通过aunchTask方法启动已经提交的Task。 3、yarn-cluster yarn-cluster集群模式涉及到的类有YarnClusterScheduler和YarnClusterSchedulerBackend。YarnClusterSchedulerBackend同样是继承自CoarseGrainedSchedulerBackend。而YarnClusterScheduler继承自TaskSchedulerImpl,它只是简单地对TaskSchedulerImpl进行封装,并重写了getRackForHost和postStartHook方法。 Client类通过YarnClient在Hadoop集群上启动一个Container,并在其中运行ApplicationMaster,并通过Yarn提供的接口在集群中启动多个Container用于运行CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor进行注册。 4、yarn-client yarn-cluster集群模式涉及到的类有YarnClientClusterScheduler和YarnClientSchedulerBackend。YarnClientClusterScheduler继承自TaskSchedulerImpl,并对其中的getRackForHost方法进行了重写。Yarn-client模式下,会在集群外面启动一个ExecutorLauncher来作为driver,并想集群申请Container,来启动CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor进行注册。 5、Mesos Mesos模式调度方式有两种:粗粒度和细粒度。粗粒度涉及到的类有CoarseMesosSchedulerBackend和TaskSchedulerImpl类;而细粒度涉及到的类有MesosSchedulerBackend和TaskSchedulerImpl类。CoarseMesosSchedulerBackend和 MesosSchedulerBackend都继承了MScheduler(其实是Mesos的Scheduler),便于注册到Mesos资源调度的框架中。选择哪种模式可以通过spark.mesos.coarse参数配置。默认的是MesosSchedulerBackend。
上面涉及到Spark的许多部署模式,究竟哪种模式好这个很难说,需要根据需求,如果只是测试Spark Application,可以选择local模式。而如果数据量不是很多,Standalone 是个不错的选择。当你需要统一管理集群资源(Hadoop、Spark等)那么可以选择Yarn,但是这样维护成本就会变高。yarn-cluster和yarn-client模式内部实现还是有很大的区别。如果需要用于生产环境,那么请选择yarn-cluster;而如果仅仅是Debug程序,可以选择yarn-client。
Spark Jar/File Url格式
file:/ 文件绝对路径,并且file:/URI是通过驱动器的HTTP文件服务器来下载的,每个执行器都从驱动器的HTTP server拉取这些文件。 hdfs:/http:/https:/ftp: Spark将会从指定的URI位置下载所需的文件和jar包。 local:/ 指定在每个工作节点上都能访问到的本地或共享文件。这意味着,不会占用网络IO,特别是对一些大文件或jar包,最好使用这种方式,当需要把文件推送到每个工作节点上可以通过NFS和GlusterFS共享文件。
Spark执行模型
Dependency Dependency代表了RDD之间的依赖关系,即血缘 NarrowDependency代表窄依赖,即父RDD的分区,最多被子RDD的一个分区使用。所以支持并行计算。 OneToOneDependency表示父RDD和子RDD的分区依赖是一对一的 RangeDependency表示在一个range范围内,依赖关系是一对一的,所以初始化的时候会有一个范围,范围外的partitionId,传进去之后返回的是Nil Shuffle代表宽依赖针对的RDD是KV形式的,需要一个partitioner指定分区方式,需要一个序列化工具类 Partition Partition具体表示RDD每个数据分区。 Partitioner Partitioner决定KV形式的RDD如何根据key进行partition 默认Partitioner Partitioner的伴生对象提供defaultPartitioner方法,逻辑为: 传入的RDD(至少两个)中,遍历(顺序是partition数目从大到小)RDD,如果已经有Partitioner了,就使用。如果RDD们都没有Partitioner,则使用默认的HashPartitioner。而HashPartitioner的初始化partition数目,取决于是否设置了Spark.default.parallelism,如果没有的话就取RDD中partition数目最大的值 HashPartitioner基于Java的Object.hashCode。会有个问题是Java的Array有自己的hashCode,不基于Array里的内容,所以RDD[Array[_]]或RDD[(Array[_], _)]使用HashPartitioner会有问题。 RangePartitioner处理的KV RDD要求Key是可排序的,即满足Scala的Ordered[K]类型 Persist/Unpersist 默认cache()过程是将RDD persist在内存里,persist()操作可以为RDD重新指定StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication), persist并不是action,并不会触发任何计算 Checkpoint RDD Actions api里提供了checkpoint()方法,会把本RDD save到SparkContext CheckpointDir 目录下。建议该RDD已经persist在内存中,否则需要recomputation。 Transformations RDD transformation 见下 Actions RDD action 见下 Job->Stage->Task->Transformations/Action 一个Spark的Job分为多个stage,最后一个stage会包括一个或多个ResultTask,前面的stages会包括一个或多个ShuffleMapTasks。 ResultTask执行并将结果返回给driver application。 ShuffleMapTask将task的output根据task的partition分离到多个buckets里。一个ShuffleMapTask对应一个ShuffleDependency的partition,而总partition数同并行度、reduce数目是一致的 DAGScheduler 面向stage的调度层,为job生成以stage组成的DAG,以stage为单位,提交TaskSet给TaskScheduler执行。 每一个Stage内,都是独立的tasks,他们共同执行同一个compute function,享有相同的shuffledependencies。DAG在切分stage的时候是依照出现shuffle为界限的。 DAGSchedulerEvent TaskScheduler TaskScheduler接收task、接收分到的资源和executor、维护信息、与backend打交道、分配任务 SchedulableBuilder FIFO和Fair两种实现, addTaskSetManager会把TaskSetManager加到pool里。FIFO的话只有一个pool。Fair有多个pool,Pool也分FIFO和Fair两种模式 TaskSet,即Stage 封装一个stage的所有的tasks, 以提交给TaskScheduler ResultTask 对应于Result Stage直接产生结果 ShuffleMapTask 对应于ShuffleMap Stage, 产生的结果作为其他stage的输入 TaskSetManager 负责这批Tasks的启动,失败重试,感知本地化等事情。每次reourseOffer方法会寻找合适(符合条件execId, host, locality)的Task并启动它 TaskResultGetter 维护一个线程池,用来反序列化和从远端获取task结果 BlockManagerMaster/BlockManagerWorker TaskResult里包含BolckId, BlockManagerMaster通过这个blockId的获取bolck的locations,BlockManagerWorker通过这些locations来获得(反序列化)block的数据
Spark RDD
RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。 和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。 Spark应用程序所做把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果 一个RDD对象,包含如下5个核心属性。 一个分区列表,每个分区里是RDD的部分数据(或称数据块)。 一个依赖列表,存储依赖的其他RDD。 一个名为compute的计算函数(由子类实现),用于计算RDD各分区的值。 一个分区器(可选),用于键/值类型的RDD,比如某个RDD是按散列来分区。 一个计算各分区时优先的位置列表(可选),比如从HDFS上的文件生成RDD时,RDD分区的位置优先选择数据所在的节点,这样可以避免数据移动带来的开销。
Work with RDD
object array -> object list -> object rdd object array -> object list -> Row list -> Row rdd + StructType schema -> object df object arrays -> object lists -> object rdds -> object rdd queue -> object dstream
RDD Transformer
map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集 keyBy(f: T => K) filter(func): 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD flatMap(func):和map差不多,但是flatMap生成的是多个结果 mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index sample(withReplacement,faction,seed):抽样 union(otherDataset):并集, 返回一个新的dataset,包含源dataset和给定dataset的元素的集合 intersection(otherDataset):交集 subtract(otherDataset):差集 distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数 sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型 join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数 cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数 cartesian(otherDataset):笛卡尔积就是m*n pipe(command: String) 把RDD数据通过ProcessBuilder创建额外的进程输出走 zip(RDD[U]): RDD[(T, U)] 两个RDD分区数目一致,且每个分区数据条数一致
RDD Action
reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的 fold(zeroValue: T)(op: (T, T) => T) 特殊的reduce,带初始值,函数式语义的fold aggregate(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U):带初始值、reduce聚合、merge聚合三个完整条件的聚合方法。rdd的做法是把函数传入分区里去做计算,最后汇总各分区的结果再一次combOp计算 subtract(RDD[T]):rdd实现为map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys 与求交类似 collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组 count():返回的是dataset中的element的个数 first():返回的是dataset中的第一个元素 top(n)(ordering):每个分区内传入top的处理函数,得到分区的堆,使用rdd.reduce(),把每个分区的堆合起来,排序,取前n个 take(n):返回前n个elements,这个士driverprogram返回的 takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed saveAsTextFile(path):把dataset写到一个textfile中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中 saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统 countByKey():返回的是key对应的个数的一个map,作用于一个RDD countByValue(): Map[T, Long] rdd实现为map(value => (value, null)).countByKey():本质上是一次简单的combineByKey,返回Map,会全load进driver的内存里,需要数据集规模较小 foreach(func):对dataset中的每个元素都使用func max()/min() 特殊的reduce,传入max/min比较函数 PairRDDFunctions ..... DoubleRDDFunctions sum() rdd实现是reduce(_ + _) stats() rdd实现是mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b)) StatCounter在一次遍历里统计出中位数、方差、count三个值,merge()是他内部的方法 mean() rdd实现是stats().mean variance()/sampleVariance() rdd实现是stats().variance stdev()/sampleStdev() rdd实现是stats().stdev 求标准差 meanApprox()/sumApprox() 调用runApproximateJob histogram() 比较复杂的计算,rdd实现是先mapPartitions再reduce,包含几次递归
Spark Core
提供了有向无环图(DAG)的分布式并行计算框架,并提供Cache机制来支持多次迭代计算或者数据共享,大大减少 迭代计算之间读取数据局的开销,这对于需要进行多次迭代的数据挖掘和分析性能有很大提升 在Spark中引入了RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合, 这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”对它们进行重建,保证了数据的高容错性; 移动计算而非移动数据,RDD Partition可以就近读取分布式文件系统中的数据块到各个节点内存中进行计算 使用多线程池模型来减少task启动开稍 采用容错的、高可伸缩性的akka作为通讯框架
Spark SQL
引入了新的RDD类型SchemaRDD,可以象传统数据库定义表一样来定义SchemaRDD,SchemaRDD由定义了列数据类型的行对象构成。SchemaRDD可以从RDD转换过来,也可以从Parquet文件读入,也可以使用HiveQL从Hive中获取。 内嵌了Catalyst查询优化框架,在把SQL解析成逻辑执行计划之后,利用Catalyst包里的一些类和接口,执行了一些简单的执行计划优化,最后变成RDD的计算 在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作。 内存列存储(In-Memory Columnar Storage),sparkSQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储; 字节码生成技术(Bytecode Generation),Spark1.1.0在Catalyst模块的expressions增加了codegen模块,使用动态字节码生成技术,对匹配的表达式采用特定的代码动态编译。另外对SQL表达式都作了CG优化, CG优化的实现主要还是依靠Scala2.10的运行时放射机制(runtime reflection); Scala代码优化 SparkSQL在使用Scala编写代码的时候,尽量避免低效的、容易GC的代码;尽管增加了编写代码的难度,但对于用户来说接口统一。
Spark MLlib
MLBase 是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。 ML Optimizer 会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数,来处理用户输入的数据,并返回模型或别的帮助分析的结果; MLI 是一个进行特征抽取和高级ML编程抽象的算法实现的API或平台; MLlib 是Spark实现一些常见的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维以及底层优化,该算法可以进行可扩充; MLRuntime 基于Spark计算框架,将Spark的分布式计算应用到机器学习领域。
Spark GraphX
GraphX是Spark中用于图(e.g., Web-Graphs and Social Networks)和图并行计算(e.g., PageRank and Collaborative Filtering)的API,可以认为是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重写及优化,跟其他分布式图计算框架相比,GraphX最大的贡献是,在Spark之上提供一栈式数据解决方案,可以方便且高效地完成图计算的一整套流水作业。GraphX最先是伯克利AMPLAB的一个分布式图计算框架项目,后来整合到Spark中成为一个核心组件。 GraphX的核心抽象是Resilient Distributed Property Graph,一种点和边都带属性的有向多重图。它扩展了Spark RDD的抽象,有Table和Graph两种视图,而只需要一份物理存储。两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率。如同Spark,GraphX的代码非常简洁。GraphX的核心代码只有3千多行,而在此之上实现的Pregel模型,只要短短的20多行。GraphX的代码结构整体下图所示,其中大部分的实现,都是围绕Partition的优化进行的。这在某种程度上说明了点分割的存储和相应的计算优化的确是图计算框架的重点和难点。 GraphX的底层设计有以下几个关键点。 1.对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成。这样对一个图的计算,最终在逻辑上,等价于一系列RDD的转换过程。因此,Graph最终具备了RDD的3个关键特性:Immutable、Distributed和Fault-Tolerant。其中最关键的是Immutable(不变性)。逻辑上,所有图的转换和操作都产生了一个新图;物理上,GraphX会有一定程度的不变顶点和边的复用优化,对用户透明。 2.两种视图底层共用的物理数据,由RDD[VertexPartition]和RDD[EdgePartition]这两个RDD组成。点和边实际都不是以表Collection[tuple]的形式存储的,而是由VertexPartition/EdgePartition在内部存储一个带索引结构的分片数据块,以加速不同视图下的遍历速度。不变的索引结构在RDD转换过程中是共用的,降低了计算和存储开销。 3.图的分布式存储采用点分割模式,而且使用partitionBy方法,由用户指定不同的划分策略(PartitionStrategy)。划分策略会将边分配到各个EdgePartition,顶点Master分配到各个VertexPartition,EdgePartition也会缓存本地边关联点的Ghost副本。划分策略的不同会影响到所需要缓存的Ghost副本数量,以及每个EdgePartition分配的边的均衡程度,需要根据图的结构特征选取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut这四种策略。在淘宝大部分场景下,EdgePartition2d效果最好。
Spark Streaming
SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。 计算流程:Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备。下图显示了Spark Streaming的整个流程。 容错性:对于流式计算来说,容错性至关重要。首先我们要明确一下Spark中RDD的容错机制。每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。 实时性:对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。 扩展性与吞吐量:Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,Berkeley利用WordCount和Grep两个用例所做的测试,在Grep这个测试中,Spark Streaming中的每个节点的吞吐量是670k records/s,而Storm是115k records/s。
SparkR
SparkR是AMPLab发布的一个R开发包,使得R摆脱单机运行的命运,可以作为Spark的job运行在集群上,极大得扩展了R的数据处理能力。 SparkR的几个特性: 提供了Spark中弹性分布式数据集(RDD)的API,用户可以在集群上通过R shell交互性的运行Spark job。 支持序化闭包功能,可以将用户定义函数中所引用到的变量自动序化发送到集群中其他的机器上。 SparkR还可以很容易地调用R开发包,只需要在集群上执行操作前用includePackage读取R开发包就可以了,当然集群上要安装R开发包。
SparkPython
Spark Python
pom.xml
org.scala-lang scala-library 2.11.8 org.apache.spark spark-mllib_2.11 2.0.0 com.typesafe.play play-json_2.11 2.3.9 net.alchim31.maven scala-maven-plugin 3.1.3 scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile maven_central http://central.maven.org/maven2/ sonatype-nexus-snapshots https://oss.sonatype.org/content/repositories/snapshots typesafe http://repo.typesafe.com/typesafe/releases/ maven_central http://central.maven.org/maven2/ sonatype-nexus-snapshots https://oss.sonatype.org/content/repositories/releases/
Tests.scala
def listRdd(){ var sc = new SparkContext("local[1]", "spdb") var sqlContext = new SQLContext(sc) var listStr1 = """zm,zn,zq""" var list = listStr1.split(",").toList var rdd = sc.parallelize(list, 2) var max = rdd.max() println(max) }
看完上述内容,你们掌握这样进行Spark的解析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!
名称栏目:这样进行Spark的解析
转载来于:http://scyanting.com/article/ggjggj.html