scala-akka实现简单的分布式RPC通信框架(主从监听,消息发送)
发布时间:2020-12-16 09:48:56 所属栏目:安全 来源:网络整理
导读:简介:模拟用akka实现一个RPC分布式通信框架,实现从机向主机发送心跳,主机判断从机是否挂机,以及消息通信的简单功能。 开发平台:IntelliJ IDEA ? ? Maven架构 项目结构及依赖: 代码2个样例类: case class Workinfo (val id:String) extends Serializab
简介:模拟用akka实现一个RPC分布式通信框架,实现从机向主机发送心跳,主机判断从机是否挂机,以及消息通信的简单功能。 开发平台:IntelliJ IDEA ? ? Maven架构 项目结构及依赖: 代码2个样例类: case class Workinfo (val id:String) extends Serializable{ } case class SendHearBeat (val id:String) extends Serializable{ } Master类: import akka.actor.{Actor,ActorSystem,Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.collection.mutable class Master extends Actor{ //保存worker id val workerMessage=new mutable.HashMap[String,Long] //worker超时时间 val WORKEER_TIMEOUT=10 * 1000 override def preStart(): Unit ={ //导入隐式转换,用于启动定时器 import context.dispatcher //启动定时 context.system.scheduler.schedule(0 millis,5000 millis,self,"CheckOfTimeOutWorker") } override def receive: Receive = { //注册woker把workerid 和当前时间存起来 case Workinfo(id)=>{ if(!workerMessage.contains(id)) { workerMessage.put(id,System.currentTimeMillis()) } println("注册worker:"+id) sender ! "MasterReply" } //接收worker心跳,更新接收时间 case SendHearBeat(id)=>{ workerMessage(id)=System.currentTimeMillis() println("接收到worker:"+id+"的心跳报告。") } //Master自己向自己发送的定期检查超时Worker的消息 case "CheckOfTimeOutWorker" =>{ val currentTime = System.currentTimeMillis() //过滤出超时的worker val outTimeWorker=workerMessage.filter(m=>currentTime-m._2>WORKEER_TIMEOUT).toArray for(worker<-outTimeWorker){ workerMessage.remove(worker._1) } println("woker数量:"+outTimeWorker.size+"个,挂了") } } } object Master{ def main(args: Array[String]): Unit = { //接受参数 val host="169.254.189.184" val port="8888" //配置信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config=ConfigFactory.parseString(configStr) //ActorSystem,辅助创建和监控下面的Actor,单例 val actorSystem=ActorSystem("MasterSystem",config) //执行mastor生命周期方法 val master= actorSystem.actorOf(Props(new Master),"Master") actorSystem.awaitTermination() } } Worker类: import java.util.UUID import akka.actor.{Actor,ActorSelection,Props} import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory class Worker(val masterHost:String,val masterPort:String) extends Actor{ var master:ActorSelection=_ //生成uuid作为worker标识 val id=UUID.randomUUID().toString //建立连接,链接Master override def preStart(): Unit ={ //Master就是你前面给master起的名字 master=context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //id和其他信息封装到一个类中,把这个类发送过去,注意这个类要序列化 master ! Workinfo(id) } override def receive: Receive = { //接收master返回来的信息后启动定时 case "MasterReply"=>{ println("a reply from master") //导入隐式转换,用于启动定时器 import context.dispatcher //启动定时任务,通过case回调SendHearBeat向master发送心跳(id标识) context.system.scheduler.schedule(0 millis,"SendHeartBeat") } case "SendHearBeat"=>{ println("worker send heartbeat") master ! SendHearBeat(id) } } } object Worker{ def main(args: Array[String]): Unit = { //接受参数 val workerHost="169.254.189.184" val workerPort="9999" val masterHost="169.254.189.184" val masterPort="8888" //配置信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$workerHost" |akka.remote.netty.tcp.port = "$workerPort" """.stripMargin val config=ConfigFactory.parseString(configStr) //ActorSystem,单例 val actorSystem=ActorSystem("WorkerSystem",config) //启动Actor,woker会被实例化,生命周期方法会被调用 actorSystem.actorOf(Props(new Worker(masterHost,masterPort)),"Worker") } } 启动Master控制台打印: "C:Program Files (x86)Javajdk1.7.0_01binjava" "-javaagent:D:Program Files (x86)IntelliJ IDEA 2017.3.2libidea_rt.jar=54133:D:Program Files (x86)IntelliJ IDEA 2017.3.2bin" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.7.0_01jrelibcharsets.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibdeploy.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibextsunec.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibjavaws.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibjce.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibjsse.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibplugin.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibresources.jar;C:Program Files (x86)Javajdk1.7.0_01jrelibrt.jar;D:Program Files (x86)IntelliJ IDEA 2017.3.2scalaspacerpcScalatargetclasses;C:UsersAdministrator.m2repositoryorgscala-langscala-library2.10.6scala-library-2.10.6.jar;C:UsersAdministrator.m2repositorycomtypesafeakkaakka-actor_2.102.3.14akka-actor_2.10-2.3.14.jar;C:UsersAdministrator.m2repositorycomtypesafeconfig1.2.1config-1.2.1.jar;C:UsersAdministrator.m2repositorycomtypesafeakkaakka-remote_2.102.3.14akka-remote_2.10-2.3.14.jar;C:UsersAdministrator.m2repositoryionettynetty3.8.0.Finalnetty-3.8.0.Final.jar;C:UsersAdministrator.m2repositorycomgoogleprotobufprotobuf-java2.5.0protobuf-java-2.5.0.jar;C:UsersAdministrator.m2repositoryorguncommonsmathsuncommons-maths1.2.2auncommons-maths-1.2.2a.jar" Master [INFO] [01/06/2018 13:44:54.602] [main] [Remoting] Starting remoting [INFO] [01/06/2018 13:44:55.338] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://MasterSystem@169.254.189.184:8888] [INFO] [01/06/2018 13:44:55.341] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://MasterSystem@169.254.189.184:8888] woker数量:0个,挂了 woker数量:0个,挂了 woker数量:0个,挂了 woker数量:0个,挂了 启动Woker后Master控制台打印:(看见worker注册了) woker数量:0个,挂了 woker数量:0个,挂了 注册worker:9e8ec9e7-6014-491f-816e-94d0f65eb068 woker数量:0个,挂了 woker数量:0个,挂了worker控制台打印: a reply from master 关掉Worker后Master控制台打印: woker数量:0个,挂了 woker数量:0个,挂了 woker数量:1个,挂了 woker数量:0个,挂了可以看到关掉worker后Master监控到有一台worker挂掉了。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |