scala – 使用TCP流并将其重定向到另一个接收器(使用Akka Stream
发布时间:2020-12-16 18:17:34 所属栏目:安全 来源:网络整理
导读:我尝试使用Akka 2.4.3将TCP流重定向/转发到另一个Sink. 该程序应该打开服务器套接字,侦听传入的连接,然后使用tcp流.我们的发件人不希望/接受我们的回复,所以我们永远不会发回任何东西 – 我们只是消费流.在构建tcp流之后,我们需要将字节转换为更有用的字节并
我尝试使用Akka 2.4.3将TCP流重定向/转发到另一个Sink.
该程序应该打开服务器套接字,侦听传入的连接,然后使用tcp流.我们的发件人不希望/接受我们的回复,所以我们永远不会发回任何东西 – 我们只是消费流.在构建tcp流之后,我们需要将字节转换为更有用的字节并将其发送到接收器. 到目前为止我尝试了以下但是我特别努力解决了如何不将tcp数据包发送回发送方并正确连接接收器的部分. import scala.util.Failure import scala.util.Success import akka.actor.ActorSystem import akka.event.Logging import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Tcp import akka.stream.scaladsl.Framing import akka.util.ByteString import java.nio.ByteOrder import akka.stream.scaladsl.Flow object TcpConsumeOnlyStreamToSink { implicit val system = ActorSystem("stream-system") private val log = Logging(system,getClass.getName) //The Sink //In reality this is of course a real Sink doing some useful things :-) //The Sink accept types of "SomethingMySinkUnderstand" val mySink = Sink.ignore; def main(args: Array[String]): Unit = { //our sender is not interested in getting replies from us //so we just want to consume the tcp stream and never send back anything to the sender val (address,port) = ("127.0.0.1",6000) server(system,address,port) } def server(system: ActorSystem,address: String,port: Int): Unit = { implicit val sys = system import system.dispatcher implicit val materializer = ActorMaterializer() val handler = Sink.foreach[Tcp.IncomingConnection] { conn => println("Client connected from: " + conn.remoteAddress) conn handleWith Flow[ByteString] //this is neccessary since we use a self developed tcp wire protocol .via(Framing.lengthField(4,65532,ByteOrder.BIG_ENDIAN)) //here we want to map the raw bytes into something our Sink understands .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) //here we like to connect our Sink to the Tcp Source .to(mySink) //<------ NOT COMPILING } val tcpSource = Tcp().bind(address,port) val binding = tcpSource.to(handler).run() binding.onComplete { case Success(b) => println("Server started,listening on: " + b.localAddress) case Failure(e) => println(s"Server could not bind to $address:$port: ${e.getMessage}") system.terminate() } } class SomethingMySinkUnderstand(x:String) { } } 更新:将此添加到build.sbt文件以获取必要的deps libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3" 解决方法
handleWith期望Flow,即具有未连接入口和未连接插座的盒子.您可以有效地提供Source,因为您使用to操作将Flow连接到Sink.
我想你可以做到以下几点: conn.handleWith( Flow[ByteString] .via(Framing.lengthField(4,ByteOrder.BIG_ENDIAN)) .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) .alsoTo(mySink) .map(_ => ByteString.empty) .filter(_ => false) // Prevents sending anything back ) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |