scala – 按照RDD值从Cassandra表中过滤
我想根据我在RDD中的值来查询Cassandra的一些数据.我的方法如下:
val userIds = sc.textFile("/tmp/user_ids").keyBy( e => e ) val t = sc.cassandraTable("keyspace","users").select("userid","user_name") val userNames = userIds.flatMap { userId => t.where("userid = ?",userId).take(1) } userNames.take(1) 虽然Cassandra查询在Spark shell中有效,但是当我在flatMap中使用它时会引发异常: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times,most recent failure: Lost task 0.0 in stage 2.0 (TID 2,localhost): java.lang.NullPointerException: org.apache.spark.rdd.RDD.<init>(RDD.scala:125) com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49) com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83) com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:94) 我的理解是我无法在另一个RDD中生成RDD(Cassandra结果). 我在网上找到的例子读取了RDD中的整个Cassandra表并加入了RDD 但是我该如何处理这个问题呢? 解决方法
Spark 1.2或更高版本
Spark 1.2引入了joinWithCassandraTable val userids = sc.textFile("file:///Users/russellspitzer/users.list") userids .map(Tuple1(_)) .joinWithCassandraTable("keyspace","table") 此代码最终将执行与下面的解决方案相同的工作 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable 我认为你真正想要做的是在两个数据源上进行内连接.这应该比flatmap方法更快,并且有一些内部智能散列. scala> val userids = sc.textFile("file:///Users/russellspitzer/users.list") scala> userids.take(5) res19: Array[String] = Array(3,2) scala> sc.cassandraTable("test","users").collect res20: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{userid: 3,username: Jacek},CassandraRow{userid: 1,username: Russ},CassandraRow{userid: 2,username: Helena}) scala> userids.map(line => (line.toInt,true)).join(sc.cassandraTable("test","users").map(row => (row.getInt("userid"),row.getString("username")))).collect res18: Array[(Int,(Boolean,String))] = Array((2,(true,Helena)),(3,Jacek))) 如果您实际上只想对C *数据库执行一堆主键查询,那么最好只使用普通的驱动程序路径执行它们而不使用spark. Spark解决方案与直接驱动程序调用集成 import com.datastax.spark.connector.cql.CassandraConnector import collection.JavaConversions._ val cc = CassandraConnector(sc.getConf) val select = s"SELECT * FROM cctest.users where userid=?" val ids = sc.parallelize(1 to 10) ids.flatMap(id => cc.withSessionDo(session => session.execute(select,id.toInt: java.lang.Integer).iterator.toList.map(row => (row.getInt("userid"),row.getString("username"))))).collect (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- 20.1 Shell脚本介绍;20.2 Shell脚本结构和执行;20.3 date
- 如何确保在map()期间保留自定义Scala集合的动态类型?
- 目前在Vim中使用OCaml的设置是什么?
- /etc/profile,/etc/bashrc,~/.bash_profile,~/.bashrc
- macos – 构建并推送多拱形泊坞窗图像
- typescript – TypeError:self.parent.parent.parent.cont
- 怎样提高WebService性能(大数据量网络传输处理)
- angular – 检索与TemplateRef关联的变量的名称
- scala – Spark2.1.0兼容杰克逊版本2.7.6
- shell脚本双引号、大括号、if语句注意事项