API 鉴权

为了不允许,任何人都可以调用后端的 api 接口,现在给 api 接口加上权限校验,只有登录过的才能调用后端的 api 接口,之前在auth.py中实现了一个校验,通过依赖注入给 api 直接加上即可

依赖注入

比如给courseApi.py的所有接口加上 token 校验的依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# -*- coding:utf-8 -*-
# Author: Zachary
from typing import Optional

from fastapi import APIRouter, Query, Depends

from api import app
from common.auth import auth_handler
from common.pageHelper import PageHelper
from common.result import ResultModel, Result
from model import Session, get_db_session
from model.course import CourseSearch, CourseCreate, CourseUpdate
from service.courseService import CourseService

course_router = APIRouter(prefix="/course")


@course_router.get("/selectPage", response_model=ResultModel, dependencies=[Depends(auth_handler.auth_required)])
async def select_page(page: int = Query(1, ge=1, alias="pageNum", description="Page number"),
size: int = Query(5, gt=0, le=100, alias="pageSize", description="Page size"),
name: Optional[str] = Query(None, description="Course name"),
number: Optional[str] = Query(None, description="Course number"),
teacher: Optional[str] = Query(None, description="Course teacher"),
db_session: Session = Depends(get_db_session)):
pageInfo = PageHelper.startPage(page, size)
course_search = CourseSearch(name=name, number=number, teacher=teacher)
course_list = CourseService.select_page(course_search, db_session)
result = Result.success(pageInfo.of(course_list))
return result


@course_router.post("/add", response_model=ResultModel, dependencies=[Depends(auth_handler.auth_required)])
async def add(course: CourseCreate, db_session: Session = Depends(get_db_session)):
CourseService.add_course(course, db_session)
result = Result.success()
return result


@course_router.put("/update", response_model=ResultModel, dependencies=[Depends(auth_handler.auth_required)])
async def update(course: CourseUpdate, db_session: Session = Depends(get_db_session)):
CourseService.update_by_id(course, db_session)
result = Result.success()
return result


@course_router.delete("/delete/{id}", response_model=ResultModel, dependencies=[Depends(auth_handler.auth_required)])
async def delete(id: int, db_session: Session = Depends(get_db_session)):
CourseService.delete_by_id(id, db_session)
result = Result.success()
return result


app.include_router(course_router)

或者给整个/course 下的 api 都注入依赖,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# -*- coding:utf-8 -*-
# Author: Zachary
from typing import Optional

from fastapi import APIRouter, Depends, Query

from api import app
from common.auth import auth_handler
from common.pageHelper import PageHelper
from common.result import ResultModel, Result
from model import Session, get_db_session
from model.course import CourseSearch, CourseCreate, CourseUpdate
from service.courseService import CourseService

course_router = APIRouter(prefix='/course', dependencies=[Depends(auth_handler.auth_required)])


@course_router.get("/selectPage", response_model=ResultModel)
async def select_page(page: int = Query(1, ge=1, alias="pageNum", description="Page number"),
size: int = Query(5, gt=0, le=100, alias="pageSize", description="Page size"),
name: Optional[str] = Query(None, description="Course name"),
number: Optional[str] = Query(None, description="Course number"),
teacher: Optional[str] = Query(None, description="Course teacher"),
db_session: Session = Depends(get_db_session)):
pageInfo = PageHelper.startPage(page, size)
course_search = CourseSearch(name=name, number=number, teacher=teacher)
course_list = CourseService.select_page(course_search, db_session)
result = Result.success(pageInfo.of(course_list))
return result


@course_router.post("/add", response_model=ResultModel)
async def add(course: CourseCreate, db_session: Session = Depends(get_db_session)):
CourseService.add_course(course, db_session)
result = Result.success()
return result


@course_router.put("/update", response_model=ResultModel)
async def update(course: CourseUpdate, db_session: Session = Depends(get_db_session)):
CourseService.update_by_id(course, db_session)
result = Result.success()
return result


@course_router.delete("/delete/{id}", response_model=ResultModel)
async def delete(id: int, db_session: Session = Depends(get_db_session)):
CourseService.delete_by_id(id, db_session)
result = Result.success()
return result


app.include_router(course_router)

这样如果没有登录的情况下,直接用 postman 是无法调用接口的

登录 token

虽然现在前端页面上已经登录了,但由于请求头没有带上 token,所以无法获取数据

为了让浏览器拿到后端生成的这个 token,需要改变一下登录后的返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# -*- coding:utf-8 -*-
# Author: Zachary
from sqlalchemy import select

from common.auth import auth_handler
from exception.customException import UserNotFoundException, PasswordNotMatchException
from model import Session
from model.admin import Admin, AdminModel, AdminLoginResponse


class AdminService:
@staticmethod
def login(admin: AdminModel, db_session: Session) -> Admin:
query = select(Admin).where(Admin.username == admin.username)
exist_admin: Admin = db_session.execute(query).scalars().first()
if not exist_admin:
raise UserNotFoundException("用户不存在")
if not auth_handler.verify_password(admin.password, exist_admin.password):
raise PasswordNotMatchException("身份验证未通过")
admin_login_response = AdminLoginResponse(id=exist_admin.id, username=exist_admin.username,
token=auth_handler.encode_token(exist_admin.id))
return admin_login_response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# -*- coding:utf-8 -*-
# Author: Zachary
import bcrypt
from pydantic import BaseModel
from sqlalchemy import Integer, String
from sqlalchemy.orm import Mapped, mapped_column

from model import Base


class Admin(Base):
__tablename__ = "admin"
id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
username: Mapped[str] = mapped_column(String(255), nullable=False)
password: Mapped[str] = mapped_column(String(255), nullable=False)


class AdminModel(BaseModel):
username: str
password: str


class AdminLoginResponse(BaseModel):
id: int
username: str
token: str

这样子在重新登录之后就可以获得后端返回的 token,前端 axios 会在发起请求之前,将 token 放到请求头中,再向后端请求 api 接口

登录后会返回 token:

能正确获取数据:

token 超时失效

为了让前端的 token 失效之后,自动回到 Login 页面,需要做些调整

当请求 api 接口的时候返回的状态值为 401 的时候,移除存储的 student-user,这样就会自动跳转回 login 页面重新进行登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
<template>
<div>
<div class="card" style="margin-bottom: 10px">
<el-input
@input="load"
style="width: 260px"
v-model="data.name"
placeholder="请输入要查询的课程名称"
:prefix-icon="Search"
/>
<el-input
@input="load"
style="margin-left:10px; width: 260px"
v-model="data.number"
placeholder="请输入课程编号"
:prefix-icon="Search"
/>
<el-input
@input="load"
style="margin-left:10px; width: 260px"
v-model="data.teacher"
placeholder="请输入任课老师"
:prefix-icon="Search"
/>
<el-button type="info" plain style="margin: 0 0 0 10px" @click="reset"
>重置</el-button
>
</div>

<div class="card" style="margin-bottom: 10px">
<div>
<el-button
type="primary"
style="margin-bottom: 5px"
plain
@click="handleAdd"
>新增</el-button
>
</div>
<div>
<el-table :data="data.tableData" stripe style="width: 100%">
<el-table-column prop="id" label="ID" width="80" />
<el-table-column prop="name" label="课程名称" width="180" />
<el-table-column prop="number" label="课程编号" width="180" />
<el-table-column prop="description" label="课程描述" width="240" />
<el-table-column prop="periods" label="课时" width="180" />
<el-table-column prop="teacher" label="任课教师" />
<el-table-column label="操作" align="center">
<template #default="scope">
<el-button
type="primary"
size="small"
plain
@click="handleEdit(scope.row)"
>编辑</el-button
>
<el-button
type="danger"
size="small"
plain
@click="handleDelete(scope.row.id)"
>删除</el-button
>
</template>
</el-table-column>
</el-table>
</div>
</div>

<div class="card">
<el-pagination
v-model:current-page="data.pageNum"
v-model:page-size="data.pageSize"
@current-change="handleCurrentChange"
background
layout="prev, pager, next"
:total="data.total"
/>
</div>

<el-dialog width="35%" v-model="data.formVisible" title="课程信息">
<el-form
:model="data.form"
label-width="100px"
label-position="right"
style="padding-right: 45px"
>
<el-form-item label="课程名称">
<el-input v-model="data.form.name" autocomplete="off" />
</el-form-item>
<el-form-item label="课程编号">
<el-input v-model="data.form.number" autocomplete="off" />
</el-form-item>
<el-form-item label="课程描述">
<el-input v-model="data.form.description" autocomplete="off" />
</el-form-item>
<el-form-item label="课时">
<el-input v-model="data.form.periods" autocomplete="off" />
</el-form-item>
<el-form-item label="任课教师">
<el-input v-model="data.form.teacher" autocomplete="off" />
</el-form-item>
</el-form>
<template #footer>
<span class="dialog-footer">
<el-button @click="data.formVisible = false" plain>取消</el-button>
<el-button type="primary" @click="save" plain>保存</el-button>
</span>
</template>
</el-dialog>
</div>
</template>

<script setup>
import { reactive } from "vue";
import { Search } from "@element-plus/icons-vue";
import request from "@/utils/request";
import { ElMessage, ElMessageBox } from "element-plus";
import router from "@/router";

const data = reactive({
name: "",
number: "",
teacher: "",
tableData: [],
total: 0,
pageNum: 1,
pageSize: 5,
formVisible: false,
form: {},
});

const load = () => {
request
.get("/course/selectPage", {
params: {
pageNum: data.pageNum,
pageSize: data.pageSize,
name: data.name,
number: data.number,
teacher: data.teacher,
},
})
.then((res) => {
if (res.code === "200") {
data.tableData = res.data?.list || [];
data.total = res.data?.total || 0;
} else {
ElMessage.error(res.msg);
}
})
.catch((err) => {
if (err.response?.data?.code === "401") {
localStorage.removeItem("student-user");
}
ElMessage.error(err.response?.data?.msg || err.message);
});
};

// 加载一次 获取课程数据
load();

const handleCurrentChange = () => {
// 当翻页的时候重新加载数据
load();
};

const reset = () => {
data.name = "";
data.number = "";
data.teacher = "";
load();
};

const handleAdd = () => {
data.formVisible = true;
data.form = {};
};

const save = () => {
request
.request({
url: data.form.id ? "/course/update" : "/course/add",
method: data.form.id ? "put" : "post",
data: data.form,
})
.then((res) => {
if (res.code === "200") {
ElMessage.success("操作成功");
data.formVisible = false;
load();
} else {
ElMessage.error(res.msg);
}
})
.catch((err) => {
if (err.response?.data?.code === "401") {
localStorage.removeItem("student-user");
}
ElMessage.error(err.response?.data?.msg || err.message);
});
};

const handleEdit = (row) => {
data.form = JSON.parse(JSON.stringify(row));
data.formVisible = true;
};

const handleDelete = (id) => {
ElMessageBox.confirm("删除后内容将无法恢复,您确认删除嘛?", "删除确认", {
type: "warning",
})
.then((res) => {
request.delete("/course/delete/" + id).then((res) => {
if (res.code === "200") {
ElMessage.success("删除成功");
load();
} else {
ElMessage.error(res.msg);
}
});
})
.catch((err) => {
ElMessage.error(err.response?.data?.msg || err.message);
})
.catch((err) => {
if (err.response?.data?.code === "401") {
localStorage.removeItem("student-user");
}
ElMessage.error(err.response?.data?.msg || err.message);
});
};
</script>

token 超时配置

为了更便捷地配置 token 的超时时间,设置一下超时时间常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# -*- coding:utf-8 -*-
# Author: Zachary

from common.config import config

HOST = config.env.get("HOST")
PORT = config.env.get("PORT")

MYSQL_DIALECT = config.env.get("MYSQL_DIALECT")
MYSQL_HOST = config.env.get("MYSQL_HOST")
MYSQL_PORT = config.env.get("MYSQL_PORT")
MYSQL_USER = config.env.get("MYSQL_USER")
MYSQL_PASSWORD = config.env.get("MYSQL_PASSWORD")
MYSQL_DATABASE = config.env.get("MYSQL_DATABASE")

TOKEN_EXPIRE_DAYS = 7
TOKEN_EXPIRE_MINUTES = 0
TOKEN_EXPIRE_SECONDS = 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# -*- coding:utf-8 -*-
# Author: Zachary
from datetime import datetime, timedelta

import jwt
from fastapi import Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from passlib.context import CryptContext

from common.constant import TOKEN_EXPIRE_DAYS, TOKEN_EXPIRE_MINUTES, TOKEN_EXPIRE_SECONDS
from exception.customException import TokenException


class AuthHandler:
security = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
secret = "SECRET"

def get_password_hash(self, password):
return self.pwd_context.hash(password)

def verify_password(self, plain_password, hashed_password):
return self.pwd_context.verify(plain_password, hashed_password)

def encode_token(self, user_id):
payload = {
'exp': datetime.utcnow() + timedelta(days=TOKEN_EXPIRE_DAYS, minutes=TOKEN_EXPIRE_MINUTES,
seconds=TOKEN_EXPIRE_SECONDS),
'iat': datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
self.secret,
algorithm='HS256'
)

def decode_token(self, token):
try:
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
raise TokenException("token过期")
except jwt.InvalidTokenError:
raise TokenException("无效token")

def auth_required(self, auth: HTTPAuthorizationCredentials = Security(security)):
return self.decode_token(auth.credentials)


auth_handler = AuthHandler()

更新: 2024-05-03 21:07:51
原文: https://www.yuque.com/zacharyblock/iacda/po1gqzqal77c94sl