加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用Dataflow将PubSub流写入云存储时出错

发布时间:2020-12-16 10:01:03 所属栏目:安全 来源:网络整理
导读:使用Spotify中的 SCIO为Dataflow编写作业,按照示例 e.g1和 e.g2将PubSub流写入GCS,但是对于以下代码,请收到以下错误 错误 Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 码 object S
使用Spotify中的 SCIO为Dataflow编写作业,按照示例 e.g1和 e.g2将PubSub流写入GCS,但是对于以下代码,请收到以下错误

错误

Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection

object StreamingPubSub {
  def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts,args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()

val sc = ScioContext(opts)


sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
    _ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
  }
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))


val result = sc.close()

// CTRL-C to cancel the streaming pipeline
    dataflowUtils.waitToFinish(result.internal)
  }
}

我可能将窗口概念与Bounded PCollection混淆,有没有办法实现这一点,或者我需要应用一些转换来实现这一点,任何人都可以对此有所帮助

解决方法

我相信SCIO下面的saveAsTextFile使用Dataflow的Write转换,它只支持有界PCollections. Dataflow尚未提供直接API来向Google Cloud Storage写入无限制的PCollection,尽管这是我们正在研究的内容.

要在某处保持无限制的PCollection,请考虑,例如,BigQuery,Datastore或Bigtable.在SCIO的API中,您可以使用例如saveAsBigQuery.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读