加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

postgresql – 批处理插入时Slick 3.0中的数据库异常

发布时间:2020-12-13 15:52:52 所属栏目:百科 来源:网络整理
导读:通过在光滑3中批量插入每5秒插入数千条记录,我得到了 org.postgresql.util.PSQLException: FATAL: sorry,too many clients already 我的数据访问层看起来像: val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url,user=user,password=p
通过在光滑3中批量插入每5秒插入数千条记录,我得到了

org.postgresql.util.PSQLException: FATAL: sorry,too many clients already

我的数据访问层看起来像:

val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url,user=user,password=password,driver= jdbcDriver)



 override def insertBatch(rowList: List[T#TableElementType]): Future[Long] = {
    val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable=> RelationalRepositoryUtility.handleBatchOperationErrors(ex)}
//db.close()
        res
      }

  override def insertBatchQuery(rowList: List[T#TableElementType]): FixedSqlAction[Option[Int],NoStream,Write] = {
    query ++= (rowList)
  }

在插入批处理中关闭连接没有任何影响…它仍然给出相同的错误.

我从我的代码中调用插入批处理,如下所示:

val temp1 = list1.flatMap { li =>
        Future.sequence(li.map { trip =>
            val data = for {
              tripData <- TripDataRepository.insertQuery( trip.tripData)
              subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id))
            } yield ((tripData,subTripData))
            val res=db.run(data.transactionally)
          res
//db.close()
        })
      }

如果我在我的工作后关闭连接,你可以在评论代码中看到我得到错误:

java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@6c3ae2b6 rejected from java.util.concurrent.ThreadPoolExecutor@79d2d4eb[Terminated,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 1]

在调用没有Future.sequence的方法后,像这样:

val temp1 =list.map { trip =>
          val data = for {
            tripData <- TripDataRepository.insertQuery( trip.tripData)
            subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id))
          } yield ((tripData,subTripData))
          val res=db.run(data.transactionally)
          res
      }

我仍然有太多的客户错误…

解决方法

这个问题的根源在于,您正在同时启动一个无限的Future列表,每个列表都连接到数据库 – 列表中每个条目一个.

这可以通过串行运行插入来解决,强制每个插入批处理依赖于前一个:

// Empty Future for the results. Replace Unit with the correct type - whatever
// "res" is below.
val emptyFuture = Future.successful(Seq.empty[Unit])
// This will only insert one at a time. You could use list.sliding to batch the
// inserts if that was important.
val temp1 = list.foldLeft(emptyFuture) { (previousFuture,trip) =>
  previousFuture flatMap { previous =>
    // Inner code copied from your example.
    val data = for {
      tripData <- TripDataRepository.insertQuery(trip.tripData)
      subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id))
    } yield ((tripData,subTripData))
    val res = db.run(data.transactionally)
    previous :+ res
  }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读