[bigdata-086] python3+neo4j 从mysql数据库读取记录然后创建节
发布时间:2020-12-14 03:09:58 所属栏目:大数据 来源:网络整理
导读:1. 测试 1.1web界面 ?http://tz211:7474/browser/ 在这里执行 MATCH (n) RETURN n,能看到节点和相互关系 一共是3个节点,6个相互关系 1.2 在211执行cypher-shell ./cypher-shell -u neo4j -p 123456 1.3 执行convert-test.py 上述1.1和1.2和1.3是一致的。 1
1. 测试
1.1web界面 ?http://tz211:7474/browser/ 在这里执行 MATCH (n) RETURN n,能看到节点和相互关系 一共是3个节点,6个相互关系 1.2 在211执行cypher-shell ./cypher-shell -u neo4j -p 123456 1.3 执行convert-test.py 上述1.1和1.2和1.3是一致的。 1.4 在web节点执行删除所有节点命令 然后再找查询节点,发现没有节点了。符合预期。 在cypher-shell执行查询结果,也没有节点,符合预期。 在convert-test执行查询结果,也没有节点,符合预期。 1.5 在cypher创建节点 CREATE (:MP {num:"18717917632"}); 在三处都能看到节点,正确。 MERGE (:MP {num:"18717917632"}); 三处只有一个节点,正确。 MERGE (:MP {num:"18717917666"}); 三处均增加了一个节点,正确。 再次执行 MERGE (:MP {num:"18717917666"}); 三处没有变化,正确。 2. 代码测试 2.1 参考文档 《The Neo4j Developer Manual v3.2》python版 2.2 session session是一个容器,它存储一系列的transaction事务。session从一个池里获取到连接。session不是线程安全的。 要把session放在一个context block,那么,当这个block执行完毕,session就可以正确地关闭。代码形如: ----------------------------- def add_person(self,name): ? ? session = self._driver.session() ? ? session.run("CREATE (a:Person {name: $name})",{"name": name}) ----------------------------- session在add_person这个context里,如果add_persion执行结束,session就会正确关闭。 2.3 transaction事务 事务是一个原子单元,它包括一个或者多个cypher语句。每个事务都必然在一个session里执行。 执行一个cypher语句,需要两个信息:cyphter语句模板; cyphper语句参数。也可以执行无参数cypher,但有参数可以更好的调优。 有三种事务: A. 自动提交事务 ?auto-commit transactions B. 事务函数 Transaction functions C. 显式事务 Explicit transactions? 注意:只有B在事务出错的时候可以自动重新执行。 2.3.1 auto-commit transactions 这种事务,是简单的,形式有限,这种事务只有一条语句,事务失败后不能自动重放,不能放入causal chain执行。 这种事务,在session.run函数执行,形如: -------------------------- def add_person(self,name): ? ? session = self._driver.session()? ? ? session.run( "CREATE (a:Person {name: $name})",{"name": name} ) -------------------------- 这种事务会立刻发送到数据库进行处理。 不要在生产环境使用这种事务。如果担心性能和稳定性,也不要使用这种事务。 2.3.2 事务函数 Transaction functions 这是推荐的事务类型。优先使用这种事务。举例如下: -------------------------- def add_person(self,name): ? ? with self._driver.session() as session: ? ? ? ? session.write_transaction(lambda tx: self.create_person_node(tx,name)) def create_person_node(self,tx,name): ? ? tx.run("CREATE (a:Person {name: $name})",{"name": name}) ? ? return 1 -------------------------- 2.3.3 显式事务 这类事务,是2.3.2的详细版本,也就是说,显式生命BEGIN、COMMIT、ROOBACK操作。 这块需要检查python的源码,具体细节在官方文档里没有。 3. 一个完整的将数据以transaction functions的方式写入到neo4j的demo 从mysql数据库取记录,然后创建节点,然后创建关系。 --------------------------- #!/usr/bin/env python3 #!-*- coding:utf-8 -*- import pymysql from neo4j.v1 import GraphDatabase uri = "bolt://10.xx.xx.xx:7687" driver = GraphDatabase.driver(uri,auth=("neo4j","888888")) print("先删除所有节点和关系") with driver.session() as session: ? ? session.write_transaction(lambda tx: tx.run("MATCH (n) DETACH DELETE n")) print("检查是否为空") with driver.session() as session: ? ? session.write_transaction(lambda tx: tx.run('MATCH (n) RETURN n')) conn = pymysql.connect(host='xxxxdb.mysql.rds.aliyuncs.com', ? ? ? ? ? ? ? ? ? ? ? ?port=3306,user='userxxx',passwd='pswdxxxx', ? ? ? ? ? ? ? ? ? ? ? ?db='ust',charset='utf8') c_t = conn.cursor() sql = 'select query_idcard,query_mobile,query_name,query_bankcard ' ? ? ? 'from id34_record ' ? ? ? 'where result_code='00' or result_code='000';' c_t.execute(sql) r_t = c_t.fetchall() nnn = 0 for i in r_t: ? ? # print(i) ? ? if i[0] == None or i[1] == None or i[2] == None or i[3] == None: ? ? ? ? #暂时不考虑三要素 ? ? ? ? continue ? ? if (nnn % 100 == 0): ? ? ? ? print('-'*10) ? ? ? ? print(nnn) ? ? # print(nnn) ? ? nnn += 1 ? ? id = i[0] ? ? mp = i[1] ? ? name = i[2] #暂时不考虑姓名 ? ? bankcard = i[3] ? ? #不重复地创建节点 ? ? node_type = 'MP' ? ? with driver.session() as session: ? ? ? ? session.write_transaction( ? ? ? ? ? ? lambda tx: tx.run("MERGE (:MP {val:$val})",{'val':mp})) ? ? node_type = 'ID' ? ? with driver.session() as session: ? ? ? ? session.write_transaction( ? ? ? ? ? ? lambda tx: tx.run("MERGE (:ID {val:$val})",{'val':id})) ? ? node_type = 'BANKCARD' ? ? with driver.session() as session: ? ? ? ? session.write_transaction( ? ? ? ? ? ? lambda tx: tx.run("MERGE (:BANKCARD {val:$val})",{'val':bankcard})) ? ? #创建关系 ? ? cmd = "MATCH (n:MP) WHERE n.val="%s" n" ? ? ? ? ? "MATCH (m:ID) WHERE m.val="%s" n" ? ? ? ? ? "MATCH (p:BANKCARD) WHERE p.val="%s" n" ? ? ? ? ? "MERGE (n)-[:BIND]->(m) n" ? ? ? ? ? "MERGE (m)-[:BIND]->(n) n" ? ? ? ? ? "MERGE (n)-[:BIND]->(p) n" ? ? ? ? ? "MERGE (p)-[:BIND]->(n) n" ? ? ? ? ? "MERGE (m)-[:BIND]->(p) n" ? ? ? ? ? "MERGE (p)-[:BIND]->(m) n" ? ? ? ? ? %(mp,id,bankcard) ? ? with driver.session() as session: ? ? ? ? session.write_transaction( ? ? ? ? ? ? lambda tx: tx.run(cmd)) --------------------------- 4. 建立索引 CREATE INDEX ON :MP(val) CREATE INDEX ON :ID(val) CREATE INDEX ON :BANKCARD(val) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |