初见akka-02:rpc框架
1.RPC:简单点说,就是多线程之间的通信,我们今天用了scala以及akka
来简单的实现了
rpc框架的一些简单的内容,一脸包括了,心跳,间隔时间,
注册以及一些问题,
模式匹配的一些东西,虽然比较简单,但是属于麻雀虽小,五脏俱全
这个里面一共有有四个文件:
Master.scala
RemoteMessage.scala
Worker.scala
WorkerInfo
Master.scala
package cn.wj.rpcimport akka.actor.{Actor, ActorSystem} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactory import akka.actor.Props import scala.concurrent.duration._import scala.collection.mutable/*** Created by WJ on 2016/12/23.*/ class Master(val host:String,val port:Int ) extends Actor {// workerId -> WorkerInfoval idToWorker = new mutable.HashMap[String,WorkerInfo]()val workers = new mutable.HashSet[WorkerInfo]()//时间间隔时间,超时检测的间隔val CHECK_INTERVAL = 15000//用于接收消息override def receive: Receive = {case RegisterWorker(id,memory,cores) => { // println("a client connected") // sender ! "reply" //往发送给他消息的人回复一个消息//判断一下是不是已经注册过了if(!(idToWorker.contains(id))){//把Worker的信息封装以前,保存到内存当中val workerInfo = new WorkerInfo(id,memory,cores)idToWorker(id) = workerInfo //这个应该是scala的特定版本workers += workerInfosender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")}}case Heartbeat(id) =>{if(idToWorker.contains(id)) {val workerInfo = idToWorker(id)//报活//得到系统当前时间val currentTime = System.currentTimeMillis()workerInfo.lastHeartbeatTime = currentTime}}case CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)for(w <- toRemove){workers -= widToWorker -= w.id}println(workers.size)}}override def preStart(): Unit = {println("prestart invoked")//导入隐式转换的功能import context.dispatchercontext.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)} }object Master{def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toInt// 准备配置val configStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval config = ConfigFactory.parseString(configStr)//ActorSystem老大,辅助创建和监控下面的Actor,他是单例的val actorSystem = ActorSystem("MasterSystem",config )//创建Actorval master = actorSystem.actorOf(Props(new Master(host,port)),"Master")actorSystem.awaitTermination()} }Worker.scala
package cn.wj.rpcimport java.util.UUIDimport akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /*** Created by WJ on 2016/12/23.*/ class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor {var master : ActorSelection = _val workerId = UUID.randomUUID().toStringval HEART_INTERVAL = 10000//preStart执行方法的时机:构造器之后,receive之前//与Master(Actor)建立连接override def preStart(): Unit = {//master已经是别的Master的引用了 ,这是跟master建立连接master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")//向Master发送注册消息master ! RegisterWorker(workerId,memory,cores)}override def receive: Receive = {case RegisteredWorker(masterUrl) => {println(masterUrl)//启动定时器发送心跳import context.dispatchercontext.system.scheduler.schedule(0 millis,HEART_INTERVAL millis,self,SendHeartbeat)}case SendHeartbeat =>{println("send heartbeat to master")master ! Heartbeat(workerId)}} }object Worker{def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toIntval masterHost = args(2)val masterPort = args(3).toIntval memory = args(4).toIntval cores = args(5).toInt// 准备配置val configStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval config = ConfigFactory.parseString(configStr)//ActorSystem老大,辅助创建和监控下面的Actor,他是单例的val actorSystem = ActorSystem("WorkerSystem",config )//创建Actor,此时调用该(Actor)的prestart以及receive方法actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker")actorSystem.awaitTermination()} }RemoteMessage.scala
package cn.wj.rpc/*** Created by WJ on 2016/12/25.*/ trait RemoteMessage extends Serializable//Worker->Master(这个表明当master接受这个worker时的信息,是receive)case class RegisterWorker (id:String, memory:Int, cores:Int) extends RemoteMessage//Master -> Worker(这个是master收到workerd的注册信息,表明已经注册过这条信息,是sender ! xxx时候出现的) case class RegisteredWorker(masterUrl:String) extends RemoteMessage//这是进程之间自己给自己发送消息,所以采用case object,并且不需要实现Serializable //Worker -> Worker(self) case object SendHeartbeat//这个是work向master发送定时器,其中的id是work的id,因为要向master说明,是哪一个work给他发送的心跳 //Worker -> Master case class Heartbeat(id:String) extends RemoteMessage//Master -> self case object CheckTimeOutWorkerWorkerInfo.scala
package cn.wj.rpc/*** Created by WJ on 2016/12/25.*/ class WorkerInfo(val id:String ,val memory :Int,val cores:Int) {//TODO 上一次心跳var lastHeartbeatTime:Long = _ }这个上面的四个就是简单的实现了RPC框架,其实就是一个Master监控多个Worker,
当一个Worker创建了,他就是需要在Master注册信息,其实这个Master个人感觉就像
是个Zookeeper,掌管Worker的信息,为其Worker分配一些资源,当Master接到Worker
的注册信息的时候,他就在自己的注册表添加上这个Worker,然后向Worker发送一个注册
成功的信息,此时这个Worker的收到这个注册信息,然后他就给Master发送心跳,这个的
作用是在告诉Master,我这个Worker是存活的(报活),当一个Worke发送心跳的时间间隔
过长,长过我们规定的时间,那么此时我们就需要主动杀死这个Worker,感觉hadoop的一些
分布式和这个原理差不多。
下面奉上原理图一张:
其中的receive是用于接受信息,因为继承Actor,
prestart这个方法是执行实在类实例之后,receive的方法之后
2.RPC的大概流程
首先定义了一个worker,一个master,master首先启动了,
然后它在prestart()的方法里面
检测超时的worker,那么在这个里面启动了一个定时器,
那么我们自己是不是自己可以手写一个定时器,
比如我们可以用线程来搞定时器,但是我们的akka
里面提供了一个超级简单的定时器,
context.system.schedular.schedule
(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
其中第一个参数:延迟多少秒
第二个参数:时间间隔
第三个参数:把这个消息发给谁
第四个参数:发送什么消息
虽然它起了消息,但是他不能一下子就把消息发送出去
,它只能把消息先发送给自己的receive接收到这个消息,
然后在发送给我们master,这个里面有一个检测,
检测worker有多长时间没有向我发送心跳了,
如果这个时间大过了我规定的范围,
这样,Master启动完成检测心跳,worker启动完成后
,首先向master建立连接,然后发送注册消息
,master接受到这个注册消息,
把worker的信息保存到内存当中,然后向worker反馈一个消息,
说你注册成功了,然后worker启动一个定时器,
定时的向master发送心跳,就是这样的流程
转载于:https://www.cnblogs.com/wnbahmbb/p/6220528.html
总结
以上是生活随笔为你收集整理的初见akka-02:rpc框架的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: nginx subrequest演示示例
- 下一篇: jquery中的ajax方法参数——$.