scala – 如何在Spark-notebook中使用Accumulo 1.6创建Spark RDD
我有一个Vagrant图像,Spark Notebook,Spark,Accumulo 1.6和Hadoop都在运行.从笔记本,我可以手动创建一个扫描仪,并从我使用其中一个Accumulo示例创建的表中提取测试数据:
val instanceNameS = "accumulo" val zooServersS = "localhost:2181" val instance: Instance = new ZooKeeperInstance(instanceNameS,zooServersS) val connector: Connector = instance.getConnector( "root",new PasswordToken("password")) val auths = new Authorizations("exampleVis") val scanner = connector.createScanner("batchtest1",auths) scanner.setRange(new Range("row_0000000000","row_0000000010")) for(entry: Entry[Key,Value] <- scanner) { println(entry.getKey + " is " + entry.getValue) } 将给出前10行表数据. 当我尝试创建RDD时: val rdd2 = sparkContext.newAPIHadoopRDD ( new Configuration(),classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Key],classOf[org.apache.accumulo.core.data.Value] ) 由于以下错误,我得到一个RDD返回给我,我做不了多少:
考虑到我没有指定任何关于连接哪个表,auths是什么等的参数,这完全有道理. 所以我的问题是:从这里我需要做什么才能将前十行表数据放入我的RDD? 更新一个
&安培;
两者都有几乎相同的成员,除了一些方法签名不同的事实.不知道为什么两者都存在,因为没有我可以看到的弃用通知.我试图毫不高兴地实施Sietse的答案.以下是我的所作所为,以及回复: import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration val jobConf = new JobConf(new Configuration)
AbstractInputFormat.setConnectorInfo(jobConf,"root",new PasswordToken("password") AbstractInputFormat.setScanAuthorizations(jobConf,auths) AbstractInputFormat.setZooKeeperInstance(jobConf,new ClientConfiguration) val rdd2 = sparkContext.hadoopRDD ( jobConf,classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Value],1 )
rdd2.first
*编辑2 * re:霍尔顿的回答 – 仍然没有快乐: AbstractInputFormat.setConnectorInfo(jobConf,new PasswordToken("password") AbstractInputFormat.setScanAuthorizations(jobConf,auths) AbstractInputFormat.setZooKeeperInstance(jobConf,new ClientConfiguration) InputFormatBase.setInputTableName(jobConf,"batchtest1") val rddX = sparkContext.newAPIHadoopRDD( jobConf,classOf[org.apache.accumulo.core.data.Value] )
rddX.first
编辑3 – 进步! 我能够弄清楚为什么’输入INFO未设置’错误发生了.你们中间的老鹰眼无疑会看到以下代码缺少一个结束的'(‘ AbstractInputFormat.setConnectorInfo(jobConf,new PasswordToken("password") 因为我在spark-notebook中这样做,我一直点击执行按钮继续前进,因为我没有看到错误.我忘了的是,当你放弃关闭’)时,笔记本将会执行spark-shell所做的事情 – 它将永远等待你添加它.所以错误是“setConnectorInfo”方法永远不会被执行的结果. 不幸的是,我仍然无法将累积表数据推送到可用于我的RDD中.当我执行 rddX.count 我回来了
这是正确的响应 – 我指出的表中有10,000行数据.但是,当我试图抓住数据的第一个元素时: rddX.first 我收到以下错误:
关于从哪里去的任何想法? 编辑4 – 成功! 接受的答案评论是90%的方式 – 除了accumulo键/值需要被强制转换为可序列化的事实.我通过在两者上调用.toString()方法来实现这一点.我会尝试尽快发布一些完整的工作代码,以防其他任何人遇到同样的问题. 解决方法
通常使用自定义Hadoop InputFormats,使用JobConf指定信息.正如@Sietse所指出的,AccumuloInputFormat上有一些静态方法可用于配置JobConf.在这种情况下,我认为你想要做的是:
val jobConf = new JobConf() // Create a job conf // Configure the job conf with our accumulo properties AccumuloInputFormat.setConnectorInfo(jobConf,principal,token) AccumuloInputFormat.setScanAuthorizations(jobConf,authorizations) val clientConfig = new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers) AccumuloInputFormat.setZooKeeperInstance(jobConf,clientConfig) AccumuloInputFormat.setInputTableName(jobConf,tableName) // Create an RDD using the jobConf val rdd2 = sc.newAPIHadoopRDD(jobConf,classOf[org.apache.accumulo.core.data.Value] ) 注意:在深入研究代码之后,看来配置属性的设置部分基于被调用的类(有意识地避免与其他包冲突),所以当我们去具体类中时后来它找不到配置好的标志.解决方法是不使用Abstract类.有关实施细节,请参阅https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127).如果你不能使用spark-notebook在具体的实现上调用这个方法,可能使用spark-shell或定期构建的应用程序是最简单的解决方案. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |