加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

用Python给你醍醐灌顶!采用并发查询mysql以及调用API灌数据!

发布时间:2020-12-17 00:27:11 所属栏目:Python 来源:网络整理
导读:执行流程如下 那么根据流程所需要的功能,需要以下的实例进行支撑: 1.并发实例 2.查询数据实例 3.执行post请求实例 目标:编写Http执行POST请求的基本类方法 编写test03.py查询mysql相关对应字段数据 1# -*- coding: utf-8 -*- 2 3from tools.MysqlTools im

执行流程如下

那么根据流程所需要的功能,需要以下的实例进行支撑:

1.并发实例

2.查询数据实例

3.执行post请求实例

目标:编写Http执行POST请求的基本类方法

编写test03.py查询mysql相关对应字段数据

 1# -*- coding: utf-8 -*-
 2
 3from tools.MysqlTools import MysqldbHelper
 4import pymysql
 5from tools.PostTools import PostHelper
 6
 7if __name__ == "__main__":
 8
 9 # 定义数据库访问参数
10 config = {
11 'host': '******#注释',12 'port': 3361,13 'user': 'root',14 'passwd': '******#注释',15 'charset': 'utf8',16 'cursorclass': pymysql.cursors.DictCursor
17 }
18
19 # 初始化打开数据库连接
20 mydb = MysqldbHelper(config)
21
22 # 选择数据库
23 DB_NAME = '******#注释'
24 # mydb.createDataBase(DB_NAME)
25
26 # 选择数据库
27 print "========= 选择数据库%s ===========" % DB_NAME
28 mydb.selectDataBase(DB_NAME)
29
30 #选择表
31 TABLE_NAME = '******#注释'
32
33 # 数据查询
34 print "========= 数据查询 ==========="
35 select_fields = [
36 "id",37 ******#注释
38 "1",39 "1",40 "null",41 "NOW()",42 "last_sync_time",43 ]
44 select_order = "order by id desc limit 1"
45 select_result = mydb.select(TABLE_NAME,fields=select_fields,order=select_order)
46
47 # 拆分打印的字段
48 for row in select_result:
49 print "id=",row['id']
50 print "user_id=",row['user_id']
51 ******#注释
52 print "1=",row['1']
53 print "1=",row['1']
54 print "null=",None # 注意:查询null的需要写为None
55 print "NOW()=",row['NOW()']
56 print "last_sync_time=",row['last_sync_time']
57 print
进群:960410445  

编写model类,抽象查询的过程方法

models.py

我新建了一个core文件夹目录,然后新建一个models,专门用来处理查询以及调用API发送请求的业务处理。

models.py代码如下:

 1# -*- coding: utf-8 -*-
 2
 3from tools.MysqlTools import MysqldbHelper
 4
 5class ModelHelper(object): # 继承object类所有方法
 6
 7 # 初始化数据库连接
 8 def __init__(self,config):
 9 self.mydb = MysqldbHelper(config) # 初始化打开数据库连接
10
11 def selectTable(self,DB_NAME,TABLE_NAME,fields,order):
12 # 选择数据库
13 self.mydb.selectDataBase(DB_NAME)
14 # 数据查询
15 print "========= 数据查询 ==========="
16 result = self.mydb.select(TABLE_NAME,fields=fields,order=order)
17 # 返回查询的数据
18 return result

编写test04.py测试文件,执行测试一下:

 1# -*- coding: utf-8 -*-
 2
 3from tools.MysqlTools import MysqldbHelper
 4import pymysql
 5from tools.PostTools import PostHelper
 6from core.models import ModelHelper
 7
 8if __name__ == "__main__":
 9
10 # 定义数据库访问参数
11 config = {
12 'host': '#####注释####',13 'port': 3361,14 'user': 'root',15 'passwd': '#####注释####',16 'charset': 'utf8',17 'cursorclass': pymysql.cursors.DictCursor
18 }
19 # 初始化数据模型
20 model = ModelHelper(config)
21 # 设置需要查询的数据库
22 DB_NAME = '#####注释####'
23 # 设置需要查询的表明
24 TABLE_NAME = '#####注释####'
25 # 数据查询
26 select_fields = [
27 "id",28 #####注释####
29 "1",30 "1",31 "null",32 "NOW()",33 "last_sync_time",34 ]
35 select_order = "order by id desc limit 1"
36 select_result = model.selectTable(DB_NAME,select_fields,select_order)
37
38 # 拆分打印的字段
39 for row in select_result:
40 print "id=",row['id']
41 #####注释####
42 print "1=",row['1']
43 print "1=",row['1']
44 print "null=",None
45 print "NOW()=",row['NOW()']
46 print "last_sync_time=",row['last_sync_time']
47 print

测试结果如下:

1id= 1066511830261694464
2 #####注释####
31= 1
41= 1
5null= None
6NOW()= 2018-11-27 11:45:08
7last_sync_time= 2018-11-25 10:00:10

那么已经抽象了这部分查询在model中处理了,还有下一步请求API也要写入到model中自动处理。

那么下面来继续写写。

将返回的查询结果转化为字典类型数据

其中查询的旧表字段与新表的字段应该要用字典进行一一映射关联,方便后续调用。

1、定义字典存储 旧表字段 《==》新表字段的映射关系

2、获取旧表字段数据,进行数据查询

3、获取新表字段对应存储数据,再次使用API请求新表,灌入数据

 1# 设置字段映射字典: 旧表查询字段 ==》 新表的字段
 2 dict_fields = {
 3 "id": "id",4 "user_id": "user_id",5 ## 注释部分字段
 6 "1": "purpose",7 "1": "status",8 "null": "data_source",9 "NOW()": "create_time",10 "last_sync_time": "last_modify_time"
11 }
1 # 获取旧表字段数组
2 select_fields = []
3 for key,value in dict_fields.items():
4 print "key = %s,value = %s" % (key,value)
5 select_fields.append(key)
6
7 print "select_fields = "
8 print select_fields

执行结果如下:

 1# 映射字典的查询数据
 2key = user_code,value = user_code
 3key = null,value = data_source
 4###### 注释部分 #######
 5key = NOW(),value = create_time
 6key = 1,value = status
 7key = user_level,value = user_level
 8
 9# 获取生成旧表需要查询的字段
10select_fields = [
11'census_town',12###### 注释部分 #######
13 'NOW()','1',14 'user_level'
15]

1、那么下面就可以根据获取的字段数据,进行mysql数据查询

2、然后生成一个body请求体字典数据,但是此时body的请求体key是旧表的字段,请求API的时候需要新表的字段,那么就需要进行字段替换

3、再写一个字段映射字典的循环,生成请求API的new_body

 1# 此时已有查询字段的数组
 2print "select_fields = "
 3 print select_fields
 4
 5 select_order = "order by id desc limit 2"
 6 select_result = model.selectTable(DB_NAME,select_order)
 7
 8 # 循环生成每条查询数据的请求body
 9 body = {}
10 for result in select_result:
11 for field in select_fields:
12 if field == "null":
13 body[field] = None
14 else:
15 body[field] = result[field]
16
17 # 打印查看已生成的body数据
18 for field in select_fields:
19 print body[field]
20
21 print body
22
23 # 更新body的字段为新表的字段
24 new_body = {}
25 for key,value in dict_fields.items():
26 print "key = %s,value)
27 new_body[value] = body[key]
28
29 print "new_body="
30 print new_body

执行结果如下:

那么上面的过程最好写在model中,这样可以方便使用。

编写model增加生成请求API的body数据相关方法

 1# -*- coding: utf-8 -*-
 2
 3from tools.MysqlTools import MysqldbHelper
 4from tools.PostTools import PostHelper
 5
 6class ModelHelper(object): # 继承object类所有方法
 7
 8 # 初始化数据库连接
 9 def __init__(self,config):
10 self.mydb = MysqldbHelper(config) # 初始化打开数据库连接
11
12 # 根据设置的旧表字段,查询旧库的数据库数据
13 def selectTable(self,order):
14 # 选择数据库
15 self.mydb.selectDataBase(DB_NAME)
16 # 数据查询
17 result = self.mydb.select(TABLE_NAME,order=order)
18 # 返回查询的数据
19 return result
20
21 # 根据字段映射字典获取旧表字段数组
22 def getSelectFields(self,dict_fields):
23 # 获取旧表字段数组
24 select_fields = []
25 for key,value in dict_fields.items():
26 # print "key = %s,value)
27 select_fields.append(key)
28 return select_fields
29
30 # 根据查询的结果以及字段字典,转化为请求API的body
31 def convertApiBody(self,result,dict_fields):
32 # 循环生成每条查询数据的请求body
33 body = {}
34 for result in result:
35 for field in result:
36 if field == "null":
37 body[field] = None
38 else:
39 body[field] = result[field]
40 # 更新body的字段为新表的字段
41 new_body = {}
42 for key,value in dict_fields.items():
43 # print "key = %s,value)
44 if key == "null":
45 new_body[value] = None
46 else:
47 new_body[value] = body[key]
48 return new_body

使用model方法,以及执行结果:

那么下一步就是再编写一个执行API的方法。

但是在请求API之前,需要将body序列化为json格式,这个存在datetime类型导致序列化失败的情况,下一个篇章继续。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读