加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

面对使用Scala Slick MySQL Akka Stream的问题

发布时间:2020-12-16 18:08:31 所属栏目:安全 来源:网络整理
导读:问题陈述:我们将 MySQL DB表中特定模块的用户的所有传入请求参数作为一行添加(这是一个巨大的数据).现在,我们想要设计一个进程,该进程将从该表中读取每条记录,并通过调用第三方API获得有关该用户请求的更多信息,之后它将把这个返回的元信息放在另一个表中.
问题陈述:我们将 MySQL DB表中特定模块的用户的所有传入请求参数作为一行添加(这是一个巨大的数据).现在,我们想要设计一个进程,该进程将从该表中读取每条记录,并通过调用第三方API获得有关该用户请求的更多信息,之后它将把这个返回的元信息放在另一个表中.

目前的尝试:

我正在使用Scala Slick来做到这一点.由于要读取的数据很大,我想一次一行地读取该表并进行处理.我尝试使用光滑的akka??流,但是我得到’java.util.concurrent.RejectedExecutionException’

以下是我尝试过的粗略逻辑,

implicit val system = ActorSystem("Example")
import system.dispatcher
implicit val materializer = ActorMaterializer()

val future = db.stream(SomeQuery.result)
Source.fromPublisher(future).map(row => {
        id = dataEnrichmentAPI.process(row)

}).runForeach(id => println("Processed row : "+ id))

dataEnrichmentAPI.process:此函数进行第三方REST调用,并执行一些数据库查询以获取所需数据.这个数据库查询是使用’db.run’方法完成的,它也会一直等到它完成(使用Await)

例如.,

def process(row: RequestRecord): Int = {
   // SomeQuery2 = Check if data is already there in DB
   val retId: Seq[Int] = Await.result(db.run(SomeQuery2.result),Duration.Inf)
   if(retId.isEmpty){
         val metaData = RestCall()
         // SomeQuery3 = Store this metaData in DB
         Await.result(db.run(SomeQuery3.result),Duration.Inf)
         return metaData.id;      
   }else{
       // SomeQuery4 = Get meta data id 
      return Await.result(db.run(SomeQuery4.result),Duration.Inf)     
   }
 }

我正在使用阻止调用DB的这个异常.我不认为我是否可以摆脱它,因为后续流程需要返回值才能继续.

“阻止呼叫”是否是此异常背后的原因?
解决此类问题的最佳做法是什么?

谢谢.

解决方法

我不知道这是不是你的问题(细节太少),但你永远不应该阻止.

说到最佳实践,我们async stages代替.
在不使用Await.result的情况下,这或多或少是您的代码的样子:

def process(row: RequestRecord): Future[Int] = {
   db.run(SomeQuery2.result) flatMap { 
      case retId if retId.isEmpty =>
        // what is this? is it a sync call? if it's a rest call it should return a future
        val metaData = RestCall() 
        db.run(SomeQuery3.result).map(_ => metaData.id)

      case _ => db.run(SomeQuery4.result)
   }
 }


Source.fromPublisher(db.stream(SomeQuery.result))
  // choose your own parallelism
  .mapAsync(2)(dataEnrichmentAPI.process)
  .runForeach(id => println("Processed row : "+ id))

这样,您将明确地和惯用地处理背压和并行.

尽量不要在生产代码中调用Await.result,只能在compose futures using map,flatMap and for comprehensions调用

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读