加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

Akka分发pub sub:Java实现不起作用

发布时间:2020-12-15 02:08:03 所属栏目:Java 来源:网络整理
导读:订阅者的主类:Application. java package com.mynamespace;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ApplicationContext;import org.
订阅者的主类:Application. java

package com.mynamespace;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

import com.mynamespace.actors.SubscriberActor;

@SpringBootApplication
@ComponentScan(basePackages = "com.mynamespace.*")   
public class Application {

    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(Application.class,args);
        // get hold of the actor system
        ActorSystem system = ctx.getBean(ActorSystem.class);
        ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
        ActorRef subscriber = system.actorOf(
                Props.create(SubscriberActor.class),"subscriber");
       // subscribe to the topic named "content"
        mediator.tell(new DistributedPubSubMediator.Put(subscriber),subscriber);
        // subscriber.tell("init",null);
        System.out.println("Running.");
        Thread.sleep(5000l);
    }
}

订阅者actor:SubscriberActor.java

package com.mynamespace.actors;

import java.util.ArrayList;
import java.util.List;

import akka.actor.UntypedActor;

import com.mynamespace.message.CategoryServiceRequest;
import com.mynamespace.message.CategoryServiceResponse;

public class SubscriberActor extends UntypedActor {

    @Override
    public void onReceive(Object msg) throws Exception {
        if (msg instanceof CategoryServiceRequest) {
            System.out.println("Request received for GetCategories.");
            CategoryServiceResponse response = new CategoryServiceResponse();
            List<String> categories = new ArrayList<>();
            categories.add("Food");
            categories.add("Fruits");
            response.setCatgories(categories);
            getSender().tell(response,getSelf());
        } else if (msg instanceof String && msg.equals("init")) {
            System.out.println("init called");
        } else {
            System.out
                .println("Unhandelled message received for getCategories.");
        }
    }

}

订阅者的Application.conf

akka {
    loglevel = INFO
    stdout-loglevel = INFO
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }

    remote {
       enabled-transports = ["akka.remote.netty.tcp"]
       netty.tcp {
         hostname = "127.0.0.1"
         port = 0
       }
     }

     cluster {
    seed-nodes = [
      "akka.tcp://mynamespace-actor-system@127.0.0.1:2551","akka.tcp://mynamespace-actor-system@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
    }

}

发布者的主要类:Application.java

package com.mynamespace;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

import com.mynamespace.actors.PublisherActor;

@SpringBootApplication
@ComponentScan(basePackages = "com.mynamespace.*")
public class Application {

    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(Application.class,args);
        // get hold of the actor system
        ActorSystem system = ctx.getBean(ActorSystem.class);
        ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
        ActorRef publisher = system.actorOf(Props.create(PublisherActor.class),"publisher");
        mediator.tell(new DistributedPubSubMediator.Put(publisher),publisher);
        Thread.sleep(5000);
        publisher.tell("hi",publisher);
        System.out.println("Running.");
    }
}

PublisherActor.java

package com.mynamespace.actors;

import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
import akka.util.Timeout;

import com.mynamespace.message.CategoryServiceRequest;
import com.mynamespace.message.CategoryServiceResponse;

public class PublisherActor extends UntypedActor {

    // activate the extension
    ActorRef mediator = DistributedPubSubExtension.get(getContext().system())
        .mediator();

    public void onReceive(Object msg) {
        if (msg instanceof String) {
            Timeout timeOut = new Timeout(50000l);
            mediator.tell(new DistributedPubSubMediator.Send(
                    "/user/subscriber",new CategoryServiceRequest()),getSelf());
            Future<Object> response = Patterns.ask(mediator,new DistributedPubSubMediator.Send("/user/subscriber",timeOut);
            Future<CategoryServiceResponse> finalresponse = response.map(
                    new Mapper<Object,CategoryServiceResponse>() {

                        @Override
                        public CategoryServiceResponse apply(Object parameter) {
                            CategoryServiceResponse responseFromRemote = (CategoryServiceResponse) parameter;
                            System.out.println("received:: list of size:: "
                                + responseFromRemote.getCatgories().size());
                            return responseFromRemote;
                        }

                    },getContext().system().dispatcher());
        } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
            System.out.println("subscribbed.......");

        } else {
            unhandled(msg);
        }
    }
}

发布者的应用程序配置与订阅者相同.两者都在同一系统上的不同端口上运行.

我在本地系统上定义并运行了两个种子节点.不知何故,我无法通过DistributedPubSub Mediator从生产者(两者都在不同的节点上运行)ASK / TELL订阅者.

运行Subscriber然后发布者:我没有在stdout / logs中打印任何异常或任何死信引用.

是否有可能查看哪个actor引用了我的调解员?

需要帮助来查找问题或可能的问题.

解决方法

我遇到了同样的问题,在@spam的评论和我自己的实验后,我可以推荐的是使用发布/订阅组和sendOneMessageToEachGroup = true.

它是否应该只在本地发送?如果是这样,文档没有明确说明.但我也可以通过代码告诉我,文档的这个特定部分显然已经被忽略了(因为更改了类名,但是不调用它们,在之前的例子中调用前面的那些)

希望这可以帮助任何有这个问题的人,因为文档显然有点误导

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读