SQLAlchemy

一个使用 Python 编写的 SQL 的 ORM(Object-orient-map)工具包,支持 MySQL、SQL Server、sqlite、Oracle

安装

pip install sqlalchemy

若使用的是 MySQL,需要安装pip install mysqlclient

https://docs.sqlalchemy.org/en/20/dialects/mysql.html#module-sqlalchemy.dialects.mysql.mysqldb

连接

连接 sqlite

1
2
3
from sqlalchemy import create_engine
engine = create_engine('sqlite:///test.db', echo=True)
connection = engine.connect()

连接 MySQL

1
2
3
from sqlalchemy import create_engine
engine = create_engine('mysql://user:pwd@localhost/tsetdb', echo=True)
connection = engine.connect()
  • echo:是用于显示数据库执行的操作

先创建一个数据库中的表

1
2
3
4
5
6
7
create table user(
id int not null primary key auto_increment,
name varchar(20),
age int,
gender char(1)
)engine=innodb default charset=utf8;

往里面插入一些数据然后:

1
2
3
4
5
6
7
8
9
10
11
12
import sqlalchemy

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8')
conn = engine.connect()

query = sqlalchemy.text("select * from user")
result = conn.execute(query)

for row in result:
print(row)
conn.close()
engine.dispose()

创建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import sqlalchemy

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)

meta_data = sqlalchemy.MetaData()

students = sqlalchemy.Table(
'students', meta_data,
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('name', sqlalchemy.String(64), unique=True, nullable=False),
sqlalchemy.Column('age', sqlalchemy.Integer, nullable=False),
sqlalchemy.Column('gender', sqlalchemy.String(8), nullable=False),
sqlalchemy.Column('birthday', sqlalchemy.Date, nullable=False),
)

meta_data.create_all(engine)
  • metaData,用于创建映射表的元数据,为了复用 所以提前使用 sqlalchemy 的 MetaDate()声明一个变量

插入数据

  • 普通 SQL 的插入 INSERT INTO table_name values()
  • 一次插入一条数据conn.execute(table_name.insert().values())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
student_insert = students.insert()
insert_value1 = student_insert.values(
name='zachary',
age=18,
gender='male',
birthday='2000-01-01',
)

insert_value2 = student_insert.values(
name='curry',
age=36,
gender='male',
birthday='1980-03-01',
)

with engine.connect() as conn:
conn.execute(insert_value1)
res = conn.execute(insert_value2)
print(f"插入的values2的主键为:{res.inserted_primary_key}")
conn.commit()
  • 一次插入多条数据conn.execute(table_name.insert(), student_list)
1
2
3
4
5
6
7
8
9
student_list = [
{"name": "jack", "age": 18, "gender": "male", "birthday": "2000-01-01"},
{"name": "sandy", "age": 25, "gender": "female", "birthday": "1990-05-04"},
{"name": "lucy", "age": 35, "gender": "female", "birthday": "2002-08-13"},
]
student_insert = students.insert()
with engine.connect() as conn:
conn.execute(student_insert, student_list)
conn.commit()

查询数据

查询所有记录

  • 普通 SQL 的查询 SELECT * FROM table_name
  • 函数查询 table_name.select()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
with engine.connect() as conn:
query = students.select()
# 方法一
result = conn.execute(query)
for row in result:
print(row[0],end=" ")
print(row.name)

# 方法二
result = conn.execute(query)
for row in result:
print(row)

# 方法三
result = conn.execute(query)
result_set = result.fetchall()
print(result_set)
for i in result_set:
print(i)

查询一条记录

1
2
3
4
5
with engine.connect() as conn:
query = students.select()
result = conn.execute(query)
result_raw = result.fetchone()
print(result_raw)

条件查询

  • 普通 SQL 的条件查询 SELECT * FROM table_name WHERE 条件
  • 函数条件查询 table_name.select().where(条件)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
with engine.connect() as conn:
# 单条件查询
query = students.select().where(students.c.gender == 'male')
result = conn.execute(query)
result_raw = result.fetchall()
for item in result_raw:
print(item)

# 多条件查询
query = students.select().where(students.c.gender == 'male').where(students.c.age > 20).where(students.c.birthday < '2000-01-01')
result = conn.execute(query)
result_raw = result.fetchall()
for item in result_raw:
print(item)
  • 更复杂的条件查询 引入and_or_( from sqlalchemy.sql import and_, or_ )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from sqlalchemy.sql import and_, or_
with engine.connect() as conn:
# 复杂条件查询
query = students.select().where(
or_(
students.c.id > 3,
and_(
students.c.gender == 'male',
students.c.age > 30
)
)
)
result = conn.execute(query)
result_set = result.fetchall()
for row in result_set:
print(row)

更新数据

  • 普通 SQL 的更新数据 UPDATE table_name SET col1=value1, col2=value2 WHERE 条件
  • 函数更新数据
    • 更新所有数据
      • table_name.update().values()
    • 更新部分数据
      • table_name.update().where(条件).values()
      • update(table_name).where(条件).values()
1
2
3
4
5
6
7
8
9
with engine.connect() as conn:
update = students.update().where(students.c.id == 1).values(
name='zacharyBlock',
age=18,
gender='male',
birthday='2000-01-01',
)
conn.execute(update)
conn.commit()

删除数据

  • 普通 SQL 的删除数据 DELETE FROM table_name WHERE 条件
  • 函数删除数据
    • 删除所有数据
      • table_name.delete()
    • 删除部分数据
      • table_name.delete().where(条件)
1
2
3
4
with engine.connect() as conn:
delete = students.delete().where(students.c.id == 5)
conn.execute(delete)
conn.commit()

关联表

定义

这里举例一个简单的一对多关联表

画板

删掉之前的 students 表重新建立一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import sqlalchemy

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)

meta_data = sqlalchemy.MetaData()

students = sqlalchemy.Table(
'students', meta_data,
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('name', sqlalchemy.String(64), unique=True, nullable=False),
sqlalchemy.Column('age', sqlalchemy.Integer, nullable=False),
sqlalchemy.Column('gender', sqlalchemy.String(8), nullable=False),
sqlalchemy.Column('class_id', sqlalchemy.Integer, sqlalchemy.ForeignKey('classes.id'), nullable=False),
)

classes = sqlalchemy.Table(
'classes', meta_data,
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('name', sqlalchemy.String(64), unique=True, nullable=False),
)

meta_data.create_all(engine)

插入一些数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# insert data
with engine.connect() as conn:
conn.execute(classes.insert(), [
{"name": "Python"},
{"name": "Java"},
{"name": "Go"},
])
conn.execute(students.insert(), [
{"name": "zachary", "age": 18, "gender": "male", "class_id": 1},
{"name": "curry", "age": 36, "gender": "male", "class_id": 1},
{"name": "jack", "age": 18, "gender": "male", "class_id": 2},
{"name": "sandy", "age": 25, "gender": "female", "class_id": 2},
{"name": "lucy", "age": 35, "gender": "female", "class_id": 3},
])

conn.commit()

查询

  • 查询 Python 班级的所有学生的信息及班级信息
1
2
3
4
5
6
with engine.connect() as conn:
join = students.join(classes, students.c.class_id == classes.c.id)
query = sqlalchemy.select(join).where(classes.c.name == "Python")
result = conn.execute(query)
for row in result:
print(row)
  • 查询 Python 班级的所有学生的信息
1
2
3
4
5
6
7
with engine.connect() as conn:
join = students.join(classes, students.c.class_id == classes.c.id)
# query = sqlalchemy.select(students).select_from(join).where(classes.c.name == 'Python')
query = students.select().select_from(join).where(classes.c.name == 'Python')
result = conn.execute(query)
for row in result:
print(row)
  • 查找 curry 所在的班级
1
2
3
4
5
6
with engine.connect() as conn:
join = students.join(classes, students.c.class_id == classes.c.id)
# query = sqlalchemy.select(classes).select_from(join).where(students.c.name == 'curry')
query = classes.select().select_from(join).where(students.c.name == 'curry')
result = conn.execute(query)
print(result.fetchall())

映射类(这块如果讲的话 暂时不要带上 join)

在 Python 中定义一个类,其中的属性对应着数据表中的字段,通过类对象进行操作从而达到操作数据表的目的

定义

  • 映射类基类

通过使用from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

还是得先删除掉原先创建的 students 表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()


class Student(Base):
__tablename__ = 'students'
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
name = sqlalchemy.Column(sqlalchemy.String(64), unique=True, nullable=False)
age = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
gender = sqlalchemy.Column(sqlalchemy.String(8), nullable=False)


# 若没有创建表 使用Base的metadata创建表
Base.metadata.create_all(engine)

插入

  • 通过session添加记录,代替了之前的 connection
  • 添加一条数据 session.add(obj)
1
2
3
4
5
6
7
8
9
from sqlalchemy.orm import sessionmaker

# 给Session绑定engine
Session = sessionmaker(bind=engine)

session = Session()
student = Student(name='Tony', age=18, gender='male')
session.add(student)
session.commit()
  • 添加多条数据 session.add_all(list)
1
2
3
4
5
6
7
8
9
10
11
12
# 给Session绑定engine
Session = sessionmaker(bind=engine)

session = Session()
# 添加多条记录
student_list = [
Student(name='stack', age=36, gender='male'),
Student(name='jeff', age=29, gender='male'),
Student(name='Monica', age=18, gender='female'),
]
session.add_all(student_list)
session.commit()

查询

  • 查询所有记录 session().query(cls).all()
  • 带条件的查询 session().query(cls).filter(cls.attr == "value")
  • 多条件查询 session().query(cls).filter(and_(条件1, 条件2))
  • 单条记录的查询
    • .first() 调用多条记录中的第一条,若无记录,则返回 None
    • .one() 结果集若为一条记录,则返回,否则抛出异常
    • .scalar() 与 one()相似,区别在于 当结果集是 None 时,不抛出异常,返回 None
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 若没有创建表 使用Base的metadata创建表
Base.metadata.create_all(engine)

# 给Session绑定engine
Session = sessionmaker(bind=engine)

session = Session()
# 查找所有学生信息
students = session.query(Student).all()
for student in students:
print(
f'id:{student.id}, name:{student.name}, age:{student.age}, gender:{student.gender}')

# 查找学生ID大于2 且 为女性的学生信息
students = session.query(Student).filter(and_(Student.id > 2, Student.gender == 'female'))
for student in students:
print(student)

# 查询学生ID大于3的第一条记录
student = session.query(Student).filter(Student.id > 3).first()
print(student)

# 使用one()获取结果集为1条数据的记录,否则会抛出异常
student = session.query(Student).filter(Student.id == 3).one()
print(student)

# 使用scalar()获取的结果集为1条记录,若为空则返回None,若为多条记录抛出异常
student = session.query(Student).filter(Student.id == 9).scalar()
print(student)

更新

  • 更新数据
    • 使用obj.attr="value",依据one()等 查询到某条数据后,直接通过对象属性的方式直接赋值
    • session().query(cls).filter(cls.id == "value").update({cls.attr == "value" })
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 给Session绑定engine
Session = sessionmaker(bind=engine)

session = Session()
# 学生id为4 的年龄+1
student = session.query(Student).filter(Student.id == 4).one()
student.age += 1
session.commit()

student = session.query(Student).filter(Student.id == 4).update({Student.age: 18})
print(student)
session.commit()

# 所有人年龄+1
students = session.query(Student).update({Student.age: Student.age + 1})
session.commit()

Mapped 映射

由 SQLAlchemy2.0 版本提供的新映射方式

  • Mappedmapped_column
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()


class Student(Base):
__tablename__ = 'students'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(sqlalchemy.String(64), unique=True, nullable=False)
age: Mapped[int] = mapped_column(nullable=False)
gender: Mapped[str] = mapped_column(sqlalchemy.String(8), nullable=False)

def __repr__(self):
return f"id:{self.id}, name:{self.name}, age:{self.age}, gender:{self.gender}"


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

session = Session()
students = session.query(Student).all()
for student in students:
print(student)
  • Annotated使 Mapped 更加便捷和得以复用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# -*- coding:utf-8 -*-
# Author: Zachary
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column
from typing_extensions import Annotated

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

int_pk = Annotated[int, mapped_column(primary_key=True)]
str_name = Annotated[str, mapped_column(sqlalchemy.String(64), unique=True, nullable=False)]
int_age = Annotated[int, mapped_column(nullable=False)]
str_gender = Annotated[str, mapped_column(sqlalchemy.String(8), nullable=False)]


class Student(Base):
__tablename__ = 'students'
id: Mapped[int_pk]
name: Mapped[str_name]
age: Mapped[int_age]
gender: Mapped[str_gender]

def __repr__(self):

return f"id:{self.id}, name:{self.name}, age:{self.age}, gender:{self.gender}"


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

session = Session()
students = session.query(Student).all()
for student in students:
print(student)

  • 使用 sql 的内置函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import datetime

import sqlalchemy
from sqlalchemy import func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column
from typing_extensions import Annotated

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

int_pk = Annotated[int, mapped_column(primary_key=True)]
str_name = Annotated[str, mapped_column(sqlalchemy.String(64), unique=True, nullable=False)]
int_age = Annotated[int, mapped_column(nullable=False)]
str_gender = Annotated[str, mapped_column(sqlalchemy.String(8), nullable=False)]
timestamp_now = Annotated[datetime.datetime, mapped_column(nullable=False, server_default=func.now())]


class Student(Base):
__tablename__ = 'students'
id: Mapped[int_pk]
name: Mapped[str_name]
age: Mapped[int_age]
gender: Mapped[str_gender]
create_time: Mapped[timestamp_now]

def __repr__(self):
return f"id:{self.id}, name:{self.name}, age:{self.age}, gender:{self.gender}, create_time:{self.create_time}"

ORM 关联表

一对多

  • ForeignKey定义外键
  • relationship创建关系字段
    • lazy=False使得在查询关系字段的时候自动执行,提高查询效率(讲的时候可以试试给 T 和 F 运行效果看看)
    • backref="关系字段名" 给关系字段对应的类 指定关联关系字段 实现双向关联,虽然便捷 但是不推荐,因为很难直接从另一个类中看出有这个关系字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

int_pk = Annotated[int, mapped_column(primary_key=True)]
str_name = Annotated[str, mapped_column(sqlalchemy.String(64), unique=True, nullable=False)]
int_age = Annotated[int, mapped_column(nullable=False)]
str_gender = Annotated[str, mapped_column(sqlalchemy.String(8), nullable=False)]
class_id_fk = Annotated[int, mapped_column(sqlalchemy.ForeignKey('classes.id'), nullable=False)]
timestamp_now = Annotated[datetime.datetime, mapped_column(nullable=False, server_default=func.now())]


class Class(Base):
__tablename__ = 'classes'
id: Mapped[int_pk]
name: Mapped[str_name]
create_time: Mapped[timestamp_now]

def __repr__(self):
return f"id:{self.id}, name:{self.name}, create_time:{self.create_time}"


class Student(Base):
__tablename__ = 'students'
id: Mapped[int_pk]
name: Mapped[str_name]
age: Mapped[int_age]
gender: Mapped[str_gender]
class_id: Mapped[class_id_fk]
create_time: Mapped[timestamp_now]

# 创建一个关系字段,这并不是一个数据库字段
classes: Mapped[Class] = relationship(lazy=False, backref="students")

def __repr__(self):
return f"id:{self.id}, name:{self.name}, age:{self.age}, gender:{self.gender}, class_id:{self.class_id}, create_time:{self.create_time}"


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

session = Session()

# 插入操作
# 班级对象
class_python = Class(name="Python")
# 学生对象 插入的时候没有使用class_id, 直接插入了一个classes关系对象
student = Student(name="zachary", age=18, gender="male", classes=class_python)
# 在添加数据的时候发现Class表中 还没有python班级,依据关系字段,会自动执行insert语句,因此不需要给class表插入数据
session.add(student)
session.commit()

# 查询操作
student = session.query(Student).filter(Student.gender == "female").first()
print(student)
# 如果是lazy=True 则会再查询一次,否则一次就查询出结果
print(student.classes)

# 通过backref实现双向关联
classes = session.query(Class).filter(Class.name == "Python").one()
print(classes)
print(classes.students)




- `back_populates="关联的关系字段名"` 建议使用该方法进行关联关系的双向映射
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import datetime
from typing import List

import sqlalchemy
from sqlalchemy import func
from sqlalchemy.orm import declarative_base, mapped_column, Mapped, sessionmaker, relationship
from typing_extensions import Annotated

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

int_pk = Annotated[int, mapped_column(primary_key=True)]
str_name = Annotated[str, mapped_column(sqlalchemy.String(64), unique=True, nullable=False)]
int_age = Annotated[int, mapped_column(nullable=False)]
str_gender = Annotated[str, mapped_column(sqlalchemy.String(8), nullable=False)]
class_id_fk = Annotated[int, mapped_column(sqlalchemy.ForeignKey('classes.id'), nullable=False)]
timestamp_now = Annotated[datetime.datetime, mapped_column(nullable=False, server_default=func.now())]


class Class(Base):
__tablename__ = 'classes'
id: Mapped[int_pk]
name: Mapped[str_name]
create_time: Mapped[timestamp_now]

# 创建一个关系字段
students: Mapped[List["Student"]] = relationship(lazy=False, back_populates="classes")

def __repr__(self):
return f"id:{self.id}, name:{self.name}, create_time:{self.create_time}"


class Student(Base):
__tablename__ = 'students'
id: Mapped[int_pk]
name: Mapped[str_name]
age: Mapped[int_age]
gender: Mapped[str_gender]
class_id: Mapped[class_id_fk]
create_time: Mapped[timestamp_now]

# 创建一个关系字段,这并不是一个数据库字段
classes: Mapped[Class] = relationship(lazy=False, back_populates="students")

def __repr__(self):
return f"id:{self.id}, name:{self.name}, age:{self.age}, gender:{self.gender}, class_id:{self.class_id}, create_time:{self.create_time}"


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

session = Session()

# 通过back_populates实现双向关联
student = session.query(Student).filter(Student.gender == "female").first()
print(student)
print(student.classes)

classes = session.query(Class).filter(Class.name == "Python").one()
print(classes)
print(classes.students)

多对多

画板

在用户登录注册的业务中,用户可以分为很多角色,同时用户可能拥有多重角色身份,这就需要使用多对多的关联

在这样的多对多关系中,通常需要一张中间表,因此在实现的时候首先需要定义这张中间表,同时这张中间表不是使用 class 进行的定义,而是使用方法定义

1
2
3
4
5
6
7
8
engine = create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

relation_table = Table(
'user_role', Base.Metadata,
Column('user_id', ForeignKey('users.id'), primary_key=True),
Column('role_id', ForeignKey('roles.id'), primary_key=True)
)

创建 role 表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import sqlalchemy
from sqlalchemy import Table, create_engine, Column, ForeignKey
from sqlalchemy.orm import declarative_base, mapped_column, Mapped
from typing_extensions import Annotated

engine = create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

int_pk = Annotated[int, mapped_column(primary_key=True)]
str_name = Annotated[str, mapped_column(sqlalchemy.String(64), unique=True, nullable=False)]

relation_table = Table(
'user_role', Base.Metadata,
Column('user_id', ForeignKey('users.id'), primary_key=True),
Column('role_id', ForeignKey('roles.id'), primary_key=True)
)


class Role(Base):
__tablename__ = 'roles'
id: Mapped[int_pk]
name: Mapped[str_name]

def __repr__(self):
return f"id:{self.id}, name:{self.name}"

之后创建 user 表,需要创建单向关联字段

  • 使用 relationship()中的secondary="中间表"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from typing import List

import sqlalchemy
from sqlalchemy import Table, create_engine, Column, ForeignKey
from sqlalchemy.orm import declarative_base, mapped_column, Mapped, relationship
from typing_extensions import Annotated

engine = create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

int_pk = Annotated[int, mapped_column(primary_key=True)]
str_unique_name = Annotated[str, mapped_column(sqlalchemy.String(64), unique=True, nullable=False)]
str_pwd = Annotated[str, mapped_column(sqlalchemy.String(64), nullable=False)]

relation_table = Table(
'user_role', Base.Metadata,
Column('user_id', ForeignKey('users.id'), primary_key=True),
Column('role_id', ForeignKey('roles.id'), primary_key=True)
)


class Role(Base):
__tablename__ = 'roles'
id: Mapped[int_pk]
role_name: Mapped[str_unique_name]

def __repr__(self):
return f"id:{self.id}, name:{self.name}"


class User(Base):
__tablename__ = 'users'
id: Mapped[int_pk]
account: Mapped[str_unique_name]
password: Mapped[str_pwd]

roles: Mapped[List["Role"]] = relationship(lazy=False, secondary=relation_table)

def __repr__(self):
return f"id:{self.id}, name:{self.name}"

Base.Metadata.create_all(engine)
Session = sessionmaker(bind=engine)

尝试插入一些数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 添加数据
session = Session()

role_1 = Role(role_name="admin")
role_2 = Role(role_name="user")
role_3 = Role(role_name="guest")

adminUser = User(account="Curry", password="1234")
adminUser.roles.add(role_1)
adminUser.roles.add(role_2)

normalUser = User(account="Jordan", password="1234")
normalUser.roles.add(role_2)

guestUser = User(account="Harden", password="1234")
guestUser.roles.add(role_3)

session.add_all([adminUser, normalUser, guestUser])
session.commit()

查询一下数据

1
2
3
4
user_data = session.query(User).all()
for item in user_data:
print(item)
print(item.roles)

至此 单向的关联关系是没问题的,照例实现一下从 role 关联至 user

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# -*- coding:utf-8 -*-
# Author: Zachary
from typing import Set

import sqlalchemy
from sqlalchemy import Table, create_engine, Column, ForeignKey
from sqlalchemy.orm import declarative_base, mapped_column, Mapped, relationship, sessionmaker
from typing_extensions import Annotated

engine = create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

int_pk = Annotated[int, mapped_column(primary_key=True)]
str_unique_name = Annotated[str, mapped_column(sqlalchemy.String(64), unique=True, nullable=False)]
str_pwd = Annotated[str, mapped_column(sqlalchemy.String(64), nullable=False)]

relation_table = Table(
'user_role', Base.metadata,
Column('user_id', ForeignKey('users.id'), primary_key=True),
Column('role_id', ForeignKey('roles.id'), primary_key=True)
)


class Role(Base):
__tablename__ = 'roles'
id: Mapped[int_pk]
role_name: Mapped[str_unique_name]

users: Mapped[Set["User"]] = relationship(lazy=False, secondary=relation_table, back_populates="roles")

def __repr__(self):
return f"id:{self.id}, role_name:{self.role_name}"


class User(Base):
__tablename__ = 'users'
id: Mapped[int_pk]
account: Mapped[str_unique_name]
password: Mapped[str_pwd]

roles: Mapped[Set["Role"]] = relationship(lazy=False, secondary=relation_table, back_populates="users")

def __repr__(self):
return f"id:{self.id}, account:{self.account}, password:{self.password}"


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

# 添加数据
session = Session()

user_data = session.query(User).all()
for item in user_data:
print(item)
print(item.roles)

role_data = session.query(Role).all()
for item in role_data:
print(item)
print(item.users)

一对一

画板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# -*- coding:utf-8 -*-
# Author: Zachary
import datetime

import sqlalchemy
from sqlalchemy import ForeignKey
from sqlalchemy.orm import declarative_base, Mapped, mapped_column, relationship, sessionmaker
from typing_extensions import Annotated

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

int_pk = Annotated[int, mapped_column(primary_key=True)]
str_name = Annotated[str, mapped_column(sqlalchemy.String(64), unique=False, nullable=False)]
int_age = Annotated[int, mapped_column(sqlalchemy.SmallInteger, nullable=False)]
str_gender = Annotated[str, mapped_column(sqlalchemy.String(8), nullable=False)]
date_exp_license = Annotated[
datetime.datetime, mapped_column(sqlalchemy.Date, default=datetime.datetime.now() + datetime.timedelta(days=365))]


class Driver(Base):
__tablename__ = 'drivers'
id: Mapped[int_pk]
name: Mapped[str_name]
age: Mapped[int_age]
gender: Mapped[str_gender]
license_id: Mapped[str] = mapped_column(ForeignKey('licenses.id'), nullable=False)

license = relationship("License", lazy=False, back_populates="driver")

def __repr__(self):
return f"id:{self.id}, name:{self.name}, age:{self.age}, gender:{self.gender}, license_id:{self.license_id}"


class License(Base):
__tablename__ = 'licenses'
id: Mapped[int_pk]
l_class: Mapped[str_name]
expiration_date: Mapped[date_exp_license]

driver = relationship("Driver", lazy=False, back_populates="license")

def __repr__(self):
return f"id:{self.id}, l_class:{self.l_class}, expiration_date:{self.expiration_date}"


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

尝试插入数据

1
2
3
4
5
6
7
8
9
10
license_1 = License(l_class="A")
license_2 = License(l_class="B")
license_3 = License(l_class="A")

driver_1 = Driver(name="Tom", age=18, gender="male", license=license_1)
driver_2 = Driver(name="Jerry", age=20, gender="female", license=license_2)
driver_3 = Driver(name="Jack", age=22, gender="male", license=license_3)

session.add_all([driver_1, driver_2, driver_3])
session.commit()

查询数据

1
2
3
4
5
6
7
8
9
10
license_1 = session.query(License).filter(License.l_class == "A").all()
for item in license_1:
print(item)
print(item.driver)

driver = session.query(Driver).all()
for item in driver:
print(item)
print(item.license)
print("=============================")

更新数据

1
2
3
4
5
6
7
8
9
10
# driver id为3 的license_id 清空
session.query(Driver).filter(Driver.id == 3).update({Driver.license_id: None})
session.commit()

# 重新赋给driver id=3的license_id
license_item = session.query(License).filter(License.id == 3).first()
driver = session.query(Driver).filter(Driver.id == 3).first()
if license_item and driver:
driver.license = license_item
session.commit()

ORM 操作

查询

这里仍旧以一对多为例,沿用之前的学生 班级表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import datetime
from typing import List

import sqlalchemy
from sqlalchemy import func
from sqlalchemy.orm import declarative_base, mapped_column, Mapped, sessionmaker, relationship
from typing_extensions import Annotated

engine = sqlalchemy.create_engine('mysql://root:980226@localhost:3306/testSql?charset=utf8', echo=True)
Base = declarative_base()

int_pk = Annotated[int, mapped_column(primary_key=True)]
str_name = Annotated[str, mapped_column(sqlalchemy.String(64), unique=True, nullable=False)]
int_age = Annotated[int, mapped_column(nullable=False)]
str_gender = Annotated[str, mapped_column(sqlalchemy.String(8), nullable=False)]
class_id_fk = Annotated[int, mapped_column(sqlalchemy.ForeignKey('classes.id'), nullable=False)]
timestamp_now = Annotated[datetime.datetime, mapped_column(nullable=False, server_default=func.now())]


class Class(Base):
__tablename__ = 'classes'
id: Mapped[int_pk]
name: Mapped[str_name]
create_time: Mapped[timestamp_now]

# 创建一个关系字段
students: Mapped[List["Student"]] = relationship(back_populates="classes")

def __repr__(self):
return f"id:{self.id}, name:{self.name}, create_time:{self.create_time}"


class Student(Base):
__tablename__ = 'students'
id: Mapped[int_pk]
name: Mapped[str_name]
age: Mapped[int_age]
gender: Mapped[str_gender]
class_id: Mapped[class_id_fk]
create_time: Mapped[timestamp_now]

# 创建一个关系字段,这并不是一个数据库字段
classes: Mapped[Class] = relationship(back_populates="students")

def __repr__(self):
return f"id:{self.id}, name:{self.name}, age:{self.age}, gender:{self.gender}, class_id:{self.class_id}, create_time:{self.create_time}"


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
  • 查询的一些实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 查询所有学生信息,以姓名排序
query = select(Student).order_by(Student.name)
result = session.execute(query)
for item in result:
print(item)

# 查询一个表中的某几个字段 id name
query = select(Student.id, Student.name).select_from(Student)
execute_query(query)

# 使用join函数做联合查询 默认是inner join
query = select(Student,Class).join(Student.classes)
execute_query(query)

query = select(Class, Student).join(Student.classes)
execute_query(query)

# 查询多个表中的某几个字段 stu.name cls.name
query = select(Student.name, Class.name).join_from(Student, Class)
execute_query(query)

# 使用outer join函数做联合查询
query = select(Student, Class).outerjoin(Class.students)
execute_query(query)

query = select(Student, Class).select_from(outerjoin(Class, Student))
execute_query(query)

query = select(Student, Class).outerjoin(Student.classes)
execute_query(query)

query = select(Student, Class).select_from(outerjoin(Student, Class))
execute_query(query)

# 使用join函数实现outer join
query = select(Student, Class).join(Student.classes, isouter=True)
execute_query(query)

query = select(Class, Student).join(Student.classes, isouter=True)
execute_query(query)

# where 条件查询
query = select(Student).where(Student.age > 18, Student.gender == 'male')
execute_query(query)
query = select(Student).where(and_(Student.age > 18, Student.gender == 'male'))
execute_query(query)

obj_stu = session.get(Student, 1) # 可以看看这个源码
query = select(Student).where(Student.name == obj_stu.name)
execute_query(query)

query = select(Student).where(Student.age != obj_stu.age)
execute_query(query)

query = select(Class).where(Class.students.contains(obj_stu))
execute_query(query)

插入

  • insert()函数插入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 使用insert()实现插入

# 给classes插入数据
cls_1 = {"name": "Golang"}
cls_2 = {"name": "C++"}
cls_list = [cls_1, cls_2]
session.execute(insert(Class).values(cls_list))
session.commit()


# 给students插入数据
student_1 = {"name": "Tom", "age": 18, "gender": "male", "class_id": select(Class.id).where(Class.name == "Golang")}
student_2 = {"name": "Rose", "age": 20, "gender": "female", "class_id": select(Class.id).where(Class.name == "C++")}
student_list = [student_1, student_2]
session.execute(insert(Student).values(student_list))
session.commit()



更新

  • update()函数更新
1
2
3
4
5
6
7
8
9
10
# 使用update()实现更新

# id为3的学生想转到Golang班 id为4的学生想转到C++班
update_1 = {"id": 3, "class_id": session.execute(select(Class).where(Class.name == "Golang")).scalar_one().id}
update_2 = {"id": 4, "class_id": session.execute(select(Class).where(Class.name == "C++")).scalar_one().id}
update_list = [update_1, update_2]
session.execute(update(Student), update_list)
session.commit()


删除

  • delete()函数删除
1
2
3
4
5
# 使用 delete() 实现删除

# 删除id在8, 9, 10 的学生
session.execute(delete(Student).where(Student.id.in_([8, 9, 10])))
session.commit()

Session 与事务

  • Session 默认启用事务
1
2
3
4
5
6
with Session(engine) as session: # 事务的开始
...
session.commit()

# 在session.commit()之前发生任何异常,都会rollback
# 若无异常,提交后会关闭session
  • 自动提交事务
1
2
3
4
5
6
7
8
9
10
with Session(engine) as session:
with session.begin():
...
# 到这块无异常,正常自动提交
# 自动关闭session

with Session(engine) as session, session.begin():
...
# 到这块无异常,正常自动提交
# 自动关闭session

更新: 2024-02-21 20:01:12
原文: https://www.yuque.com/zacharyblock/cx2om6/uigosxw6s0n5b7k8