scala – 在Play 2.5中编写BodyParser
|
给定具有此签名的函数:
def parser[A](otherParser: BodyParser[A]): BodyParser[A] 如何在将请求体传递给otherParser之前对请求体进行检查和验证的方式来编写函数? 为简单起见,我想说我想验证标题(“Some-Header”,或许)有一个与主体完全匹配的值.所以如果我有这个动作: def post(): Action(parser(parse.tolerantText)) { request =>
Ok(request.body)
}
当我提出像curl -H“Some-Header:hello”-d“hello”http:// localhost:9000 / post这样的请求时,它应该在状态为200的响应正文中返回“hello”.如果我的请求是curl -H“Some-Header:hello”-d“hi”http:// localhost:9000 / post它应该返回400没有正文. 这是我尝试过的. 这个没有编译,因为otherParser(request).through(flow)期望flow输出ByteString.这里的想法是流程可以通过Either输出通知累加器是否继续处理.我不知道如何让累加器知道上一步的状态. def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString,Either[Result,ByteString],NotUsed] = Flow[ByteString].map { bytes =>
if (request.headers.get("Some-Header").contains(bytes.utf8String)) {
Right(bytes)
} else {
Left(BadRequest)
}
}
val acc: Accumulator[ByteString,A]] = otherParser(request)
// This fails to compile because flow needs to output a ByteString
acc.through(flow)
}
我也试图使用过滤器.这个编译并且写入的响应主体是正确的.但是它总是返回200 Ok响应状态. def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString,ByteString,akka.NotUsed] = Flow[ByteString].filter { bytes =>
request.headers.get("Some-Header").contains(bytes.utf8String)
}
val acc: Accumulator[ByteString,A]] = otherParser(request)
acc.through(flow)
}
解决方法
我想出了一个使用GraphStageWithMaterializedValue的解决方案.这个概念是从
Play’s
maxLength body parser借用的.我在我的问题中第一次尝试(不编译)的主要区别在于,我应该使用物化值来传达有关处理状态的信息,而不是试图改变流.虽然我创建了一个Flow [ByteString,[Result,NotUsed],但事实证明我需要的是Flow [ByteString,Future [Boolean]].
因此,我的解析器函数最终看起来像这样: def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString,Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
val parserSink: Sink[ByteString,Future[Either[Result,A]]] = otherParser.apply(request).toSink
Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean],resultFuture: Future[Either[Result,A]]) =>
statusFuture.flatMap { success =>
if (success) {
resultFuture.map {
case Left(result) => Left(result)
case Right(a) => Right(a)
}
} else {
Future.successful(Left(BadRequest))
}
}
})
}
关键是这一个: val flow: Flow[ByteString,Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
一旦您能够创建此流程,其余类型就会落实到位.不幸的是BodyValidator非常冗长,感觉有点泛滥.在任何情况下,它都非常容易阅读. GraphStageWithMaterializedValue期望你实现def形状:S(这里是S的FlowShape [ByteString,ByteString])来指定该图的输入类型和输出类型.它还希望你能够创建def createLogicAndMaterializedValue(inheritedAttributes:Attributes):( GraphStageLogic,M)(M是这里的Future [Boolean])来定义图形实际应该做什么.这是BodyValidator的完整代码(我将在下面详细解释): class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString,Future[Boolean]] {
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
override def shape: FlowShape[ByteString,ByteString] = FlowShape.of(in,out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic,Future[Boolean]) = {
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
val logic = new GraphStageLogic(shape) {
setHandler(out,new OutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in,new InHandler {
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out,chunk)
}
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
})
}
(logic,status.future)
}
}
您首先要创建一个入口和出口来设置图形的输入和输出 val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
然后使用它们来定义形状. def shape: FlowShape[ByteString,out) 在createLogicAndMaterializedValue中,您需要初始化要实现的值.在这里,我使用了一个可以在我从流中获取完整数据时可以解决的承诺.我还创建了一个ByteStringBuilder来跟踪迭代之间的数据. val status = Promise[Boolean]() val bodyBuffer = new ByteStringBuilder() 然后我创建一个GraphStageLogic来实际设置此图在每个处理点的作用.正在设置两个处理程序.一个是InHandler,用于处理来自上游源的数据.另一个是OutHandler,用于处理要向下游发送的数据.在OutHandler中没有什么真正有趣的,所以我会在这里忽略它,除了说它是必要的bobo板以避免IllegalStateException.在InHandler中重写了三种方法:onPush,onUpstreamFinish和onUpstreamFailure.从上游准备好新数据时调用onPush.在这个方法中,我只需抓住下一个数据块,将其写入bodyBuffer并将数据推送到下游. def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out,chunk)
}
当上游完成(惊讶)时调用onUpstreamFinish.这是将主体与标题进行比较的业务逻辑发生的地方. override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
实现onUpstreamFailure,以便在出现问题时,我也可以将物化的未来标记为失败. override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
然后我只返回我创建的GraphStageLogic和status.future作为元组. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
