播放2 Scala – 用Iteratee上传大型CSV文件以反应性处理每一行的
发布时间:2020-12-16 21:31:01 所属栏目:安全 来源:网络整理
导读:我想使用Play2在弹性搜索上上传非常大的CSV文件(数百行).我写了以下代码,工作正常. 我不满意我在第一个组块中跳过http响应头的方式.应该有一种方式来链接这个迭代,第一个迭代器跳过http头并直接切换到完成状态,但是我还没有找到. 如果有人可以帮忙 object Re
我想使用Play2在弹性搜索上上传非常大的CSV文件(数百行).我写了以下代码,工作正常.
我不满意我在第一个组块中跳过http响应头的方式.应该有一种方式来链接这个迭代,第一个迭代器跳过http头并直接切换到完成状态,但是我还没有找到. 如果有人可以帮忙 object ReactiveFileUpload extends Controller { def upload = Action(BodyParser(rh => new CsvIteratee(isFirst = true))) { request => Ok("File Processed") } } case class CsvIteratee(state: Symbol = 'Cont,input: Input[Array[Byte]] = Empty,lastChunk: String = "",isFirst: Boolean = false) extends Iteratee[Array[Byte],Either[Result,String]] { def fold[B]( done: (Either[Result,String],Input[Array[Byte]]) => Promise[B],cont: (Input[Array[Byte]] => Iteratee[Array[Byte],String]]) => Promise[B],error: (String,Input[Array[Byte]]) => Promise[B] ): Promise[B] = state match { case 'Done => done(Right(lastChunk),Input.Empty) case 'Cont => cont(in => in match { case in: El[Array[Byte]] => { // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk val content = lastChunk + new String(in.e) val csvBody = if (isFirst) // Skip http header if it is the first chunk content.drop(content.indexOf("rnrn") + 4) else content val csv = new CSVReader(new StringReader(csvBody),';') val lines = csv.readAll // Process all lines excepted the last one since it is cut by the chunk for (line <- lines.init) processLine(line) // Put forward the part that has not been processed val last = lines.last.toList.mkString(";") copy(input = in,lastChunk = last,isFirst = false) } case Empty => copy(input = in,isFirst = false) case EOF => copy(state = 'Done,input = in,isFirst = false) case _ => copy(state = 'Error,isFirst = false) }) case _ => error("Unexpected state",input) } def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post( toJson( Map( "date" -> toJson(line(0)),"trig" -> toJson(line(1)),"code" -> toJson(line(2)),"nbjours" -> toJson(line(3).toDouble) ) ) ) } 解决方法
要将两个迭代链接在一起,请使用flatMap.
val combinedIteratee = firstIteratee.flatMap(firstResult => secondIteratee) 或者,用于理解: val combinedIteratee = for { firstResult <- firstIteratee secondResult <- secondIteratee } yield secondResult 您可以使用flatMap按照您喜欢的顺序排列多个迭代. 你可能想做一些像: val headerAndCsvIteratee = for { headerResult <- headerIteratee csvResult <- csvIteratee } yield csvResult (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |