scala – Spark 1.6:java.lang.IllegalArgumentException:spar
发布时间:2020-12-16 09:03:56 所属栏目:安全 来源:网络整理
导读:当我运行以下代码时,我使用spark 1.6并遇到上述问题: // Importsimport org.apache.spark.sql.hive.HiveContextimport org.apache.spark.{SparkConf,SparkContext}import org.apache.spark.sql.SaveModeimport scala.concurrent.ExecutionContext.Implicits
当我运行以下代码时,我使用spark 1.6并遇到上述问题:
// Imports import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.sql.SaveMode import scala.concurrent.ExecutionContext.Implicits.global import java.util.Properties import scala.concurrent.Future // Set up spark on local with 2 threads val conf = new SparkConf().setMaster("local[2]").setAppName("app") val sc = new SparkContext(conf) val sqlCtx = new HiveContext(sc) // Create fake dataframe import sqlCtx.implicits._ var df = sc.parallelize(1 to 50000).map { i => (i,i,i) }.toDF("a","b","c","d","e","f","g").repartition(2) // Write it as a parquet file df.write.parquet("/tmp/parquet1") df = sqlCtx.read.parquet("/tmp/parquet1") // JDBC connection val url = s"jdbc:postgresql://localhost:5432/tempdb" val prop = new Properties() prop.setProperty("user","admin") prop.setProperty("password","") // 4 futures - at least one of them has been consistently failing for val x1 = Future { df.write.jdbc(url,"temp1",prop) } val x2 = Future { df.write.jdbc(url,"temp2",prop) } val x3 = Future { df.write.jdbc(url,"temp3",prop) } val x4 = Future { df.write.jdbc(url,"temp4",prop) } 这是github要点:https://gist.github.com/karanveerm/27d852bf311e39f05491 我得到的错误是: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1482) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:247) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:306) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] at writer.SQLWriter$.writeDf(Writer.scala:75) ~[temple.temple-1.0-sans-externalized.jar:na] at writer.Writer$.writeDf(Writer.scala:33) ~[temple.temple-1.0-sans-externalized.jar:na] at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:460) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:452) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.7.jar:na] 这是火花虫还是我做错了什么/任何解决方法? 解决方法
尝试几件事情后,我发现全局ForkJoinPool创建的一个线程将其spark.sql.execution.id属性设置为随机值.
我无法确定实际执行的过程,但我可以使用我自己的ExecutionContext来解决这个问题. import java.util.concurrent.Executors import concurrent.ExecutionContext val executorService = Executors.newFixedThreadPool(4) implicit val ec = ExecutionContext.fromExecutorService(executorService) 我使用了http://danielwestheide.com/blog/2013/01/16/the-neophytes-guide-to-scala-part-9-promises-and-futures-in-practice.html的代码.也许ForkJoinPool在创建新的时候克隆线程属性,并且如果在SQL执行的上下文中发生这种情况,它将获得其非空值,而FixedThreadPool将在实例化时创建线程. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐
热点阅读