SDP(7):Cassandra- Cassandra-Engine:Streaming
? akka在alpakka工具包里提供了对cassandra数据库的streaming功能。简单来讲就是用一个CQL-statement读取cassandra数据并产生akka-stream的Source。这是一个支持reactive-stream协议的流: object CassandraSource { /** * Scala API: creates a [[CassandraSourceStage]] from a given statement. */ def apply(stmt: Statement)(implicit session: Session): Source[Row,NotUsed] = Source.fromGraph(new CassandraSourceStage(Future.successful(stmt),session)) /** * Scala API: creates a [[CassandraSourceStage]] from the result of a given Future. */ def fromFuture(futStmt: Future[Statement])(implicit session: Session): Source[Row,NotUsed] = Source.fromGraph(new CassandraSourceStage(futStmt,session)) } CassandraSource.apply构建Source[Row,NotUsed]。可以直接接通Flow[Row,Row,NotUsed]和Sink来使用。我们是通过CQLQueryContext来构建这个Source的: def cassandraStream[A](ctx: CQLQueryContext[A]) (implicit session: Session,ec: ExecutionContextExecutor): Source[A,NotUsed] = { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor) } CQLQueryContext[A]在上期介绍过: case class CQLQueryContext[M]( statement: String,extractor: Row => M,parameter: Seq[Object] = Nil,consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,fetchSize: Int = 100 ) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] = ctx.copy(consistency = Some(_consistency)) def setFetchSize(pageSize: Int): CQLQueryContext[M] = ctx.copy(fetchSize = pageSize) } object CQLQueryContext { def apply[M](stmt: String,converter: Row => M): CQLQueryContext[M] = new CQLQueryContext[M](statement = stmt,extractor = converter) } 下面是一个流的构建和使用示范: case class Model ( rowid: Long,measureid: Long,state: String,county: String,year: Int,value: Int,createdAt: java.util.Date ) //data row converter val toModel = (rs: Row) => Model( rowid = rs.getLong("ROWID"),measureid = rs.getLong("MEASUREID"),state = rs.getString("STATENAME"),county = rs.getString("COUNTYNAME"),year = rs.getInt("REPORTYEAR"),value = rs.getInt("VALUE"),createdAt = rs.getTimestamp("CREATED") ) //setup context val ctx = CQLQueryContext("select * from testdb.aqmrpt",toModel) //construct source val src = cassandraStream(ctx) //a display sink val snk = Sink.foreach[Model]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } //run on source src.to(snk).run() 除了通过读取数据构成stream source之外,我们还可以以流元素为数据进行数据库更新操作,因为我们可以用map来运行execute: case class CassandraActionStream[R](parallelism: Int = 1,processInOrder: Boolean = true,statement: String,prepareParams: R => Seq[Object],consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = cas.copy(consistency = Some(_consistency)) def perform(r: R)(implicit session: Session,ec: ExecutionContext) = { val prepStmt = session.prepare(statement) var boundStmt = prepStmt.bind() val params = processParameters(prepareParams(r)) boundStmt = prepStmt.bind(params:_*) consistency.foreach { cons => boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) } session.executeAsync(boundStmt).map(_ => r) } def performOnRow(implicit session: Session,ec: ExecutionContext): Flow[R,R,NotUsed] = if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) } object CassandraActionStream { def apply[R](_statement: String,params: R => Seq[Object]): CassandraActionStream[R] = new CassandraActionStream[R]( statement=_statement,prepareParams = params) } CassandraActionStream可以用statement,params构建。它的一个函数performOnRow是一个Flow[R,NotUsed],可以把每个R转换成一条CQL后通过map来运行executeAsyn,造成一种批次运算效果。下面是CassandraActionStream的使用示范: //pass context to construct akka-source val jdbcSource = jdbcAkkaStream(ctx) val cqlInsert = "insert into testdb.AQMRPT(ROWID,MEASUREID,STATENAME," + "COUNTYNAME,REPORTYEAR,VALUE,CREATED) VALUES(?,?,?)" val toPparams: DataRow => Seq[Object] = r => { Seq[Object](r.rowid.asInstanceOf[Object],r.measureid.asInstanceOf[Object],r.state,r.county,r.year.asInstanceOf[Object],r.value.asInstanceOf[Object],CQLDateTimeNow ) } val actionStream = CassandraActionStream(cqlInsert,toPparams).setParallelism(2) .setProcessOrder(false) val actionFlow: Flow[DataRow,DataRow,NotUsed] = actionStream.performOnRow val sink = Sink.foreach[DataRow]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } val sts = jdbcSource.take(100).via(actionFlow).to(sink).run() 下面的例子里我们用CassandraStream的流元素更新h2数据库中的数据,调用了JDBCActionStream: //data row converter val toModel = (rs: Row) => Model( rowid = rs.getLong("ROWID"),createdAt = rs.getTimestamp("CREATED") ) //setup context val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",toModel) //construct source val src = cassandraStream(cqlCtx) //a display sink val snk = Sink.foreach[Model]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } //run on source src.to(snk).run() val params: Model => Seq[Any] = row => { Seq((row.value * 10),row.rowid) } val jdbcActionStream = JDBCActionStream('h2,"update AQMRPT set total = ? where rowid = ?",params) .setParallelism(2).setProcessOrder(false) val jdbcActionFlow = jdbcActionStream.performOnRow //update rows in h2 database from data in cassandra database src.via(jdbcActionFlow).to(snk).run() 下面是本次示范的源代码: build.sbt name := "learn_cassandra" version := "0.1" scalaVersion := "2.12.4" libraryDependencies := Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0","com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0","com.typesafe.akka" %% "akka-actor" % "2.5.4","com.typesafe.akka" %% "akka-stream" % "2.5.4","com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16","org.scalikejdbc" %% "scalikejdbc" % "3.2.1","org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test","org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1","org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1","org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1","com.h2database" % "h2" % "1.4.196","mysql" % "mysql-connector-java" % "6.0.6","org.postgresql" % "postgresql" % "42.2.0","commons-dbcp" % "commons-dbcp" % "1.4","org.apache.tomcat" % "tomcat-jdbc" % "9.0.2","com.zaxxer" % "HikariCP" % "2.7.4","com.jolbox" % "bonecp" % "0.8.0.RELEASE","com.typesafe.slick" %% "slick" % "3.2.1","ch.qos.logback" % "logback-classic" % "1.2.3") resources/application.conf # JDBC settings test { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "commons-dbcp2" } } db.mysql.driver = "com.mysql.cj.jdbc.Driver" db.mysql.url = "jdbc:mysql://localhost:3306/testdb" db.mysql.user = "root" db.mysql.password = "123" db.mysql.poolInitialSize = 5 db.mysql.poolMaxSize = 7 db.mysql.poolConnectionTimeoutMillis = 1000 db.mysql.poolValidationQuery = "select 1 as one" db.mysql.poolFactoryName = "bonecp" # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 } dev { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } mysql { driver = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://localhost:3306/testdb" user = "root" password = "123" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "bonecp" } postgres { driver = "org.postgresql.Driver" url = "jdbc:postgresql://localhost:5432/testdb" user = "root" password = "123" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } } # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 } CassandraEngine.scala import com.datastax.driver.core._ import scala.concurrent._ import com.google.common.util.concurrent.{FutureCallback,Futures,ListenableFuture} import scala.collection.JavaConverters._ import scala.collection.generic.CanBuildFrom import scala.concurrent.duration.Duration import akka.NotUsed import akka.stream.alpakka.cassandra.scaladsl._ import akka.stream.scaladsl._ object CQLContext { // Consistency Levels type CONSISTENCY_LEVEL = Int val ANY: CONSISTENCY_LEVEL = 0x0000 val ONE: CONSISTENCY_LEVEL = 0x0001 val TWO: CONSISTENCY_LEVEL = 0x0002 val THREE: CONSISTENCY_LEVEL = 0x0003 val QUORUM : CONSISTENCY_LEVEL = 0x0004 val ALL: CONSISTENCY_LEVEL = 0x0005 val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006 val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007 val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B val SERIAL: CONSISTENCY_LEVEL = 0x000C def apply(): CQLContext = CQLContext(statements = Nil) def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { consistency match { case ALL => ConsistencyLevel.ALL case ONE => ConsistencyLevel.ONE case TWO => ConsistencyLevel.TWO case THREE => ConsistencyLevel.THREE case ANY => ConsistencyLevel.ANY case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE case QUORUM => ConsistencyLevel.QUORUM case SERIAL => ConsistencyLevel.SERIAL case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL } } } case class CQLQueryContext[M]( statement: String,extractor = converter) } case class CQLContext( statements: Seq[String],parameters: Seq[Seq[Object]] = Nil,consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None ) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = ctx.copy(consistency = Some(_consistency)) def setCommand(_statement: String,_parameters: Object*): CQLContext = ctx.copy(statements = Seq(_statement),parameters = Seq(_parameters)) def appendCommand(_statement: String,_parameters: Object*): CQLContext = ctx.copy(statements = ctx.statements :+ _statement,parameters = ctx.parameters ++ Seq(_parameters)) } object CQLEngine { import CQLContext._ import CQLHelpers._ def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A],pageSize: Int = 100)( implicit session: Session,cbf: CanBuildFrom[Nothing,A,C[A]]): (ResultSet,C[A])= { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() if (ctx.parameter != Nil) { val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) } ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C]) } def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet,timeOut: Duration)( extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing,Option[C[A]]) = if (resultSet.isFullyFetched) { (resultSet,None) } else { try { val result = Await.result(resultSet.fetchMoreResults(),timeOut) (result,Some((result.asScala.view.map(extractor)).to[C])) } catch { case e: Throwable => (resultSet,None) } } def cqlExecute(ctx: CQLContext)( implicit session: Session,ec: ExecutionContext): Future[Boolean] = { if (ctx.statements.size == 1) cqlSingleUpdate(ctx) else cqlMultiUpdate(ctx) } def cqlSingleUpdate(ctx: CQLContext)( implicit session: Session,ec: ExecutionContext): Future[Boolean] = { val prepStmt = session.prepare(ctx.statements.head) var boundStmt = prepStmt.bind() if (ctx.parameters != Nil) { val params = processParameters(ctx.parameters.head) boundStmt = prepStmt.bind(params:_*) } ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} session.executeAsync(boundStmt).map(_.wasApplied()) } def cqlMultiUpdate(ctx: CQLContext)( implicit session: Session,ec: ExecutionContext): Future[Boolean] = { val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters var batch = new BatchStatement() commands.foreach { case (stm,params) => val prepStmt = session.prepare(stm) if (params == Nil) batch.add(prepStmt.bind()) else { val p = processParameters(params) batch.add(prepStmt.bind(p: _*)) } } ctx.consistency.foreach {consistency => batch.setConsistencyLevel(consistencyLevel(consistency))} session.executeAsync(batch).map(_.wasApplied()) } def cassandraStream[A](ctx: CQLQueryContext[A]) (implicit session: Session,NotUsed] = { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor) } case class CassandraActionStream[R](parallelism: Int = 1,consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = cas.copy(consistency = Some(_consistency)) private def perform(r: R)(implicit session: Session,prepareParams = params) } } object CQLHelpers { import java.nio.ByteBuffer import java.io._ import java.nio.file._ import com.datastax.driver.core.LocalDate import com.datastax.driver.extras.codecs.jdk8.InstantCodec import java.time.Instant import akka.stream.scaladsl._ import akka.stream._ implicit def listenableFutureToFuture[T]( listenableFuture: ListenableFuture[T]): Future[T] = { val promise = Promise[T]() Futures.addCallback(listenableFuture,new FutureCallback[T] { def onFailure(error: Throwable): Unit = { promise.failure(error) () } def onSuccess(result: T): Unit = { promise.success(result) () } }) promise.future } case class CQLDate(year: Int,month: Int,day: Int) case object CQLTodayDate case class CQLDateTime(year: Int,Month: Int,day: Int,hour: Int,minute: Int,second: Int,millisec: Int = 0) case object CQLDateTimeNow def processParameters(params: Seq[Object]): Seq[Object] = { import java.time.{Clock,ZoneId} params.map { obj => obj match { case CQLDate(yy,mm,dd) => LocalDate.fromYearMonthDay(yy,dd) case CQLTodayDate => val today = java.time.LocalDate.now() LocalDate.fromYearMonthDay(today.getYear,today.getMonth.getValue,today.getDayOfMonth) case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST",ZoneId.SHORT_IDS))) case CQLDateTime(yy,dd,hr,ms,sc,mi) => Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") case p@_ => p } } } class ByteBufferInputStream(buf: ByteBuffer) extends InputStream { override def read: Int = { if (!buf.hasRemaining) return -1 buf.get } override def read(bytes: Array[Byte],off: Int,len: Int): Int = { val length: Int = Math.min(len,buf.remaining) buf.get(bytes,off,length) length } } object ByteBufferInputStream { def apply(buf: ByteBuffer): ByteBufferInputStream = { new ByteBufferInputStream(buf) } } class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream { override def write(b: Int): Unit = { buf.put(b.toByte) } override def write(bytes: Array[Byte],len: Int): Unit = { buf.put(bytes,len) } } object FixsizedByteBufferOutputStream { def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf) } class ExpandingByteBufferOutputStream(var buf: ByteBuffer,onHeap: Boolean) extends OutputStream { private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR override def write(b: Array[Byte],len: Int): Unit = { val position = buf.position val limit = buf.limit val newTotal: Long = position + len if(newTotal > limit){ var capacity = (buf.capacity * increasing) while(capacity <= newTotal){ capacity = (capacity*increasing) } increase(capacity.toInt) } buf.put(b,len) } override def write(b: Int): Unit= { if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt) buf.put(b.toByte) } protected def increase(newCapacity: Int): Unit = { buf.limit(buf.position) buf.rewind val newBuffer = if (onHeap) ByteBuffer.allocate(newCapacity) else ByteBuffer.allocateDirect(newCapacity) newBuffer.put(buf) buf.clear buf = newBuffer } def size: Long = buf.position def capacity: Long = buf.capacity def byteBuffer: ByteBuffer = buf } object ExpandingByteBufferOutputStream { val DEFAULT_INCREASING_FACTOR = 1.5f def apply(size: Int,increasingBy: Float,onHeap: Boolean) = { if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0") val buffer: ByteBuffer = if (onHeap) ByteBuffer.allocate(size) else ByteBuffer.allocateDirect(size) new ExpandingByteBufferOutputStream(buffer,onHeap) } def apply(size: Int): ExpandingByteBufferOutputStream = { apply(size,ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR,false) } def apply(size: Int,onHeap: Boolean): ExpandingByteBufferOutputStream = { apply(size,onHeap) } def apply(size: Int,increasingBy: Float): ExpandingByteBufferOutputStream = { apply(size,increasingBy,false) } } def cqlFileToBytes(fileName: String): ByteBuffer = { val fis = new FileInputStream(fileName) val b = new Array[Byte](fis.available + 1) val length = b.length fis.read(b) ByteBuffer.wrap(b) } def cqlBytesToFile(bytes: ByteBuffer,fileName: String)( implicit mat: Materializer): Future[IOResult] = { val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def cqlDateTimeString(date: java.util.Date,fmt: String): String = { val outputFormat = new java.text.SimpleDateFormat(fmt) outputFormat.format(date) } def useJava8DateTime(cluster: Cluster) = { //for jdk8 datetime format cluster.getConfiguration().getCodecRegistry() .register(InstantCodec.instance) } } JDBCEngine.scala package jdbccontext import java.sql.PreparedStatement import scala.collection.generic.CanBuildFrom import akka.stream.scaladsl._ import scalikejdbc._ import scalikejdbc.streams._ import akka.NotUsed import scala.util._ import scalikejdbc.TxBoundary.Try._ import scala.concurrent.ExecutionContextExecutor object JDBCContext { type SQLTYPE = Int val SQL_EXEDDL= 1 val SQL_UPDATE = 2 val RETURN_GENERATED_KEYVALUE = true val RETURN_UPDATED_COUNT = false } case class JDBCQueryContext[M]( dbName: Symbol,parameters: Seq[Any] = Nil,fetchSize: Int = 100,autoCommit: Boolean = false,queryTimeout: Option[Int] = None,extractor: WrappedResultSet => M) case class JDBCContext( dbName: Symbol,statements: Seq[String] = Nil,parameters: Seq[Seq[Any]] = Nil,queryTags: Seq[String] = Nil,sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,batch: Boolean = false,returnGeneratedKey: Seq[Option[Any]] = Nil,// no return: None,return by index: Some(1),by name: Some("id") preAction: Option[PreparedStatement => Unit] = None,postAction: Option[PreparedStatement => Unit] = None) { ctx => //helper functions def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch && ctx.statements.size == 1) ctx.copy(preAction = action) else throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch && ctx.statements.size == 1) ctx.copy(postAction = action) else throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } def appendDDLCommand(_statement: String,_parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { ctx.copy( statements = ctx.statements ++ Seq(_statement),parameters = ctx.parameters ++ Seq(Seq(_parameters)) ) } else throw new IllegalStateException("JDBCContex setting error: option not supported!") } def appendUpdateCommand(_returnGeneratedKey: Boolean,_statement: String,_parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { ctx.copy( statements = ctx.statements ++ Seq(_statement),parameters = ctx.parameters ++ Seq(_parameters),returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) ) } else throw new IllegalStateException("JDBCContex setting error: option not supported!") } def appendBatchParameters(_parameters: Any*): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") var matchParams = true if (ctx.parameters != Nil) if (ctx.parameters.head.size != _parameters.size) matchParams = false if (matchParams) { ctx.copy( parameters = ctx.parameters ++ Seq(_parameters) ) } else throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") } def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") ctx.copy( returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil ) } def setDDLCommand(_statement: String,_parameters: Any*): JDBCContext = { ctx.copy( statements = Seq(_statement),parameters = Seq(_parameters),sqlType = JDBCContext.SQL_EXEDDL,batch = false ) } def setUpdateCommand(_returnGeneratedKey: Boolean,returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),sqlType = JDBCContext.SQL_UPDATE,batch = false ) } def setBatchCommand(_statement: String): JDBCContext = { ctx.copy ( statements = Seq(_statement),batch = true ) } } object JDBCEngine { import JDBCContext._ private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => throw new IllegalStateException(message) } def jdbcAkkaStream[A](ctx: JDBCQueryContext[A]) (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream { val rawSql = new SQLToCollectionImpl[A,NoExtractor](ctx.statement,ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) val sql: SQL[A,HasExtractor] = rawSql.map(ctx.extractor) sql.iterator .withDBSessionForceAdjuster(session => { session.connection.setAutoCommit(ctx.autoCommit) session.fetchSize(ctx.fetchSize) }) } Source.fromPublisher[A](publisher) } def jdbcQueryResult[C[_] <: TraversableOnce[_],A]( ctx: JDBCQueryContext[A])( implicit cbf: CanBuildFrom[Nothing,C[A]]): C[A] = { val rawSql = new SQLToCollectionImpl[A,ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) rawSql.fetchSize(ctx.fetchSize) implicit val session = NamedAutoSession(ctx.dbName) val sql: SQL[A,HasExtractor] = rawSql.map(ctx.extractor) sql.collection.apply[C]() } def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = { if (ctx.sqlType != SQL_EXEDDL) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) } else { NamedDB(ctx.dbName) localTx { implicit session => Try { ctx.statements.foreach { stm => val ddl = new SQLExecution(statement = stm,parameters = Nil)( before = WrappedResultSet => {})( after = WrappedResultSet => {}) ddl.apply() } "SQL_EXEDDL executed succesfully." } } } } def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing,Long,C[Long]]): Try[C[Long]] = { if (ctx.statements == Nil) throw new IllegalStateException("JDBCContex setting error: statements empty!") if (ctx.sqlType != SQL_UPDATE) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (ctx.batch) { if (noReturnKey(ctx)) { val usql = SQL(ctx.statements.head) .tags(ctx.queryTags: _*) .batch(ctx.parameters: _*) Try { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[Seq]() Seq.empty[Long].to[C] } } } else { val usql = new SQLBatchWithGeneratedKey(ctx.statements.head,ctx.parameters,ctx.queryTags)(None) Try { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[C]() } } } } else { Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !")) } } } private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = { val Some(key) :: xs = ctx.returnGeneratedKey val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head,params,ctx.queryTags)(key) Try { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result).to[C] } } } private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = { val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val before = ctx.preAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val after = ctx.postAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val usql = new SQLUpdate(ctx.statements.head,ctx.queryTags)(before)(after) Try { NamedDB(ctx.dbName) localTx {implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result.toLong).to[C] } } } private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = { if (noReturnKey(ctx)) singleTxUpdateNoReturnKey(ctx) else singleTxUpdateWithReturnKey(ctx) } private def noReturnKey(ctx: JDBCContext): Boolean = { if (ctx.returnGeneratedKey != Nil) { val k :: xs = ctx.returnGeneratedKey k match { case None => true case Some(k) => false } } else true } def noActon: PreparedStatement=>Unit = pstm => {} def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = { Try { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { case Nil => Seq.fill(ctx.statements.size)(None) case k@_ => k } val sqlcmd = ctx.statements zip ctx.parameters zip keys val results = sqlcmd.map { case ((stm,param),key) => key match { case None => new SQLUpdate(stm,param,Nil)(noActon)(noActon).apply().toLong case Some(k) => new SQLUpdateWithGeneratedKey(stm,Nil)(k).apply().toLong } } results.to[C] } } } def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing,C[Long]]): Try[C[Long]] = { if (ctx.statements == Nil) throw new IllegalStateException("JDBCContex setting error: statements empty!") if (ctx.sqlType != SQL_UPDATE) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (!ctx.batch) { if (ctx.statements.size == 1) singleTxUpdate(ctx) else multiTxUpdates(ctx) } else Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) } } case class JDBCActionStream[R](dbName: Symbol,parallelism: Int = 1,prepareParams: R => Seq[Any]) {jas => def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName=db) def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism=parLevel) def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) private def perform(r: R) = { import scala.concurrent._ val params = prepareParams(r) NamedDB(dbName) autoCommit { session => session.execute(statement,params: _*) } Future.successful(r) } def performOnRow(implicit session: DBSession): Flow[R,NotUsed] = if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) } object JDBCActionStream { def apply[R](_dbName: Symbol,params: R => Seq[Any]): JDBCActionStream[R] = new JDBCActionStream[R](dbName = _dbName,statement=_statement,prepareParams = params) } } HikariConfig.scala package configdbs import scala.collection.mutable import scala.concurrent.duration.Duration import scala.language.implicitConversions import com.typesafe.config._ import java.util.concurrent.TimeUnit import java.util.Properties import scalikejdbc.config._ import com.typesafe.config.Config import com.zaxxer.hikari._ import scalikejdbc.ConnectionPoolFactoryRepository /** Extension methods to make Typesafe Config easier to use */ class ConfigExtensionMethods(val c: Config) extends AnyVal { import scala.collection.JavaConverters._ def getBooleanOr(path: String,default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default def getIntOr(path: String,default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default def getStringOr(path: String,default: => String = null) = if(c.hasPath(path)) c.getString(path) else default def getConfigOr(path: String,default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default def getMillisecondsOr(path: String,default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path,TimeUnit.MILLISECONDS) else default def getDurationOr(path: String,default: => Duration = Duration.Zero) = if(c.hasPath(path)) Duration(c.getDuration(path,TimeUnit.MILLISECONDS),TimeUnit.MILLISECONDS) else default def getPropertiesOr(path: String,default: => Properties = null): Properties = if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default def toProperties: Properties = { def toProps(m: mutable.Map[String,ConfigValue]): Properties = { val props = new Properties(null) m.foreach { case (k,cv) => val v = if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) else if(cv.unwrapped eq null) null else cv.unwrapped.toString if(v ne null) props.put(k,v) } props } toProps(c.root.asScala) } def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None def getStringOpt(path: String) = Option(getStringOr(path)) def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) } object ConfigExtensionMethods { @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) } trait HikariConfigReader extends TypesafeConfigReader { self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix => import ConfigExtensionMethods.configExtensionMethods def getFactoryName(dbName: Symbol): String = { val c: Config = config.getConfig(envPrefix + "db." + dbName.name) c.getStringOr("poolFactoryName",ConnectionPoolFactoryRepository.COMMONS_DBCP) } def hikariCPConfig(dbName: Symbol): HikariConfig = { val hconf = new HikariConfig() val c: Config = config.getConfig(envPrefix + "db." + dbName.name) // Connection settings if (c.hasPath("dataSourceClass")) { hconf.setDataSourceClassName(c.getString("dataSourceClass")) } else { Option(c.getStringOr("driverClassName",c.getStringOr("driver"))).map(hconf.setDriverClassName _) } hconf.setJdbcUrl(c.getStringOr("url",null)) c.getStringOpt("user").foreach(hconf.setUsername) c.getStringOpt("password").foreach(hconf.setPassword) c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) // Pool configuration hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout",1000)) hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout",1000)) hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout",600000)) hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime",1800000)) hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold",0)) hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast",false)) c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) val numThreads = c.getIntOr("numThreads",20) hconf.setMaximumPoolSize(c.getIntOr("maxConnections",numThreads * 5)) hconf.setMinimumIdle(c.getIntOr("minConnections",numThreads)) hconf.setPoolName(c.getStringOr("poolName",dbName.name)) hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans",false)) // Equivalent of ConnectionPreparer hconf.setReadOnly(c.getBooleanOr("readOnly",false)) c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) hconf.setCatalog(c.getStringOr("catalog",null)) hconf } } import scalikejdbc._ trait ConfigDBs { self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { getFactoryName(dbName) match { case "hikaricp" => { val hconf = hikariCPConfig(dbName) val hikariCPSource = new HikariDataSource(hconf) if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { Class.forName(hconf.getDriverClassName) } ConnectionPool.add(dbName,new DataSourceConnectionPool(hikariCPSource)) } case _ => { val JDBCSettings(url,user,password,driver) = readJDBCSettings(dbName) val cpSettings = readConnectionPoolSettings(dbName) if (driver != null && driver.trim.nonEmpty) { Class.forName(driver) } ConnectionPool.add(dbName,url,cpSettings) } } } def setupAll(): Unit = { loadGlobalSettings() dbNames.foreach { dbName => setup(Symbol(dbName)) } } def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { ConnectionPool.close(dbName) } def closeAll(): Unit = { ConnectionPool.closeAll } } object ConfigDBs extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader with EnvPrefix { override val env = Option(envValue) } StreamDemo.scala import com.datastax.driver.core._ import akka._ import CQLEngine._ import CQLHelpers._ import akka.actor._ import akka.stream.scaladsl._ import akka.stream._ import scalikejdbc._ import configdbs._ import jdbccontext._ import JDBCEngine._ import scala.util._ object cassandraStreamDemo extends App { //#init-mat implicit val cqlsys = ActorSystem("cqlSystem") implicit val mat = ActorMaterializer() implicit val ec = cqlsys.dispatcher val cluster = new Cluster .Builder() .addContactPoints("localhost") .withPort(9042) .build() useJava8DateTime(cluster) implicit val session = cluster.connect() //config jdbc drivers ConfigDBsWithEnv("dev").setup('h2) ConfigDBsWithEnv("dev").loadGlobalSettings() val cqlCreate = """ |create table testdb.AQMRPT( |ROWID BIGINT PRIMARY KEY,|MEASUREID BIGINT,|STATENAME TEXT,|COUNTYNAME TEXT,|REPORTYEAR INT,|VALUE INT,|CREATED TIMESTAMP) """.stripMargin val ctxCreate = CQLContext().setCommand(cqlCreate) cqlExecute(ctxCreate).onComplete{ case Success(s) => println("schema created successfully!") case Failure(e) => println(e.getMessage) } scala.io.StdIn.readLine() case class DataRow ( rowid: Long,value: Int ) val toRow: WrappedResultSet => DataRow = rs => DataRow( rowid = rs.long("ROWID"),measureid = rs.long("MEASUREID"),state = rs.string("STATENAME"),county = rs.string("COUNTYNAME"),year = rs.int("REPORTYEAR"),value = rs.int("VALUE") ) //construct the context val ctx = JDBCQueryContext[DataRow]( dbName = 'h2,statement = "select * from AQMRPT",extractor = toRow ) //source from h2 database val jdbcSource = jdbcAkkaStream(ctx) //insert into cassandra database val cqlInsert = "insert into testdb.AQMRPT(ROWID,NotUsed] = actionStream.performOnRow val sink = Sink.foreach[DataRow]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } val sts = jdbcSource.take(100).via(actionFlow).to(sink).run() case class Model ( rowid: Long,params) .setParallelism(2).setProcessOrder(false) val jdbcActionFlow = jdbcActionStream.performOnRow //update rows in h2 database from data in cassandra database src.via(jdbcActionFlow).to(snk).run() scala.io.StdIn.readLine() session.close() cluster.close() cqlsys.terminate() } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |