Scala,读取文件,处理行并使用并发(akka),异步API(nio2)将输出写
发布时间:2020-12-16 18:24:58 所属栏目:安全 来源:网络整理
导读:1:我遇到了一个试图处理大文本文件的问题 – 10Gigs 单线程解决方案如下: val writer = new PrintWriter(new File(output.getOrElse("output.txt")));for(line - scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines()){ writer.println(Diges
1:我遇到了一个试图处理大文本文件的问题 – 10Gigs
单线程解决方案如下: val writer = new PrintWriter(new File(output.getOrElse("output.txt"))); for(line <- scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines()) { writer.println(DigestUtils.HMAC_SHA_256(line)) } writer.close() 2:我尝试使用并发处理 val futures = scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines .map{ s => Future{ DigestUtils.HMAC_SHA_256(s) } }.to val results = futures.map{ Await.result(_,10000 seconds) } 这会导致GC开销限制超出异常(有关stacktrace,请参阅附录A) 3:我在https://github.com/drexin/akka-io-file之后尝试使用Akka IO和AsynchronousFileChannel的组合我能够使用FileSlurp以字节块读取文件,但是无法找到按行读取文件的解决方案. 任何帮助将不胜感激.谢谢. 附录A. [error] (run-main) java.lang.OutOfMemoryError: GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded at java.nio.CharBuffer.wrap(Unknown Source) at sun.nio.cs.StreamDecoder.implRead(Unknown Source) at sun.nio.cs.StreamDecoder.read(Unknown Source) at java.io.InputStreamReader.read(Unknown Source) at java.io.BufferedReader.fill(Unknown Source) at java.io.BufferedReader.readLine(Unknown Source) at java.io.BufferedReader.readLine(Unknown Source) at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.s cala:67) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala: 48) at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:7 16) at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:6 92) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at com.test.Twitterhashconcurrentcli$.doConcurrent(Twitterhashconcu rrentcli.scala:35) at com.test.Twitterhashconcurrentcli$delayedInit$body.apply(Twitter hashconcurrentcli.scala:62) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala: 12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(Traversab leForwarder.scala:32) at scala.App$class.main(App.scala:71) 解决方法
这里的诀窍是避免将所有数据一次性读入内存.如果您迭代并向工作人员发送行,则会冒这个风险,因为发送给actor是异步的,因此您可能会将所有数据读入内存,并且它将位于actor的邮箱中,可能会导致OOM异常.更好的高级方法是使用单个主运算符和下面的子工作池来进行处理.这里的技巧是在主文件中使用一个惰性流(如scala.io.Source.fromX返回的Iterator),然后在worker中使用
work-pulling模式来防止他们的邮箱填满数据.然后,当迭代器不再有任何行时,主服务器会自行停止,这将停止工作者(如果需要,您也可以使用此点关闭actor系统,如果这是你真正想做的事情).
这是一个非常粗略的轮廓.请注意,我还没有测试过这个: import akka.actor._ import akka.routing.RoundRobinLike import akka.routing.RoundRobinRouter import scala.io.Source import akka.routing.Broadcast object FileReadMaster{ case class ProcessFile(filePath:String) case class ProcessLines(lines:List[String],last:Boolean = false) case class LinesProcessed(lines:List[String],last:Boolean = false) case object WorkAvailable case object GimmeeWork } class FileReadMaster extends Actor{ import FileReadMaster._ val workChunkSize = 10 val workersCount = 10 def receive = waitingToProcess def waitingToProcess:Receive = { case ProcessFile(path) => val workers = (for(i <- 1 to workersCount) yield context.actorOf(Props[FileReadWorker])).toList val workersPool = context.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = workers))) val it = Source.fromFile(path).getLines workersPool ! Broadcast(WorkAvailable) context.become(processing(it,workersPool,workers.size)) //Setup deathwatch on all workers foreach (context watch _) } def processing(it:Iterator[String],workers:ActorRef,workersRunning:Int):Receive = { case ProcessFile(path) => sender ! Status.Failure(new Exception("already processing!!!")) case GimmeeWork if it.hasNext => val lines = List.fill(workChunkSize){ if (it.hasNext) Some(it.next) else None }.flatten sender ! ProcessLines(lines,it.hasNext) //If no more lines,broadcast poison pill if (!it.hasNext) workers ! Broadcast(PoisonPill) case GimmeeWork => //get here if no more work left case LinesProcessed(lines,last) => //Do something with the lines //Termination for last worker case Terminated(ref) if workersRunning == 1 => //Done with all work,do what you gotta do when done here //Terminared for non-last worker case Terminated(ref) => context.become(processing(it,workers,workersRunning - 1)) } } class FileReadWorker extends Actor{ import FileReadMaster._ def receive = { case ProcessLines(lines,last) => sender ! LinesProcessed(lines.map(_.reverse),last) sender ! GimmeeWork case WorkAvailable => sender ! GimmeeWork } } 这个想法是主人迭代文件的内容并将一大堆工作发送给一个童工池.文件处理开始时,主人告诉所有孩子工作可用.然后每个孩子继续请求工作,直到不再有工作为止.当主人检测到文件被读完时,它会向孩子们播放一个毒丸,让他们完成任何未完成的工作,然后停止.当所有孩子都停下来时,主人可以完成所需的任何清理工作. 同样,根据我的想法,这是非常粗略的.如果我在任何地区离开,请告诉我,我可以修改答案. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |