scala – 如何加入2个spark sql流
发布时间:2020-12-16 08:46:08 所属栏目:安全 来源:网络整理
导读:ENV: Scala spark版本:2.1.1 这是我的溪流(从卡夫卡读取): val conf = new SparkConf() .setMaster("local[1]") .setAppName("JoinStreams")val spark = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val schema = StructT
ENV:
Scala spark版本:2.1.1 这是我的溪流(从卡夫卡读取): val conf = new SparkConf() .setMaster("local[1]") .setAppName("JoinStreams") val spark = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ val schema = StructType( List( StructField("t",DataTypes.StringType),StructField("dst",StructField("dstPort",DataTypes.IntegerType),StructField("src",StructField("srcPort",StructField("ts",DataTypes.LongType),StructField("len",StructField("cpu",DataTypes.DoubleType),StructField("l",StructField("headers",DataTypes.createArrayType(DataTypes.StringType)) ) ) val baseDataFrame = spark .readStream .format("kafka") .option("kafka.bootstrap.servers","host:port") .option("subscribe",'topic') .load() .selectExpr("cast (value as string) as json") .select(from_json($"json",schema).as("data")) .select($"data.*") val requestsDataFrame = baseDataFrame .filter("t = 'REQUEST'") .repartition($"dst") .withColumn("rowId",monotonically_increasing_id()) val responseDataFrame = baseDataFrame .filter("t = 'RESPONSE'") .repartition($"src") .withColumn("rowId",monotonically_increasing_id()) responseDataFrame.createOrReplaceTempView("responses") requestsDataFrame.createOrReplaceTempView("requests") val dataFrame = spark.sql("select * from requests left join responses ON requests.rowId = responses.rowId") 启动应用程序时出现此错误: org.apache.spark.sql.AnalysisException: Left outer/semi/anti joins with a streaming DataFrame/Dataset on the right is not supported;; 我如何加入这两个流? 解决方法
看来你需要Spark 2.3:
“在Spark 2.3中,我们增加了对流 – 流连接的支持……” https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- twitter-bootstrap – angular – 使用PrimeNG和Bootstrap
- 如何在Scala中使用Stream.cons写入不泄漏的尾递归函数?
- twitter-bootstrap – Bootstrap 3与远程Modal
- Bootstrap-datetimepicker控件使用
- scala – Slick – 一对多表模式
- TypeError:无法读取null Angular2 Router的属性’router’
- [译] 为何 Angular 内部没有发现组件
- scala – Play Framework 2.0.升级bonecp
- 根据Unix时间戳计算时间
- bootstrap中的导航条会遮挡导航条下面的内容