spark之master与worker通信模型讲解
通信模型架构图
让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名注册、网站空间、营销软件、网站建设、和顺网站维护、网站推广。
master 端代码 import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory // 需要导入这2个包 封装一些属性。 class MasterActor extends Actor { //在开始之前调用一次 override def preStart(): Unit = { } //用于接收消息 override def receive: Receive = { case "started" => { println("Master has been started!") //进入这个分支,说明这个Master线程已经启动完成 } case "connecting" => { println("Master has been get connect from Worker!") println("a Worker Node has been register!") //返回消息给Worker sender() ! "connected" Thread.sleep(1000) } case "stoped" => { } } } object Demo01MasterActor { def main(args: Array[String]) { //设置MasterIP和端口 val masterHost = "localhost" val masterPort = "1234" //端口和IP封装到akka架构,获取一个属性配置文件 val conStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$masterHost" |akka.remote.netty.tcp.port = "$masterPort" """.stripMargin val config = ConfigFactory.parseString(conStr) val masterActorSystem = ActorSystem("MasterActorSystem", config) val masterActor = masterActorSystem.actorOf(Props[MasterActor], "MasterActor") masterActor ! "started" masterActorSystem.awaitTermination(); } } worker端代码 import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory class WorkerActor extends Actor { var masterURL: ActorSelection = null //启动Actor之前执行,做初始化工作 override def preStart(): Unit = { //配置访问Master的URL //MasterIP:localhost //MasterPort:8888(根据Master配置) //Master的 ActorSystem对象:MasterActorSystem、MasterActor masterURL = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor") } override def receive: Receive = { case "started" => { println("Worker has been started!") //进入这个分支,说明这个Worker线程已经启动完成 //可以去向Master注册 //请求和Master建立连接 masterURL ! "connecting" } case "connected" => { println("Worker 收到来自Master确认信息!") } case "stoped" => { } } } object Demo01WorkerActor { def main(args: Array[String]) { //初始化MastereIP和端口、WorkerIP和端口 // val masterHost = args(0) // val masterPort = args(1) // val workerHost = args(2) // val workePort = args(3) val masterHost = "localhost" val masterPort = "8888" val workerHost = "localhost" val workePort = "8889" //端口和IP封装到akka架构,获取一个属性配置文件 val conStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$workerHost" |akka.remote.netty.tcp.port = "$workePort" """.stripMargin val config = ConfigFactory.parseString(conStr) val workerActorSystem = ActorSystem("WorkerActorSystem", config) val workerActor = workerActorSystem.actorOf(Props[WorkerActor], "WorkerActor") workerActor ! "started" workerActorSystem.awaitTermination(); } }
当前题目:spark之master与worker通信模型讲解
网站地址:http://scyanting.com/article/jpgeed.html