加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用TCP流并将其重定向到另一个接收器(使用Akka Stream

发布时间:2020-12-16 18:17:34 所属栏目:安全 来源:网络整理
导读:我尝试使用Akka 2.4.3将TCP流重定向/转发到另一个Sink. 该程序应该打开服务器套接字,侦听传入的连接,然后使用tcp流.我们的发件人不希望/接受我们的回复,所以我们永远不会发回任何东西 – 我们只是消费流.在构建tcp流之后,我们需要将字节转换为更有用的字节并
我尝试使用Akka 2.4.3将TCP流重定向/转发到另一个Sink.
该程序应该打开服务器套接字,侦听传入的连接,然后使用tcp流.我们的发件人不希望/接受我们的回复,所以我们永远不会发回任何东西 – 我们只是消费流.在构建tcp流之后,我们需要将字节转换为更有用的字节并将其发送到接收器.

到目前为止我尝试了以下但是我特别努力解决了如何不将tcp数据包发送回发送方并正确连接接收器的部分.

import scala.util.Failure
import scala.util.Success

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow

object TcpConsumeOnlyStreamToSink {
  implicit val system = ActorSystem("stream-system")
  private val log = Logging(system,getClass.getName)    

  //The Sink
  //In reality this is of course a real Sink doing some useful things :-)
  //The Sink accept types of "SomethingMySinkUnderstand"
  val mySink = Sink.ignore;

  def main(args: Array[String]): Unit = {
    //our sender is not interested in getting replies from us
    //so we just want to consume the tcp stream and never send back anything to the sender
    val (address,port) = ("127.0.0.1",6000)
    server(system,address,port)
  }

  def server(system: ActorSystem,address: String,port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
      println("Client connected from: " + conn.remoteAddress)

      conn handleWith Flow[ByteString]
      //this is neccessary since we use a self developed tcp wire protocol
      .via(Framing.lengthField(4,65532,ByteOrder.BIG_ENDIAN)) 
      //here we want to map the raw bytes into something our Sink understands
      .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
      //here we like to connect our Sink to the Tcp Source
      .to(mySink) //<------ NOT COMPILING
    }


    val tcpSource = Tcp().bind(address,port)
    val binding = tcpSource.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started,listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }

  }

  class SomethingMySinkUnderstand(x:String) {

  }
}

更新:将此添加到build.sbt文件以获取必要的deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"

解决方法

handleWith期望Flow,即具有未连接入口和未连接插座的盒子.您可以有效地提供Source,因为您使用to操作将Flow连接到Sink.

我想你可以做到以下几点:

conn.handleWith(
  Flow[ByteString]
    .via(Framing.lengthField(4,ByteOrder.BIG_ENDIAN)) 
    .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
    .alsoTo(mySink)
    .map(_ => ByteString.empty)
    .filter(_ => false) // Prevents sending anything back
)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读