赞
踩
项目结构

config.py
hostname = "127.0.0.1"
port = 3306
username = "root"
password = "root"
database = "flask_qa"
# 在 app.config 中设置连接数据库的信息
SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{username}:{password}@{hostname}:{port}/{database}?charset=utf8mb4"
exts.py
扩展文件,先预定义db
from flask_sqlalchemy import SQLAlchemy
# 扩展文件,先预定义db
# 避免出现循环引用, 比如将db=db = SQLAlchemy(app)放入app.py中, modules 实体类中需要引用 db, 而app.py中需要导入实体类, 出现了循环引用
# 如果在此处定义 db=db = SQLAlchemy(app) , 则会引用 app.py文件, 而 modules 实体类中需要引用 db, 也相当于引用了 app.py , app.py中需要导入实体类, 又会出现循环引用
# 因此此处预定义, 从而 实体类中引用该文件, app.py 中引用实体类 , 并在app.py中初始化db的配置
db = SQLAlchemy()
实体类定义(modules包下:):
User.py
from exts import db
from datetime import datetime
class UserModel(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(100), nullable=False)
password = db.Column(db.String(100), nullable=False)
email = db.Column(db.String(100), nullable=False, unique=True)
join_time = db.Column(db.DateTime, default=datetime.now)
视图定义(blueprints包下):
autho.py
from flask import Blueprint
# 初始化, 定义该视图名称和访问前缀
bp = Blueprint('auth', __name__, url_prefix='/auth')
# 相当于访问 /auth/login
@bp.route('/login')
def login():
pass
qa.py
from flask import Blueprint
# 初始化, 定义该视图名称和访问前缀
bp = Blueprint('qa', __name__, url_prefix='/qa')
最后在app.py中进行整合:
from flask import Flask import config from exts import db # 导入自定义的视图 from buleprints.autho import bp as auth_bp from buleprints.qa import bp as qa_bp # 导入定义的实体类 from modules.User import UserModel from flask_migrate import Migrate app = Flask(__name__) # 绑定数据库的配置文件,自动读取配置信息 app.config.from_object(config) # 初始化db的配置 db.init_app(app) # 绑定自定义的视图 app.register_blueprint(auth_bp) app.register_blueprint(qa_bp) # 将定义的实体类模型映射到数据库中 migrate = Migrate(app, db) # 然后在控制台 flask db init, flask db migrate, flask db upgrade @app.route('/') def hello_world(): # put application's code here return 'Hello World!' if __name__ == '__main__': app.run()
注册登录页面模板渲染
qq邮箱:
设置->账户->POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV服务
开启 POP3/SMTP服务
第一步,下载 flask_mail 包
pip install flask-mail
config.py
# 发送邮件的相关配置
MAIL_SERVER = "smtp.qq.com"
MAIL_USE_SSL = True
MAIL_PORT = 465
MAIL_USERNAME = "3148524372@qq.com"
MAIL_PASSWORD = "vbjpvdzrzzlhdfca"
MAIL_DEFAULT_SENDER = "3148524372@qq.com"
exts.py 中 预定义(作用和db的预定义一样)
from flask_sqlalchemy import SQLAlchemy
# 扩展文件,先预定义db
# 避免出现循环引用, 比如将db=db = SQLAlchemy(app)放入app.py中, modules 实体类中需要引用 db, 而app.py中需要导入实体类, 出现了循环引用
# 如果在此处定义 db=db = SQLAlchemy(app) , 则会引用 app.py文件, 而 modules 实体类中需要引用 db, 也相当于引用了 app.py , app.py中需要导入实体类, 又会出现循环引用
# 因此此处预定义, 从而 实体类中引用该文件, app.py 中引用实体类 , 并在app.py中初始化db的配置
db = SQLAlchemy()
from flask_mail import Mail
mail = Mail()
app.py 中 初始化
# 绑定数据库的配置文件,自动读取配置信息
app.config.from_object(config)
# 初始化db的配置
db.init_app(app)
# 初始化邮箱相关配置
mail.init_app(app)
autho.py 中测试
from flask import Blueprint from flask_mail import Message from exts import mail # 初始化, 定义该视图名称和访问前缀 bp = Blueprint('auth', __name__, url_prefix='/auth') # 相当于访问 /auth/login @bp.route('/login') def login(): pass @bp.route('/mail/test') def mail_test(): message = Message(subject='发送邮件测试', recipients=["yangzhaohui4132@163.com"], body="测试邮件") mail.send(message) return "success"
因为需要将验证码存储到数据库中,因此需要先创建实体类 EmailCaptch.py
from exts import db
class EmailCaptchaModel(db.Model):
__tablename__ = 'email_captcha'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
email = db.Column(db.String(100), nullable=False)
captcha = db.Column(db.String(100), nullable=False)
在 autho.py 中 请求 /autho/captch/email 会执行下面的方法,生成验证码并发送到对应的邮件中,并将生成的验证码存储到数据库中
# 获取验证码 @bp.route('/captch/email') def get_email_captch(): # 请求方式 captch/email/<email> # captch/email?email=xx@qq.com # 通过request获取请求的参数 email = request.args.get('email') # 生成验证码 # 验证码为4个随机数字的组合 source = string.digits*4 captch = random.sample(source, 4) # 此时captch为列表 captch = "".join(captch) # 发送验证码 message = Message(subject='发送邮件测试', recipients=[email], body=f"验证码是:{captch}") mail.send(message) # 将验证码存储到数据库中 email_captch = EmailCaptchaModel(email=email, captcha=captch) db.session.add(email_captch) db.session.commit() # 统一对前端请求的返回信息,采用 RESTFUL 的形式 # {"code":200, "message":"", "data":None} return jsonify({"code":200, "message": "", "data":None})
进行测试:访问 127.0.0.1:5000/auth/captch/email?email=yangzhaohui4132@163.com

register.html页面
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <script src="{{ url_for('static', filename='js/jquery-3.6.3.js') }}"></script> <script src="{{ url_for('static', filename='js/register.js') }}"></script> </head> <body> <form> <div><input type="text" name="email"></div> <button name="captcha-btn" id="captcha-btn">获取验证码</button> </form> </body> </html>
register.js 页面
// 定义绑定 发送验证码按钮 的函数 function bindEmailCaptchaClick(){ $("#captcha-btn").click(function (event) { // $this, 代表的是当前按钮的 jquery 对象 var $this = $(this) // 阻止默认事件 event.preventDefault() console.log("1111") // 获取输入的邮箱 let email = $("input[name='email']").val() // 点击发送验证码按钮后,发送ajax请求 $.ajax({ // 根目录 : http:127.0.0.1:5000 // 请求路径为 : /auth/captcha/email?email=XX@qq.com url: "/auth/captch/email?email="+email, method: "GET", // 如果请求成功, 服务器会返回结果 success: function(result){ var code = result['code'] if (code == 200){ // 说明请求成功 // 开始倒计时 // 进行倒计时之前, 需要先取消按钮的点击事件 $this.off("click") var countdown = 60 // 倒计时60s // 使用 setInterval 定义计时器 var timer = setInterval(function () { $this.text(countdown) countdown -= 1 // 倒计时结束 if (countdown <= 0){ // 清除计时器 clearInterval(timer) // 修改按钮的文字 $this.text("获取验证码") // 重新绑定点击事件 bindEmailCaptchaClick() } }, 1000) alert("验证码发送成功") }else { // 未能成功发送成功 alert(result["message"]) } }, fail: function(error){ console.log(error) } }) }) } //先加载完整个页面文档后, 再加载js函数 $(function () { bindEmailCaptchaClick() })
首先下载 flask-wtf 包, 该包包含对表单 form 的相关处理
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple flask-wtf
其次在配置文件中配置 密钥
config.py
# flask_wtf 进行对前端传递到服务器的数据验证
# 需要在配置文件中设定 关键字
SECRET_KEY = "qwoehavnOEJVIN "
然后定义对应的表单验证类
注册表单对应 RegisterForm 类
form.py 中
import wtforms from wtforms.validators import Email, Length, EqualTo from modules.User import UserModel from modules.EmailCaptch import EmailCaptchaModel # 用来验证前端提交的表单数据是否符合要求 class RegisterForm(wtforms.Form): email = wtforms.StringField(validators=[Email(message="邮箱格式错误")]) captcha = wtforms.StringField(Length(min=4, max=4, message="验证码格式错误")) username = wtforms.StringField(Length(min=3, max=20, message="用户名格式错误")) password = wtforms.StringField(Length(min=3, max=20, message="密码格式错误")) password_confirm = wtforms.StringField(validators=[EqualTo("password")]) # 自定义验证 # 判断邮箱是否被注册 def validate_email(self, field): email = field.data user = UserModel.query.filter_by(email=email).first() if user: raise wtforms.ValidationError(message="该邮箱已被注册") # 验证验证码是否正确 def validate_captcha(self, field): captcha = field.data email = self.email.data captcha_model = EmailCaptchaModel.query.filter_by(email=email, captcha=captcha).first() if not captcha_model: raise wtforms.ValidationError(message="邮箱或验证码错误")
最后服务器对前端提交的数据进行处理, 并保存到数据库中
autho.py
@bp.route('/register', methods=['GET', 'POST']) def register(): # 默认是get请求 if request.method == 'GET': return render_template('register.html') else: # 验证用户提交的邮箱和验证码是否正确 # 使用flask-wtf: wtforms 进行表单验证 form = RegisterForm(request.form) if form.validate(): # 验证成功, 将提交的数据存储到数据库中 email = form.email.data username = form.username.data password = form.password.data user = UserModel(email=email, username=username, password=generate_password_hash(password)) db.session.add(user) db.session.commit() return redirect('/auth/login') else: # 验证失败 print(form.errors) return "fail"
首先定义一个和登录表单对应的验证类 loginForm
form.py 中
class loginForm(wtforms.Form):
email = wtforms.StringField(validators=[Email(message="邮箱格式错误")])
password = wtforms.StringField(Length(min=3, max=20, message="密码格式错误"))
在 autho.py 中对登录请求进行处理
# 相当于访问 /auth/login @bp.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': return render_template("login.html") else: form = LoginForm(request.form) if form.validate(): # 验证成功, 将提交的数据存储到数据库中 email = form.email.data password = form.password.data # 数据库查找 user = UserModel.query.filter_by(email=email).first() if not user: print("该用户不存在") return redirect(url_for('auth.login')) # 如果用户存在, 判断密码是否正确 # check_password_hash(arg1, arg2) 第一个参数是加密后的密码, 第二个参数是用户通过前端传递过来的密码 if check_password_hash(user.password, password): # 如果密码正确, 通过cookie存储用户的id, 下一次用户访问服务器时会顺带着携带cookie, 从而可以通过cookie识别用户的身份 # flask 中的session, 是经过加密后存储在cookie中的 session['user_id'] = user.id return redirect("/") # 跳转到首页 else: print("密码错误") return redirect(url_for("auth.login")) else: # 验证失败 print(form.errors) return redirect(url_for("auth.login"))
app.py
# 钩子函数 # before_request/before_first_request/after_request # 在调用视图函数处理请求前先执行该方法 # 在视图函数之前,将一些后面需要用到的数据先处理好,方便视图函数使用 @app.before_request def my_before_request(): user_id = session.get("user_id") if user_id: # 判断是否授权 user = UserModel.query.filter_by(user_id) # 保存到全局变量中 setattr(g, "user", user) else: setattr(g, "user", None) # 上下文处理器 # 使用这个钩子函数,必须返回一个字典。 # 这个字典中的值在所有模版中都可以使用。 # 如果一些在很多模版中都要用到的变量,那么就可以使用这个钩子函数来返回,而不用在每个视图函数中的render_template中去写,从而让代码更加简洁和好维护。 @app.context_processor def my_context_processor(): return {"user", g.user}
钩子函数可以用来在执行视图函数之前, 判断用户是否授权
首先钩子函数判断用户是否授权, 并且在上下文处理器中返回了用户信息
然后在 base.html 中
<ul class="navbar-nav"> {% if user %} <li class="nav-item"> <a class="nav-link" href="#">{{user.username}}</a> </li> <li class="nav-item"> <a class="nav-link" href="{{ url_for('auth.logout') }}">退出登录</a> </li> {% else %} <li class="nav-item"> <a class="nav-link" href="{{url_for('auth.login')}}">登录</a> </li> <li class="nav-item"> <a class="nav-link" href="{{url_for('auth.register')}}">注册</a> </li> {% endif %} </ul>
在 autho.py 中,定义退出登录的视图函数
@bp.route('logout')
def logout():
# 清楚cookie
session.clear()
return redirect("/")
QuestionModel.py
from exts import db from datetime import datetime from User import UserModel class QuestionModel(db.Model): __tablename__ = 'question' id = db.Column(db.Integer, primary_key=True, autoincrement=True) title = db.Column(db.String(100), nullable=False) content = db.Column(db.Text, nullable=False) create_time = db.Column(db.DateTime, default=datetime.now) # 每一个问题都对应一个用户, 一个用户可以发布多个问题, 因此需要外键 author_id = db.Column(db.Integer, db.ForeignKey("user.id")) # 关联 question 和 user 表, 从而可以通过 QuestionModel 的对象 questionModel的 questionModel.author的形式访问该问题属于的用户 author = db.relationship(UserModel, backref="questions")
对应的表单验证类 QuestionForm
form.py
class QuestionForm(wtforms.Form):
title = wtforms.StringField(validators=[Length(min=3, max=20, message="标题格式错误")])
content = wtforms.StringField(validators=[Length(min=3, message="内容格式错误")])
qa.py
@bp.route('/qa/publish_question', methods=['GET', 'POST']) def publish_question(): if request.method == 'GET': return render_template("publish_question.html") else: form = QuestionForm(request.form) # 进行表单验证 if form.validate(): title = form.title.data content = form.content.data # 保存到数据库中 question = QuestionModel(title=title, content=content, author = g.user) db.session.add(question) db.session.commit() # 应该跳转到详情页面 return redirect("/") else: print(form.errors) return redirect(url_for("qa.publish_question"))
登录装饰器的作用:
创建一个装饰器类 decorators.py
登录装饰器函数如下:
# 需要进行登录验证的装饰器函数
def login_require(func):
# 保存func信息
@wraps(func)
# 内函数的参数是函数 func的参数
def inner(*args, **kwargs):
if g.user:
return func(*args, **kwargs)
else:
return redirect(url_for("auth.login"))
return inner
在 /qa/publish_question 请求中应用登录装饰器函数
@bp.route('/qa/publish_question', methods=['GET', 'POST']) @login_require def publish_question(): if request.method == 'GET': return render_template("publish_question.html") else: form = QuestionForm(request.form) # 进行表单验证 if form.validate(): title = form.title.data content = form.content.data # 保存到数据库中 question = QuestionModel(title=title, content=content, author = g.user) db.session.add(question) db.session.commit() # 应该跳转到详情页面 return redirect("/") else: print(form.errors) return redirect(url_for("qa.publish_question"))
在 qa.py 对首页请求返回 定向到首页的同时携带所有问题
@bp.route('/')
def index():
questions = QuestionModel.query.order_by(QuestionModel.create_time.desc()).all()
return render_template("index.html", questions=questions)
前端页面 index.html
<ul class="question-ul"> {% for question in questions %} <li> <div class="side-question"> <img class="side-question-avatar" src="{{url_for('static', filename='/images/11.jpg')}}" alt=""> </div> <div class="question-main"> <div class="question-title"><a href="#">{{ question.title }}</a></div> <div class="question-content">{{question.content}}</div> <div class="question-detail"> <span class="question-author">{{question.author.username}}</span> <span class="question-time">{{ question.create_time}}</span> </div> </div> </li> {% endfor %} </ul>
首先定义一个视图函数处理访问 问题详情页的请求
qa.py中
@bp.route('/qa/question_detail/<question_id>')
@login_require
def question_detail(question_id):
question = QuestionModel.query.get(question_id)
return render_template("detail.html", question = question)
index.html 中, 点击问题标题进入详情页
<div class="question-title"><a href="{{url_for('qa.question_detail', question_id = question.id)}}">{{ question.title }}</a></div>
detail.html 中
<h3 class="page-title">{{question.title}}</h3>
<p class="question-info">
<span>{{ question.author.username }}</span>
<span>{{question.create_time}}</span>
</p>
<hr>
<p class="question-content">{{ question.content }}</p>
Comment.py
class CommentModel(db.Model):
__tablename__ = 'comment'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
content = db.Column(db.Text, nullable=False)
create_time = db.Column(db.DateTime, default=datetime.now)
# 一个评论对应一个用户, 一个用户可以有多个评论
author_id = db.Column(db.Integer, db.ForeignKey("user.id"))
author = db.relationship(UserModel, backref="comments")
# 一个评论对应一个问题, 一个问题可以有多个评论
question_id = db.Column(db.Integer, db.ForeignKey("question.id"))
question = db.relationship(QuestionModel, backref=db.backref("comments", order_by=create_time.desc()))
form.py 中
class CommentForm(wtforms.Form):
content = wtforms.StringField(validators=[Length(min=3, message="内容格式错误")])
# 验证前端传递的评论是否带有该评论属于的问题id
question_id = wtforms.IntegerField(validators=[InputRequired(message="必须传入对应问题")])
qa.py 中
# 添加评论请求 # @bp.route('/qa/add_comment', methods=["POST"]) @bp.post('/qa/add_comment') @login_require def add_comment(): form = CommentForm(request.form) if form.validate(): # 验证成功 content = form.content.data question_id = form.question_id.data # 保存到数据库中 comment = CommentModel(content=content, question_id=question_id, author_id=g.user.id) db.session.add(comment) db.session.commit() return redirect(url_for("qa.question_detail", question_id=question_id)) else: # 未验证成功 print(form.errors) # 通过 request 获取 question_id return redirect(url_for("qa.question_detail", question_id=request.form.get("question_id")))
detail.html
<form action="{{url_for("qa.add_comment")}}" method="post"> <div class="form-group"> <input type="text" placeholder="请填写评论" name="content" class="form-control"> </div> <input type="hidden" name="question_id" value="{{ question.id }}"> <div class="form-group" style="text-align: right;"> <button class="btn btn-primary">评论</button> </div> </form> <ul class="comment-group"> {% for comment in question.comments %} <li> <div class="user-info"> <img class="side-question-avatar" src="{{url_for('static', filename='images/11.jpg')}}" alt=""> <span class="username">{{comment.author.username}}</span> <span class="create-time">{{ comment.create_time}}</span> </div> <p class="comment-content">{{comment.content}}</p> </li> {% endfor %} </ul>
处理搜索请求的视图函数:
qa.py
@bp.route("/search")
def search_qa():
# search?q=XXX
q = request.args.get("q")
questions = QuestionModel.query.filter(QuestionModel.title.contains(q)).all()
return render_template("index.html", questions=questions)
前端页面
base.html
<li class="nav-item ml-2">
<form class="form-inline my-2 my-lg-0" method="GET" action="{{url_for("qa.search_qa")}}">
<input class="form-control mr-sm-2" type="search" placeholder="关键字" aria-label="Search" name="q">
<button class="btn btn-outline-success my-2 my-sm-0" type="submit">搜索</button>
</form>
</li>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。