加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何将多个传入的TCP连接表示为Akka流的流?

发布时间:2020-12-16 19:23:37 所属栏目:安全 来源:网络整理
导读:我正在使用Akka Streams对网络服务器进行原型设计,该服务器将侦听端口,接受传入连接,并连续读取每个连接的数据.每个连接的客户端只会发送数据,并且不希望从服务器接收任何有用的内容. 从概念上讲,我认为将传入事件建模为一个只偶然碰巧通过多个TCP连接传递的
我正在使用Akka Streams对网络服务器进行原型设计,该服务器将侦听端口,接受传入连接,并连续读取每个连接的数据.每个连接的客户端只会发送数据,并且不希望从服务器接收任何有用的内容.

从概念上讲,我认为将传入事件建模为一个只偶然碰巧通过多个TCP连接传递的单个流是合适的.因此,假设我有一个表示每个数据消息的案例类Msg(msg:String),我想要的是将整个传入数据表示为Source [Msg,_].这对我的用例很有意义,因为我可以非常简单地连接流和&沉到这个源头.

这是我为实现我的想法而编写的代码:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.SourceShape
import akka.stream.scaladsl._
import akka.util.ByteString
import akka.NotUsed
import scala.concurrent.{ Await,Future }
import scala.concurrent.duration._

case class Msg(msg: String)

object tcp {
  val N = 2
  def main(argv: Array[String]) {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    val connections = Tcp().bind("0.0.0.0",65432)
    val delim = Framing.delimiter(
      ByteString("n"),maximumFrameLength = 256,allowTruncation = true
    )
    val parser = Flow[ByteString].via(delim).map(_.utf8String).map(Msg(_))
    val messages: Source[Msg,Future[Tcp.ServerBinding]] =
      connections.flatMapMerge(N,{
        connection =>
          println(s"client connected: ${connection.remoteAddress}")
          Source.fromGraph(GraphDSL.create() { implicit builder =>
            import GraphDSL.Implicits._
            val F = builder.add(connection.flow.via(parser))
            val nothing = builder.add(Source.tick(
              initialDelay = 1.second,interval = 1.second,tick = ByteString.empty
            ))
            F.in <~ nothing.out
            SourceShape(F.out)
          })
      })
    import scala.concurrent.ExecutionContext.Implicits.global
    Await.ready(for {
      _ <- messages.runWith(Sink.foreach {
        msg => println(s"${System.currentTimeMillis} $msg")
      })
      _ <- system.terminate()
    } yield (),Duration.Inf)
  }
}

此代码按预期工作,但请注意val N = 2,它将传递给flatMapMerge调用,该调用最终将传入的数据流合并为一个.在实践中,这意味着我一次只能从那么多的流中读取.

我不知道在任何给定时间将与此服务器建立多少连接.理想情况下,我希望尽可能多地支持,但硬编码上限似乎不是正确的事情.

我的问题,最后一个问题是:我如何获得或创建一个可以同时读取超过固定数量连接的flatMapMerge阶段?

解决方法

如Viktor Klang的评论所示,我认为这不可能在1个流中.但是,我认为可以创建一个 can receive messages after materialization的流,并将其用作来自TCP连接的消息的“接收器”.

首先创建“接收器”流:

val sinkRef = 
  Source
    .actorRef[Msg](Int.MaxValue,fail)
    .to(Sink foreach {m =>  println(s"${System.currentTimeMillis} $m")})
    .run()

每个Connection都可以使用此sinkRef来接收消息:

connections foreach { conn =>
  Source
    .empty[ByteString]
    .via(conn.flow)
    .via(parser)
    .runForeach(msg => sinkRef ! msg)
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读