scala – 使用Dataflow将PubSub流写入云存储时出错
发布时间:2020-12-16 10:01:03 所属栏目:安全 来源:网络整理
导读:使用Spotify中的 SCIO为Dataflow编写作业,按照示例 e.g1和 e.g2将PubSub流写入GCS,但是对于以下代码,请收到以下错误 错误 Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 码 object S
使用Spotify中的
SCIO为Dataflow编写作业,按照示例
e.g1和
e.g2将PubSub流写入GCS,但是对于以下代码,请收到以下错误
错误 Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 码 object StreamingPubSub { def main(cmdlineArgs: Array[String]): Unit = { // set up example wiring val (opts,args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs) val dataflowUtils = new DataflowExampleUtils(opts) dataflowUtils.setup() val sc = ScioContext(opts) sc.pubsubTopic(opts.getPubsubTopic) .timestampBy { _ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong) } .withFixedWindows((Duration.standardHours(1))) .groupBy(_ => Unit) .toWindowed .toSCollection .saveAsTextFile(args("output")) val result = sc.close() // CTRL-C to cancel the streaming pipeline dataflowUtils.waitToFinish(result.internal) } } 我可能将窗口概念与Bounded PCollection混淆,有没有办法实现这一点,或者我需要应用一些转换来实现这一点,任何人都可以对此有所帮助 解决方法
我相信SCIO下面的saveAsTextFile使用Dataflow的Write转换,它只支持有界PCollections. Dataflow尚未提供直接API来向Google Cloud Storage写入无限制的PCollection,尽管这是我们正在研究的内容.
要在某处保持无限制的PCollection,请考虑,例如,BigQuery,Datastore或Bigtable.在SCIO的API中,您可以使用例如saveAsBigQuery. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容