在Python程序和Flask框架中使用SQLAlchemy的教程
ORM 江湖 ORM 的出现,让畏惧SQL的开发者,在坑里看见了爬出去的绳索,仿佛天空并不是那么黑暗,至少再暗,我们也有了眼睛。顾名思义,ORM 对象关系映射,简而言之,就是把数据库的一个个table(表),映射为编程语言的class(类)。 python中比较著名的ORM框架有很多,大名顶顶的 SQLAlchemy 是python世界里当仁不让的ORM框架。江湖中peewee,strom, pyorm,SQLObject 各领风骚,可是最终还是SQLAlchemy 傲视群雄。 SQLAlchemy 简介
本文先探讨 SQLAlchemy的 sql expresstion 部分的用法。主要还是跟着官方的 SQL Expression Language Tutorial.介绍 为什么要学习 sql expresstion ,而不直接上 ORM?因为后面这个两个是 orm 的基础。并且,即是不使用orm,后面这两个也能很好的完成工作,并且代码的可读性更好。纯粹把SQLAlchemy当成dbapi使用。首先SQLAlchemy 内建数据库连接池,解决了连接操作相关繁琐的处理。其次,提供方便的强大的log功能,最后,复杂的查询语句,依靠单纯的ORM比较难实现。 实战 from sqlalchemy import create_engine engine = create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8",echo=True) create_engine 方法进行数据库连接,返回一个 db 对象。里面的参数表示 数据库类型://用户名:密码(没有密码则为空,不填)@数据库主机地址/数据库名?编码 直接使用engine的execute执行sql的方式,叫做connnectionless执行, # -*- coding: utf-8 -*- __author__ = 'ghost' from sqlalchemy import create_engine,Table,Column,Integer,String,MetaData,ForeignKey # 连接数据库 engine = create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",echo=True) # 获取元数据 metadata = MetaData() # 定义表 user = Table('user',metadata,Column('id',primary_key=True),Column('name',String(20)),Column('fullname',String(40)),) address = Table('address',Column('user_id',None,ForeignKey('user.id')),Column('email',String(60),nullable=False) ) # 创建数据表,如果数据表存在,则忽视 metadata.create_all(engine) # 获取数据库连接 conn = engine.connect() 插入 insert >>> i = user.insert() # 使用查询 >>> i <sqlalchemy.sql.dml.Insert object at 0x0000000002637748> >>> print i # 内部构件的sql语句 INSERT INTO "user" (id,name,fullname) VALUES (:id,:name,:fullname) >>> u = dict(name='jack',fullname='jack Jone') >>> r = conn.execute(i,**u) # 执行查询,第一个为查询对象,第二个参数为一个插入数据字典,如果插入的是多个对象,就把对象字典放在列表里面 >>> r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EF9390> >>> r.inserted_primary_key # 返回插入行 主键 id [4L] >>> addresses [{'user_id': 1,'email': 'jack@yahoo.com'},{'user_id': 1,'email': 'jack@msn.com'},{'user_id': 2,'email': 'www@www.org'},'email': 'wendy@aol.com'}] >>> i = address.insert() >>> r = conn.execute(i,addresses) # 插入多条记录 >>> r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EB5080> >>> r.rowcount #返回影响的行数 4L >>> i = user.insert().values(name='tom',fullname='tom Jim') >>> i.compile() <sqlalchemy.sql.compiler.SQLCompiler object at 0x0000000002F6F390> >>> print i.compile() INSERT INTO "user" (name,fullname) VALUES (:name,:fullname) >>> print i.compile().params {'fullname': 'tom Jim','name': 'tom'} >>> r = conn.execute(i) >>> r.rowcount 1L 查询 select >>> s = select([user]) # 查询 user表 >>> s <sqlalchemy.sql.selectable.Select at 0x25a7748; Select object> >>> print s SELECT "user".id,"user".name,"user".fullname FROM "user" 如果需要查询自定义的字段,可是使用 user 的cloumn 对象,例如 >>> user.c # 表 user 的字段column对象 <sqlalchemy.sql.base.ImmutableColumnCollection object at 0x0000000002E804A8> >>> print user.c ['user.id','user.name','user.fullname'] >>> s = select([user.c.name,user.c.fullname]) >>> r = conn.execute(s) >>> r <sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748> >>> r.rowcount # 影响的行数 5L >>> ru = r.fetchall() >>> ru [(u'hello',u'hello world'),(u'Jack',u'Jack Jone'),(u'jack',u'jack Jone'),(u'tom',u'tom Jim')] >>> r <sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748> >>> r.closed # 只要 r.fetchall() 之后,就会自动关闭 ResultProxy 对象 True 同时查询两个表 >>> s = select([user.c.name,address.c.user_id]).where(user.c.id==address.c.user_id) # 使用了字段和字段比较的条件 >>> s <sqlalchemy.sql.selectable.Select at 0x2f03390; Select object> >>> print s SELECT "user".name,address.user_id FROM "user",address WHERE "user".id = address.user_id 操作符 >>> print user.c.id == address.c.user_id # 返回一个编译的字符串 "user".id = address.user_id >>> print user.c.id == 7 "user".id = :id_1 # 编译成为带参数的sql 语句片段字符串 >>> print user.c.id != 7 "user".id != :id_1 >>> print user.c.id > 7 "user".id > :id_1 >>> print user.c.id == None "user".id IS NULL >>> print user.c.id + address.c.id # 使用两个整形的变成 + "user".id + address.id >>> print user.c.name + address.c.email # 使用两个字符串 变成 || "user".name || address.email 操作连接 >>> print and_( user.c.name.like('j%'),user.c.id == address.c.user_id,or_( address.c.email == 'wendy@aol.com',address.c.email == 'jack@yahoo.com' ),not_(user.c.id>5)) "user".name LIKE :name_1 AND "user".id = address.user_id AND (address.email = :email_1 OR address.email = :email_2) AND "user".id <= :id_1 >>> 得到的结果为 编译的sql语句片段,下面看一个完整的例子 >>> se_sql = [(user.c.fullname +"," + address.c.email).label('title')] >>> wh_sql = and_( user.c.id == address.c.user_id,user.c.name.between('m','z'),or_( address.c.email.like('%@aol.com'),address.c.email.like('%@msn.com') ) ) >>> print wh_sql "user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2) >>> s = select(se_sql).where(wh_sql) >>> print s SELECT "user".fullname || :fullname_1 || address.email AS title FROM "user",address WHERE "user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2) >>> r = conn.execute(s) >>> r.fetchall() 使用 raw sql 方式 遇到负责的sql语句的时候,可以使用 sqlalchemy.sql 下面的 text 函数。将字符串的sql语句包装编译成为 execute执行需要的sql对象。例如:、 >>> text_sql = "SELECT id,fullname FROM user WHERE id=:id" # 原始sql语句,参数用( :value)表示 >>> s = text(text_sql) >>> print s SELECT id,fullname FROM user WHERE id=:id >>> s <sqlalchemy.sql.elements.TextClause object at 0x0000000002587668> >>> conn.execute(s,id=3).fetchall() # id=3 传递:id参数 [(3L,u'Jack',u'Jack Jone')] 连接 join >>> print user.join(address) "user" JOIN address ON "user".id = address.user_id # 因为开启了外键 ,所以join 能只能识别 on 条件 >>> print user.join(address,address.c.user_id==user.c.id) # 手动指定 on 条件 "user" JOIN address ON address.user_id = "user".id >>> s = select([user.c.name,address.c.email]).select_from(user.join(address,user.c.id==address.c.user_id)) # 被jion的sql语句需要用 select_from方法配合 >>> s <sqlalchemy.sql.selectable.Select at 0x2eb63c8; Select object> >>> print s SELECT "user".name,address.email FROM "user" JOIN address ON "user".id = address.user_id >>> conn.execute(s).fetchall() [(u'hello',u'jack@yahoo.com'),(u'hello',u'jack@msn.com'),u'www@www.org'),u'wendy@aol.com'),u'wendy@aol.com')] 排序 分组 分页 >>> s = select([user.c.name]).order_by(user.c.name) # order_by >>> print s SELECT "user".name FROM "user" ORDER BY "user".name >>> s = select([user]).order_by(user.c.name.desc()) >>> print s SELECT "user".id,"user".fullname FROM "user" ORDER BY "user".name DESC >>> s = select([user]).group_by(user.c.name) # group_by >>> print s SELECT "user".id,"user".fullname FROM "user" GROUP BY "user".name >>> s = select([user]).order_by(user.c.name.desc()).limit(1).offset(3) # limit(1).offset(3) >>> print s SELECT "user".id,"user".fullname FROM "user" ORDER BY "user".name DESC LIMIT :param_1 OFFSET :param_2 [(4L,u'jack',u'jack Jone')] 更新 update >>> s = user.update() >>> print s UPDATE "user" SET id=:id,name=:name,fullname=:fullname >>> s = user.update().values(fullname=user.c.name) # values 指定了更新的字段 >>> print s UPDATE "user" SET fullname="user".name >>> s = user.update().where(user.c.name == 'jack').values(name='ed') # where 进行选择过滤 >>> print s UPDATE "user" SET name=:name WHERE "user".name = :name_1 >>> r = conn.execute(s) >>> print r.rowcount # 影响行数 3 还有一个高级用法,就是一次命令执行多个记录的更新,需要用到 bindparam 方法 >>> s = user.update().where(user.c.name==bindparam('oldname')).values(name=bindparam('newname')) # oldname 与下面的传入的从拿书进行绑定,newname也一样 >>> print s UPDATE "user" SET name=:newname WHERE "user".name = :oldname >>> u = [{'oldname':'hello','newname':'edd'},{'oldname':'ed','newname':'mary'},{'oldname':'tom','newname':'jake'}] >>> r = conn.execute(s,u) >>> r.rowcount 5L 删除 delete >>> r = conn.execute(address.delete()) # 清空表 >>> print r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EAF550> >>> r.rowcount 8L >>> r = conn.execute(users.delete().where(users.c.name > 'm')) # 删除记录 >>> r.rowcount 3L
安装flask-sqlalchemy pip install flask-sqlalchemy 初始化sqlalchemy from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy app = Flask(__name__) # dialect+driver://username:password@host:port/database?charset=utf8 # 配置 sqlalchemy 数据库驱动://数据库用户名:密码@主机地址:端口/数据库?编码 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:@localhost:3306/sqlalchemy?charset=utf8' # 初始化 db = SQLAlchemy(app) 定义model class User(db.Model): """ 定义了三个字段, 数据库表名为model名小写 """ id = db.Column(db.Integer,primary_key=True) username = db.Column(db.String(80),unique=True) email = db.Column(db.String(120),unique=True) def __init__(self,username,email): self.username = username self.email = email def __repr__(self): return '<User %r>' % self.username def save(self): db.session.add(self) db.session.commit() 创建数据表 >>> from yourapp import db,User >>> u = User(username='admin',email='admin@example.com') # 创建实例 >>> db.session.add(u) # 添加session >>> db.session.commit() # 提交查询 >>> users = User.query.all() # 查询 需要注意的是,如果要插入中文,必须插入 unicode字符串 >>> u = User(username=u'人世间',email='rsj@example.com') >>> u.save() 定义关系 one to many: class Category(db.Model): id = db.Column(db.Integer,primary_key=True) name = db.Column(db.String(50)) def __init__(self,name): self.name = name def __repr__(self): return '<Category %r>' % self.name class Post(db.Model): """ 定义了五个字段,分别是 id,title,body,pub_date,category_id """ id = db.Column(db.Integer,primary_key=True) title = db.Column(db.String(80)) body = db.Column(db.Text) pub_date = db.Column(db.String(20)) # 用于外键的字段 category_id = db.Column(db.Integer,db.ForeignKey('category.id')) # 外键对象,不会生成数据库实际字段 # backref指反向引用,也就是外键Category通过backref(post_set)查询Post category = db.relationship('Category',backref=db.backref('post_set',lazy='dynamic')) def __init__(self,title,body,category,pub_date=None): self.title = title self.body = body if pub_date is None: pub_date = time.time() self.pub_date = pub_date self.category = category def __repr__(self): return '<Post %r>' % self.title def save(self): db.session.add(self) db.session.commit() 如何使用查询呢? >>> c = Category(name='Python') >>> c <Category 'Python'> >>> c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B58F60> >>> c.post_set.all() [] >>> p = Post(title='hello python',body='python is cool',category=c) >>> p.save() >>> c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B73710> >>> c.post_set.all() # 反向查询 [<Post u'hello python'>] >>> p <Post u'hello python'> >>> p.category <Category u'Python'> # 也可以使用category_id 字段来添加 >>> p = Post(title='hello flask',body='flask is cool',category_id=1) >>> p.save() many to many (评论已经指出,这样的做法无法关联删除,简书没有删除线格式,多多对例子作废,在此提示,以免被误导) post_tag = db.Table('post_tag',db.Column('post_id',db.Integer,db.ForeignKey('post.id')),db.Column('tag_id',db.ForeignKey('tag.id')) ) class Post(db.Model): id = db.Column(db.Integer,primary_key=True) # ... 省略 # 定义一个反向引用,tag可以通过 post_set查询到 post的集合 tags = db.relationship('Tag',secondary=post_tag,lazy='dynamic')) class Tag(db.Model): id = db.Column(db.Integer,primary_key=True) content = db.Column(db.String(10),unique=True) # 定义反向查询 posts = db.relationship('Post',backref=db.backref('tag_set',lazy='dynamic')) def __init__(self,content): self.content = content def save(self): db.session.add(self) db.session.commit() 查询: >>> tag_list = [] >>> tags = ['python','flask','ruby','rails'] >>> for tag in tags: t = Tag(tag) tag_list.append(t) >>> tag_list [<f_sqlalchemy.Tag object at 0x0000000003B7CF28>,<f_sqlalchemy.Tag object at 0x0000000003B7CF98>,<f_sqlalchemy.Tag object at 0x0000000003B7CEB8>,<f_sqlalchemy.Tag object at 0x0000000003B7CE80>] >>> p <Post u'hello python'> >>> p.tags [] >>> p.tags = tag_list # 添加多对多的数据 >>> p.save() >>> p.tags [<f_sqlalchemy.Tag object at 0x0000000003B7CF28>,<f_sqlalchemy.Tag object at 0x0000000003B7CE80>] >>> p.tag_set # 反向查询 <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C080> >>> p.tag_set.all() [<f_sqlalchemy.Tag object at 0x0000000003B7CF28>,<f_sqlalchemy.Tag object at 0x0000000003B7CE80>] >>> t = Tag.query.all()[1] >>> t <f_sqlalchemy.Tag object at 0x0000000003B7CF28> >>> t.content u'python' >>> t.posts [<Post u'hello python'>] >>> t.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C358> >>> t.post_set.all() [<Post u'hello python'>] self one to one 自身一对一也是常用的需求,比如无限分级栏目 class Category(db.Model): id = db.Column(db.Integer,primary_key=True) name = db.Column(db.String(50)) # 父级 id pid = db.Column(db.Integer,db.ForeignKey('category.id')) # 父栏目对象 pcategory = db.relationship('Category',uselist=False,remote_side=[id],backref=db.backref('scategory',uselist=False)) def __init__(self,pcategory=None): self.name = name self.pcategory = pcategory def __repr__(self): return '<Category %r>' % self.name def save(self): db.session.add(self) db.session.commit() 查询: >>> p = Category('Python') >>> p <Category 'Python'> >>> p.pid >>> p.pcategory # 查询父栏目 >>> p.scategory # 查询子栏目 >>> f = Category('Flask',p) >>> f.save() >>> f <Category u'Flask'> >>> f.pid 1L >>> f.pcategory # 查询父栏目 <Category u'Python'> >>> f.scategory # 查询父栏目 >>> p.scategory # 查询子栏目 <Category u'Flask'> 关于 flask-sqlalchemy 定义models的简单应用就这么多,更多的技巧在于如何查询。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |