如果出现权限问题请把url里面的地址改为ip地址,不要用localhost
App和User里面的tablename对应数据库里面的表名
执行之前手动连一下postgresql数据库查看一下数据
from sqlalchemy import Column,String,create_engine from sqlalchemy.types import CHAR,Integer,Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from flask import jsonify from sqlalchemy import func Base = declarative_base() class App(Base): __tablename__ = 'app' id = Column(Integer,primary_key=True) name = Column(String(32)) cn_name = Column(String(32)) desc = Column(Text) status = Column(Integer) class User(Base): __tablename__ = 't_user' userid = Column('userid',primary_key=True,nullable=True) username = Column('username',String(64),nullable=True) password = Column('password',nullable=True) engine = create_engine('postgresql://postgres:postgres@172.17.0.37:5432/skynet',echo=True) DBsession = sessionmaker(bind=engine) session = DBsession() query = session.query(App) print session.query(func.count(App.id)).one()[0]统计多少 for i in query.all(): #列出所有 print i.id,i.name,i.cn_name,i.desc,i.status
几种常见sqlalchemy查询: #简单查询 print(session.query(User).all()) print(session.query(User.name,User.fullname).all()) print(session.query(User,User.name).all())
#带条件查询 print(session.query(User).filter_by(name='user1').all()) print(session.query(User).filter(User.name == "user").all()) print(session.query(User).filter(User.name.like("user%")).all())
#多条件查询 print(session.query(User).filter(and_(User.name.like("user%"),User.fullname.like("first%"))).all()) print(session.query(User).filter(or_(User.name.like("user%"),User.password != None)).all())
#sql过滤 print(session.query(User).filter("id>:id").params(id=1).all())
#关联查询 print(session.query(User,Address).filter(User.id == Address.user_id).all()) print(session.query(User).join(User.addresses).all()) print(session.query(User).outerjoin(User.addresses).all())
#聚合查询 print(session.query(User.name,func.count('*').label("user_count")).group_by(User.name).all()) print(session.query(User.name,func.sum(User.id).label("user_id_sum")).group_by(User.name).all())
#子查询 stmt = session.query(Address.user_id,func.count('*').label("address_count")).group_by(Address.user_id).subquery() print(session.query(User,stmt.c.address_count).outerjoin((stmt,User.id == stmt.c.user_id)).order_by(User.id).all())
#exists print(session.query(User).filter(exists().where(Address.user_id == User.id))) print(session.query(User).filter(User.addresses.any()))
限制返回字段查询 person = session.query(Person.name,Person.created_at,Person.updated_at).filter_by(name="zhongwei").order_by(Person.created_at).first()
记录总数查询: from sqlalchemy import func # count User records,without # using a subquery. session.query(func.count(User.id)) # return count of user "id" grouped # by "name" session.query(func.count(User.id)). group_by(User.name) from sqlalchemy import distinct # count distinct "name" values session.query(func.count(distinct(User.name))) (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|