python-气流DAG-如何首先检查BQ(如有必要,请删除),然后运行数据
我正在使用云编写器来协调ETL,以获取到达BigCS的GCS中到达的文件.我有一个云函数,当文件到达时会触发dag,而云函数会将文件名/位置传递给DAG.在我的DAG中,我有2个任务: 1)使用DataflowPythonOperator运行一个数据流作业,该作业从GCS中的文本读取数据并将其转换并将其输入到BQ中,以及2)根据该文件是失败还是成功将文件移动到失败/成功存储段. 我调查了其他气流运算符,但在运行数据流作业之前希望在DAG中有2个任务: >根据文件名获取文件ID(现在,我有一个bigquery表映射文件名->文件ID,但我也可以引入一个用作地图的json,我想这是否更容易) 在完成数据流作业之后,理想情况下,在将文件移至成功/失败文件夹之前,我想附加到一些“记录”表中,以表明此时已输入了该游戏.这将是我查看发生的所有插入的方式. 谢谢,我非常感谢大家的帮助,如果您不清楚您的意思,我们深表歉意.有关气流的文档非常强大,但是鉴于云作曲家和bigquery相对较新,因此很难彻底学习如何做一些GCP特定任务. 最佳答案
听起来有点复杂.很高兴,几乎所有的GCP服务都有操作符.另一件事是何时触发DAG执行.你知道了吗?您希望每次GCS存储桶中有新文件进入时触发Google Cloud Function运行.
>触发您的DAG 要触发DAG,您需要使用依赖于Object Finalize或Metadata Update触发器的Google Cloud Function来调用它. >将数据加载到BigQuery 如果您的文件已经是GCS格式,并且已经采用JSON或CSV格式,那么使用数据流作业就显得过头了.您可以使用GoogleCloudStorageToBigQueryOperator将文件加载到BQ. >跟踪文件ID 计算文件ID的最好方法是使用Airflow的Bash或Python运算符.您可以直接从文件名中派生它吗? 如果是这样,那么您可以在GoogleCloudStorageObjectSensor上游使用一个Python运算符来检查文件是否在成功目录中. 如果是这样,则可以使用BigQueryOperator在BQ上运行删除查询. 之后,您运行GoogleCloudStorageToBigQueryOperator. >移动文件 如果要将文件从GCS移到GCS位置,则GoogleCloudStorageToGoogleCloudStorageOperator应该可以满足您的需要.如果您的BQ加载运算符失败,则移至失败的文件位置,如果成功,则移至成功的作业位置. >记录任务日志 可能需要跟踪插入的所有操作都是将任务信息记录到GCS.签出how to log task information to GCS 有帮助吗? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |