如何只依靠Scala的语言特性实现高逼格的依赖注入
如何只依靠Scala的语言特性实现高逼格的依赖注入
需求最近我在做【增加运算模块的列筛选条件支持】需求,其中包含两大部分, 设计看到这个需求,我们会想到需要做如下事情: 如果单纯只是实现上面的1、2、3,其实可以写一个足够简单的程序包含3个文件即可, 所以我的设计是这样的,
上面描述的前3个组件会使用Cake模式被注入到job层或launch层中。 编码接口定义首先看一下各层次的抽象接口。 数据访问层 DBPluginComponenttrait DBPluginComponent {
trait DBPlugin[R,U]{
def queryByNodeTypes(nodeTypes: Seq[NodeType]): Future[Seq[R]]
def bulkUpdateParam(workflowWithParamSeq: Seq[(R,String)]): Future[U]
}
}
trait套trait是不是看着很新鲜,的确这种写法不常用,但是这就是Cake模式的一部分: json解析层 JsonPluginComponenttrait JsonPluginComponent {
trait JsonPlugin[O] {
def fromOld(jsonStr: String): O = {
val json = Try(JSON.parseArray(jsonStr).getJSONObject(0)).getOrElse(JSON.parSEObject(jsonStr))
fromOld(json)
}
def fromOld(json: JSONObject): O = throw new UnsupportedOperationException
def toNew(oldObj: O): String
def transform(json: String): String = {
toNew(fromOld(json))
}
protected def toSelectedColumnsJson(columns: JSONArray): JSONObject = {
val json = new JSONObject()
json.put("isSpecified",true)
json.put("specifiedColumns",columns)
json
}
}
}
此处定义了将要被注入的依赖:json解析组件。 日志输出层 LoggerComponenttrait LoggerComponent {
trait Logger {
def info(s: String)
def warn(s: String)
def warn(s: String,e: Throwable)
def error(s: String)
def error(s: String,e: Throwable)
def debug(s: String)
}
}
此处定义了需要注入的依赖:日志输出组件 job执行任务层 JobComponenttrait JobComponent[R <: NodeTypeWithParam] {
this: JsonPluginComponent =>
val json: JsonPlugin[_]
def work(workflow: R): (R,String) = {
val newJson = json.transform(workflow.param)
(workflow,newJson)
}
}
此处定义了一个job组件,它告诉程序,必须将json解析组件依赖注入进来,所以这里使用这种写法, this: JsonPluginComponent =>
这种写法被称作自身类型标注(self-type annotation),它可以确保Cake模式的类型安全, val json: JsonPlugin[_]
work方法定义了任务如何依靠注入的组件干活,实现类不用关心这个方法,只需关心具体要注入什么组件。 launch加载层trait LaunchComponent[R <: NodeTypeWithParam,U] {
this: DBPluginComponent with LoggerComponent =>
val db: DBPlugin[R,U]
val logger: Logger
def launch(): Unit = {
val filterNodesConfMap: Map[NodeType,String] = ConfigurationFactory.get.getConfigList("filter.nodes").asScala.map(FilterNode.apply).toMap
val nodeJobMap = new mutable.HashMap[NodeType,JobComponent[R]]
val filterNodeTypes = filterNodesConfMap.keys.toSeq
val filterNodes = Await.result(db.queryByNodeTypes(filterNodeTypes),60 second)
logger.info(s"QueriedCount = ${filterNodes.size}")
val newParamSeq = filterNodes.map {
n =>
val job = nodeJobMap.getOrElse(n.nodeType,{
val instance = this.getClass.getClassLoader.loadClass(filterNodesConfMap(n.nodeType)).newInstance().asInstanceOf[JobComponent[R]]
nodeJobMap += (n.nodeType -> instance)
instance
})
job.work(n)
}
workResultHandler(newParamSeq)
}
def workResultHandler(seq: Seq[(R,String)]): Unit
}
此处定义了一个launch组件,它告诉程序,必须将数据访问组件和日志输出组件注入进来,同样也使用了自身类型标注的写法,确保类型安全。 val db: DBPlugin[R,U]
val logger: Logger
launch方法描述如何加载各job组件完成任务,其中会根据配置文件动态实例化需要的job层组件, 接口实现再看一下每个层次是如何实现接口的,由于模块较多,json解析层和job任务执行层只挑选其中一个模块做说明。 数据访问层mongodb组件 MongoDBComponenttrait MongoDBComponent extends DBPluginComponent {
class DB extends DBPlugin[MongoWorkFlowParam,BulkWriteResult] {
private val uri = ConfigurationFactory.get.getString("db.mongo.uri")
private val dbName = ConfigurationFactory.get.getString("db.mongo.dbName")
private val db = MongoClient(uri).getDatabase(dbName)
private val collectionName = "workflow"
import org.mongodb.scala.bson.codecs.Macros._
private implicit val codecRegistry: CodecRegistry = fromRegistries(
DEFAULT_CODEC_REGISTRY,fromCodecs(new NodeTypeCodec),fromProviders(classOf[MongoWorkFlowParam])
)
private val collection = db.getCollection[MongoWorkFlowParam](collectionName).withCodecRegistry(codecRegistry)
override def queryByNodeTypes(nodeTypes: Seq[NodeType]): Future[Seq[MongoWorkFlowParam]] = {
if (nodeTypes.isEmpty) Future(Seq.empty[MongoWorkFlowParam])
else {
collection.find(
and(in("nodeType",nodeTypes: _*),notEqual("param",""),"[]"),null),"{}"))).projection(include("id","nodeType","param")).toFuture()
}
}
override def bulkUpdateParam(workflowWithParamSeq: Seq[(MongoWorkFlowParam,String)]): Future[BulkWriteResult] = {
val writes: Seq[WriteModel[_ <: MongoWorkFlowParam]] = workflowWithParamSeq.map{
case (w,newJson) =>
UpdateOneModel(equal("_id",w._id),set("param",newJson))
}
collection.bulkWrite(writes).toFuture()
}
}
}
下游的组件没什么特别的,就是根据自身的特性,实现接口中的方法,这里是使用了”mongo-scala-driver”驱动实现的访问mongodb的方法。 数据访问层dummy测试组件 DummyDBComponenttrait DummyDBComponent extends DBPluginComponent {
class DB extends DBPlugin[MongoWorkFlowParam,fromProviders(classOf[MongoWorkFlowParam])
)
private val collection = db.getCollection[MongoWorkFlowParam](collectionName).withCodecRegistry(codecRegistry)
override def queryByNodeTypes(nodeTypes: Seq[NodeType]): Future[Seq[MongoWorkFlowParam]] = {
if (nodeTypes.isEmpty) Future(Seq.empty[MongoWorkFlowParam])
else {
collection.find(and(in("nodeType","param"))/*.limit(1)*/.toFuture()
}
}
override def bulkUpdateParam(workflowWithParamSeq: Seq[(MongoWorkFlowParam,String)]): Future[BulkWriteResult] = {
Future(BulkWriteResult.unacknowledged())
}
}
}
这个实现是为了开发过程中测试用的, json解析层列加密模块组件 ColumnEncryptJsonComponenttrait ColumnEncryptJsonComponent extends JsonPluginComponent {
class ColumnEncryptJson extends JsonPlugin[OldColumnEncryptParam] {
override def fromOld(json: JSONObject): OldColumnEncryptParam = {
val retainOldColumn = if (json.containsKey("retainOldColumn")) Some(json.getBooleanValue("retainOldColumn")) else None
val selectedArr = if (json.containsKey("selected")) Some(json.getJSONArray("selected")) else None
OldColumnEncryptParam(selectedArr,retainOldColumn)
}
override def toNew(oldObj: OldColumnEncryptParam): String = {
val rootJson = new JSONArray()
val newJson = new JSONObject()
oldObj.selected.map {
v =>
val selectedColumnsJson = toSelectedColumnsJson(v)
newJson.put("selectedColumns",selectedColumnsJson)
}
oldObj.retainOldColumn.map(v => newJson.put("retainOldColumn",v))
rootJson.add(newJson)
rootJson.toJSONString
}
}
case class OldColumnEncryptParam(selected: Option[JSONArray],retainOldColumn: Option[Boolean])
}
这个就是列加密模块json解析转换的具体实现,没啥可说的,看代码。 job执行任务层列加密模块组件 ColumnEncryptJobclass ColumnEncryptJob extends JobComponent[MongoWorkFlowParam]
with ColumnEncryptJsonComponent {
override val json = new ColumnEncryptJson
}
这个就是列加密模块job层的实现,其中注入了列加密模块的json解析组件。 launch加载层的具体实现组件 Launcherclass Launcher extends LaunchComponent[MongoWorkFlowParam,BulkWriteResult]
with MongoDBComponent with Log4jLoggerComponent {
override val db: DBPlugin[MongoWorkFlowParam,BulkWriteResult] = new DB
override val logger = new Log4jLogger(this.getClass)
override def workResultHandler(seq: Seq[(MongoWorkFlowParam,String)]): Unit = {
val start = System.currentTimeMillis()
val future = db.bulkUpdateParam(seq)
val result = Await.result(future,300 second)
val finished = System.currentTimeMillis()
logger.info(s"Elapsed time = ${finished - start}ms,MatchedCount = ${result.getMatchedCount},ModifiedCount = ${result.getModifiedCount},DeletedCount = ${result.getDeletedCount},InsertedCount = ${result.getInsertedCount}")
}
}
这个就是launch层加载器的具体实现了,其中注入了”mongo-scala-driver”驱动实现的数据访问层的组件和log4j实现的日志输出层组件, 扩展与维护上面已经完整的演示了如何只依靠Scala的语言特性实现高逼格的依赖注入。 如果以后出了一款新的mongodb驱动,性能超好,想替换现有db读取逻辑呢?
如果以后mongodb换成了mysql呢?与上面问题的解决方案基本一致,只不过是新写一个针对mysql的数据访问层组件即可 如果以后出了一款新的json解析库,性能超好,想替换现有json处理逻辑呢?
如果以后想部分模块增加一个将json转换成xml的逻辑呢
如果以后不仅想用log4j写日志,还想采用自己开发的一个模块将日志输出到hdfs呢?
总结通过上面的讲解,大家应该已经学会了如何使用Cake模式在Scala中实现高逼格的依赖注入,而且认识到其强大的解耦能力和易扩展易维护的能力, 具体代码可以从github获取:https://github.com/deanzz/json-transformer (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |