使用ES的bulk接口导入批量数据(python进行格式化,curl提交)
发布时间:2020-12-17 17:22:09 所属栏目:Python 来源:网络整理
导读:今天PHP站长网 52php.cn把收集自互联网的代码分享给大家,仅供参考。 #!/bin/env python# -*- coding: utf-8 -*-'''修改:2015/9/25 ver.2原因:ver.1 要使用elasticsearch的官方库,不方便,这版使用bulk接口,curl提交
以下代码由PHP站长网 52php.cn收集自互联网 现在PHP站长网小编把它分享给大家,仅供参考 #!/bin/env python # -*- coding: utf-8 -*- ''' 修改:2015/9/25 ver.2 原因:ver.1 要使用elasticsearch的官方库,不方便,这版使用bulk接口,curl提交 修改:2015/9/30 ver.3 原因:封装成class,方便调用 ''' import sys import os from optparse import OptionParser from datetime import datetime import subprocess as sub import json class loadDataToES: def __init__(self,field_desc,data_file,host='127.0.0.1',port='9200',index='test',doctype='others',delimeter=',',tmp_file='/dev/shm/_tmp_data_to_es',cut_off=10000): self.host = host self.port = port self.index = index self.doctype = doctype self.delimeter = delimeter self.tmp_file = tmp_file self.field_desc = field_desc self.data_file = data_file self.header = '{"index":{"_index":"%s","_type":"%s"}}' %(self.index,self.doctype) self.cut_off = cut_off self.url = 'http://%s:%s/_bulk' %(self.host,self.port) def load_data(self): ''' expample data from the file: 2015-09-24 09:17:29,memory_11601,123988 ''' self.body_list = [] self.bulk = '' self.line_num = 0 self.pretty_print('INFO: loadding data to es,host: %s,index: %s' %(self.host,self.index)) self.parse_field() with open(self.data_file,'r') as f_desc: for line in f_desc: self.do_line(line) self.line_num += 1 if self.line_num >= self.cut_off: self.bulk_content = 'n'.join(self.body_list) self._load_data() self.body_list = [] self.bulk = '' self.line_num = 0 if self.line_num > 0: self.bulk_content = 'n'.join(self.body_list) self._load_data() self.pretty_print('INFO: all lines parsed.') def do_line(self,line): fields = line.strip().split(self.delimeter) if len(fields) != self.field_len: self.pretty_print("ERROR: line %d not match fields" % line_num) return body_tmp = str(self.get_body(fields,self.fields_list)) self.body_list.append(self.header) self.body_list.append(body_tmp.replace("'",'"')) def parse_field(self): fields_list = [] fields_desc = self.field_desc.strip().split(self.delimeter) for item in fields_desc: items = item.split('|') fields_list.append([items[0],items[1]]) self.fields_list = fields_list self.field_len = len(fields_list) def _load_data(self): open(self.tmp_file,'w').write(self.bulk_content) p = sub.Popen(['curl','-s','-XPOST',self.url,'--data-binary',"@" + self.tmp_file ],stdout=sub.PIPE) for line in iter(p.stdout.readline,b''): ret_dict = json.loads(line) if not ret_dict['errors']: self.pretty_print("INFO: %6s lines parseed with no errors,total cost %d ms." %(len(ret_dict['items']),ret_dict['took'])) else: self.pretty_print("ERROR: %6s lines parseed with some errors,ret_dict['took'])) def pretty_print(self,str): print('%s %s' %(datetime.now(),str)) def get_body(self,fields,fields_list): counter = 0 body = {} while (counter < len(fields)): # if the data type is 'date',we will translate the value str to date # type if fields_list[counter][1] == 'date': body[fields_list[counter][0]] = self.translate_str_to_date( fields[counter]) # and if the data type is 'int',we translate it to int elif fields_list[counter][1] == 'int': body[fields_list[counter][0]] = self.translate_str_to_int( fields[counter]) elif fields_list[counter][1] == 'float': body[fields_list[counter][0]] = self.translate_str_to_float( fields[counter]) # other is defalut to be str else: body[fields_list[counter][0]] = fields[counter] counter += 1 # print(my_body) return body def translate_str_to_date(self,date_str): try: date = datetime.strptime(date_str,'%Y-%m-%d %H:%M:%S') return date.isoformat() except: self.pretty_print("Unexpected error: %s" % (sys.exc_info()[0])) self.pretty_print("Failed to translate '%s' to date." % (date_str)) return False def translate_str_to_int(self,num_str): try: return int(num_str) except: self.pretty_print("Failed to translate '%s' to int." % (num_str)) return False def translate_str_to_float(self,num_str): try: return float(num_str) except: self.pretty_print("Failed to translate '%s' to int." % (num_str)) return False if __name__ == '__main__': ''' example fields_desc:@timestamp|date,process|str,mem|int example lines in file: 2015-09-24 09:17:29,203532 2015-09-24 09:17:29,memory_11602,223112 ''' loader = loadDataToES(field_desc='@timestamp|date,mem|int',data_file='/root/scripts/in.data',host='10.21.102.60',index = 'test') loader.load_data() 以上内容由PHP站长网【52php.cn】收集整理供大家参考研究 如果以上内容对您有帮助,欢迎收藏、点赞、推荐、分享。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |