加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

SDP(7):Cassandra- Cassandra-Engine:Streaming

发布时间:2020-12-16 09:44:44 所属栏目:安全 来源:网络整理
导读:? akka在alpakka工具包里提供了对cassandra数据库的streaming功能。简单来讲就是用一个CQL-statement读取cassandra数据并产生akka-stream的Source。这是一个支持reactive-stream协议的流: object CassandraSource { /** * Scala API: creates a [[Cassandra

? akka在alpakka工具包里提供了对cassandra数据库的streaming功能。简单来讲就是用一个CQL-statement读取cassandra数据并产生akka-stream的Source。这是一个支持reactive-stream协议的流:

object CassandraSource {

  /**
   * Scala API: creates a [[CassandraSourceStage]] from a given statement.
   */
  def apply(stmt: Statement)(implicit session: Session): Source[Row,NotUsed] =
    Source.fromGraph(new CassandraSourceStage(Future.successful(stmt),session))

  /**
   * Scala API: creates a [[CassandraSourceStage]] from the result of a given Future.
   */
  def fromFuture(futStmt: Future[Statement])(implicit session: Session): Source[Row,NotUsed] =
    Source.fromGraph(new CassandraSourceStage(futStmt,session))

}

CassandraSource.apply构建Source[Row,NotUsed]。可以直接接通Flow[Row,Row,NotUsed]和Sink来使用。我们是通过CQLQueryContext来构建这个Source的:

def cassandraStream[A](ctx: CQLQueryContext[A])
       (implicit session: Session,ec: ExecutionContextExecutor): Source[A,NotUsed] = {

    val prepStmt = session.prepare(ctx.statement)
    var boundStmt =  prepStmt.bind()
    val params = processParameters(ctx.parameter)
    boundStmt = prepStmt.bind(params:_*)
    ctx.consistency.foreach {consistency =>
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))}

    CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor)
   }

CQLQueryContext[A]在上期介绍过:

case class CQLQueryContext[M](
                               statement: String,extractor: Row => M,parameter: Seq[Object] = Nil,consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,fetchSize: Int = 100
                             ) { ctx =>
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] =
     ctx.copy(consistency = Some(_consistency))
  def setFetchSize(pageSize: Int): CQLQueryContext[M] =
    ctx.copy(fetchSize = pageSize)
}
object CQLQueryContext {
  def apply[M](stmt: String,converter: Row => M): CQLQueryContext[M] =
    new CQLQueryContext[M](statement = stmt,extractor = converter)
}

下面是一个流的构建和使用示范:

case class Model (
                       rowid: Long,measureid: Long,state: String,county: String,year: Int,value: Int,createdAt: java.util.Date
                     )
  //data row converter
  val toModel = (rs: Row) => Model(
    rowid = rs.getLong("ROWID"),measureid = rs.getLong("MEASUREID"),state = rs.getString("STATENAME"),county = rs.getString("COUNTYNAME"),year = rs.getInt("REPORTYEAR"),value = rs.getInt("VALUE"),createdAt = rs.getTimestamp("CREATED")
  )

  //setup context
  val ctx = CQLQueryContext("select * from testdb.aqmrpt",toModel)
  //construct source
  val src = cassandraStream(ctx)
  //a display sink
  val snk = Sink.foreach[Model]{ r =>
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
  }
  //run on source
  src.to(snk).run()

除了通过读取数据构成stream source之外,我们还可以以流元素为数据进行数据库更新操作,因为我们可以用map来运行execute:

case class CassandraActionStream[R](parallelism: Int = 1,processInOrder: Boolean = true,statement: String,prepareParams: R => Seq[Object],consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
    def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
    def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
    def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
      cas.copy(consistency = Some(_consistency))

    def perform(r: R)(implicit session: Session,ec: ExecutionContext) = {
      val prepStmt = session.prepare(statement)
      var boundStmt =  prepStmt.bind()
      val params = processParameters(prepareParams(r))
      boundStmt = prepStmt.bind(params:_*)
      consistency.foreach { cons =>
        boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
      }
      session.executeAsync(boundStmt).map(_ => r)
    }
    def performOnRow(implicit session: Session,ec: ExecutionContext): Flow[R,R,NotUsed] =
      if (processInOrder)
        Flow[R].mapAsync(parallelism)(perform)
      else
        Flow[R].mapAsyncUnordered(parallelism)(perform)

  }
  object CassandraActionStream {
    def apply[R](_statement: String,params: R => Seq[Object]): CassandraActionStream[R] =
      new CassandraActionStream[R]( statement=_statement,prepareParams = params)
  }

CassandraActionStream可以用statement,params构建。它的一个函数performOnRow是一个Flow[R,NotUsed],可以把每个R转换成一条CQL后通过map来运行executeAsyn,造成一种批次运算效果。下面是CassandraActionStream的使用示范:

//pass context to construct akka-source
  val jdbcSource = jdbcAkkaStream(ctx)

  val cqlInsert = "insert into testdb.AQMRPT(ROWID,MEASUREID,STATENAME," +
    "COUNTYNAME,REPORTYEAR,VALUE,CREATED) VALUES(?,?,?)"

  val toPparams: DataRow => Seq[Object] = r => {
    Seq[Object](r.rowid.asInstanceOf[Object],r.measureid.asInstanceOf[Object],r.state,r.county,r.year.asInstanceOf[Object],r.value.asInstanceOf[Object],CQLDateTimeNow
    )
  }

  val actionStream = CassandraActionStream(cqlInsert,toPparams).setParallelism(2)
    .setProcessOrder(false)
  val actionFlow: Flow[DataRow,DataRow,NotUsed] = actionStream.performOnRow

  val sink = Sink.foreach[DataRow]{ r =>
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
  }
  val sts = jdbcSource.take(100).via(actionFlow).to(sink).run()

下面的例子里我们用CassandraStream的流元素更新h2数据库中的数据,调用了JDBCActionStream:

//data row converter
  val toModel = (rs: Row) => Model(
    rowid = rs.getLong("ROWID"),createdAt = rs.getTimestamp("CREATED")
  )

  //setup context
  val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",toModel)
  //construct source
  val src = cassandraStream(cqlCtx)
  //a display sink
  val snk = Sink.foreach[Model]{ r =>
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
  }
  //run on source
  src.to(snk).run()

  val params: Model => Seq[Any] = row => {
    Seq((row.value * 10),row.rowid) }

  val jdbcActionStream = JDBCActionStream('h2,"update AQMRPT set total = ? where rowid = ?",params)
      .setParallelism(2).setProcessOrder(false)
  val jdbcActionFlow = jdbcActionStream.performOnRow

  //update rows in h2 database from data in cassandra database
  src.via(jdbcActionFlow).to(snk).run()

下面是本次示范的源代码:

build.sbt

name := "learn_cassandra"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies := Seq(
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0","com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0","com.typesafe.akka" %% "akka-actor" % "2.5.4","com.typesafe.akka" %% "akka-stream" % "2.5.4","com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16","org.scalikejdbc" %% "scalikejdbc"       % "3.2.1","org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test","org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1","org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1","org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1","com.h2database"  %  "h2"                % "1.4.196","mysql" % "mysql-connector-java" % "6.0.6","org.postgresql" % "postgresql" % "42.2.0","commons-dbcp" % "commons-dbcp" % "1.4","org.apache.tomcat" % "tomcat-jdbc" % "9.0.2","com.zaxxer" % "HikariCP" % "2.7.4","com.jolbox" % "bonecp" % "0.8.0.RELEASE","com.typesafe.slick" %% "slick" % "3.2.1","ch.qos.logback"  %  "logback-classic"   % "1.2.3")

resources/application.conf

# JDBC settings
test {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "commons-dbcp2"
    }
  }

  db.mysql.driver = "com.mysql.cj.jdbc.Driver"
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
  db.mysql.user = "root"
  db.mysql.password = "123"
  db.mysql.poolInitialSize = 5
  db.mysql.poolMaxSize = 7
  db.mysql.poolConnectionTimeoutMillis = 1000
  db.mysql.poolValidationQuery = "select 1 as one"
  db.mysql.poolFactoryName = "bonecp"

  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
    mysql {
      driver = "com.mysql.cj.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/testdb"
      user = "root"
      password = "123"
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "bonecp"

    }
    postgres {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://localhost:5432/testdb"
      user = "root"
      password = "123"
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
  }
  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}

CassandraEngine.scala

import com.datastax.driver.core._

import scala.concurrent._
import com.google.common.util.concurrent.{FutureCallback,Futures,ListenableFuture}

import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration
import akka.NotUsed
import akka.stream.alpakka.cassandra.scaladsl._
import akka.stream.scaladsl._

object CQLContext {
  // Consistency Levels
  type CONSISTENCY_LEVEL = Int
  val ANY: CONSISTENCY_LEVEL          =                                        0x0000
  val ONE: CONSISTENCY_LEVEL          =                                        0x0001
  val TWO: CONSISTENCY_LEVEL          =                                        0x0002
  val THREE: CONSISTENCY_LEVEL        =                                        0x0003
  val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004
  val ALL: CONSISTENCY_LEVEL          =                                        0x0005
  val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006
  val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007
  val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000A
  val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000B
  val SERIAL: CONSISTENCY_LEVEL       =                                      0x000C

  def apply(): CQLContext = CQLContext(statements = Nil)

  def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
    consistency match {
      case ALL => ConsistencyLevel.ALL
      case ONE => ConsistencyLevel.ONE
      case TWO => ConsistencyLevel.TWO
      case THREE => ConsistencyLevel.THREE
      case ANY => ConsistencyLevel.ANY
      case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
      case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
      case QUORUM => ConsistencyLevel.QUORUM
      case SERIAL => ConsistencyLevel.SERIAL
      case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL

    }
  }

}
case class CQLQueryContext[M](
                               statement: String,extractor = converter)
}

case class CQLContext(
                       statements: Seq[String],parameters: Seq[Seq[Object]] = Nil,consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
                     ) { ctx =>

  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
    ctx.copy(consistency = Some(_consistency))
  def setCommand(_statement: String,_parameters: Object*): CQLContext =
    ctx.copy(statements = Seq(_statement),parameters = Seq(_parameters))
  def appendCommand(_statement: String,_parameters: Object*): CQLContext =
    ctx.copy(statements = ctx.statements :+ _statement,parameters = ctx.parameters ++ Seq(_parameters))
}

object CQLEngine {
  import CQLContext._
  import CQLHelpers._

  def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A],pageSize: Int = 100)(
    implicit session: Session,cbf: CanBuildFrom[Nothing,A,C[A]]): (ResultSet,C[A])= {

    val prepStmt = session.prepare(ctx.statement)

    var boundStmt =  prepStmt.bind()
    if (ctx.parameter != Nil) {
      val params = processParameters(ctx.parameter)
      boundStmt = prepStmt.bind(params:_*)
    }

    ctx.consistency.foreach {consistency =>
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))}

    val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
    (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
  }
  def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet,timeOut: Duration)(
    extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing,Option[C[A]]) =
    if (resultSet.isFullyFetched) {
      (resultSet,None)
    } else {
      try {
        val result = Await.result(resultSet.fetchMoreResults(),timeOut)
        (result,Some((result.asScala.view.map(extractor)).to[C]))
      } catch { case e: Throwable => (resultSet,None) }
    }
  def cqlExecute(ctx: CQLContext)(
    implicit session: Session,ec: ExecutionContext): Future[Boolean] = {
    if (ctx.statements.size == 1)
      cqlSingleUpdate(ctx)
    else
      cqlMultiUpdate(ctx)
  }
  def cqlSingleUpdate(ctx: CQLContext)(
    implicit session: Session,ec: ExecutionContext): Future[Boolean] = {

    val prepStmt = session.prepare(ctx.statements.head)

    var boundStmt =  prepStmt.bind()
    if (ctx.parameters != Nil) {
      val params = processParameters(ctx.parameters.head)
      boundStmt = prepStmt.bind(params:_*)
    }

    ctx.consistency.foreach {consistency =>
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
    session.executeAsync(boundStmt).map(_.wasApplied())
  }
  def cqlMultiUpdate(ctx: CQLContext)(
    implicit session: Session,ec: ExecutionContext): Future[Boolean] = {
    val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
    var batch = new BatchStatement()
    commands.foreach { case (stm,params) =>
      val prepStmt = session.prepare(stm)
      if (params == Nil)
        batch.add(prepStmt.bind())
      else {
        val p = processParameters(params)
        batch.add(prepStmt.bind(p: _*))
      }
    }
    ctx.consistency.foreach {consistency =>
      batch.setConsistencyLevel(consistencyLevel(consistency))}
    session.executeAsync(batch).map(_.wasApplied())
  }

  def cassandraStream[A](ctx: CQLQueryContext[A])
       (implicit session: Session,NotUsed] = {

    val prepStmt = session.prepare(ctx.statement)
    var boundStmt =  prepStmt.bind()
    val params = processParameters(ctx.parameter)
    boundStmt = prepStmt.bind(params:_*)
    ctx.consistency.foreach {consistency =>
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))}

    CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor)
   }

  case class CassandraActionStream[R](parallelism: Int = 1,consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
    def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
    def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
    def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
      cas.copy(consistency = Some(_consistency))

    private def perform(r: R)(implicit session: Session,prepareParams = params)
  }

}
object CQLHelpers {
  import java.nio.ByteBuffer
  import java.io._
  import java.nio.file._
  import com.datastax.driver.core.LocalDate
  import com.datastax.driver.extras.codecs.jdk8.InstantCodec
  import java.time.Instant
  import akka.stream.scaladsl._
  import akka.stream._


  implicit def listenableFutureToFuture[T](
                                            listenableFuture: ListenableFuture[T]): Future[T] = {
    val promise = Promise[T]()
    Futures.addCallback(listenableFuture,new FutureCallback[T] {
      def onFailure(error: Throwable): Unit = {
        promise.failure(error)
        ()
      }
      def onSuccess(result: T): Unit = {
        promise.success(result)
        ()
      }
    })
    promise.future
  }

  case class CQLDate(year: Int,month: Int,day: Int)
  case object CQLTodayDate
  case class CQLDateTime(year: Int,Month: Int,day: Int,hour: Int,minute: Int,second: Int,millisec: Int = 0)
  case object CQLDateTimeNow

  def processParameters(params: Seq[Object]): Seq[Object] = {
    import java.time.{Clock,ZoneId}
    params.map { obj =>
      obj match {
        case CQLDate(yy,mm,dd) => LocalDate.fromYearMonthDay(yy,dd)
        case CQLTodayDate =>
          val today = java.time.LocalDate.now()
          LocalDate.fromYearMonthDay(today.getYear,today.getMonth.getValue,today.getDayOfMonth)
        case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST",ZoneId.SHORT_IDS)))
        case CQLDateTime(yy,dd,hr,ms,sc,mi) =>
          Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
        case p@_ => p
      }
    }
  }
  class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
    override def read: Int = {
      if (!buf.hasRemaining) return -1
      buf.get
    }

    override def read(bytes: Array[Byte],off: Int,len: Int): Int = {
      val length: Int = Math.min(len,buf.remaining)
      buf.get(bytes,off,length)
      length
    }
  }
  object ByteBufferInputStream {
    def apply(buf: ByteBuffer): ByteBufferInputStream = {
      new ByteBufferInputStream(buf)
    }
  }
  class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {

    override def write(b: Int): Unit = {
      buf.put(b.toByte)
    }

    override def write(bytes: Array[Byte],len: Int): Unit = {
      buf.put(bytes,len)
    }
  }
  object FixsizedByteBufferOutputStream {
    def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
  }
  class ExpandingByteBufferOutputStream(var buf: ByteBuffer,onHeap: Boolean) extends OutputStream {

    private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR

    override def write(b: Array[Byte],len: Int): Unit = {
      val position = buf.position
      val limit = buf.limit
      val newTotal: Long = position + len
      if(newTotal > limit){
        var capacity = (buf.capacity * increasing)
        while(capacity <= newTotal){
          capacity = (capacity*increasing)
        }
        increase(capacity.toInt)
      }

      buf.put(b,len)
    }

    override def write(b: Int): Unit= {
      if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
      buf.put(b.toByte)
    }
    protected def increase(newCapacity: Int): Unit = {
      buf.limit(buf.position)
      buf.rewind
      val newBuffer =
        if (onHeap) ByteBuffer.allocate(newCapacity)
        else  ByteBuffer.allocateDirect(newCapacity)
      newBuffer.put(buf)
      buf.clear
      buf = newBuffer
    }
    def size: Long = buf.position
    def capacity: Long = buf.capacity
    def byteBuffer: ByteBuffer = buf
  }
  object ExpandingByteBufferOutputStream {
    val DEFAULT_INCREASING_FACTOR = 1.5f
    def apply(size: Int,increasingBy: Float,onHeap: Boolean) = {
      if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
      val buffer: ByteBuffer =
        if (onHeap) ByteBuffer.allocate(size)
        else ByteBuffer.allocateDirect(size)
      new ExpandingByteBufferOutputStream(buffer,onHeap)
    }
    def apply(size: Int): ExpandingByteBufferOutputStream = {
      apply(size,ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR,false)
    }

    def apply(size: Int,onHeap: Boolean): ExpandingByteBufferOutputStream = {
      apply(size,onHeap)
    }

    def apply(size: Int,increasingBy: Float): ExpandingByteBufferOutputStream = {
      apply(size,increasingBy,false)
    }

  }
  def cqlFileToBytes(fileName: String): ByteBuffer = {
    val fis = new FileInputStream(fileName)
    val b = new Array[Byte](fis.available + 1)
    val length = b.length
    fis.read(b)
    ByteBuffer.wrap(b)
  }
  def cqlBytesToFile(bytes: ByteBuffer,fileName: String)(
    implicit mat: Materializer): Future[IOResult] = {
    val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }
  def cqlDateTimeString(date: java.util.Date,fmt: String): String = {
    val outputFormat = new java.text.SimpleDateFormat(fmt)
    outputFormat.format(date)
  }
  def useJava8DateTime(cluster: Cluster) = {
    //for jdk8 datetime format
    cluster.getConfiguration().getCodecRegistry()
      .register(InstantCodec.instance)
  }
}

JDBCEngine.scala

package jdbccontext
import java.sql.PreparedStatement

import scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed

import scala.util._
import scalikejdbc.TxBoundary.Try._

import scala.concurrent.ExecutionContextExecutor

object JDBCContext {
  type SQLTYPE = Int
  val SQL_EXEDDL= 1
  val SQL_UPDATE = 2
  val RETURN_GENERATED_KEYVALUE = true
  val RETURN_UPDATED_COUNT = false

}

case class JDBCQueryContext[M](
                                dbName: Symbol,parameters: Seq[Any] = Nil,fetchSize: Int = 100,autoCommit: Boolean = false,queryTimeout: Option[Int] = None,extractor: WrappedResultSet => M)


case class JDBCContext(
                        dbName: Symbol,statements: Seq[String] = Nil,parameters: Seq[Seq[Any]] = Nil,queryTags: Seq[String] = Nil,sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,batch: Boolean = false,returnGeneratedKey: Seq[Option[Any]] = Nil,// no return: None,return by index: Some(1),by name: Some("id")
                        preAction: Option[PreparedStatement => Unit] = None,postAction: Option[PreparedStatement => Unit] = None) {

  ctx =>

  //helper functions

  def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)

  def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)

  def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)

  def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)

  def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
      !ctx.batch && ctx.statements.size == 1)
      ctx.copy(preAction = action)
    else
      throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
  }

  def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
      !ctx.batch && ctx.statements.size == 1)
      ctx.copy(postAction = action)
    else
      throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
  }

  def appendDDLCommand(_statement: String,_parameters: Any*): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
      ctx.copy(
        statements = ctx.statements ++ Seq(_statement),parameters = ctx.parameters ++ Seq(Seq(_parameters))
      )
    } else
      throw new IllegalStateException("JDBCContex setting error: option not supported!")
  }

  def appendUpdateCommand(_returnGeneratedKey: Boolean,_statement: String,_parameters: Any*): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
      ctx.copy(
        statements = ctx.statements ++ Seq(_statement),parameters = ctx.parameters ++ Seq(_parameters),returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
      )
    } else
      throw new IllegalStateException("JDBCContex setting error: option not supported!")
  }

  def appendBatchParameters(_parameters: Any*): JDBCContext = {
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
      throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")

    var matchParams = true
    if (ctx.parameters != Nil)
      if (ctx.parameters.head.size != _parameters.size)
        matchParams = false
    if (matchParams) {
      ctx.copy(
        parameters = ctx.parameters ++ Seq(_parameters)
      )
    } else
      throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
  }

  def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
      throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
    ctx.copy(
      returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
    )
  }

  def setDDLCommand(_statement: String,_parameters: Any*): JDBCContext = {
    ctx.copy(
      statements = Seq(_statement),parameters = Seq(_parameters),sqlType = JDBCContext.SQL_EXEDDL,batch = false
    )
  }

  def setUpdateCommand(_returnGeneratedKey: Boolean,returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),sqlType = JDBCContext.SQL_UPDATE,batch = false
    )
  }
  def setBatchCommand(_statement: String): JDBCContext = {
    ctx.copy (
      statements = Seq(_statement),batch = true
    )
  }
}

object JDBCEngine {

  import JDBCContext._

  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
    throw new IllegalStateException(message)
  }

  def jdbcAkkaStream[A](ctx: JDBCQueryContext[A])
                       (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {

    val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {
      val rawSql = new SQLToCollectionImpl[A,NoExtractor](ctx.statement,ctx.parameters)(noExtractor(""))
      ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
      val sql: SQL[A,HasExtractor] = rawSql.map(ctx.extractor)

      sql.iterator
        .withDBSessionForceAdjuster(session => {
          session.connection.setAutoCommit(ctx.autoCommit)
          session.fetchSize(ctx.fetchSize)
        })
    }
    Source.fromPublisher[A](publisher)
  }


  def jdbcQueryResult[C[_] <: TraversableOnce[_],A](
                                                      ctx: JDBCQueryContext[A])(
                                                      implicit cbf: CanBuildFrom[Nothing,C[A]]): C[A] = {

    val rawSql = new SQLToCollectionImpl[A,ctx.parameters)(noExtractor(""))
    ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
    rawSql.fetchSize(ctx.fetchSize)
    implicit val session = NamedAutoSession(ctx.dbName)
    val sql: SQL[A,HasExtractor] = rawSql.map(ctx.extractor)
    sql.collection.apply[C]()

  }

  def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {
    if (ctx.sqlType != SQL_EXEDDL) {
      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
    }
    else {
      NamedDB(ctx.dbName) localTx { implicit session =>
        Try {
          ctx.statements.foreach { stm =>
            val ddl = new SQLExecution(statement = stm,parameters = Nil)(
              before = WrappedResultSet => {})(
              after = WrappedResultSet => {})

            ddl.apply()
          }
          "SQL_EXEDDL executed succesfully."
        }
      }
    }
  }

  def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing,Long,C[Long]]): Try[C[Long]] = {
    if (ctx.statements == Nil)
      throw new IllegalStateException("JDBCContex setting error: statements empty!")
    if (ctx.sqlType != SQL_UPDATE) {
      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
    }
    else {
      if (ctx.batch) {
        if (noReturnKey(ctx)) {
          val usql = SQL(ctx.statements.head)
            .tags(ctx.queryTags: _*)
            .batch(ctx.parameters: _*)
          Try {
            NamedDB(ctx.dbName) localTx { implicit session =>
              ctx.queryTimeout.foreach(session.queryTimeout(_))
              usql.apply[Seq]()
              Seq.empty[Long].to[C]
            }
          }
        } else {
          val usql = new SQLBatchWithGeneratedKey(ctx.statements.head,ctx.parameters,ctx.queryTags)(None)
          Try {
            NamedDB(ctx.dbName) localTx { implicit session =>
              ctx.queryTimeout.foreach(session.queryTimeout(_))
              usql.apply[C]()
            }
          }
        }

      } else {
        Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
      }
    }
  }
  private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = {
    val Some(key) :: xs = ctx.returnGeneratedKey
    val params: Seq[Any] = ctx.parameters match {
      case Nil => Nil
      case p@_ => p.head
    }
    val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head,params,ctx.queryTags)(key)
    Try {
      NamedDB(ctx.dbName) localTx { implicit session =>
        session.fetchSize(ctx.fetchSize)
        ctx.queryTimeout.foreach(session.queryTimeout(_))
        val result = usql.apply()
        Seq(result).to[C]
      }
    }
  }

  private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = {
    val params: Seq[Any] = ctx.parameters match {
      case Nil => Nil
      case p@_ => p.head
    }
    val before = ctx.preAction match {
      case None => pstm: PreparedStatement => {}
      case Some(f) => f
    }
    val after = ctx.postAction match {
      case None => pstm: PreparedStatement => {}
      case Some(f) => f
    }
    val usql = new SQLUpdate(ctx.statements.head,ctx.queryTags)(before)(after)
    Try {
      NamedDB(ctx.dbName) localTx {implicit session =>
        session.fetchSize(ctx.fetchSize)
        ctx.queryTimeout.foreach(session.queryTimeout(_))
        val result = usql.apply()
        Seq(result.toLong).to[C]
      }
    }

  }

  private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = {
    if (noReturnKey(ctx))
      singleTxUpdateNoReturnKey(ctx)
    else
      singleTxUpdateWithReturnKey(ctx)
  }

  private def noReturnKey(ctx: JDBCContext): Boolean = {
    if (ctx.returnGeneratedKey != Nil) {
      val k :: xs = ctx.returnGeneratedKey
      k match {
        case None => true
        case Some(k) => false
      }
    } else true
  }

  def noActon: PreparedStatement=>Unit = pstm => {}

  def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = {
    Try {
      NamedDB(ctx.dbName) localTx { implicit session =>
        session.fetchSize(ctx.fetchSize)
        ctx.queryTimeout.foreach(session.queryTimeout(_))
        val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
          case Nil => Seq.fill(ctx.statements.size)(None)
          case k@_ => k
        }
        val sqlcmd = ctx.statements zip ctx.parameters zip keys
        val results = sqlcmd.map { case ((stm,param),key) =>
          key match {
            case None =>
              new SQLUpdate(stm,param,Nil)(noActon)(noActon).apply().toLong
            case Some(k) =>
              new SQLUpdateWithGeneratedKey(stm,Nil)(k).apply().toLong
          }
        }
        results.to[C]
      }
    }
  }


  def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = {
    if (ctx.statements == Nil)
      throw new IllegalStateException("JDBCContex setting error: statements empty!")
    if (ctx.sqlType != SQL_UPDATE) {
      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
    }
    else {
      if (!ctx.batch) {
        if (ctx.statements.size == 1)
          singleTxUpdate(ctx)
        else
          multiTxUpdates(ctx)
      } else
        Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !"))

    }
  }

  case class JDBCActionStream[R](dbName: Symbol,parallelism: Int = 1,prepareParams: R => Seq[Any]) {jas =>
    def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName=db)
    def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism=parLevel)
    def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)

    private def perform(r: R) = {
      import scala.concurrent._
      val params = prepareParams(r)
      NamedDB(dbName) autoCommit { session =>
        session.execute(statement,params: _*)
      }
      Future.successful(r)
    }
    def performOnRow(implicit session: DBSession): Flow[R,NotUsed] =
      if (processInOrder)
        Flow[R].mapAsync(parallelism)(perform)
      else
        Flow[R].mapAsyncUnordered(parallelism)(perform)

  }
  object JDBCActionStream {
    def apply[R](_dbName: Symbol,params: R => Seq[Any]): JDBCActionStream[R] =
      new JDBCActionStream[R](dbName = _dbName,statement=_statement,prepareParams = params)
  }


}

HikariConfig.scala

package configdbs
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository

/** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {
  import scala.collection.JavaConverters._

  def getBooleanOr(path: String,default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
  def getIntOr(path: String,default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
  def getStringOr(path: String,default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
  def getConfigOr(path: String,default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default

  def getMillisecondsOr(path: String,default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path,TimeUnit.MILLISECONDS) else default
  def getDurationOr(path: String,default: => Duration = Duration.Zero) =
    if(c.hasPath(path)) Duration(c.getDuration(path,TimeUnit.MILLISECONDS),TimeUnit.MILLISECONDS) else default

  def getPropertiesOr(path: String,default: => Properties = null): Properties =
    if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default

  def toProperties: Properties = {
    def toProps(m: mutable.Map[String,ConfigValue]): Properties = {
      val props = new Properties(null)
      m.foreach { case (k,cv) =>
        val v =
          if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
          else if(cv.unwrapped eq null) null
          else cv.unwrapped.toString
        if(v ne null) props.put(k,v)
      }
      props
    }
    toProps(c.root.asScala)
  }

  def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
  def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
  def getStringOpt(path: String) = Option(getStringOr(path))
  def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
}

object ConfigExtensionMethods {
  @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
}

trait HikariConfigReader extends TypesafeConfigReader {
  self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>

  import ConfigExtensionMethods.configExtensionMethods

  def getFactoryName(dbName: Symbol): String = {
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
    c.getStringOr("poolFactoryName",ConnectionPoolFactoryRepository.COMMONS_DBCP)
  }

  def hikariCPConfig(dbName: Symbol): HikariConfig = {

    val hconf = new HikariConfig()
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)

    // Connection settings
    if (c.hasPath("dataSourceClass")) {
      hconf.setDataSourceClassName(c.getString("dataSourceClass"))
    } else {
      Option(c.getStringOr("driverClassName",c.getStringOr("driver"))).map(hconf.setDriverClassName _)
    }
    hconf.setJdbcUrl(c.getStringOr("url",null))
    c.getStringOpt("user").foreach(hconf.setUsername)
    c.getStringOpt("password").foreach(hconf.setPassword)
    c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)

    // Pool configuration
    hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout",1000))
    hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout",1000))
    hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout",600000))
    hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime",1800000))
    hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold",0))
    hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast",false))
    c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
    c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
    val numThreads = c.getIntOr("numThreads",20)
    hconf.setMaximumPoolSize(c.getIntOr("maxConnections",numThreads * 5))
    hconf.setMinimumIdle(c.getIntOr("minConnections",numThreads))
    hconf.setPoolName(c.getStringOr("poolName",dbName.name))
    hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans",false))

    // Equivalent of ConnectionPreparer
    hconf.setReadOnly(c.getBooleanOr("readOnly",false))
    c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
    hconf.setCatalog(c.getStringOr("catalog",null))

    hconf

  }
}

import scalikejdbc._
trait ConfigDBs {
  self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>

  def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
    getFactoryName(dbName) match {
      case "hikaricp" => {
        val hconf = hikariCPConfig(dbName)
        val hikariCPSource = new HikariDataSource(hconf)
        if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
          Class.forName(hconf.getDriverClassName)
        }
        ConnectionPool.add(dbName,new DataSourceConnectionPool(hikariCPSource))
      }
      case _ => {
        val JDBCSettings(url,user,password,driver) = readJDBCSettings(dbName)
        val cpSettings = readConnectionPoolSettings(dbName)
        if (driver != null && driver.trim.nonEmpty) {
          Class.forName(driver)
        }
        ConnectionPool.add(dbName,url,cpSettings)
      }
    }
  }

  def setupAll(): Unit = {
    loadGlobalSettings()
    dbNames.foreach { dbName => setup(Symbol(dbName)) }
  }

  def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
    ConnectionPool.close(dbName)
  }

  def closeAll(): Unit = {
    ConnectionPool.closeAll
  }

}


object ConfigDBs extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader

case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader
  with EnvPrefix {

  override val env = Option(envValue)
}

StreamDemo.scala

import com.datastax.driver.core._
import akka._
import CQLEngine._
import CQLHelpers._
import akka.actor._
import akka.stream.scaladsl._
import akka.stream._
import scalikejdbc._
import configdbs._
import jdbccontext._
import JDBCEngine._
import scala.util._
object cassandraStreamDemo extends App {
  //#init-mat
  implicit val cqlsys = ActorSystem("cqlSystem")
  implicit val mat = ActorMaterializer()
  implicit val ec = cqlsys.dispatcher

  val cluster = new Cluster
  .Builder()
    .addContactPoints("localhost")
    .withPort(9042)
    .build()

  useJava8DateTime(cluster)
  implicit val session = cluster.connect()

//config jdbc drivers
  ConfigDBsWithEnv("dev").setup('h2)
  ConfigDBsWithEnv("dev").loadGlobalSettings()


  val cqlCreate =
    """
      |create table testdb.AQMRPT(
      |ROWID BIGINT PRIMARY KEY,|MEASUREID BIGINT,|STATENAME TEXT,|COUNTYNAME TEXT,|REPORTYEAR INT,|VALUE INT,|CREATED TIMESTAMP)
    """.stripMargin
  val ctxCreate = CQLContext().setCommand(cqlCreate)
  cqlExecute(ctxCreate).onComplete{
    case Success(s) => println("schema created successfully!")
    case Failure(e) => println(e.getMessage)
  }

  scala.io.StdIn.readLine()

  case class DataRow (
                  rowid: Long,value: Int
                  )
  val toRow: WrappedResultSet => DataRow = rs => DataRow(
    rowid = rs.long("ROWID"),measureid = rs.long("MEASUREID"),state = rs.string("STATENAME"),county = rs.string("COUNTYNAME"),year = rs.int("REPORTYEAR"),value = rs.int("VALUE")
  )

  //construct the context
  val ctx = JDBCQueryContext[DataRow](
    dbName = 'h2,statement = "select * from AQMRPT",extractor = toRow
  )

  //source from h2 database
  val jdbcSource = jdbcAkkaStream(ctx)

  //insert into cassandra database
  val cqlInsert = "insert into testdb.AQMRPT(ROWID,NotUsed] = actionStream.performOnRow

  val sink = Sink.foreach[DataRow]{ r =>
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
  }
  val sts = jdbcSource.take(100).via(actionFlow).to(sink).run()

  case class Model (
                       rowid: Long,params)
      .setParallelism(2).setProcessOrder(false)
  val jdbcActionFlow = jdbcActionStream.performOnRow

  //update rows in h2 database from data in cassandra database
  src.via(jdbcActionFlow).to(snk).run()


  scala.io.StdIn.readLine()
  session.close()
  cluster.close()

  cqlsys.terminate()

}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读