SparkOnYarn实战
这里已经部署好hadoop环境,以及spark
专注于为中小企业提供网站建设、成都网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业中卫免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上1000+企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。
环境如下:
192.168.1.2 master
[hadoop@master ~]$ jps 2298 SecondaryNameNode 2131 NameNode 2593 JobHistoryServer 4363 Jps 3550 HistoryServer 2481 ResourceManager 3362 Master
192.168.1.3 slave1
[hadoop@slave1 ~]$ jps 2919 Jps 2464 Worker 1993 DataNode 2109 NodeManager
192.168.1.4 slave2
[hadoop@slave2 ~]$ jps 2762 Jps 2113 NodeManager 1998 DataNode 2452 Worker
这里以spark自带求pi值的python程序为例
[hadoop@master ~]$ cd spark [hadoop@master spark]$ find . -name "pi.py" [hadoop@master spark]$ cat ./examples/src/main/python/pi.py # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import sys from random import random from operator import add from pyspark import SparkContext if __name__ == "__main__": """ Usage: pi [slices] """ sc = SparkContext(appName="PythonPi") slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * slices def f(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 < 1 else 0 count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add) print "Pi is roughly %f" % (4.0 * count / n) [hadoop@master spark]$ cd ./examples/src/main/python/ # 修改pi.py文件,在末尾添加 sc.stop() [hadoop@master python]$ spark-submit --master spark://master:7077 --executor-memory 200m --driver-memory 200m pi.py # 如报下面错误,绑定hosts文件127.0.0.1为localhost Traceback (most recent call last): File "/home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py", line 29, insc = SparkContext(appName="PythonPi") File "/home/hadoop/spark/python/pyspark/context.py", line 138, in __init__ self._accumulatorServer = accumulators._start_update_server() File "/home/hadoop/spark/python/pyspark/accumulators.py", line 224, in _start_update_server server = SocketServer.TCPServer(("localhost", 0), _UpdateRequestHandler) File "/usr/lib64/python2.6/SocketServer.py", line 402, in __init__ self.server_bind() File "/usr/lib64/python2.6/SocketServer.py", line 413, in server_bind self.socket.bind(self.server_address) File " ", line 1, in bind socket.gaierror: [Errno -3] Temporary failure in name resolution # 正常执行如下 [hadoop@master python]$ spark-submit --master spark://master:7077 --executor-memory 200m --driver-memory 200m pi.py Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/03/25 12:18:27 INFO spark.SecurityManager: Changing view acls to: hadoop 15/03/25 12:18:27 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop) 15/03/25 12:18:28 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/03/25 12:18:28 INFO Remoting: Starting remoting 15/03/25 12:18:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@master:47877] 15/03/25 12:18:29 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@master:47877] 15/03/25 12:18:29 INFO spark.SparkEnv: Registering MapOutputTracker 15/03/25 12:18:29 INFO spark.SparkEnv: Registering BlockManagerMaster 15/03/25 12:18:29 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150325121829-88cd 15/03/25 12:18:29 INFO storage.MemoryStore: MemoryStore started with capacity 116.0 MB. 15/03/25 12:18:30 INFO network.ConnectionManager: Bound socket to port 48556 with id = ConnectionManagerId(master,48556) 15/03/25 12:18:30 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/03/25 12:18:30 INFO storage.BlockManagerInfo: Registering block manager master:48556 with 116.0 MB RAM 15/03/25 12:18:30 INFO storage.BlockManagerMaster: Registered BlockManager 15/03/25 12:18:30 INFO spark.HttpServer: Starting HTTP Server 15/03/25 12:18:30 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/25 12:18:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48872 15/03/25 12:18:30 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.1.2:48872 15/03/25 12:18:30 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-e2d76bbd-d2f6-4b2f-a018-f2d795a488aa 15/03/25 12:18:30 INFO spark.HttpServer: Starting HTTP Server 15/03/25 12:18:30 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/25 12:18:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:43148 15/03/25 12:18:31 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/25 12:18:31 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/03/25 12:18:31 INFO ui.SparkUI: Started SparkUI at http://master:4040 15/03/25 12:18:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/25 12:18:35 INFO scheduler.EventLoggingListener: Logging events to hdfs://master:9000/spark/log/pythonpi-1427311113352 15/03/25 12:18:35 INFO util.Utils: Copying /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py to /tmp/spark-b66e65a9-91dc-479c-8938-14314fd1febb/pi.py 15/03/25 12:18:36 INFO spark.SparkContext: Added file file:/home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py at http://192.168.1.2:43148/files/pi.py with timestamp 1427311115 93515/03/25 12:18:36 INFO client.AppClient$ClientActor: Connecting to master spark://master:7077... 15/03/25 12:18:38 INFO spark.SparkContext: Starting job: reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38 15/03/25 12:18:38 INFO scheduler.DAGScheduler: Got job 0 (reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38) with 2 output partitions (allowLocal=false) 15/03/25 12:18:38 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38) 15/03/25 12:18:38 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/03/25 12:18:38 INFO scheduler.DAGScheduler: Missing parents: List() 15/03/25 12:18:38 INFO scheduler.DAGScheduler: Submitting Stage 0 (PythonRDD[1] at RDD at PythonRDD.scala:37), which has no missing parents 15/03/25 12:18:38 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[1] at RDD at PythonRDD.scala:37) 15/03/25 12:18:38 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/03/25 12:18:38 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150325121838-0001 15/03/25 12:18:38 INFO client.AppClient$ClientActor: Executor added: app-20150325121838-0001/0 on worker-20150325114825-slave1-50832 (slave1:50832) with 1 cores 15/03/25 12:18:38 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150325121838-0001/0 on hostPort slave1:50832 with 1 cores, 200.0 MB RAM 15/03/25 12:18:38 INFO client.AppClient$ClientActor: Executor added: app-20150325121838-0001/1 on worker-20150325114823-slave2-56888 (slave2:56888) with 1 cores 15/03/25 12:18:38 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150325121838-0001/1 on hostPort slave2:56888 with 1 cores, 200.0 MB RAM 15/03/25 12:18:39 INFO client.AppClient$ClientActor: Executor updated: app-20150325121838-0001/0 is now RUNNING 15/03/25 12:18:39 INFO client.AppClient$ClientActor: Executor updated: app-20150325121838-0001/1 is now RUNNING 15/03/25 12:18:43 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@slave1:35398/user/Executor#765391125] with ID 0 15/03/25 12:18:43 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: slave1 (PROCESS_LOCAL) 15/03/25 12:18:43 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 374986 bytes in 12 ms 15/03/25 12:18:44 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@slave2:37669/user/Executor#2076348799] with ID 1 15/03/25 12:18:44 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 1: slave2 (PROCESS_LOCAL) 15/03/25 12:18:44 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 502789 bytes in 4 ms 15/03/25 12:18:44 INFO storage.BlockManagerInfo: Registering block manager slave1:47192 with 116.0 MB RAM 15/03/25 12:18:44 INFO storage.BlockManagerInfo: Registering block manager slave2:42313 with 116.0 MB RAM 15/03/25 12:18:46 INFO scheduler.TaskSetManager: Finished TID 0 in 2534 ms on slave1 (progress: 1/2) 15/03/25 12:18:46 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0) 15/03/25 12:18:46 INFO scheduler.TaskSetManager: Finished TID 1 in 2234 ms on slave2 (progress: 2/2) 15/03/25 12:18:46 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/03/25 12:18:46 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1) 15/03/25 12:18:46 INFO scheduler.DAGScheduler: Stage 0 (reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38) finished in 7.867 s 15/03/25 12:18:46 INFO spark.SparkContext: Job finished: reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38, took 8.181053565 s Pi is roughly 3.147220 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null} 15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null} 15/03/25 12:18:46 INFO ui.SparkUI: Stopped Spark web UI at http://master:4040 15/03/25 12:18:46 INFO scheduler.DAGScheduler: Stopping DAGScheduler 15/03/25 12:18:46 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors 15/03/25 12:18:46 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down 15/03/25 12:18:47 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 15/03/25 12:18:47 INFO network.ConnectionManager: Selector thread was interrupted! 15/03/25 12:18:47 INFO network.ConnectionManager: ConnectionManager stopped 15/03/25 12:18:47 INFO storage.MemoryStore: MemoryStore cleared 15/03/25 12:18:47 INFO storage.BlockManager: BlockManager stopped 15/03/25 12:18:47 INFO storage.BlockManagerMasterActor: Stopping BlockManagerMaster 15/03/25 12:18:47 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 15/03/25 12:18:47 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/03/25 12:18:47 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/03/25 12:18:47 INFO Remoting: Remoting shut down 15/03/25 12:18:47 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/03/25 12:18:48 INFO spark.SparkContext: Successfully stopped SparkContext
查看任务监控,http://192.168.1.2:8080/
查看worker信息 http://192.168.1.3:8081/
spark on yarn实践
[hadoop@master ~]$ cd spark/examples/src/main/scala/org/apache/spark/examples/ [hadoop@master examples]$ spark-submit --master yarn-cluster \ > --class org.apache.spark.examples.SparkPi \ > --driver-memory 400m \ > --executor-memory 400m \ > --executor-cores 1 \ > --num-executors 2 \ > /home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar 2 # 如报下面错误,修改yarn-site.xml文件# value大于800即可,然后重启yarn # 正常结果如下: [hadoop@master sbin]$ spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi --driver-memory 400m --executor-memory 400m --executor-cores 1 --num-executors 2 /home/hadoop /spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar 2Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/03/25 13:06:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/25 13:06:09 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.2:8032 15/03/25 13:06:09 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 2 15/03/25 13:06:09 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 15/03/25 13:06:09 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 800 15/03/25 13:06:09 INFO yarn.Client: Preparing Local resources 15/03/25 13:06:10 INFO yarn.Client: Uploading file:/home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar to hdfs://master:9000/user/hadoop/.sparkStaging/application_1427313904247_0001/sp ark-examples-1.0.2-hadoop2.2.0.jar15/03/25 13:06:13 INFO yarn.Client: Uploading file:/home/hadoop/spark-1.0.2-bin-hadoop2/lib/spark-assembly-1.0.2-hadoop2.2.0.jar to hdfs://master:9000/user/hadoop/.sparkStaging/application_142 7313904247_0001/spark-assembly-1.0.2-hadoop2.2.0.jar15/03/25 13:06:25 INFO yarn.Client: Setting up the launch environment 15/03/25 13:06:25 INFO yarn.Client: Setting up container launch context 15/03/25 13:06:25 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx400m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.app.name=\"org.apache.spar k.examples.SparkPi\", -Dspark.eventLog.enabled=\"true\", -Dspark.eventLog.dir=\"hdfs://master:9000/spark/log\", -Dspark.yarn.historyServer.address=\"master:18080\", -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, org.apache.spark.examples.SparkPi, --jar , file:/home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar, --args '2' , --executor-memory, 400, --executor-cores, 1, --num-executors , 2, 1>, yarn.scheduler.maximum-allocation-mb 800 /stdout, 2>, /stderr)15/03/25 13:06:25 INFO yarn.Client: Submitting application to ASM 15/03/25 13:06:25 INFO impl.YarnClientImpl: Submitted application application_1427313904247_0001 to ResourceManager at master/192.168.1.2:8032 15/03/25 13:06:26 INFO yarn.Client: Application report from ASM: application identifier: application_1427313904247_0001 appId: 1 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: default appMasterRpcPort: 0 appStartTime: 1427313985731 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: master:8088/proxy/application_1427313904247_0001/ appUser: hadoop 15/03/25 13:06:27 INFO yarn.Client: Application report from ASM: application identifier: application_1427313904247_0001 appId: 1 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: default appMasterRpcPort: 0 appStartTime: 1427313985731 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: master:8088/proxy/application_1427313904247_0001/ appUser: hadoop
查看yarn监控页面:http://192.168.1.2:8088/cluster
可以看到任务是在slave2上面执行的
访问http://192.168.1.4:8042/node
登录slave2查看
[hadoop@slave2 ~]$ cd /home/hadoop/hadoop/logs/userlogs/application_1427313904247_0001/container_1427313904247_0001_01_000001 [hadoop@slave2 container_1427313904247_0001_01_000001]$ ls stderr stdout [hadoop@slave2 container_1427313904247_0001_01_000001]$ cat stdout Pi is roughly 3.13774 [hadoop@slave2 ~]$ cd /home/hadoop/spark/examples/src/main/scala/org/apache/spark/examples/ [hadoop@slave2 examples]$ cat SparkPi.scala /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.examples import scala.math.random import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = 100000 * slices val count = spark.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } }
在yarn上面使用spark-shell
[hadoop@master ~]$ spark-shell --master yarn-client
分享文章:SparkOnYarn实战
链接URL:http://scyanting.com/article/pohehj.html