加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

使用ES的bulk接口导入批量数据(python进行格式化,curl提交)

发布时间:2020-12-17 17:22:09 所属栏目:Python 来源:网络整理
导读:今天PHP站长网 52php.cn把收集自互联网的代码分享给大家,仅供参考。 #!/bin/env python# -*- coding: utf-8 -*-'''修改:2015/9/25 ver.2原因:ver.1 要使用elasticsearch的官方库,不方便,这版使用bulk接口,curl提交

以下代码由PHP站长网 52php.cn收集自互联网

现在PHP站长网小编把它分享给大家,仅供参考

#!/bin/env python
# -*- coding: utf-8 -*-
'''
修改:2015/9/25 ver.2
原因:ver.1 要使用elasticsearch的官方库,不方便,这版使用bulk接口,curl提交
修改:2015/9/30 ver.3
原因:封装成class,方便调用
'''

import sys
import os
from optparse import OptionParser
from datetime import datetime
import subprocess as sub
import json

class loadDataToES:
    def __init__(self,field_desc,data_file,host='127.0.0.1',port='9200',index='test',doctype='others',delimeter=',',tmp_file='/dev/shm/_tmp_data_to_es',cut_off=10000):
        self.host = host
        self.port = port
        self.index = index
        self.doctype = doctype
        self.delimeter = delimeter
        self.tmp_file = tmp_file
        self.field_desc = field_desc
        self.data_file = data_file
        self.header = '{"index":{"_index":"%s","_type":"%s"}}' %(self.index,self.doctype)
        self.cut_off = cut_off
        self.url = 'http://%s:%s/_bulk' %(self.host,self.port)

    def load_data(self):
        '''
        expample data from the file:
        2015-09-24 09:17:29,memory_11601,123988
        '''
        self.body_list = []
        self.bulk = ''
        self.line_num = 0
        self.pretty_print('INFO: loadding data to es,host: %s,index: %s' %(self.host,self.index))
        self.parse_field()
        with open(self.data_file,'r') as f_desc:
            for line in f_desc:
                self.do_line(line)
                self.line_num += 1
                if self.line_num >= self.cut_off:
                    self.bulk_content = 'n'.join(self.body_list)
                    self._load_data()
                    self.body_list = []
                    self.bulk = ''
                    self.line_num = 0
            if self.line_num > 0:
                self.bulk_content = 'n'.join(self.body_list)
                self._load_data()
        self.pretty_print('INFO: all lines parsed.')

    def do_line(self,line):
        fields = line.strip().split(self.delimeter)
        if len(fields) != self.field_len:
            self.pretty_print("ERROR: line %d not match fields" % line_num)
            return
        body_tmp = str(self.get_body(fields,self.fields_list))
        self.body_list.append(self.header)
        self.body_list.append(body_tmp.replace("'",'"'))


    def parse_field(self):
        fields_list = []
        fields_desc = self.field_desc.strip().split(self.delimeter)
        for item in fields_desc:
            items = item.split('|')
            fields_list.append([items[0],items[1]])
        self.fields_list = fields_list
        self.field_len = len(fields_list)

    def _load_data(self):
        open(self.tmp_file,'w').write(self.bulk_content)
        p = sub.Popen(['curl','-s','-XPOST',self.url,'--data-binary',"@" + self.tmp_file ],stdout=sub.PIPE)
        for line in iter(p.stdout.readline,b''):
            ret_dict = json.loads(line)
            if not ret_dict['errors']:
                self.pretty_print("INFO: %6s lines parseed with no errors,total cost %d ms." %(len(ret_dict['items']),ret_dict['took']))
            else:
                self.pretty_print("ERROR: %6s lines parseed with some errors,ret_dict['took']))

    def pretty_print(self,str):
        print('%s %s' %(datetime.now(),str))

    def get_body(self,fields,fields_list):
        counter = 0
        body = {}
        while (counter < len(fields)):
            # if the data type is 'date',we will translate the value str to date
            # type
            if fields_list[counter][1] == 'date':
                body[fields_list[counter][0]] = self.translate_str_to_date(
                    fields[counter])
            # and if the data type is 'int',we translate it to int
            elif fields_list[counter][1] == 'int':
                body[fields_list[counter][0]] = self.translate_str_to_int(
                    fields[counter])
            elif fields_list[counter][1] == 'float':
                body[fields_list[counter][0]] = self.translate_str_to_float(
                    fields[counter])
            # other is defalut to be str
            else:
                body[fields_list[counter][0]] = fields[counter]
            counter += 1
        # print(my_body)
        return body

    def translate_str_to_date(self,date_str):
        try:
            date = datetime.strptime(date_str,'%Y-%m-%d %H:%M:%S')
            return date.isoformat()
        except:
            self.pretty_print("Unexpected error: %s" % (sys.exc_info()[0]))
            self.pretty_print("Failed to translate '%s' to date." % (date_str))
        return False


    def translate_str_to_int(self,num_str):
        try:
            return int(num_str)
        except:
            self.pretty_print("Failed to translate '%s' to int." % (num_str))
        return False


    def translate_str_to_float(self,num_str):
        try:
            return float(num_str)
        except:
            self.pretty_print("Failed to translate '%s' to int." % (num_str))
        return False

if __name__ == '__main__':
    '''
    example fields_desc:@timestamp|date,process|str,mem|int
    example lines in file:
    2015-09-24 09:17:29,203532
    2015-09-24 09:17:29,memory_11602,223112
    '''
    loader = loadDataToES(field_desc='@timestamp|date,mem|int',data_file='/root/scripts/in.data',host='10.21.102.60',index = 'test')
    loader.load_data()

以上内容由PHP站长网【52php.cn】收集整理供大家参考研究

如果以上内容对您有帮助,欢迎收藏、点赞、推荐、分享。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读