欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

初见akka-02:rpc框架

发布时间:2025/3/15 编程问答 37 豆豆
生活随笔 收集整理的这篇文章主要介绍了 初见akka-02:rpc框架 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

  1.RPC:简单点说,就是多线程之间的通信,我们今天用了scala以及akka

   来简单的实现了

   rpc框架的一些简单的内容,一脸包括了,心跳,间隔时间,

   注册以及一些问题,

   模式匹配的一些东西,虽然比较简单,但是属于麻雀虽小,五脏俱全

   这个里面一共有有四个文件:

   Master.scala

   RemoteMessage.scala

   Worker.scala

   WorkerInfo

  

   Master.scala

package cn.wj.rpcimport akka.actor.{Actor, ActorSystem} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactory import akka.actor.Props import scala.concurrent.duration._import scala.collection.mutable/*** Created by WJ on 2016/12/23.*/ class Master(val host:String,val port:Int ) extends Actor {// workerId -> WorkerInfoval idToWorker = new mutable.HashMap[String,WorkerInfo]()val workers = new mutable.HashSet[WorkerInfo]()//时间间隔时间,超时检测的间隔val CHECK_INTERVAL = 15000//用于接收消息override def receive: Receive = {case RegisterWorker(id,memory,cores) => { // println("a client connected") // sender ! "reply" //往发送给他消息的人回复一个消息//判断一下是不是已经注册过了if(!(idToWorker.contains(id))){//把Worker的信息封装以前,保存到内存当中val workerInfo = new WorkerInfo(id,memory,cores)idToWorker(id) = workerInfo //这个应该是scala的特定版本workers += workerInfosender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")}}case Heartbeat(id) =>{if(idToWorker.contains(id)) {val workerInfo = idToWorker(id)//报活//得到系统当前时间val currentTime = System.currentTimeMillis()workerInfo.lastHeartbeatTime = currentTime}}case CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)for(w <- toRemove){workers -= widToWorker -= w.id}println(workers.size)}}override def preStart(): Unit = {println("prestart invoked")//导入隐式转换的功能import context.dispatchercontext.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)} }object Master{def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toInt// 准备配置val configStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval config = ConfigFactory.parseString(configStr)//ActorSystem老大,辅助创建和监控下面的Actor,他是单例的val actorSystem = ActorSystem("MasterSystem",config )//创建Actorval master = actorSystem.actorOf(Props(new Master(host,port)),"Master")actorSystem.awaitTermination()} }

  Worker.scala

package cn.wj.rpcimport java.util.UUIDimport akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /*** Created by WJ on 2016/12/23.*/ class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor {var master : ActorSelection = _val workerId = UUID.randomUUID().toStringval HEART_INTERVAL = 10000//preStart执行方法的时机:构造器之后,receive之前//与Master(Actor)建立连接override def preStart(): Unit = {//master已经是别的Master的引用了 ,这是跟master建立连接master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")//向Master发送注册消息master ! RegisterWorker(workerId,memory,cores)}override def receive: Receive = {case RegisteredWorker(masterUrl) => {println(masterUrl)//启动定时器发送心跳import context.dispatchercontext.system.scheduler.schedule(0 millis,HEART_INTERVAL millis,self,SendHeartbeat)}case SendHeartbeat =>{println("send heartbeat to master")master ! Heartbeat(workerId)}} }object Worker{def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toIntval masterHost = args(2)val masterPort = args(3).toIntval memory = args(4).toIntval cores = args(5).toInt// 准备配置val configStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval config = ConfigFactory.parseString(configStr)//ActorSystem老大,辅助创建和监控下面的Actor,他是单例的val actorSystem = ActorSystem("WorkerSystem",config )//创建Actor,此时调用该(Actor)的prestart以及receive方法actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker")actorSystem.awaitTermination()} }

  RemoteMessage.scala

package cn.wj.rpc/*** Created by WJ on 2016/12/25.*/ trait RemoteMessage extends Serializable//Worker->Master(这个表明当master接受这个worker时的信息,是receive)case class RegisterWorker (id:String, memory:Int, cores:Int) extends RemoteMessage//Master -> Worker(这个是master收到workerd的注册信息,表明已经注册过这条信息,是sender ! xxx时候出现的) case class RegisteredWorker(masterUrl:String) extends RemoteMessage//这是进程之间自己给自己发送消息,所以采用case object,并且不需要实现Serializable //Worker -> Worker(self) case object SendHeartbeat//这个是work向master发送定时器,其中的id是work的id,因为要向master说明,是哪一个work给他发送的心跳 //Worker -> Master case class Heartbeat(id:String) extends RemoteMessage//Master -> self case object CheckTimeOutWorker

  WorkerInfo.scala

package cn.wj.rpc/*** Created by WJ on 2016/12/25.*/ class WorkerInfo(val id:String ,val memory :Int,val cores:Int) {//TODO 上一次心跳var lastHeartbeatTime:Long = _ }

  这个上面的四个就是简单的实现了RPC框架,其实就是一个Master监控多个Worker,

  当一个Worker创建了,他就是需要在Master注册信息,其实这个Master个人感觉就像

  是个Zookeeper,掌管Worker的信息,为其Worker分配一些资源,当Master接到Worker

  的注册信息的时候,他就在自己的注册表添加上这个Worker,然后向Worker发送一个注册

  成功的信息,此时这个Worker的收到这个注册信息,然后他就给Master发送心跳,这个的

  作用是在告诉Master,我这个Worker是存活的(报活),当一个Worke发送心跳的时间间隔

  过长,长过我们规定的时间,那么此时我们就需要主动杀死这个Worker,感觉hadoop的一些

  分布式和这个原理差不多。

  下面奉上原理图一张:

  

  其中的receive是用于接受信息,因为继承Actor,

  prestart这个方法是执行实在类实例之后,receive的方法之后

2.RPC的大概流程

 

  首先定义了一个worker,一个master,master首先启动了,

  然后它在prestart()的方法里面
  检测超时的worker,那么在这个里面启动了一个定时器,

  那么我们自己是不是自己可以手写一个定时器,
  比如我们可以用线程来搞定时器,但是我们的akka

  里面提供了一个超级简单的定时器,
  context.system.schedular.schedule

  (0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
  其中第一个参数:延迟多少秒
  第二个参数:时间间隔
  第三个参数:把这个消息发给谁
  第四个参数:发送什么消息
  虽然它起了消息,但是他不能一下子就把消息发送出去
  ,它只能把消息先发送给自己的receive接收到这个消息,
  然后在发送给我们master,这个里面有一个检测,
  检测worker有多长时间没有向我发送心跳了,
  如果这个时间大过了我规定的范围,
  这样,Master启动完成检测心跳,worker启动完成后
  ,首先向master建立连接,然后发送注册消息
  ,master接受到这个注册消息,
  把worker的信息保存到内存当中,然后向worker反馈一个消息,
  说你注册成功了,然后worker启动一个定时器,
  定时的向master发送心跳,就是这样的流程

  

 

转载于:https://www.cnblogs.com/wnbahmbb/p/6220528.html

总结

以上是生活随笔为你收集整理的初见akka-02:rpc框架的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。