如何使用ValueMapper使用Scala更改Kafka Streams 10.2中的值类型
我正在尝试将基于
Scala的Kafka Streams应用程序从0.10.0.0升级到0.10.2.1,我无法弄清楚如何让应用程序进行编译.
我在documentation中找到的示例使用mapValue但它不会更改值类型.我按照this使用Scala 2.11和-Xexperimental编译器标志. 码 class MyStream() { def startMyStream(): Unit = { val kStreamBuilder = new KStreamBuilder val kStream = kStreamBuilder.stream("myTopic") kStream.mapValues(new ValueMapper[AnyRef,Double]() { override def apply(value: Any) = 6.3 }) val kafkaStreams = new KafkaStreams(kStreamBuilder,new Properties) kafkaStreams.start() } } 编译错误 no type parameters for method mapValues: (x$1: org.apache.kafka.streams.kstream.ValueMapper[_,_ <: VR])org.apache.kafka.streams.kstream.KStream[Nothing,VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{}) --- because --- argument expression's type is not compatible with formal parameter type; found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double] required: org.apache.kafka.streams.kstream.ValueMapper[_,_ <: ?VR] Note: AnyRef <: Any,but Java-defined trait ValueMapper is invariant in type V. You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10) kStream.mapValues(new ValueMapper[AnyRef,Double]() { ^ type mismatch; found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{} required: org.apache.kafka.streams.kstream.ValueMapper[_,_ <: VR] kStream.mapValues(new ValueMapper[AnyRef,Double]() { ^ two errors found 表示为java类,编译很好. public class MyStream { public void startMyStream() { KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream kStream = kStreamBuilder.stream("myTopic"); kStream.mapValues(new ValueMapper<Object,Double>() { @Override public Double apply(Object value) { return 6.3; } }); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,new Properties()); kafkaStreams.start(); } } 如何编译Scala版本? 解决方案基于此answer 三种工作方式,两种方式不工作. class MyStream() { def startMyStream(): Unit = { val kStreamBuilder = new KStreamBuilder // Explicit tying here is not required to compile and run. val kStream: KStream[Array[Byte],String] = kStreamBuilder.stream("myTopic") // Does not compile kStream.mapValues(new ValueMapper[AnyRef,Double]() { override def apply(value: AnyRef) = 6.3 }) // Does not compile kStream.mapValues(_ => 6.3) // Works kStream.mapValues[Double](new ValueMapper[AnyRef,Double]() { override def apply(value: AnyRef) = 6.3 }) // Works,requires compiler option -Xexperimental kStream.mapValues[Double](_ => 6.3) // Works,requires compiler option -Xexperimental kStream.mapValues[Double](convert) def convert(string: String): Double = 6.3 val kafkaStreams = new KafkaStreams(kStreamBuilder,new Properties) kafkaStreams.start() } } 更新:无效的解决方案 尝试1根据建议将显式类型添加到kStream(val kStream:KStream [Array [Byte],String] = kStreamBuilder.stream(“myTopic”))仍然无法编译并导致此错误. no type parameters for method mapValues: (x$1: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,_ <: VR])org.apache.kafka.streams.kstream.KStream[Array[Byte],Double]{}) --- because --- argument expression's type is not compatible with formal parameter type; found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double] required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,but Java-defined trait ValueMapper is invariant in type V. You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10) kStream.mapValues(new ValueMapper[AnyRef,Double]() { ^ type mismatch; found : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{} required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,_ <: VR] kStream.mapValues(new ValueMapper[AnyRef,Double]() { 尝试2添加上述内容并使用SAM转换“以避免显式写出匿名类的实例化”(kStream.mapValues(_ => 6.3))导致此编译器错误. no type parameters for method mapValues: (x$1: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable) --- because --- argument expression's type is not compatible with formal parameter type; found : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,_ <: ?VR] Note: String <: Any (and org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable <: org.apache.kafka.streams.kstream.ValueMapper[String,Double]),but Java-defined trait ValueMapper is invariant in type V. You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10) kStream.mapValues(_ => 6.3) ^ type mismatch; found : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,_ <: VR] kStream.mapValues(_ => 6.3) ^ 解决方法
这里有两个问题.首先,您不指定流的类型:
val kStream: KStream[Array[Byte],String] = kStreamBuilder.stream("myTopic") 这实际上是您看到错误的原因 – 因为您没有指定类型,Scala会将它们推断为某些默认值,这几乎总是不是您想要的. 其次,由于您已启用-Xexperimental,因此您可以依赖SAM转换来避免显式写出匿名类的实例化: kStream.mapValues(_ => 6.3) 更新:似乎由于某种原因,Scala编译器无法正确推断匿名函数/ SAM实例的输出类型.我能够通过以下小调整成功编译代码: kStream.mapValues[Double](_ => 6.3) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |