23、SQLAlchemy 应用
23.1、PyMySQL
MySQL基于TCP协议之上开发,但是网络连接后,传输的数据必须遵循MySQL的协议。封装好MySQL协议的包,就是驱动程序。
MySQL的驱动:
-
MySQLdb
最有名的库。对MySQL的C Client封装实现,支持Python 2,不更新了,不支持Python3
-
MySQL官方Connector
-
pymysql
语法兼容MySQLdb,使用Python写的库,支持Python 3
23.1.1、安装
$ pip install pymysql
23.1.2、创建数据库和表
CREATE DATABASE
IF
NOT EXISTS school;
SHOW DATABASES;
USE school;
CREATE TABLE `student` (
`id` INT ( 11 ) NOT NULL AUTO_INCREMENT,
`name` VARCHAR ( 255 ) NOT NULL,
`age` INT ( 11 ) DEFAULT NULL,
PRIMARY KEY ( `id` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
23.1.3、连接 Connect
首先,必须建立一个传输数据通道,连接。
pymysql.connect()
方法返回的是Connections 模块下的 Connection 类实例。connect 方法传参就是给 Connection 类的 __init__
提供参数。
Connection初始化常用参数 | 说明 |
---|---|
host | 主机 |
user | 用户名 |
password | 密码 |
database | 数据库 |
port | 端口 |
Connection.ping()
方法,测试数据库服务器是否活着。有一个参数 reconnect 表示断开与服务器连接是否重连。
import pymysql
try:
conn = pymysql.connect(host='192.168.136.128', user='brinnatt', password='WelC0me168!', database='school')
with conn as c:
print(c.__dict__)
except Exception as e:
print(e)
23.1.4、游标 Cursor
操作数据库,必须使用游标,需要先获取一个游标对象。Connection.cursor(cursor=None)
方法返回一个新的游标对象。连接没有关闭前,游标对象可以反复使用。cursor 参数,可以指定一个 Cursor 类。如果为 None,则使用默认 Cursor 类。
23.1.5、操作数据库
数据库操作需要使用Cursor类的实例,提供 execute() 方法,执行SQL语句,成功返回影响的行数。
23.1.5.1、新增记录
使用insert into语句插入数据。
import pymysql
try:
conn = pymysql.connect(host='192.168.136.128', user='brinnatt', password='WelC0me168!', database='school')
with conn as c:
cursor = c.cursor()
insert_sql = "insert into student (name,age) values('tom',20)"
rows = cursor.execute(insert_sql)
print(rows)
except Exception as e:
print(e)
发现数据库中没有数据提交成功,为什么?
原因在于,在 Connection 类的 __init__
方法的注释中有这么一句话:
autocommit: Autocommit mode. None means use server default. (default: False)
那是否应该开启自动提交呢?
不用开启,一般我们需要手动管理事务。
23.1.5.2、事务管理
Connection 类有三个方法:begin 开始事务;commit 将变更提交;rollback 回滚事务。
import pymysql
try:
conn = pymysql.connect(host='192.168.136.128', user='brinnatt', password='WelC0me168!', database='school')
with conn as c:
cursor = c.cursor()
insert_sql = "insert into student (name,age) values('tom',20)"
rows = cursor.execute(insert_sql)
print(rows)
c.commit()
except Exception as e:
c.rollback()
print(e)
finally:
if cursor:
cursor.close()
批量增加数据:
import pymysql
try:
conn = pymysql.connect(host='192.168.136.128', user='brinnatt', password='WelC0me168!', database='school')
with conn as c:
cursor = c.cursor()
for i in range(10):
insert_sql = "insert into student (name,age) values('tom{0}',20 + {0})".format(i)
rows = cursor.execute(insert_sql)
c.commit()
except Exception as e:
c.rollback()
print(e)
finally:
if cursor:
cursor.close()
一般流程:
- 建立连接
- 获取游标
- 执行SQL
- 提交事务
- 释放资源
23.1.5.3、查询
Cursor 类的获取查询结果集的方法有 fetchone()、fetchmany(size=None)、fetchall()。
import pymysql
conn = pymysql.connect(host='192.168.136.128', user='brinnatt', password='WelC0me168!', database='school')
cursor = conn.cursor()
sql = 'select * from student'
rows = cursor.execute(sql) # 返回影响的行数
print(cursor.fetchone())
print(cursor.fetchone())
print('1 -----------')
print(cursor.fetchmany(2))
print('2 ~~~~~~~~~~~')
print(cursor.fetchmany(2))
print('3 -----------')
print(cursor.fetchall())
if cursor:
cursor.close()
if conn:
conn.close()
fetchone() 方法,获取结果集的下一行。
fetchmany(size=None) 方法,size 指定返回的行数的行,None 则返回空元组。
fetchall() 方法,获取所有行。
返回多行,如果走到末尾,就返回空元组,否则返回一个元组,其元素就是每一行的记录。
每一行的记录也封装在一个元组中。
cursor.rownumber 返回当前行号。可以修改,支持负数。
cursor.rowcount 返回的总行数。
注意:fetch 操作的是结果集,结果集是保存在客户端的,也就是说 fetch 的时候,查询已经结束了。
23.1.5.4、带列名查询
Cursor 类有一个 Mixin 的子类 DictCursor。
只需要 cursor = conn.cursor(DictCursor) 就可以了。
# 返回结果
{'name': 'tom', 'age': 20, 'id': 4}
{'name': 'tom0', 'age': 20, 'id': 5}
返回一行,是一个字典。返回多行,放在列表中,元素是字典,代表行。
23.1.5.5、SQL 注入攻击
找出用户 id 为 6 的用户信息的 SQL 语句如下:
SELECT * from student WHERE id = 6
现在,要求可以找出某个 id 对应用户的信息,代码如下:
userid = 5 # 用户id可以变
sql = 'SELECT * from student WHERE id = {}'.format(userid)
userid 可以变,例如从客户端 request 请求中获取,直接拼接到查询字符串中。
可是,如果 userid = '5 or 1=1' 呢?
sql = 'SELECT * from student WHERE id = {}'.format('5 or 1=1')
运行的结果竟然是返回了全部数据。
SQL注入攻击:
猜测后台数据库的查询语句使用拼接字符串的方式,从而经过设计为服务端传参,令其拼接出特殊字符串,返回用
户想要的结果。
永远不要相信客户端传来的数据是规范的及安全的!!!
如何解决注入攻击?
参数化查询,可以有效防止注入攻击,并提高查询的效率。
Cursor.execute(query, args=None)
args,必须是元组、列表或字典。如果查询字符串使用 %(name)s
,就必须使用字典。
import pymysql
from pymysql.cursors import DictCursor
conn = pymysql.connect(host='192.168.136.128', user='brinnatt', password='WelC0me168!', database='school')
cursor = conn.cursor(DictCursor)
userid = '5 or 1=1'
sql = 'SELECT * from student WHERE id = %s'
cursor.execute(sql, (userid,)) # 参数化查询
print(cursor.fetchall())
print('~~~~~~~~~~~~~~~~~')
sql = 'SELECT * from student WHERE name like %(name)s and age > %(age)s'
cursor.execute(sql, {'name': 'tom%', 'age': 25}) # 参数化查询
print(cursor.fetchall())
if cursor:
cursor.close()
if conn:
conn.close()
参数化查询为什么提高效率?
原因就是 SQL 语句缓存。数据库服务器一般会对 SQL 语句编译和缓存,编译只对 SQL 语句部分,所以参数中就算有 SQL 指令也不会被执行。
编译过程,需要词法分析、语法分析、生成 AST、优化、生成执行计划等过程,比较耗费资源。服务端会先查找是否对同一条查询语句进行了缓存,如果缓存未失效,则不需要再次编译,从而降低了编译的成
本,降低了内存消耗。
可以认为 SQL 语句字符串就是一个 key,如果使用拼接方案,每次发过去的 SQL 语句都不一样,都需要编译并缓存。大量查询的时候,首选使用参数化查询,以节省资源。开发时,应该使用参数化查询。
注意:这里说的是查询字符串的缓存,不是查询结果的缓存。
23.1.5.6、上下文支持
查看连接类和游标类的源码:
# 连接类
class Connection(object):
def __enter__(self):
"""Context manager that returns a Cursor"""
return self.cursor()
def __exit__(self, exc, value, traceback):
"""On successful exit, commit. On exception, rollback"""
if exc:
self.rollback()
else:
self.commit()
# 游标类
class Cursor(object):
def __enter__(self):
return self
def __exit__(self, *exc_info):
del exc_info
self.close()
连接类进入上下文的时候会返回一个游标对象,退出时如果没有异常会提交更改。
游标类也使用上下文,在退出时关闭游标对象。
import pymysql
conn = pymysql.connect(host='192.168.136.128', user='brinnatt', password='WelC0me168!', database='school')
try:
with conn.cursor() as cursor:
for i in range(3):
insert_sql = "insert into student (name,age) values('tom{0}',20+{0})".format(i)
rows = cursor.execute(insert_sql)
conn.commit()
# 如果此时使用这个关闭的cursor,会抛异常
# sql = "select * from student"
# cursor.execute(sql)
# print(cursor.fetchall())
except Exception as e:
print(e)
conn.rollback()
finally:
conn.close()
换一种写法,使用连接的上下文:
import pymysql
conn = pymysql.connect(host='192.168.136.128', user='brinnatt', password='WelC0me168!', database='school')
try:
with conn as conn:
with conn.cursor() as cursor:
for i in range(3):
insert_sql = "insert into student (name,age) values('tom{0}',20+{0})".format(i)
rows = cursor.execute(insert_sql)
conn.commit()
# 如果此时使用这个关闭的cursor,会抛异常
# sql = "select * from student"
# cursor.execute(sql)
# print(cursor.fetchall())
except Exception as e:
print(e)
conn.rollback()
连接应该不需要反反复复创建销毁,应该是多个 cursor 共享一个 conn。
23.2、元编程
元编程概念来自 LISP 和 smalltalk。
我们写程序是直接写代码,是否能够用代码来生成未来我们需要的代码?这就是元编程概念。
用来生成代码的程序称为元程序 metaprogram,编写这种程序就称为元编程 metaprogramming。
Python 语言能够通过反射实现元编程。
23.2.1、type()
动态语言和静态语言最大的不同,就是函数和类的定义,不是编译时定义的,而是运行时动态创建的。
比方说我们要定义一个 Hello
的class,就写一个 hello.py
模块:
class Hello(object):
def hello(self, name='world'):
print('Hello, %s.' % name)
当Python解释器载入hello
模块时,就会依次执行该模块的所有语句,执行结果就是动态创建出一个Hello
的class对象,测试如下:
from hello import Hello
h = Hello()
h.hello()
print(type(Hello))
print(type(h))
输出:
Hello, world.
<class 'type'>
<class 'hello.Hello'>
type()
函数可以查看对象的类型,Hello
是一个 class,它的类型就是 type
,而 h
是一个实例,它的类型就是 class Hello
。
我们说 class 的定义是运行时动态创建的,而创建 class 的方法就是使用 type()
函数。
type()
函数既可以返回一个对象的类型,又可以创建出新的类型,比如,我们可以通过 type()
函数创建出 Hello
类,而无需通过 class Hello(object)...
的定义:
def fn(self, name='world'): # 先定义函数
print(f"Hello {name}")
Hello = type('Hello', (object,), dict(hello=fn)) # 创建Hello Class
h = Hello()
h.hello()
print(type(Hello))
print(type(h))
输出:
Hello world
<class 'type'>
<class '__main__.Hello'>
要创建一个 class 对象,type()
函数依次传入 3 个参数:
- class 的名称;
- 继承的父类集合,注意 Python 支持多重继承,如果只有一个父类,别忘了 tuple 的单元素写法;
- class 的方法名称与函数绑定,这里我们把函数
fn
绑定到方法名hello
上。
通过 type()
函数创建的类和直接写 class 是完全一样的,因为 Python 解释器遇到 class 定义时,仅仅是扫描一下 class 定义的语法,然后调用 type()
函数创建出 class。
正常情况下,我们都用 class Xxx...
来定义类,但是,type()
函数也允许我们动态创建出类来,也就是说,动态语言本身支持运行期动态创建类,这和静态语言有非常大的不同,要在静态语言运行期创建类,必须构造源代码字符串再调用编译器,或者借助一些工具生成字节码实现,本质上都是动态编译,会非常复杂。
23.2.2、metaclass
除了使用 type()
动态创建类以外,要控制类的创建行为,还可以使用 metaclass。
metaclass,直译为元类,简单的解释就是:
当我们定义了类以后,就可以根据这个类创建出实例,所以:先定义类,然后创建实例。
但是如果我们想创建出类呢?那就必须根据 metaclass 创建出类,所以:先定义 metaclass,然后创建类。
连接起来就是:先定义 metaclass,就可以创建类,最后创建实例。
所以,metaclass 允许你创建类或者修改类。换句话说,你可以把类看成是 metaclass 创建出来的“实例”。
metaclass 是 Python 面向对象里最难理解,也是最难使用的魔术代码。正常情况下,你不会碰到需要使用 metaclass 的情况,所以,以下内容看不懂也没关系,因为基本上你不会用到。
我们先看一个简单的例子,这个 metaclass 可以给我们自定义的 MyList 增加一个 add
方法:
定义 ListMetaclass
,按照默认习惯,metaclass 的类名总是以 Metaclass 结尾,以便清楚地表示这是一个 metaclass:
# metaclass是类的模板,所以必须从`type`类型派生:
class ListMetaclass(type):
def __new__(cls, name, bases, attrs):
attrs['add'] = lambda self, value: self.append(value)
return type.__new__(cls, name, bases, attrs)
有了ListMetaclass,我们在定义类的时候还要指示使用ListMetaclass来定制类,传入关键字参数metaclass
:
class MyList(list, metaclass=ListMetaclass):
pass
当我们传入关键字参数 metaclass
时,魔术就生效了,它指示 Python 解释器在创建 MyList
时,要通过 ListMetaclass.__new__()
来创建,在此,我们可以修改类的定义,比如,加上新的方法,然后,返回修改后的定义。
__new__()
方法接收到的参数依次是:
- 当前准备创建的类的对象;
- 类的名字;
- 类继承的父类集合;
- 类的方法集合。
测试一下 MyList
是否可以调用 add()
方法:
# metaclass是类的模板,所以必须从`type`类型派生:
class ListMetaclass(type):
def __new__(cls, name, bases, attrs):
attrs['add'] = lambda self, value: self.append(value)
return type.__new__(cls, name, bases, attrs)
class MyList(list, metaclass=ListMetaclass):
pass
L = MyList()
L.add(1)
print(L)
输出:
[1]
而普通的 list
没有 add()
方法:
L2 = list()
L2.add(1)
输出:
Traceback (most recent call last):
File "D:\JetBrains\Projects\main.py", line 2, in <module>
L2.add(1)
AttributeError: 'list' object has no attribute 'add'
动态修改有什么意义?直接在 MyList
定义中写上 add()
方法不是更简单吗?正常情况下,确实应该直接写,通过 metaclass 修改纯属变态。
但是,总会遇到需要通过 metaclass 修改类定义的。ORM 就是一个典型的例子。
ORM 全称 “Object Relational Mapping”,即对象-关系映射,就是把关系数据库的一行映射为一个对象,也就是一个类对应一个表,这样,写代码更简单,不用直接操作 SQL 语句。
要编写一个 ORM 框架,所有的类都只能动态定义,因为只有使用者才能根据表的结构定义出对应的类来。
让我们来尝试编写一个 ORM 框架。
编写底层模块的第一步,就是先把调用接口写出来。比如,使用者如果使用这个 ORM 框架,想定义一个 User
类来操作对应的数据库表 User
,我们期待他写出这样的代码:
class User(Model):
# 定义类的属性到列的映射:
id = IntegerField('id')
name = StringField('username')
email = StringField('email')
password = StringField('password')
# 创建一个实例:
u = User(id=12345, name='Michael', email='test@orm.org', password='my-pwd')
# 保存到数据库:
u.save()
其中,父类 Model
和属性类型 StringField
、IntegerField
是由 ORM 框架提供的,剩下的魔术方法比如 save()
全部由父类 Model
自动完成。虽然 metaclass 的编写会比较复杂,但 ORM 的使用者用起来却异常简单。
现在,我们就按上面的接口来实现该 ORM。
首先来定义 Field
类,它负责保存数据库表的字段名和字段类型:
class Field(object):
def __init__(self, name, column_type):
self.name = name
self.column_type = column_type
def __str__(self):
return '<%s:%s>' % (self.__class__.__name__, self.name)
在 Field
的基础上,进一步定义各种类型的 Field
,比如 StringField
,IntegerField
等等:
class StringField(Field):
def __init__(self, name):
super(StringField, self).__init__(name, 'varchar(100)')
class IntegerField(Field):
def __init__(self, name):
super(IntegerField, self).__init__(name, 'bigint')
下一步,就是编写最复杂的 ModelMetaclass
了:
class ModelMetaclass(type):
def __new__(cls, name, bases, attrs):
if name=='Model':
return type.__new__(cls, name, bases, attrs)
print('Found model: %s' % name)
mappings = dict()
for k, v in attrs.items():
if isinstance(v, Field):
print('Found mapping: %s ==> %s' % (k, v))
mappings[k] = v
for k in mappings.keys():
attrs.pop(k)
attrs['__mappings__'] = mappings # 保存属性和列的映射关系
attrs['__table__'] = name # 假设表名和类名一致
return type.__new__(cls, name, bases, attrs)
以及基类 Model
:
class Model(dict, metaclass=ModelMetaclass):
def __init__(self, **kw):
super(Model, self).__init__(**kw)
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(r"'Model' object has no attribute '%s'" % key)
def __setattr__(self, key, value):
self[key] = value
def save(self):
fields = []
params = []
args = []
for k, v in self.__mappings__.items():
fields.append(v.name)
params.append('?')
args.append(getattr(self, k, None))
sql = 'insert into %s (%s) values (%s)' % (self.__table__, ','.join(fields), ','.join(params))
print('SQL: %s' % sql)
print('ARGS: %s' % str(args))
当用户定义一个 class User(Model)
时,Python 解释器首先在当前类 User
的定义中查找 metaclass
,如果没有找到,就继续在父类 Model
中查找 metaclass
,找到了,就使用 Model
中定义的 metaclass
的 ModelMetaclass
来创建 User
类,也就是说,metaclass 可以隐式地继承到子类,但子类自己却感觉不到。
在 ModelMetaclass
中,一共做了几件事情:
- 排除掉对
Model
类的修改; - 在当前类(比如
User
)中查找定义的类的所有属性,如果找到一个 Field 属性,就把它保存到一个__mappings__
的 dict 中,同时从类属性中删除该 Field 属性,否则,容易造成运行时错误(实例的属性会遮盖类的同名属性); - 把表名保存到
__table__
中,这里简化为表名默认为类名。
在 Model
类中,就可以定义各种操作数据库的方法,比如 save()
,delete()
,find()
,update
等等。
我们实现了 save()
方法,把一个实例保存到数据库中。因为有表名,属性到字段的映射和属性值的集合,就可以构造出 INSERT
语句。
编写代码试试:
u = User(id=12345, name='Michael', email='test@orm.org', password='my-pwd')
u.save()
输出如下:
Found model: User
Found mapping: id ==> <IntegerField:id>
Found mapping: name ==> <StringField:username>
Found mapping: email ==> <StringField:email>
Found mapping: password ==> <StringField:password>
SQL: insert into User (id,username,email,password) values (?,?,?,?)
ARGS: [12345, 'Michael', 'test@orm.org', 'my-pwd']
可以看到,save()
方法已经打印出了可执行的 SQL 语句,以及参数列表,只需要真正连接到数据库,执行该 SQL 语句,就可以完成真正的功能。
不到 100 行代码,我们就通过 metaclass 实现了一个精简的 ORM 框架,是不是非常简单?
metaclass 是 Python 中非常具有魔术性的对象,它可以改变类创建时的行为。这种强大的功能使用起来务必小心。
23.3、SQLAlchemy
官方文档:http://docs.sqlalchemy.org/en/latest/
SQLAlchemy 是一个 ORM 框架,大量使用了元编程。
23.3.1、安装
$ pip install sqlalchemy
查看版本:
import sqlalchemy
print(sqlalchemy.__version__)
SQLAlchemy 内部使用了连接池。
23.3.2、创建连接
数据库连接的事情,交给引擎:
mysqldb的连接:
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
engine = sqlalchemy.create_engine("mysql+mysqldb://brinnatt:brinnatt@192.168.136.128:3306/school")
pymysql的连接:
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
engine = sqlalchemy.create_engine("mysql+pymysql://brinnatt:brinnatt@192.168.136.128:3306/school")
engine = sqlalchemy.create_engine("mysql+pymysql://brinnatt:brinnatt@192.168.136.128:3306/school",
echo=True)
echo=True
引擎是否打印执行的语句,调试的时候打开很方便。
23.3.3、Declare a Mapping 创建映射
23.3.3.1、创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建基类,便于实体类继承
Base = declarative_base()
23.3.3.2、创建实体类
student 表:
CREATE TABLE student (
id INTEGER NOT NULL AUTO_INCREMENT,
NAME VARCHAR ( 64 ) NOT NULL,
age INTEGER,
PRIMARY KEY ( id )
)
# 创建实体类
class Student(Base):
# 指定表名
__tablename__ = 'student'
# 定义属性对应字段
id = Column(Integer, primary_key=True)
name = Column(String(64))
age = Column(Integer)
# 第一参数是字段名,如果和属性名不一致,一定要指定
# age = Column('age', Integer)
def __repr__(self):
return "{} id={} name={} age={}".format(
self.__class__.__name__, self.id, self.name, self.age)
__tablename__
指定表名。Column 类指定对应的字段,必须指定。
23.3.3.3、实例化
s = Student(name='tom')
print(s.name)
s.age = 20
print(s.age)
23.3.3.4、创建表
可以使用SQLAlchemy来创建、删除表:
# 删除继承自Base的所有表
Base.metadata.drop_all(engine)
# 创建继承自Base的所有表
Base.metadata.create_all(engine)
生产环境很少这样创建表,都是系统上线的时候由脚本生成。
生成环境很少删除表,宁可废弃都不能删除。
23.3.3.5、创建会话session
在一个会话中操作数据库,会话建立在连接上,连接被引擎管理。
# 创建session
Session = sessionmaker(bind=engine) # 返回类
session = Session() # 实例化
session 对象线程不安全。所以不同线程使用不用的 session 对象。
Session 类和 engine 都是线程安全的,有一个就行了。
23.3.4、CRUD 操作
23.3.4.1、增
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
host = '192.168.136.128'
user = 'brinnatt'
password = 'WelC0me168!'
port = 3306
database = 'school'
conn_str = "mysql+pymysql://{}:{}@{}:{}/{}".format(
user, password, host, port, database
)
engine = sqlalchemy.create_engine(conn_str, echo=True)
# 创建基类
Base = declarative_base()
# 创建实体类
class Student(Base):
# 指定表名
__tablename__ = 'student'
# 定义属性对应字段
id = Column(Integer, primary_key=True)
name = Column(String(64), nullable=False)
age = Column(Integer)
# 第一参数是字段名,如果和属性名不一致,一定要指定
# age = Column('age', Integer)
def __repr__(self):
return "{} id={} name={} age={}".format(
self.__class__.__name__, self.id, self.name, self.age)
s = Student(name='tom') # 构造的时候传入
print(1, '-->', s.name)
s.age = 20 # 属性赋值
print(2, '-->', s.age)
# # 删除继承自Base的所有表
# Base.metadata.drop_all(engine)
# # 创建继承自Base的所有表
# Base.metadata.create_all(engine)
# 创建 session
Session = sessionmaker(bind=engine)
session = Session()
session.add(s)
print(3, '-->', s)
session.commit()
print(4, '-->', s)
print('-' * 80)
try:
session.add_all([s])
print(5, '-->', s)
session.commit() # 提交能够成功吗
print(6, '-->', s)
except:
session.rollback()
raise
add_all() 方法不会提交成功的,不是因为它不对,而是 s,s 成功提交后,s 的主键就有了值,所以,只要 s 没有修改过,就认为没有改动。如下,s 变化了,就可以提交修改了。
s.name = 'jerry' # 修改
session.add_all([s])
s 主键没有值,就是新增;主键有值,就是找到主键对应的记录修改。
23.3.4.2、简单查询
使用 query() 方法,返回一个 Query 对象:
students = session.query(Student) # 无条件
for student in students:
print(student)
print('~~~~~~~~~~~~~')
student = session.query(Student).get(2) # 通过主键查询
print(student)
query 方法将实体类传入,返回类的对象可迭代对象,这时候并不查询。迭代它就执行 SQL 来查询数据库,封装数据到指定类的实例。
get 方法使用主键查询,返回一条传入类的一个实例。
23.3.4.3、改
student = session.query(Student).get(2)
print(student)
student.name = 'sam'
student.age = 30
print(student)
session.add(student)
session.commit()
先查回来,修改后,再提交更改。
23.3.4.4、删除
先看下数据库,表中有:
1 tom 20
2 sam 30
3 jerry 20
4 ben 20
5 ben 20
编写如下程序来删除数据,会发生什么?
try:
student = Student(id=2, name="sam", age=30)
session.delete(student)
session.commit()
except Exception as e:
session.rollback()
print('~~~~~~~~')
print(e)
会产生一个异常 Instance '<Student at 0x3e654e0>' is not persisted
未持久的异常!
23.3.4.5、状态
每一个实体,都有一个状态属性 _sa_instance_state
,其类型是 sqlalchemy.orm.state.InstanceState,可以使用 sqlalchemy.inspect(entity)
函数查看状态。
常见的状态值有 transient、pending、persistent、deleted、detached。
状态 | 说明 |
---|---|
transient | 实体类尚未加入到session中,同时并没有保存到数据库中。 |
pending | transient的实体被add()到session中,状态切换到pending,但它还没有flush到数据库中。 |
persistent | session中的实体对象对应着数据库中的真实记录。pending状态在提交成功后可以变成persistent状态,或者查询成功返回的实体也是persistent状态。 |
deleted | 实体被删除且已经 flush 但未 commit 完成。事务提交成功了,实体变成 detached,事务失败,返回 persistent 状态。 |
detached | 删除成功的实体进入这个状态。 |
新建一个实体,状态是 transient 临时的。一旦 add() 后从 transient 变成 pending 状态。成功 commit() 后从 pending 变成 persistent 状态。
成功查询返回的实体对象,也是 persistent 状态。persistent 状态的实体,修改依然是 persistent 状态。
persistent 状态的实体,删除后,flush 后但没有 commit,就变成 deteled 状态,成功提交,变为 detached 状态,提交失败,还原到 persistent 状态。flush 方法,主动把改变应用到数据库中去。
删除、修改操作,需要对应一个真实的记录,所以要求实体对象是 persistent 状态。
import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.state import InstanceState
host = '192.168.136.128'
user = 'brinnatt'
password = 'WelC0me168!'
port = 3306
database = 'school'
conn_str = "mysql+pymysql://{}:{}@{}:{}/{}".format(
user, password, host, port, database
)
engine = sqlalchemy.create_engine(conn_str, echo=True)
Base = declarative_base()
# 创建实体类
class Student(Base):
# 指定表名
__tablename__ = 'student'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(64), nullable=False)
age = Column(Integer)
# 第一参数是字段名,如果和属性名不一致,则一定要指定
# age = Column('age', Integer)
def __repr__(self):
return f"{self.__class__.__name__} id={self.id} name={self.name} age={self.age}"
Session = sessionmaker(bind=engine)
session = Session()
def getstate(entity, i):
insp = sqlalchemy.inspect(entity)
state = (
f"sessionid={insp.session_id}",
f"attached={insp._attached}",
f"transient={insp.transient}",
f"persistent={insp.persistent}",
f"pending={insp.pending}",
f"deleted={insp.deleted}",
f"detached={insp.detached}"
)
print(i, '-->', state)
print(i, '-->', insp.key)
print('-' * 80)
student = session.query(Student).get(2)
getstate(student, 1)
try:
student = Student(id=2, name="sam", age=30)
getstate(student, 2) # transit
student = Student(name="sammy", age=30)
getstate(student, 3) # transit
session.add(student) # add后变成pending
getstate(student, 4) # pending
# session.delete(student) # 删除的前提是persistent,否则抛异常
# getstate(student, 5)
session.commit()
getstate(student, 6) # persistent
except Exception as e:
session.rollback()
print('-' * 80)
print(e)
输出:
2023-03-29 13:44:28,453 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2023-03-29 13:44:28,453 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-03-29 13:44:28,456 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2023-03-29 13:44:28,456 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-03-29 13:44:28,457 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2023-03-29 13:44:28,457 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-03-29 13:44:28,458 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-03-29 13:44:28,460 INFO sqlalchemy.engine.Engine SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age
FROM student
WHERE student.id = %(pk_1)s
2023-03-29 13:44:28,460 INFO sqlalchemy.engine.Engine [generated in 0.00019s] {'pk_1': 2}
1 --> ('sessionid=1', 'attached=True', 'transient=False', 'persistent=True', 'pending=False', 'deleted=False', 'detached=False')
1 --> (<class '__main__.Student'>, (2,), None)
--------------------------------------------------------------------------------
2 --> ('sessionid=None', 'attached=False', 'transient=True', 'persistent=False', 'pending=False', 'deleted=False', 'detached=False')
2 --> None
--------------------------------------------------------------------------------
3 --> ('sessionid=None', 'attached=False', 'transient=True', 'persistent=False', 'pending=False', 'deleted=False', 'detached=False')
3 --> None
--------------------------------------------------------------------------------
4 --> ('sessionid=1', 'attached=True', 'transient=False', 'persistent=False', 'pending=True', 'deleted=False', 'detached=False')
4 --> None
--------------------------------------------------------------------------------
2023-03-29 13:44:28,477 INFO sqlalchemy.engine.Engine INSERT INTO student (name, age) VALUES (%(name)s, %(age)s)
2023-03-29 13:44:28,477 INFO sqlalchemy.engine.Engine [generated in 0.00017s] {'name': 'sammy', 'age': 30}
2023-03-29 13:44:28,478 INFO sqlalchemy.engine.Engine COMMIT
6 --> ('sessionid=1', 'attached=True', 'transient=False', 'persistent=True', 'pending=False', 'deleted=False', 'detached=False')
6 --> (<class '__main__.Student'>, (51,), None)
--------------------------------------------------------------------------------
student = session.query(Student).get(2)
getstate(student, 10) # persistent
try:
session.delete(student) # 删除的前提是persistent
getstate(student, 11) # persistent
session.flush()
getstate(student, 12) # deleted
session.commit()
getstate(student, 13) # detached
except Exception as e:
session.rollback()
print('-' * 80)
print(e)
输出:
2023-03-29 13:49:51,711 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2023-03-29 13:49:51,712 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-03-29 13:49:51,716 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2023-03-29 13:49:51,716 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-03-29 13:49:51,718 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2023-03-29 13:49:51,718 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-03-29 13:49:51,720 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-03-29 13:49:51,722 INFO sqlalchemy.engine.Engine SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age
FROM student
WHERE student.id = %(pk_1)s
2023-03-29 13:49:51,722 INFO sqlalchemy.engine.Engine [generated in 0.00022s] {'pk_1': 2}
10 --> ('sessionid=1', 'attached=True', 'transient=False', 'persistent=True', 'pending=False', 'deleted=False', 'detached=False')
10 --> (<class '__main__.Student'>, (2,), None)
--------------------------------------------------------------------------------
11 --> ('sessionid=1', 'attached=True', 'transient=False', 'persistent=True', 'pending=False', 'deleted=False', 'detached=False')
11 --> (<class '__main__.Student'>, (2,), None)
--------------------------------------------------------------------------------
2023-03-29 13:49:51,738 INFO sqlalchemy.engine.Engine DELETE FROM student WHERE student.id = %(id)s
2023-03-29 13:49:51,738 INFO sqlalchemy.engine.Engine [generated in 0.00018s] {'id': 2}
12 --> ('sessionid=1', 'attached=True', 'transient=False', 'persistent=False', 'pending=False', 'deleted=True', 'detached=False')
12 --> (<class '__main__.Student'>, (2,), None)
--------------------------------------------------------------------------------
2023-03-29 13:49:51,740 INFO sqlalchemy.engine.Engine COMMIT
13 --> ('sessionid=None', 'attached=False', 'transient=False', 'persistent=False', 'pending=False', 'deleted=False', 'detached=True')
13 --> (<class '__main__.Student'>, (2,), None)
--------------------------------------------------------------------------------
23.3.5、复杂查询
将 test.sql 导入数据库中:
$ mysql -ubrinnatt -pWelC0me168! < test.sql
对应实体类:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Date, Enum, ForeignKey, create_engine
from sqlalchemy.orm import sessionmaker
import enum
host = '192.168.136.128'
user = 'brinnatt'
password = 'WelC0me168!'
port = 3306
database = 'test'
conn_str = "mysql+pymysql://{}:{}@{}:{}/{}".format(
user, password, host, port, database
)
Base = declarative_base()
engine = create_engine(conn_str, echo=True)
Session = sessionmaker(bind=engine)
session = Session()
class MyEnum(enum.Enum):
M = 'M'
F = 'F'
class Employee(Base):
# 指定表名
__tablename__ = 'employees'
# 定义属性对应字段
emp_no = Column(Integer, primary_key=True)
birth_date = Column(Date, nullable=False)
first_name = Column(String(14), nullable=False)
last_name = Column(String(16), nullable=False)
gender = Column(Enum(MyEnum), nullable=False)
hire_date = Column(Date, nullable=False)
def __repr__(self):
return "{} no={} name={} {} gender={}".format(
self.__class__.__name__,
self.emp_no,
self.first_name,
self.last_name,
self.gender.value
)
# 打印函数
def show(emps, msg):
print('-' * 40, msg, '-' * 40)
for x in emps:
print("(-->", x, "<--)")
23.3.5.1、条件查询
emps = session.query(Employee).filter(Employee.emp_no > 10015)
show(emps, "简单条件查询")
输出:
---------------------------------------- 简单条件查询 ----------------------------------------
2023-03-29 14:49:57,599 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2023-03-29 14:49:57,599 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-03-29 14:49:57,600 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2023-03-29 14:49:57,600 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-03-29 14:49:57,601 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2023-03-29 14:49:57,601 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-03-29 14:49:57,602 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-03-29 14:49:57,617 INFO sqlalchemy.engine.Engine SELECT employees.emp_no AS employees_emp_no, employees.birth_date AS employees_birth_date, employees.first_name AS employees_first_name, employees.last_name AS employees_last_name, employees.gender AS employees_gender, employees.hire_date AS employees_hire_date
FROM employees
WHERE employees.emp_no > %(emp_no_1)s
2023-03-29 14:49:57,617 INFO sqlalchemy.engine.Engine [generated in 0.00020s] {'emp_no_1': 10015}
Employee no=10016 name=Kazuhito Cappelletti gender=M
Employee no=10017 name=Cristinel Bouloucos gender=F
Employee no=10018 name=Kazuhide Peha gender=F
Employee no=10019 name=Lillian Haddadi gender=M
Employee no=10020 name=Mayuko Warwick gender=M
23.3.5.2、与或非
from sqlalchemy import or_, and_, not_
emps = session.query(Employee).filter(Employee.emp_no > 10015).filter(Employee.gender == MyEnum.F)
show(emps, "and 条件")
emps = session.query(Employee).filter(and_(Employee.emp_no > 10015, Employee.gender == MyEnum.M))
show(emps, "and 条件")
emps = session.query(Employee).filter((Employee.emp_no > 10015) & (Employee.gender == MyEnum.M))
show(emps, "一定要注意 & 符号两边表达式都要加括号")
from sqlalchemy import or_, and_, not_
emps = session.query(Employee).filter((Employee.emp_no > 10018) | (Employee.emp_no < 10003))
show(emps, "OR 条件")
emps = session.query(Employee).filter(or_(Employee.emp_no > 10018, Employee.emp_no < 10003))
show(emps, "OR 条件")
from sqlalchemy import or_, and_, not_
# 总之,与或非的运算符&、|、~,一定要在表达式上加上括号
emps = session.query(Employee).filter(not_(Employee.emp_no < 10018))
show(emps, "Not")
emps = session.query(Employee).filter(~(Employee.emp_no < 10018))
show(emps, "一定注意要加括号")
# in
emplist = [10010, 10015, 10018]
emps = session.query(Employee).filter(Employee.emp_no.in_(emplist))
show(emps, "in")
# not in
emps = session.query(Employee).filter(~Employee.emp_no.in_(emplist))
show(emps, 'not in')
# like
# ilike可以忽略大小写匹配
emps = session.query(Employee).filter(Employee.last_name.like('P%'))
show(emps, "like")
23.3.5.3、排序
# 升序
emps = session.query(Employee).filter(Employee.emp_no > 10010).order_by(Employee.emp_no)
emps = session.query(Employee).filter(Employee.emp_no > 10010).order_by(Employee.emp_no.asc())
show(emps, "升序")
# 降序
emps = session.query(Employee).filter(Employee.emp_no > 10010).order_by(Employee.emp_no.desc())
show(emps, "降序")
# 多列排序
emps = session.query(Employee).filter(Employee.emp_no > 10010).order_by(Employee.last_name).order_by(
Employee.emp_no.desc())
show(emps, "多列排序")
23.3.5.4、分页
# 分页
emps = session.query(Employee).limit(4)
show(emps, "分页")
emps = session.query(Employee).limit(4).offset(18)
show(emps, "分页")
23.3.5.5、消费者方法
消费者方法调用后,Query 对象(可迭代)就转换成了一个容器。
# 总行数
emps = session.query(Employee)
print(1, '-->', len(list(emps))) # 返回大量的结果集,然后转换list
print(2, '-->', emps.count()) # 聚合函数count(*)的查询
# 取所有数据
print(3, '-->', emps.all()) # 返回列表,查不到返回空列表
# 取首行
print(4, '-->', emps.first()) # 返回首行,查不到返回None
# 有且只能有一行
# print(emps.one()) #如果查询结果是多行抛异常
print(5, '-->', emps.limit(1).one())
# 删除 delete by query
# session.query(Employee).filter(Employee.emp_no > 10018).delete()
# session.commit() # 提交则删除
23.3.5.6、聚合、分组
# 聚合函数
# count
from sqlalchemy import func
query = session.query(func.count(Employee.emp_no))
print(1, '-->', query.one()) # 只能有一行结果
print(2, '-->', query.scalar()) # 取one()返回元组的第一个元素
# max/min/avg
print(3, '-->', session.query(func.max(Employee.emp_no)).scalar())
print(4, '-->', session.query(func.min(Employee.emp_no)).scalar())
print(5, '-->', session.query(func.avg(Employee.emp_no)).scalar())
# 分组
print(6, '-->', session.query(Employee.gender,func.count(Employee.emp_no)).group_by(Employee.gender).all())
23.3.5.7、关联查询
CREATE TABLE `departments` (
`dept_no` char(4) NOT NULL,
`dept_name` varchar(40) NOT NULL,
PRIMARY KEY (`dept_no`),
UNIQUE KEY `dept_name` (`dept_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
CREATE TABLE `employees` (
`emp_no` int NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
CREATE TABLE `dept_emp` (
`emp_no` int NOT NULL,
`dept_no` char(4) NOT NULL,
`from_date` date NOT NULL,
`to_date` date NOT NULL,
PRIMARY KEY (`emp_no`,`dept_no`),
KEY `dept_no` (`dept_no`),
CONSTRAINT `dept_emp_ibfk_1` FOREIGN KEY (`emp_no`) REFERENCES `employees` (`emp_no`) ON DELETE CASCADE,
CONSTRAINT `dept_emp_ibfk_2` FOREIGN KEY (`dept_no`) REFERENCES `departments` (`dept_no`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
从语句看出员工、部门之间的关系是多对多关系。先把这些表的 Model 类和字段属性建立起来。
# 创建实体类
class Employee(Base):
# 表名
__tablename__ = 'employees'
# 定义属性对应字段
emp_no = Column(Integer, primary_key=True)
birth_date = Column(Date, nullable=False)
first_name = Column(String(14), nullable=False)
last_name = Column(String(16), nullable=False)
gender = Column(Enum(MyEnum), nullable=False)
hire_date = Column(Date, nullable=False)
# 第一参数是字段名,如果和属性名不一致,一定要指定
# age = Column('age', Integer)
def __repr__(self):
return "{} no={} name={} {} gender={}".format(
self.__class__.__name__, self.emp_no, self.first_name, self.last_name,
self.gender.value
)
class Department(Base):
__tablename__ = 'departments'
dept_no = Column(String(4), primary_key=True)
dept_name = Column(String(40), nullable=False, unique=True)
def __repr__(self):
return "{} no={} name={}".format(
type(self).__name__, self.dept_no, self.dept_name)
class Dept_emp(Base):
__tablename__ = "dept_emp"
emp_no = Column(Integer, ForeignKey('employees.emp_no', ondelete='CASCADE'), primary_key=True)
dept_no = Column(String(4), ForeignKey('departments.dept_no', ondelete='CASCADE'), primary_key=True)
from_date = Column(Date, nullable=False)
to_date = Column(Date, nullable=False)
def __repr__(self):
return "{} empno={} deptno={}".format(
type(self).__name__, self.emp_no, self.dept_no)
ForeignKey('employees.emp_no', ondelete='CASCADE')
定义外键约束。
需求:
查询10010员工的所在的部门编号
1、使用隐式内连接
# 查询10010员工所在的部门编号
results = session.query(Employee, Dept_emp).filter(Employee.emp_no == Dept_emp.emp_no).filter(
Employee.emp_no == 10010).all()
show(results, "查询10010员工所在的部门编号")
# 查询结果
(Employee no=10010 name=Duangkaew Piveteau gender=F, Dept_emp empno=10010 deptno=d004)
(Employee no=10010 name=Duangkaew Piveteau gender=F, Dept_emp empno=10010 deptno=d006)
这种方式产生隐式连接的语句
SELECT * FROM employees, dept_emp WHERE employees.emp_no = dept_emp.emp_no AND employees.emp_no = %( emp_no_1 )s
2、使用 join
# 查询10010员工所在的部门编号
# 第一种写法
results = session.query(Employee).join(Dept_emp).filter(Employee.emp_no == 10010).all()
show(results, "第一种写法")
# sqlalchemy 会转化成:
SELECT
employees.emp_no AS employees_emp_no,
employees.birth_date AS employees_birth_date,
employees.first_name AS employees_first_name,
employees.last_name AS employees_last_name,
employees.gender AS employees_gender,
employees.hire_date AS employees_hire_date
FROM
employees
INNER JOIN dept_emp ON employees.emp_no = dept_emp.emp_no
# 第二种写法
results = session.query(Employee).join(Dept_emp, Employee.emp_no == Dept_emp.emp_no).filter(
Employee.emp_no == 10010).all()
show(results, "第二种写法")
# sqlalchemy 会转化成:
SELECT
employees.emp_no AS employees_emp_no,
employees.birth_date AS employees_birth_date,
employees.first_name AS employees_first_name,
employees.last_name AS employees_last_name,
employees.gender AS employees_gender,
employees.hire_date AS employees_hire_date
FROM
employees
INNER JOIN dept_emp ON employees.emp_no = dept_emp.emp_no
发现两种写法最后 SQLAlchemy 转化成了一样的查询语句,这说明 SQLAlchemy 内部有隐式处理方式,默认有等值条件。最好显式写出条件。
但是这两种写法,返回都只有一行数据,为什么?
原因在于 query(Employee)
这个只能返回一个实体对象中去,为了解决这个问题,需要修改实体类 Employee,
增加属性用来存放部门信息。
sqlalchemy.orm.relationship(实体类名字符串)
class Employee(Base):
# 表名
__tablename__ = 'employees'
# 定义属性对应字段
emp_no = Column(Integer, primary_key=True)
birth_date = Column(Date, nullable=False)
first_name = Column(String(14), nullable=False)
last_name = Column(String(16), nullable=False)
gender = Column(Enum(MyEnum), nullable=False)
hire_date = Column(Date, nullable=False)
dept_emps = relationship('Dept_emp')
def __repr__(self):
return "{} no={} name={} {} gender={} dept_emps={}".format(
self.__class__.__name__, self.emp_no, self.first_name, self.last_name,
self.gender.value, self.dept_emps
)
查询信息:
# 查询10010员工所在的部门编号
# 第一种
results = session.query(Employee).join(Dept_emp).filter(Employee.emp_no ==
Dept_emp.emp_no).filter(Employee.emp_no == 10010)
# 第二种
results = session.query(Employee).join(Dept_emp, Employee.emp_no ==
Dept_emp.emp_no).filter(Employee.emp_no == 10010)
# 第三种
results = session.query(Employee).join(Dept_emp, (Employee.emp_no == Dept_emp.emp_no) &
(Employee.emp_no == 10010))
show(results.all(), "第三种")
第一种方法 join(Dept_emp) 中没有等值条件,会自动生成一个等值条件,如果后面有 filter,哪怕是 filter(Employee.emp_no == Dept_emp.emp_no),这个条件会在 where 中出现。第一种这种自动增加 join 的等值条件的方式不好,不要这么写。
第二种方法在 join 中增加等值条件,阻止了自动的等值条件的生成。这种方式推荐。
第三种方法就是第二种,这种方式也可以。
总结:
在开发中,一般都会采用 ORM 框架,这样就可以使用对象操作表了。
定义表映射的类,使用 Column 的描述器定义类属性,使用 ForeignKey 来定义外键约束。
如果在一个对象中,想查看其它表对应的对象的内容,就要使用 relationship 来定义关系。
是否使用外键?
1、力挺派:能使数据保证完整性一致性。
2、嫌弃派:开发难度增加,大数据的时候影响插入、修改、删除的效率。在业务层保证数据的一致性。