加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 从Spark错误Upsert到CosmosDB

发布时间:2020-12-16 18:10:03 所属栏目:安全 来源:网络整理
导读:我是Spark / CosmosDB / Python的新手,所以我在尝试自己创建一些东西时,会从MS站点和GitHub中获取代码示例.经过与Spark-CosmosDB连接器的长期斗争,我能够从CosmosDB集合中读取数据.现在我想反过来(upsert),但发现了另一个障碍.这是一个例子,我正在努力: Wri
我是Spark / CosmosDB / Python的新手,所以我在尝试自己创建一些东西时,会从MS站点和GitHub中获取代码示例.经过与Spark-CosmosDB连接器的长期斗争,我能够从CosmosDB集合中读取数据.现在我想反过来(upsert),但发现了另一个障碍.这是一个例子,我正在努力:
Writing to Cosmos DB section.

我能够从宇宙中读取,并对数据进行处理,但我无法插回到Cosmos.以下是我稍加修改的代码:

%%configure
{ "name":"Spark-to-Cosmos_DB_Connector","jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.2.0_2.11-1.1.0.jar","wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar","wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar","wasb:///example/jars/1.0.0/json-20140107.jar","wasb:///example/jars/1.0.0/rxjava-1.3.0.jar","wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],"conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

# Read Configuration
readConfig = {
  "Endpoint" : "https://doctorwho.documents.azure.com:443/","Masterkey" : "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==","Database" : "DepartureDelays","preferredRegions" : "Central US;East US2","Collection" : "flights_pcoll","SamplingRatio" : "1.0","schema_samplesize" : "1000","query_pagesize" : "2147483647","query_custom" : "SELECT c.date,c.delay,c.distance,c.origin,c.destination FROM c WHERE c.origin = 'SEA'"
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()

# Write configuration
writeConfig = {
 "Endpoint" : "https://doctorwho.documents.azure.com:443/","Upsert" : "true"
}

# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(**writeConfig).save()

所以,当我尝试运行它时,我得到:

An error occurred while calling o90.save.
: java.lang.UnsupportedOperationException: Writing in a non-empty collection.

快速谷歌搜索后,我尝试添加模式(“追加”)到我的最后一行:

flights.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

不幸的是,这给我留下了一个我无法理解的错误:

An error occurred while calling o127.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 4.0 failed 4 times,most recent failure: Lost task 2.3 in stage 4.0 (TID 90,wn2-MDMstr.zxmmgisclg5udfemnv0v3qva3e.ax.internal.cloudapp.net,executor 2): java.lang.NoClassDefFoundError: com/microsoft/azure/documentdb/bulkexecutor/DocumentBulkExecutor

这是完整的堆栈跟踪:error in pastebin

有人可以帮我解决这个错误吗?在使用我自己的cosmosDB时,我也收到了完全相同的错误,而不是文档中的示例.

我正在使用带有PySpark3内核的Jupyter笔记本. Spark 2.2版,HDInsight集群3.6.

编辑
我不想只是等待回复,所以我尝试用Scala做同样的事情.你猜怎么着?相同的错误(或至少非常相似):Scala error

这是我的Scala代码:

%%configure
{ "name":"Spark-to-Cosmos_DB_Connector","conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

val readConfig = Config(Map(
  "Endpoint" -> "https://$my_cosmos_db.documents.azure.com:443/","Masterkey" -> "$my_key","Database" -> "test","PreferredRegions" -> "West Europe","Collection" -> "$my_collection","SamplingRatio" -> "1.0"
))
val docs = spark.read.cosmosDB(readConfig)

docs.show()

val writeConfig = Config(Map(
  "Endpoint" -> "https://$my_cosmos_db.documents.azure.com:443/","WritingBatchSize" -> "100"
))




val someData = Seq(
    Row(8,"bat"),Row(64,"mouse"),Row(-27,"test_name")
)

val someSchema = List(
  StructField("number",IntegerType,true),StructField("name",StringType,true)
)

val someDF = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),StructType(someSchema)
)

someDF.show()

someDF.write.mode(SaveMode.Append).cosmosDB(writeConfig)

也许这会对故障排除有所帮助.

谢谢!

解决方法

对于使用python时的第一个问题,请注意您使用的是Azure Cosmos DB集合的doctor.这是一个演示集合,我们提供了只读密钥而不是写密钥.因此,您收到的错误是缺少对集合的写访问权限.

对于第二个问题,来自pastebin的错误看起来是一样的.这样说,一些快速观察:

>您使用的是HDI 3.6,如果您使用的是Spark 2.1,则使用的JAR适用于Spark 2.2.如果您正在使用HDI 3.7,那么它在Spark 2.2上,然后您使用正确的jar.
>您可能希望使用maven坐标来获取最新版本的JAR.请注意azure-cosmosdb-spark> Using Jupyter Notebooks获取更多信息.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读