加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用Async HTTP调用的Spark作业

发布时间:2020-12-16 19:15:37 所属栏目:安全 来源:网络整理
导读:我从URL列表构建一个RDD,然后尝试使用一些异步http调用获取数据. 在进行其他计算之前我需要所有结果. 理想情况下,我需要在不同节点上进行http调用以进行缩放考虑. 我做了这样的事情: //init sparkval sparkContext = new SparkContext(conf)val datas = Seq
我从URL列表构建一个RDD,然后尝试使用一些异步http调用获取数据.
在进行其他计算之前我需要所有结果.
理想情况下,我需要在不同节点上进行http调用以进行缩放考虑.

我做了这样的事情:

//init spark
val sparkContext = new SparkContext(conf)
val datas = Seq[String]("url1","url2")

//create rdd
val rdd = sparkContext.parallelize[String](datas)

//httpCall return Future[String]
val requests = rdd.map((url: String) => httpCall(url))

//await all results (Future.sequence may be better)
val responses = requests.map(r => Await.result(r,10.seconds))

//print responses
response.collect().foreach((s: String) => println(s))

//stop spark
sparkContext.stop()

这项工作,但Spark工作永远不会完成!

所以我想知道使用Spark(或Future [RDD])处理Future的最佳实践是什么.

我认为这个用例看起来很常见,但还没有找到任何答案.

最好的祝福

解决方法

this use case looks pretty common

不是真的,因为它根本无法正常工作(可能).由于每个任务都在标准的Scala迭代器上运行,因此这些操作将被压缩在一起.这意味着所有操作都将在实践中阻塞.假设您有三个URL [“x”,“y”,“z”],您的代码将按以下顺序执行:

Await.result(httpCall("x",10.seconds))
Await.result(httpCall("y",10.seconds))
Await.result(httpCall("z",10.seconds))

您可以轻松地在本地重现相同的行为.如果要异步执行代码,则应使用mapPartitions显式处理:

rdd.mapPartitions(iter => {
  ??? // Submit requests
  ??? // Wait until all requests completed and return Iterator of results
})

但这比较棘手.无法保证给定分区的所有数据都适合内存,因此您可能也需要一些批处理机制.

所有这一切都说我无法重现你所描述的问题可能是一些配置问题或httpCall本身的问题.

在旁注上允许单个超时终止整个任务看起来不是一个好主意.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读