加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 关于访问Tuple2中的字段的错误

发布时间:2020-12-16 10:00:53 所属栏目:安全 来源:网络整理
导读:我试图访问Tuple2中的字段,编译器返回错误.该软件尝试在kafka主题中推送案例类,然后我想使用spark streaming恢复它,这样我就可以提供机器学习算法并将结果保存在mongo实例中. 解决了! 我终于解决了我的问题,我将发布最终解决方案: 这是github项目: https:
我试图访问Tuple2中的字段,编译器返回错误.该软件尝试在kafka主题中推送案例类,然后我想使用spark streaming恢复它,这样我就可以提供机器学习算法并将结果保存在mongo实例中.

解决了!

我终于解决了我的问题,我将发布最终解决方案:

这是github项目:

https://github.com/alonsoir/awesome-recommendation-engine/tree/develop

build.sbt

name := "my-recommendation-spark-engine"

version := "1.0-SNAPSHOT"

scalaVersion := "2.10.4"

val sparkVersion = "1.6.1"

val akkaVersion = "2.3.11" // override Akka to be this version to match the one in Spark

libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.10" % "0.8.1"
  exclude("javax.jms","jms")
  exclude("com.sun.jdmk","jmxtools")
  exclude("com.sun.jmx","jmxri"),//not working play module!! check
 //jdbc,//anorm,//cache,// HTTP client
 "net.databinder.dispatch" %% "dispatch-core" % "0.11.1",// HTML parser
 "org.jodd" % "jodd-lagarto" % "3.5.2","com.typesafe" % "config" % "1.2.1","com.typesafe.play" % "play-json_2.10" % "2.4.0-M2","org.scalatest" % "scalatest_2.10" % "2.2.1" % "test","org.twitter4j" % "twitter4j-core" % "4.0.2","org.twitter4j" % "twitter4j-stream" % "4.0.2","org.codehaus.jackson" % "jackson-core-asl" % "1.6.1","org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test","org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1","org.apache.spark" % "spark-core_2.10" % "1.6.1","org.apache.spark" % "spark-streaming_2.10" % "1.6.1","org.apache.spark" % "spark-sql_2.10" % "1.6.1","org.apache.spark" % "spark-mllib_2.10" % "1.6.1","com.google.code.gson" % "gson" % "2.6.2","commons-cli" % "commons-cli" % "1.3.1","com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",// Akka
 "com.typesafe.akka" %% "akka-actor" % akkaVersion,"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,// MongoDB
 "org.reactivemongo" %% "reactivemongo" % "0.10.0"
 )

 packAutoSettings

 //play.Project.playScalaSettings

卡夫卡制片人

package example.producer

import play.api.libs.json._
import example.utils._
import scala.concurrent.Future
import example.model.{AmazonProductAndRating,AmazonProduct,AmazonRating}
import example.utils.AmazonPageParser
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future


/**
args(0) : productId
args(1) : userdId

Usage: ./amazon-producer-example 0981531679 someUserId 3.0
*/
object AmazonProducerExample {

def main(args: Array[String]): Unit = {

val productId = args(0).toString
val userId = args(1).toString
val rating = args(2).toDouble
val topicName = "amazonRatingsTopic"

val producer = Producer[String](topicName)

//0981531679 is Scala Puzzlers...
AmazonPageParser.parse(productId,userId,rating).onSuccess { case amazonRating =>
  //Is this the correct way? the best performance? possibly not,what about using avro or parquet? How can i push data in avro or parquet format?
  //You can see that i am pushing json String to kafka topic,not raw String,but is there any difference? 
  //of course there are differences...
  producer.send(Json.toJson(amazonRating).toString)
  //producer.send(amazonRating.toString)
  println("amazon product with rating sent to kafka cluster..." + amazonRating.toString)
  System.exit(0)
}

}
}

这是必要的案例类(UPDATED)的定义,该文件名为models.scala:

package example.model

import play.api.libs.json.Json
import reactivemongo.bson.Macros

case class AmazonProduct(itemId: String,title: String,url: String,img: String,description: String)
case class AmazonRating(userId: String,productId: String,rating: Double)

case class AmazonProductAndRating(product: AmazonProduct,rating: AmazonRating)

// For MongoDB
object AmazonRating {
implicit val amazonRatingHandler = Macros.handler[AmazonRating]
implicit val amazonRatingFormat = Json.format[AmazonRating]
//added using @Yuval tip
lazy val empty: AmazonRating = AmazonRating("-1","-1",-1d)
}

这是火花流程的完整代码:

package example.spark

import java.io.File
import java.util.Date

import play.api.libs.json._
import com.google.gson.{Gson,GsonBuilder,JsonParser}
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

import com.mongodb.casbah.Imports._
import com.mongodb.QueryBuilder
import com.mongodb.casbah.MongoClient
import com.mongodb.casbah.commons.{MongoDBList,MongoDBObject}

import reactivemongo.api.MongoDriver
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.BSONDocument

import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import example.model._

import example.utils.Recommender

/**
* Collect at least the specified number of json amazon products in order to feed recomedation system and feed mongo instance with results.

Usage: ./amazon-kafka-connector 127.0.0.1:9092 amazonRatingsTopic

on mongo shell:

use alonsodb;
db.amazonRatings.find();
*/
object AmazonKafkaConnector {

private var numAmazonProductCollected = 0L
private var partNum = 0
private val numAmazonProductToCollect = 10000000

//this settings must be in reference.conf
private val Database = "alonsodb"
private val ratingCollection = "amazonRatings"
private val MongoHost = "127.0.0.1"
private val MongoPort = 27017
private val MongoProvider = "com.stratio.datasource.mongodb"

private val jsonParser = new JsonParser()
private val gson = new GsonBuilder().setPrettyPrinting().create()

private def prepareMongoEnvironment(): MongoClient = {
  val mongoClient = MongoClient(MongoHost,MongoPort)
  mongoClient
}

private def closeMongoEnviroment(mongoClient : MongoClient) = {
  mongoClient.close()
  println("mongoclient closed!")
}

private def cleanMongoEnvironment(mongoClient: MongoClient) = {
  cleanMongoData(mongoClient)
  mongoClient.close()
}

private def cleanMongoData(client: MongoClient): Unit = {
  val collection = client(Database)(ratingCollection)
  collection.dropCollection()
}

def main(args: Array[String]) {
// Process program arguments and set properties

if (args.length < 2) {
  System.err.println("Usage: " + this.getClass.getSimpleName + " <brokers> <topics>")
  System.exit(1)
}

val Array(brokers,topics) = args

println("Initializing Streaming Spark Context and kafka connector...")
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
                               .setMaster("local[4]")
                                .set("spark.driver.allowMultipleContexts","true")

val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar")
val ssc = new StreamingContext(sparkConf,Seconds(2))
//this checkpointdir should be in a conf file,for now it is hardcoded!
val streamingCheckpointDir = "/Users/aironman/my-recommendation-spark-engine/checkpoint"
ssc.checkpoint(streamingCheckpointDir)

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet)
println("Initialized Streaming Spark Context and kafka connector...")

//create recomendation module
println("Creating rating recommender module...")
val ratingFile= "ratings.csv"
val recommender = new Recommender(sc,ratingFile)
println("Initialized rating recommender module...")
//THIS IS THE MOST INTERESTING PART AND WHAT I NEED!
//THE SOLUTION IS NOT PROBABLY THE MOST EFFICIENT,BECAUSE I HAD TO 
//USE DATAFRAMES,ARRAYs and SEQs BUT IS FUNCTIONAL!
try{
messages.foreachRDD(rdd => {
 val count = rdd.count()
 if (count > 0){
   val json= rdd.map(_._2)
   val dataFrame = sqlContext.read.json(json) //converts json to DF
   val myRow = dataFrame.select(dataFrame("userId"),dataFrame("productId"),dataFrame("rating")).take(count.toInt)
   println("myRow is: " + myRow)

   val myAmazonRating = AmazonRating(myRow(0).getString(0),myRow(0).getString(1),myRow(0).getDouble(2))
   println("myAmazonRating is: " + myAmazonRating.toString)
   val arrayAmazonRating = Array(myAmazonRating)
   //this method needs Seq[AmazonRating]
   recommender.predictWithALS(arrayAmazonRating.toSeq)
   }//if
})      
}catch{
  case e: IllegalArgumentException => {println("illegal arg. exception")};
  case e: IllegalStateException    => {println("illegal state exception")};
  case e: ClassCastException       => {println("ClassCastException")};
  case e: Exception                => {println(" Generic Exception")};
}finally{

  println("Finished taking data from kafka topic...")
}

ssc.start()
ssc.awaitTermination()

println("Finished!")
}
}

谢谢大家,@ Yuval,@ Emecas和@ Riccardo.cardin.

Recommender.predict签名方法如下:

def predict(ratings: Seq[AmazonRating]) = {
  // train model
  val myRatings = ratings.map(toSparkRating)
  val myRatingRDD = sc.parallelize(myRatings)

  val startAls = DateTime.now
  val model = ALS.train((sparkRatings ++ myRatingRDD).repartition(NumPartitions),10,20,0.01)

  val myProducts = myRatings.map(_.product).toSet
  val candidates = sc.parallelize((0 until productDict.size).filterNot(myProducts.contains))

  // get ratings of all products not in my history ordered by rating (higher first) and only keep the first NumRecommendations
   val myUserId = userDict.getIndex(MyUsername)
   val recommendations = model.predict(candidates.map((myUserId,_))).collect
   val endAls = DateTime.now
   val result = recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
   val alsTime = Seconds.secondsBetween(startAls,endAls).getSeconds

   println(s"ALS Time: $alsTime seconds")
   result
   }

//我想我一直都很清楚,告诉我你是否需要更多东西,感谢你耐心教我@Yuval

解决方法

诊断

IllegalStateException表示您正在通过ACTIVE或STOPPED的StreamingContext进行操作. see details here (lines 218-231)

java.lang.IllegalStateException: Adding new inputs,transformations,and output operations after starting a context is not supported

代码审查

通过观察您的代码AmazonKafkaConnector,您正在通过名为:messages的同一DirectStream对象将map,filter和foreachRDD转换为另一个foreachRDD.

一般建议:

成为我的朋友,通过将您的逻辑划分为您要执行的每个任务的小块:

>流媒体
> ML推荐
>坚持
>等

这将有助于您更轻松地理解和调试要实现的Spark管道.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读