加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

[bigdata-086] python3+neo4j 从mysql数据库读取记录然后创建节

发布时间:2020-12-14 03:09:59 所属栏目:大数据 来源:网络整理
导读:1. 测试 1.1web界面 ?http://tz211:7474/browser/ 在这里执行 MATCH (n) RETURN n,能看到节点和相互关系 一共是3个节点,6个相互关系 1.2 在211执行cypher-shell ./cypher-shell -u neo4j -p 123456 1.3 执行convert-test.py 上述1.1和1.2和1.3是一致的。 1
1. 测试
1.1web界面 ?http://tz211:7474/browser/
在这里执行 MATCH (n) RETURN n,能看到节点和相互关系
一共是3个节点,6个相互关系


1.2 在211执行cypher-shell
./cypher-shell -u neo4j -p 123456


1.3 执行convert-test.py


上述1.1和1.2和1.3是一致的。


1.4 在web节点执行删除所有节点命令
然后再找查询节点,发现没有节点了。符合预期。
在cypher-shell执行查询结果,也没有节点,符合预期。
在convert-test执行查询结果,也没有节点,符合预期。




1.5 在cypher创建节点
CREATE (:MP {num:"18717917632"});
在三处都能看到节点,正确。


MERGE (:MP {num:"18717917632"});
三处只有一个节点,正确。


MERGE (:MP {num:"18717917666"});
三处均增加了一个节点,正确。


再次执行
MERGE (:MP {num:"18717917666"});
三处没有变化,正确。


2. 代码测试
2.1 参考文档
《The Neo4j Developer Manual v3.2》python版


2.2 session
session是一个容器,它存储一系列的transaction事务。session从一个池里获取到连接。session不是线程安全的。
要把session放在一个context block,那么,当这个block执行完毕,session就可以正确地关闭。代码形如:
-----------------------------
def add_person(self,name):
? ? session = self._driver.session()
? ? session.run("CREATE (a:Person {name: $name})",{"name": name})
-----------------------------
session在add_person这个context里,如果add_persion执行结束,session就会正确关闭。


2.3 transaction事务
事务是一个原子单元,它包括一个或者多个cypher语句。每个事务都必然在一个session里执行。
执行一个cypher语句,需要两个信息:cyphter语句模板; cyphper语句参数。也可以执行无参数cypher,但有参数可以更好的调优。
有三种事务:
A. 自动提交事务 ?auto-commit transactions
B. 事务函数 Transaction functions
C. 显式事务 Explicit transactions?


注意:只有B在事务出错的时候可以自动重新执行。


2.3.1 auto-commit transactions
这种事务,是简单的,形式有限,这种事务只有一条语句,事务失败后不能自动重放,不能放入causal chain执行。
这种事务,在session.run函数执行,形如:
--------------------------
def add_person(self,name):
? ? session = self._driver.session()?
? ? session.run( "CREATE (a:Person {name: $name})",{"name": name} )
--------------------------
这种事务会立刻发送到数据库进行处理。
不要在生产环境使用这种事务。如果担心性能和稳定性,也不要使用这种事务。


2.3.2 事务函数 Transaction functions
这是推荐的事务类型。优先使用这种事务。举例如下:
--------------------------
def add_person(self,name):
? ? with self._driver.session() as session:
? ? ? ? session.write_transaction(lambda tx: self.create_person_node(tx,name))


def create_person_node(self,tx,name):
? ? tx.run("CREATE (a:Person {name: $name})",{"name": name})
? ? return 1
--------------------------




2.3.3 显式事务
这类事务,是2.3.2的详细版本,也就是说,显式生命BEGIN、COMMIT、ROOBACK操作。
这块需要检查python的源码,具体细节在官方文档里没有。


3. 一个完整的将数据以transaction functions的方式写入到neo4j的demo
从mysql数据库取记录,然后创建节点,然后创建关系。
---------------------------
#!/usr/bin/env python3
#!-*- coding:utf-8 -*-


import pymysql
from neo4j.v1 import GraphDatabase


uri = "bolt://10.xx.xx.xx:7687"
driver = GraphDatabase.driver(uri,auth=("neo4j","888888"))


print("先删除所有节点和关系")
with driver.session() as session:
? ? session.write_transaction(lambda tx: tx.run("MATCH (n) DETACH DELETE n"))


print("检查是否为空")
with driver.session() as session:
? ? session.write_transaction(lambda tx: tx.run('MATCH (n) RETURN n'))


conn = pymysql.connect(host='xxxxdb.mysql.rds.aliyuncs.com',
? ? ? ? ? ? ? ? ? ? ? ?port=3306,user='userxxx',passwd='pswdxxxx',
? ? ? ? ? ? ? ? ? ? ? ?db='ust',charset='utf8')
c_t = conn.cursor()
sql = 'select query_idcard,query_mobile,query_name,query_bankcard '
? ? ? 'from id34_record '
? ? ? 'where result_code='00' or result_code='000';'
c_t.execute(sql)
r_t = c_t.fetchall()
nnn = 0
for i in r_t:
? ? # print(i)
? ? if i[0] == None or i[1] == None or i[2] == None or i[3] == None:
? ? ? ? #暂时不考虑三要素
? ? ? ? continue


? ? if (nnn % 100 == 0):
? ? ? ? print('-'*10)
? ? ? ? print(nnn)
? ? # print(nnn)
? ? nnn += 1


? ? id = i[0]
? ? mp = i[1]
? ? name = i[2] #暂时不考虑姓名
? ? bankcard = i[3]


? ? #不重复地创建节点
? ? node_type = 'MP'
? ? with driver.session() as session:
? ? ? ? session.write_transaction(
? ? ? ? ? ? lambda tx: tx.run("MERGE (:MP {val:$val})",{'val':mp}))
? ? node_type = 'ID'
? ? with driver.session() as session:
? ? ? ? session.write_transaction(
? ? ? ? ? ? lambda tx: tx.run("MERGE (:ID {val:$val})",{'val':id}))


? ? node_type = 'BANKCARD'
? ? with driver.session() as session:
? ? ? ? session.write_transaction(
? ? ? ? ? ? lambda tx: tx.run("MERGE (:BANKCARD {val:$val})",{'val':bankcard}))


? ? #创建关系
? ? cmd = "MATCH (n:MP) WHERE n.val="%s" n"
? ? ? ? ? "MATCH (m:ID) WHERE m.val="%s" n"
? ? ? ? ? "MATCH (p:BANKCARD) WHERE p.val="%s" n"
? ? ? ? ? "MERGE (n)-[:BIND]->(m) n"
? ? ? ? ? "MERGE (m)-[:BIND]->(n) n"
? ? ? ? ? "MERGE (n)-[:BIND]->(p) n"
? ? ? ? ? "MERGE (p)-[:BIND]->(n) n"
? ? ? ? ? "MERGE (m)-[:BIND]->(p) n"
? ? ? ? ? "MERGE (p)-[:BIND]->(m) n"
? ? ? ? ? %(mp,id,bankcard)
? ? with driver.session() as session:
? ? ? ? session.write_transaction(
? ? ? ? ? ? lambda tx: tx.run(cmd))

---------------------------


4. 建立索引

CREATE INDEX ON :MP(val) CREATE INDEX ON :ID(val) CREATE INDEX ON :BANKCARD(val)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读