postgresql – 批处理插入时Slick 3.0中的数据库异常
发布时间:2020-12-13 15:52:52 所属栏目:百科 来源:网络整理
导读:通过在光滑3中批量插入每5秒插入数千条记录,我得到了 org.postgresql.util.PSQLException: FATAL: sorry,too many clients already 我的数据访问层看起来像: val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url,user=user,password=p
通过在光滑3中批量插入每5秒插入数千条记录,我得到了
org.postgresql.util.PSQLException: FATAL: sorry,too many clients already 我的数据访问层看起来像: val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url,user=user,password=password,driver= jdbcDriver) override def insertBatch(rowList: List[T#TableElementType]): Future[Long] = { val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable=> RelationalRepositoryUtility.handleBatchOperationErrors(ex)} //db.close() res } override def insertBatchQuery(rowList: List[T#TableElementType]): FixedSqlAction[Option[Int],NoStream,Write] = { query ++= (rowList) } 在插入批处理中关闭连接没有任何影响…它仍然给出相同的错误. 我从我的代码中调用插入批处理,如下所示: val temp1 = list1.flatMap { li => Future.sequence(li.map { trip => val data = for { tripData <- TripDataRepository.insertQuery( trip.tripData) subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id)) } yield ((tripData,subTripData)) val res=db.run(data.transactionally) res //db.close() }) } 如果我在我的工作后关闭连接,你可以在评论代码中看到我得到错误: java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@6c3ae2b6 rejected from java.util.concurrent.ThreadPoolExecutor@79d2d4eb[Terminated,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 1] 在调用没有Future.sequence的方法后,像这样: val temp1 =list.map { trip => val data = for { tripData <- TripDataRepository.insertQuery( trip.tripData) subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id)) } yield ((tripData,subTripData)) val res=db.run(data.transactionally) res } 我仍然有太多的客户错误… 解决方法
这个问题的根源在于,您正在同时启动一个无限的Future列表,每个列表都连接到数据库 – 列表中每个条目一个.
这可以通过串行运行插入来解决,强制每个插入批处理依赖于前一个: // Empty Future for the results. Replace Unit with the correct type - whatever // "res" is below. val emptyFuture = Future.successful(Seq.empty[Unit]) // This will only insert one at a time. You could use list.sliding to batch the // inserts if that was important. val temp1 = list.foldLeft(emptyFuture) { (previousFuture,trip) => previousFuture flatMap { previous => // Inner code copied from your example. val data = for { tripData <- TripDataRepository.insertQuery(trip.tripData) subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id)) } yield ((tripData,subTripData)) val res = db.run(data.transactionally) previous :+ res } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |