scala – 使用spark cassandra连接器更新Cassandra表
发布时间:2020-12-16 18:05:50 所属栏目:安全 来源:网络整理
导读:我在更新键空间中的表时遇到了 scala上的spark cassandra连接器问题 这是我的一段代码 val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE + " SET a= a + " + b + " WHERE x=" + x + " AND y=" + y + " AND z=" + xprintln(query)val
我在更新键空间中的表时遇到了
scala上的spark cassandra连接器问题
这是我的一段代码 val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE + " SET a= a + " + b + " WHERE x=" + x + " AND y=" + y + " AND z=" + x println(query) val KeySpace = new CassandraSQLContext(sparkContext) KeySpace.setKeyspace(KEYSPACE) hourUniqueKeySpace.sql(query) 当我执行此代码时,我收到这样的错误 Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found 知道为什么会这样吗? 解决方法
通过spark-cassandra-connector可以更新具有
counter column的表.您必须使用DataFrames和
DataFrameWriter方法保存模式“append”(或
SaveMode.Append,如果您愿意).检查代码
DataFrameWriter.scala.
例如,给出一个表: cqlsh:test> SELECT * FROM name_counter ; name | surname | count ---------+---------+------- John | Smith | 100 Zhang | Wei | 1000 Angelos | Papas | 10 代码应该如下所示: val updateRdd = sc.parallelize(Seq(Row("John","Smith",1L),Row("Zhang","Wei",2L),Row("Angelos","Papas",3L))) val tblStruct = new StructType( Array(StructField("name",StringType,nullable = false),StructField("surname",StructField("count",LongType,nullable = false))) val updateDf = sqlContext.createDataFrame(updateRdd,tblStruct) updateDf.write.format("org.apache.spark.sql.cassandra") .options(Map("keyspace" -> "test","table" -> "name_counter")) .mode("append") .save() 更新后: name | surname | count ---------+---------+------- John | Smith | 101 Zhang | Wei | 1002 Angelos | Papas | 13 通过隐式转换RDD to a DataFrame:import sqlContext.implicits._并使用.toDF(),DataFrame转换可以更简单. 检查此玩具应用程序的完整代码: 由于版本在这里非常重要,以上内容适用于Scala 2.11.7,Spark 1.5.1,spark-cassandra-connector 1.5.0-RC1-s_2.11,Cassandra 3.0.5.自@since 1.4.0以来,DataFrameWriter被指定为@Experimental. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- Bootstrap 4: 对比普通网格和 Flexbox 网格
- 【数据结构】图Graph的邻接矩阵,邻接表及深度、广度遍历
- 【数据结构】用模版实现大小堆、实现优先级队列,以及堆排序
- Scala中的多个flatMaps
- twitter-bootstrap-3 – 什么是Bootstrap 3中的“data-targ
- [原创]webservice 学习笔记,axis2传递简单对象,非stub方式
- 如何使用带有来自scala的python -c命令的args运行python代码
- 如何在Scala中创建一个“n到m”受限制的int变体?
- angular引用echarts插件
- IIS 无法启动:发生意外错误0x8ffe2740 的原因