scala – akka在tcp上流
发布时间:2020-12-16 18:50:19 所属栏目:安全 来源:网络整理
导读:以下是设置:我希望能够通过tcp连接将消息(jsons转换为bytestrings)从发布者流式传输到远程服务器订阅者. 理想情况下,发布者将是一个接收内部消息,排队然后然后将它们流式传输到订阅者服务器的参与者,如果当然有出色的需求.据我所知,这需要扩展ActorPublishe
|
以下是设置:我希望能够通过tcp连接将消息(jsons转换为bytestrings)从发布者流式传输到远程服务器订阅者.
理想情况下,发布者将是一个接收内部消息,排队然后然后将它们流式传输到订阅者服务器的参与者,如果当然有出色的需求.据我所知,这需要扩展ActorPublisher类,以便在需要时将消息扩展到onNext(). 我的问题是,到目前为止,我只能向服务器发送(接收和解码)一次性消息,每次都打开一个新的连接.我没有设法绕过akka doc并能够使用ActorPublisher设置正确的tcp Flow. 以下是发布商的代码: def send(message: Message): Unit = {
val system = Akka.system()
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val address = Play.current.configuration.getString("eventservice.location").getOrElse("localhost")
val port = Play.current.configuration.getInt("eventservice.port").getOrElse(9000)
/*** Try with actorPublisher ***/
//val result = Source.actorPublisher[Message] (Props[EventActor]).via(Flow[Message].map(Json.toJson(_).toString.map(ByteString(_))))
/*** Try with actorRef ***/
/*val source = Source.actorRef[Message](0,OverflowStrategy.fail).map(
m => {
Logger.info(s"Sending message: ${m.toString}")
ByteString(Json.toJson(m).toString)
}
)
val ref = Flow[ByteString].via(Tcp().outgoingConnection(address,port)).to(Sink.ignore).runWith(source)*/
val result = Source(Json.toJson(message).toString.map(ByteString(_))).
via(Tcp().outgoingConnection(address,port)).
runFold(ByteString.empty) { (acc,in) ? acc ++ in }//Handle the future
}
以及最终非常标准的演员代码: import akka.actor.Actor
import akka.stream.actor.ActorSubscriberMessage.{OnComplete,OnError}
import akka.stream.actor.{ActorPublisherMessage,ActorPublisher}
import models.events.Message
import play.api.Logger
import scala.collection.mutable
class EventActor extends Actor with ActorPublisher[Message] {
import ActorPublisherMessage._
var queue: mutable.Queue[Message] = mutable.Queue.empty
def receive = {
case m: Message =>
Logger.info(s"EventActor - message received and queued: ${m.toString}")
queue.enqueue(m)
publish()
case Request => publish()
case Cancel =>
Logger.info("EventActor - cancel message received")
context.stop(self)
case OnError(err: Exception) =>
Logger.info("EventActor - error message received")
onError(err)
context.stop(self)
case OnComplete =>
Logger.info("EventActor - onComplete message received")
onComplete()
context.stop(self)
}
def publish() = {
while (queue.nonEmpty && isActive && totalDemand > 0) {
Logger.info("EventActor - message published")
onNext(queue.dequeue())
}
}
如有必要,我可以提供订阅者的代码: def connect(system: ActorSystem,address: String,port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
Logger.info("Event server connected to: " + conn.remoteAddress)
// Get the ByteString flow and reconstruct the msg for handling and then output it back
// that is how handleWith work apparently
conn.handleWith(
Flow[ByteString].fold(ByteString.empty)((acc,b) => acc ++ b).
map(b => handleIncomingMessages(system,b.utf8String)).
map(ByteString(_))
)
}
val connections = Tcp().bind(address,port)
val binding = connections.to(handler).run()
binding.onComplete {
case Success(b) =>
Logger.info("Event server started,listening on: " + b.localAddress)
case Failure(e) =>
Logger.info(s"Event server could not bind to $address:$port: ${e.getMessage}")
system.terminate()
}
}
提前感谢提示. 解决方法
我的第一个建议是不要编写自己的队列逻辑. Akka提供了这种开箱即用的功能.您也不需要编写自己的Actor,Akka Streams也可以提供它.
首先,我们可以创建Flow,通过Tcp将您的发布者连接到您的订阅者.在您的发布商代码中,您只需创建一次ActorSystem并连接到外部服务器一次: //this code is at top level of your application
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
import actorSystem.dispatcher
val host = Play.current.configuration.getString("eventservice.location").getOrElse("localhost")
val port = Play.current.configuration.getInt("eventservice.port").getOrElse(9000)
val publishFlow = Tcp().outgoingConnection(host,port)
publishFlow是一个 // data to subscriber ----> publishFlow ----> data returned from subscriber 下一步是发布者来源.您可以使用 //these values control the buffer val bufferSize = 1024 val overflowStrategy = akka.stream.OverflowStrategy.dropHead val messageSource = Source.actorRef[Message](bufferSize,overflowStrategy) 我们还需要一个Flow将Messages转换为ByteString val marshalFlow = Flow[Message].map(message => ByteString(Json.toJson(message).toString)) 最后,我们可以连接所有部分.由于您没有从外部订户接收任何数据,我们将忽略来自连接的任何数据: val subscriberRef : ActorRef = messageSource.via(marshalFlow)
.via(publishFlow)
.runWith(Sink.ignore)
我们现在可以将此流视为一个Actor: val message1 : Message = ??? subscriberRef ! message1 val message2 : Message = ??? subscriberRef ! message2 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- angular.element使用方法及总结
- Angular 2 Material – 在表单中使用MD的自动完成示例
- angularjs – angular-pickadate timezone – selectedDate
- angularjs – 量角器过滤器,如何在过滤元素中查找元素
- bash – 不能使用curl安装NPM
- bootstrap table 的searchParam参数传递
- twitter-bootstrap – 如何使用Twitter Bootstrap 3的非响应
- 泛型 – Scala:“结构细化中的参数类型可能不是指在该细化
- 将列和数据动态加载到Angular 2中的表中
- angular 完整实例
