加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用spark cassandra连接器更新Cassandra表

发布时间:2020-12-16 18:05:50 所属栏目:安全 来源:网络整理
导读:我在更新键空间中的表时遇到了 scala上的spark cassandra连接器问题 这是我的一段代码 val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE + " SET a= a + " + b + " WHERE x=" + x + " AND y=" + y + " AND z=" + xprintln(query)val
我在更新键空间中的表时遇到了 scala上的spark cassandra连接器问题

这是我的一段代码

val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
                        " SET a= a + " + b + " WHERE x=" +
                        x + " AND y=" + y +
                        " AND z=" + x

println(query)

val KeySpace    = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)

hourUniqueKeySpace.sql(query)

当我执行此代码时,我收到这样的错误

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found

知道为什么会这样吗?
我怎样才能解决这个问题?

解决方法

通过spark-cassandra-connector可以更新具有 counter column的表.您必须使用DataFrames和 DataFrameWriter方法保存模式“append”(或 SaveMode.Append,如果您愿意).检查代码 DataFrameWriter.scala.

例如,给出一个表:

cqlsh:test> SELECT * FROM name_counter ;

 name    | surname | count
---------+---------+-------
    John |   Smith |   100
   Zhang |     Wei |  1000
 Angelos |   Papas |    10

代码应该如下所示:

val updateRdd = sc.parallelize(Seq(Row("John","Smith",1L),Row("Zhang","Wei",2L),Row("Angelos","Papas",3L)))

val tblStruct = new StructType(
    Array(StructField("name",StringType,nullable = false),StructField("surname",StructField("count",LongType,nullable = false)))

val updateDf  = sqlContext.createDataFrame(updateRdd,tblStruct)

updateDf.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "test","table" -> "name_counter"))
.mode("append")
.save()

更新后:

name    | surname | count
---------+---------+-------
    John |   Smith |   101
   Zhang |     Wei |  1002
 Angelos |   Papas |    13

通过隐式转换RDD to a DataFrame:import sqlContext.implicits._并使用.toDF(),DataFrame转换可以更简单.

检查此玩具应用程序的完整代码:
https://github.com/kyrsideris/SparkUpdateCassandra/tree/master

由于版本在这里非常重要,以上内容适用于Scala 2.11.7,Spark 1.5.1,spark-cassandra-connector 1.5.0-RC1-s_2.11,Cassandra 3.0.5.自@since 1.4.0以来,DataFrameWriter被指定为@Experimental.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读