加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

如何使用ValueMapper使用Scala更改Kafka Streams 10.2中的值类型

发布时间:2020-12-16 18:09:38 所属栏目:安全 来源:网络整理
导读:我正在尝试将基于 Scala的Kafka Streams应用程序从0.10.0.0升级到0.10.2.1,我无法弄清楚如何让应用程序进行编译. 我在documentation中找到的示例使用mapValue但它不会更改值类型.我按照this使用Scala 2.11和-Xexperimental编译器标志. 码 class MyStream() {
我正在尝试将基于 Scala的Kafka Streams应用程序从0.10.0.0升级到0.10.2.1,我无法弄清楚如何让应用程序进行编译.

我在documentation中找到的示例使用mapValue但它不会更改值类型.我按照this使用Scala 2.11和-Xexperimental编译器标志.

class MyStream() {
  def startMyStream(): Unit = {
    val kStreamBuilder = new KStreamBuilder
    val kStream = kStreamBuilder.stream("myTopic")

    kStream.mapValues(new ValueMapper[AnyRef,Double]() {
      override def apply(value: Any) = 6.3
    })

    val kafkaStreams = new KafkaStreams(kStreamBuilder,new Properties)
    kafkaStreams.start()
  }
}

编译错误

no type parameters for method mapValues: (x$1: org.apache.kafka.streams.kstream.ValueMapper[_,_ <: VR])org.apache.kafka.streams.kstream.KStream[Nothing,VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{})
 --- because ---
argument expression's type is not compatible with formal parameter type;
  found   : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]
  required: org.apache.kafka.streams.kstream.ValueMapper[_,_ <: ?VR]
Note: AnyRef <: Any,but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
    kStream.mapValues(new ValueMapper[AnyRef,Double]() {
            ^
type mismatch;
  found   : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{}
  required: org.apache.kafka.streams.kstream.ValueMapper[_,_ <: VR]
    kStream.mapValues(new ValueMapper[AnyRef,Double]() {
                      ^
two errors found

表示为java类,编译很好.

public class MyStream {
    public void startMyStream() {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream kStream = kStreamBuilder.stream("myTopic");

        kStream.mapValues(new ValueMapper<Object,Double>() {
            @Override
            public Double apply(Object value) {
                return 6.3;
            }
        });

        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,new Properties());
        kafkaStreams.start();
    }
}

如何编译Scala版本?

解决方案基于此answer

三种工作方式,两种方式不工作.

class MyStream() {
  def startMyStream(): Unit = {
    val kStreamBuilder = new KStreamBuilder

    // Explicit tying here is not required to compile and run. 
    val kStream: KStream[Array[Byte],String] = kStreamBuilder.stream("myTopic")

    // Does not compile
    kStream.mapValues(new ValueMapper[AnyRef,Double]() {
      override def apply(value: AnyRef) = 6.3
    })

    // Does not compile
    kStream.mapValues(_ => 6.3)

    // Works
    kStream.mapValues[Double](new ValueMapper[AnyRef,Double]() {
      override def apply(value: AnyRef) = 6.3
    })

    // Works,requires compiler option -Xexperimental
    kStream.mapValues[Double](_ => 6.3)

    // Works,requires compiler option -Xexperimental
    kStream.mapValues[Double](convert)
    def convert(string: String): Double = 6.3

    val kafkaStreams = new KafkaStreams(kStreamBuilder,new Properties)
    kafkaStreams.start()
  }
}

更新:无效的解决方案

尝试1根据建议将显式类型添加到kStream(val kStream:KStream [Array [Byte],String] = kStreamBuilder.stream(“myTopic”))仍然无法编译并导致此错误.

no type parameters for method mapValues: (x$1: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,_ <: VR])org.apache.kafka.streams.kstream.KStream[Array[Byte],Double]{})
--- because ---
argument expression's type is not compatible with formal parameter type;
  found   : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]
  required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(new ValueMapper[AnyRef,Double]() {
  ^
  type mismatch;
  found   : org.apache.kafka.streams.kstream.ValueMapper[AnyRef,Double]{}
  required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,_ <: VR]
  kStream.mapValues(new ValueMapper[AnyRef,Double]() {

尝试2添加上述内容并使用SAM转换“以避免显式写出匿名类的实例化”(kStream.mapValues(_ => 6.3))导致此编译器错误.

no type parameters for method mapValues: (x$1: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable)
--- because ---
argument expression's type is not compatible with formal parameter type;
  found   : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable
  required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,_ <: ?VR]
Note: String <: Any (and org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable <: org.apache.kafka.streams.kstream.ValueMapper[String,Double]),but Java-defined trait ValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
kStream.mapValues(_ => 6.3)
        ^
type mismatch;
found   : org.apache.kafka.streams.kstream.ValueMapper[String,Double] with Serializable
required: org.apache.kafka.streams.kstream.ValueMapper[_ >: String,_ <: VR]
kStream.mapValues(_ => 6.3)
                    ^

解决方法

这里有两个问题.首先,您不指定流的类型:

val kStream: KStream[Array[Byte],String] = kStreamBuilder.stream("myTopic")

这实际上是您看到错误的原因 – 因为您没有指定类型,Scala会将它们推断为某些默认值,这几乎总是不是您想要的.

其次,由于您已启用-Xexperimental,因此您可以依赖SAM转换来避免显式写出匿名类的实例化:

kStream.mapValues(_ => 6.3)

更新:似乎由于某种原因,Scala编译器无法正确推断匿名函数/ SAM实例的输出类型.我能够通过以下小调整成功编译代码:

kStream.mapValues[Double](_ => 6.3)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读