scala – 使用Async HTTP调用的Spark作业
发布时间:2020-12-16 19:15:37 所属栏目:安全 来源:网络整理
导读:我从URL列表构建一个RDD,然后尝试使用一些异步http调用获取数据. 在进行其他计算之前我需要所有结果. 理想情况下,我需要在不同节点上进行http调用以进行缩放考虑. 我做了这样的事情: //init sparkval sparkContext = new SparkContext(conf)val datas = Seq
我从URL列表构建一个RDD,然后尝试使用一些异步http调用获取数据.
在进行其他计算之前我需要所有结果. 理想情况下,我需要在不同节点上进行http调用以进行缩放考虑. 我做了这样的事情: //init spark val sparkContext = new SparkContext(conf) val datas = Seq[String]("url1","url2") //create rdd val rdd = sparkContext.parallelize[String](datas) //httpCall return Future[String] val requests = rdd.map((url: String) => httpCall(url)) //await all results (Future.sequence may be better) val responses = requests.map(r => Await.result(r,10.seconds)) //print responses response.collect().foreach((s: String) => println(s)) //stop spark sparkContext.stop() 这项工作,但Spark工作永远不会完成! 所以我想知道使用Spark(或Future [RDD])处理Future的最佳实践是什么. 我认为这个用例看起来很常见,但还没有找到任何答案. 最好的祝福 解决方法
不是真的,因为它根本无法正常工作(可能).由于每个任务都在标准的Scala迭代器上运行,因此这些操作将被压缩在一起.这意味着所有操作都将在实践中阻塞.假设您有三个URL [“x”,“y”,“z”],您的代码将按以下顺序执行: Await.result(httpCall("x",10.seconds)) Await.result(httpCall("y",10.seconds)) Await.result(httpCall("z",10.seconds)) 您可以轻松地在本地重现相同的行为.如果要异步执行代码,则应使用mapPartitions显式处理: rdd.mapPartitions(iter => { ??? // Submit requests ??? // Wait until all requests completed and return Iterator of results }) 但这比较棘手.无法保证给定分区的所有数据都适合内存,因此您可能也需要一些批处理机制. 所有这一切都说我无法重现你所描述的问题可能是一些配置问题或httpCall本身的问题. 在旁注上允许单个超时终止整个任务看起来不是一个好主意. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |