加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD

发布时间:2020-12-16 18:34:02 所属栏目:安全 来源:网络整理
导读:我有一个Vagrant图像,Spark Notebook,Spark,Accumulo 1.6和Hadoop都在运行.从笔记本,我可以手动创建一个扫描仪,并从我使用其中一个Accumulo示例创建的表中提取测试数据: val instanceNameS = "accumulo"val zooServersS = "localhost:2181"val instance: In
我有一个Vagrant图像,Spark Notebook,Spark,Accumulo 1.6和Hadoop都在运行.从笔记本,我可以手动创建一个扫描仪,并从我使用其中一个Accumulo示例创建的表中提取测试数据:

val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS,zooServersS)
val connector: Connector = instance.getConnector( "root",new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1",auths)

scanner.setRange(new Range("row_0000000000","row_0000000010"))

for(entry: Entry[Key,Value] <- scanner) {
  println(entry.getKey + " is " + entry.getValue)
}

将给出前10行表数据.

当我尝试创建RDD时:

val rdd2 = 
  sparkContext.newAPIHadoopRDD (
    new Configuration(),classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Key],classOf[org.apache.accumulo.core.data.Value]
  )

由于以下错误,我得到一个RDD返回给我,我做不了多少:

java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at
org.apache.spark.rdd.RDD.count(RDD.scala:927)

考虑到我没有指定任何关于连接哪个表,auths是什么等的参数,这完全有道理.

所以我的问题是:从这里我需要做什么才能将前十行表数据放入我的RDD?

更新一个
仍然不起作用,但我确实发现了一些东西.原来有两个几乎相同的包,

org.apache.accumulo.core.client.mapreduce

&安培;

org.apache.accumulo.core.client.mapred

两者都有几乎相同的成员,除了一些方法签名不同的事实.不知道为什么两者都存在,因为没有我可以看到的弃用通知.我试图毫不高兴地实施Sietse的答案.以下是我的所作所为,以及回复:

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)

import org.apache.hadoop.mapred.JobConf import
org.apache.hadoop.conf.Configuration jobConf:
org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml,
core-site.xml,mapred-default.xml,mapred-site.xml,yarn-default.xml,
yarn-site.xml

Configuration: core-default.xml,core-site.xml,
mapred-site.xml,yarn-site.xml

AbstractInputFormat.setConnectorInfo(jobConf,"root",new PasswordToken("password")

AbstractInputFormat.setScanAuthorizations(jobConf,auths)

AbstractInputFormat.setZooKeeperInstance(jobConf,new ClientConfiguration)

val rdd2 = 
  sparkContext.hadoopRDD (
    jobConf,classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Value],1
  )

rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key,
org.apache.accumulo.core.data.Value)] = HadoopRDD[1] at hadoopRDD at
:62

rdd2.first

java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308)
at
org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.rdd.RDD.take(RDD.scala:1077) at
org.apache.spark.rdd.RDD.first(RDD.scala:1110) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69)
at…

*编辑2 *

re:霍尔顿的回答 – 仍然没有快乐:

AbstractInputFormat.setConnectorInfo(jobConf,new PasswordToken("password")
    AbstractInputFormat.setScanAuthorizations(jobConf,auths)
    AbstractInputFormat.setZooKeeperInstance(jobConf,new ClientConfiguration)
    InputFormatBase.setInputTableName(jobConf,"batchtest1")
    val rddX = sparkContext.newAPIHadoopRDD(
      jobConf,classOf[org.apache.accumulo.core.data.Value]
      )

rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key,
org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] at
newAPIHadoopRDD at :58

Out[15]: NewHadoopRDD[0] at newAPIHadoopRDD at :58

rddX.first

java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.rdd.RDD.take(RDD.scala:1077) at
org.apache.spark.rdd.RDD.first(RDD.scala:1110) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)
at

编辑3 – 进步!

我能够弄清楚为什么’输入INFO未设置’错误发生了.你们中间的老鹰眼无疑会看到以下代码缺少一个结束的'(‘

AbstractInputFormat.setConnectorInfo(jobConf,new PasswordToken("password")

因为我在spark-notebook中这样做,我一直点击执行按钮继续前进,因为我没有看到错误.我忘了的是,当你放弃关闭’)时,笔记本将会执行spark-shell所做的事情 – 它将永远等待你添加它.所以错误是“setConnectorInfo”方法永远不会被执行的结果.

不幸的是,我仍然无法将累积表数据推送到可用于我的RDD中.当我执行

rddX.count

我回来了

res15: Long = 10000

这是正确的响应 – 我指出的表中有10,000行数据.但是,当我试图抓住数据的第一个元素时:

rddX.first

我收到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.accumulo.core.data.Key

关于从哪里去的任何想法?

编辑4 – 成功!

接受的答案评论是90%的方式 – 除了accumulo键/值需要被强制转换为可序列化的事实.我通过在两者上调用.toString()方法来实现这一点.我会尝试尽快发布一些完整的工作代码,以防其他任何人遇到同样的问题.

解决方法

通常使用自定义Hadoop InputFormats,使用JobConf指定信息.正如@Sietse所指出的,AccumuloInputFormat上有一些静态方法可用于配置JobConf.在这种情况下,我认为你想要做的是:

val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf,principal,token)
AccumuloInputFormat.setScanAuthorizations(jobConf,authorizations)
val clientConfig =  new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf,clientConfig)
AccumuloInputFormat.setInputTableName(jobConf,tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf,classOf[org.apache.accumulo.core.data.Value]
)

注意:在深入研究代码之后,看来配置属性的设置部分基于被调用的类(有意识地避免与其他包冲突),所以当我们去具体类中时后来它找不到配置好的标志.解决方法是不使用Abstract类.有关实施细节,请参阅https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127).如果你不能使用spark-notebook在具体的实现上调用这个方法,可能使用spark-shell或定期构建的应用程序是最简单的解决方案.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读