ruby-on-rails – 使用API??将压缩数据上传到Google BigQuery
在过去的几天里,我一直在努力改进logstash google_bigquery连接器.
目前我能够添加错误处理(坏线),更好的连接管理和其他一些东西等功能. 我一直在努力的最后但最重要的功能是将压缩数据上传到BigQuery,并且API的文档很糟糕. 现在我可以使用Jobs.insert方法将CSV文件直接上传到BQ 唯一的问题是我可以在不使用谷歌云存储的情况下这样做,因为压缩选项可以降低网络带宽和成本,而增加另一条路线(花钱)如GCS则毫无意义 我得到的错误代码是:
我将使用代码切入主案例,并感谢您的帮助 # Uploads a local file to the configured bucket. def upload_object(filename) @logger.debug("entering upload_object") begin @logger.debug("1") require 'json' @logger.debug("2") table_id = @table_prefix + "_" + get_date_pattern(filename) @logger.debug("3") # BQ does not accept anything other than alphanumeric and _ # Ref: https://developers.google.com/bigquery/browser-tool-quickstart?hl=en table_id = table_id.gsub(':','_').gsub('-','_') @logger.debug("table bane has been modified") @logger.debug("BQ: upload object.",:filename => filename,:table_id => table_id) media = Google::APIClient::UploadIO.new(filename,"application/octet-stream") body = { "configuration" => { "load" => { "sourceFormat" => "NEWLINE_DELIMITED_JSON","schema" => @json_schema,"destinationTable" => { "projectId" => @project_id,"datasetId" => @dataset,"tableId" => table_id },'createDisposition' => 'CREATE_IF_NEEDED','writeDisposition' => 'WRITE_APPEND','maxBadRecords' => 99 } } } @logger.debug("Execution details: ",:body_object => body,:parameters => { 'uploadType' => 'multipart','projectId' => @project_id },:media => media) datasetId = @project_id+":"+@dataset verify_dataset = @client.execute(:api_method => @bq.datasets.get,:parameters => { 'projectId' => @project_id,'datasetId' => datasetId }) status = JSON.parse(verify_dataset.response.body)["id"] if status != dataset @logger.info("BQ: dataset doesnt exist,creating it instead") create_dataset = @client.execute(:api_method => @bq.datasets.insert,:parameters => { 'projectId' => @project_id },:body_object => { 'id' => datasetId }) sleep 10 end insert_result = @client.execute(:api_method => @bq.jobs.insert,:parameters => { 'uploadType' => 'multipart','projectId' => @project_id },:media => media) job_id = JSON.parse(insert_result.response.body)["jobReference"]["jobId"] @logger.debug("BQ: multipart insert",:job_id => job_id) return job_id rescue => e @logger.error("BQ: failed to upload file",:exception => e) # TODO(rdc): limit retries? sleep 1 if File.exist?(filename) retry end end end 解决方法
我们这边的错误是该文件似乎不是一个有效的gzip文件,并且gzip库无法打开它.
这可能是文件生成方式或上传方式的问题.如果您仍然可以访问该文件,是否可以验证您是否能够解压缩它? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |