Akka分发pub sub:Java实现不起作用
发布时间:2020-12-15 02:08:03 所属栏目:Java 来源:网络整理
导读:订阅者的主类:Application. java package com.mynamespace;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ApplicationContext;import org.
订阅者的主类:Application.
java
package com.mynamespace; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.ComponentScan; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.contrib.pattern.DistributedPubSubExtension; import akka.contrib.pattern.DistributedPubSubMediator; import com.mynamespace.actors.SubscriberActor; @SpringBootApplication @ComponentScan(basePackages = "com.mynamespace.*") public class Application { public static void main(String[] args) throws InterruptedException { ApplicationContext ctx = SpringApplication.run(Application.class,args); // get hold of the actor system ActorSystem system = ctx.getBean(ActorSystem.class); ActorRef mediator = DistributedPubSubExtension.get(system).mediator(); ActorRef subscriber = system.actorOf( Props.create(SubscriberActor.class),"subscriber"); // subscribe to the topic named "content" mediator.tell(new DistributedPubSubMediator.Put(subscriber),subscriber); // subscriber.tell("init",null); System.out.println("Running."); Thread.sleep(5000l); } } 订阅者actor:SubscriberActor.java package com.mynamespace.actors; import java.util.ArrayList; import java.util.List; import akka.actor.UntypedActor; import com.mynamespace.message.CategoryServiceRequest; import com.mynamespace.message.CategoryServiceResponse; public class SubscriberActor extends UntypedActor { @Override public void onReceive(Object msg) throws Exception { if (msg instanceof CategoryServiceRequest) { System.out.println("Request received for GetCategories."); CategoryServiceResponse response = new CategoryServiceResponse(); List<String> categories = new ArrayList<>(); categories.add("Food"); categories.add("Fruits"); response.setCatgories(categories); getSender().tell(response,getSelf()); } else if (msg instanceof String && msg.equals("init")) { System.out.println("init called"); } else { System.out .println("Unhandelled message received for getCategories."); } } } 订阅者的Application.conf akka { loglevel = INFO stdout-loglevel = INFO loggers = ["akka.event.slf4j.Slf4jLogger"] extensions = ["akka.contrib.pattern.DistributedPubSubExtension"] actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://mynamespace-actor-system@127.0.0.1:2551","akka.tcp://mynamespace-actor-system@127.0.0.1:2552"] auto-down-unreachable-after = 10s } } 发布者的主要类:Application.java package com.mynamespace; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.ComponentScan; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.contrib.pattern.DistributedPubSubExtension; import akka.contrib.pattern.DistributedPubSubMediator; import com.mynamespace.actors.PublisherActor; @SpringBootApplication @ComponentScan(basePackages = "com.mynamespace.*") public class Application { public static void main(String[] args) throws InterruptedException { ApplicationContext ctx = SpringApplication.run(Application.class,args); // get hold of the actor system ActorSystem system = ctx.getBean(ActorSystem.class); ActorRef mediator = DistributedPubSubExtension.get(system).mediator(); ActorRef publisher = system.actorOf(Props.create(PublisherActor.class),"publisher"); mediator.tell(new DistributedPubSubMediator.Put(publisher),publisher); Thread.sleep(5000); publisher.tell("hi",publisher); System.out.println("Running."); } } PublisherActor.java package com.mynamespace.actors; import scala.concurrent.Future; import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.contrib.pattern.DistributedPubSubExtension; import akka.contrib.pattern.DistributedPubSubMediator; import akka.dispatch.Mapper; import akka.pattern.Patterns; import akka.util.Timeout; import com.mynamespace.message.CategoryServiceRequest; import com.mynamespace.message.CategoryServiceResponse; public class PublisherActor extends UntypedActor { // activate the extension ActorRef mediator = DistributedPubSubExtension.get(getContext().system()) .mediator(); public void onReceive(Object msg) { if (msg instanceof String) { Timeout timeOut = new Timeout(50000l); mediator.tell(new DistributedPubSubMediator.Send( "/user/subscriber",new CategoryServiceRequest()),getSelf()); Future<Object> response = Patterns.ask(mediator,new DistributedPubSubMediator.Send("/user/subscriber",timeOut); Future<CategoryServiceResponse> finalresponse = response.map( new Mapper<Object,CategoryServiceResponse>() { @Override public CategoryServiceResponse apply(Object parameter) { CategoryServiceResponse responseFromRemote = (CategoryServiceResponse) parameter; System.out.println("received:: list of size:: " + responseFromRemote.getCatgories().size()); return responseFromRemote; } },getContext().system().dispatcher()); } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) { System.out.println("subscribbed......."); } else { unhandled(msg); } } } 发布者的应用程序配置与订阅者相同.两者都在同一系统上的不同端口上运行. 我在本地系统上定义并运行了两个种子节点.不知何故,我无法通过DistributedPubSub Mediator从生产者(两者都在不同的节点上运行)ASK / TELL订阅者. 运行Subscriber然后发布者:我没有在stdout / logs中打印任何异常或任何死信引用. 是否有可能查看哪个actor引用了我的调解员? 需要帮助来查找问题或可能的问题. 解决方法
我遇到了同样的问题,在@spam的评论和我自己的实验后,我可以推荐的是使用发布/订阅组和sendOneMessageToEachGroup = true.
它是否应该只在本地发送?如果是这样,文档没有明确说明.但我也可以通过代码告诉我,文档的这个特定部分显然已经被忽略了(因为更改了类名,但是不调用它们,在之前的例子中调用前面的那些) 希望这可以帮助任何有这个问题的人,因为文档显然有点误导 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |