scala – 如何将多个传入的TCP连接表示为Akka流的流?
发布时间:2020-12-16 19:23:37 所属栏目:安全 来源:网络整理
导读:我正在使用Akka Streams对网络服务器进行原型设计,该服务器将侦听端口,接受传入连接,并连续读取每个连接的数据.每个连接的客户端只会发送数据,并且不希望从服务器接收任何有用的内容. 从概念上讲,我认为将传入事件建模为一个只偶然碰巧通过多个TCP连接传递的
我正在使用Akka Streams对网络服务器进行原型设计,该服务器将侦听端口,接受传入连接,并连续读取每个连接的数据.每个连接的客户端只会发送数据,并且不希望从服务器接收任何有用的内容.
从概念上讲,我认为将传入事件建模为一个只偶然碰巧通过多个TCP连接传递的单个流是合适的.因此,假设我有一个表示每个数据消息的案例类Msg(msg:String),我想要的是将整个传入数据表示为Source [Msg,_].这对我的用例很有意义,因为我可以非常简单地连接流和&沉到这个源头. 这是我为实现我的想法而编写的代码: import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.SourceShape import akka.stream.scaladsl._ import akka.util.ByteString import akka.NotUsed import scala.concurrent.{ Await,Future } import scala.concurrent.duration._ case class Msg(msg: String) object tcp { val N = 2 def main(argv: Array[String]) { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val connections = Tcp().bind("0.0.0.0",65432) val delim = Framing.delimiter( ByteString("n"),maximumFrameLength = 256,allowTruncation = true ) val parser = Flow[ByteString].via(delim).map(_.utf8String).map(Msg(_)) val messages: Source[Msg,Future[Tcp.ServerBinding]] = connections.flatMapMerge(N,{ connection => println(s"client connected: ${connection.remoteAddress}") Source.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val F = builder.add(connection.flow.via(parser)) val nothing = builder.add(Source.tick( initialDelay = 1.second,interval = 1.second,tick = ByteString.empty )) F.in <~ nothing.out SourceShape(F.out) }) }) import scala.concurrent.ExecutionContext.Implicits.global Await.ready(for { _ <- messages.runWith(Sink.foreach { msg => println(s"${System.currentTimeMillis} $msg") }) _ <- system.terminate() } yield (),Duration.Inf) } } 此代码按预期工作,但请注意val N = 2,它将传递给flatMapMerge调用,该调用最终将传入的数据流合并为一个.在实践中,这意味着我一次只能从那么多的流中读取. 我不知道在任何给定时间将与此服务器建立多少连接.理想情况下,我希望尽可能多地支持,但硬编码上限似乎不是正确的事情. 我的问题,最后一个问题是:我如何获得或创建一个可以同时读取超过固定数量连接的flatMapMerge阶段? 解决方法
如Viktor Klang的评论所示,我认为这不可能在1个流中.但是,我认为可以创建一个
can receive messages after materialization的流,并将其用作来自TCP连接的消息的“接收器”.
首先创建“接收器”流: val sinkRef = Source .actorRef[Msg](Int.MaxValue,fail) .to(Sink foreach {m => println(s"${System.currentTimeMillis} $m")}) .run() 每个Connection都可以使用此sinkRef来接收消息: connections foreach { conn => Source .empty[ByteString] .via(conn.flow) .via(parser) .runForeach(msg => sinkRef ! msg) } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |