Flask 是一个以微核心设计为基础的轻量级 Web 应用框架,它提供了 Web 开发的基础功能,如路由、请求处理和响应构建。这种设计哲学使得 Flask 既简单直观,易于新手上手,又足够灵活,能够满足经验丰富的开发者的需求。开发者可以根据自己的项目需求,选择性地添加各种 Flask 扩展,实现如数据库集成、表单验证等额外功能。
传参
GET
通常用于请求服务器发送资源。
# get query参数@app.route('/get')def testGet(): data = request.args.get('data') print('获取的data值:', data) return "获取的data值:" + data# get path参数@app.route("/get/<int:id>")def testGetPath(id): print(type(id)) return f"返回的数据 {id}"
路径参数
string:接受任何不包含斜杠的文本
int:接受正整数
float:接受正浮点数
path:接受包含斜杠的文本
POST
通常用于向服务器提交要被处理的数据。
# post form-data参数@app.route("/post/form", methods=["POST"])def testPostForm(): username = request.form.get("username") password = request.form.get("password") print(username, password) data = { "username": username, "password": password } return data
# post json参数# 学生信息类class Student(): def __init__(self, id, name, age): self.id = id self.name = name self.age = age def __repr__(self): return f"Student[id={self.id},name={self.name},age={self.age}]" def to_dict(self): return { 'id': self.id, 'name': self.name, 'age': self.age }@app.route("/post/json", methods=["POST"])def testPostJson(): id = request.json.get("id") name = request.json.get("name") age = request.json.get("age") data = Student(id, name, age) print(data) # 返回参数需要格式化 return jsonify(data.to_dict())
PUT
通常用于更新服务器上的现有资源。
# put path参数@app.route("/api/put/<int:id>", methods=["PUT"])def testPut(id): print(type(id)) return f"上传参数 {id}"# put query参数@app.route("/api/put", methods=["PUT"])def testPuta(): data = request.args.get('data') print(type(data)) return f"上传参数 {data}"# put json参数@app.route("/api/put/json", methods=["PUT"])def testPutJson(): name = request.json.get("name") return name
DELETE
通常用于删除服务器上的资源。
# delete query参数@app.route('/api/delete', methods=["DELETE"]) # 方式1def testDelete(): data = request.args.get('name') print(data) return data + "删除成功"# delete path参数@app.route("/api/delete/<int:ID>", methods=["DELETE"]) # 方式2def testDeletePath(ID): print(type(ID)) return f"测试值为 {ID}"
request对象
request.method返回请求方法
request.args是获取url中的请求参数数据(key-value),主要是GET请求,请求参数在url中.也即请求链接中?后⾯的所有参数
request.args.get('a') 获取url中参数a的值,数据来源是url地址
request.form是获取form表单中的数据(key-value),原理跟request.args差不多,只是request.args数据来源是url,request.form的数据来源是表单
request.form.get('username')获取表单中key为username的值,也即获取在html页面中定义的元素的name值对应的输入值
request.cookies获取cookies信息
request.headres获取请求头信息
request.data如果处理不了的数据就变成字符串儿存在data里面
request.files获取上传或下载的文件信息
request.path获取请求文件路径:/myapplication/page.html
request.base_url获取域名与请求文件路径:http://www.baidu.com/myapplication/page.html
request.url获取全部url:http://www.baidu.com/myapplication/page.html?id=1&edit=edit
request.url_root获取域名:http://www.baidu.com/
重定向
from flask import Flask, request, redirect, jsonifyapp = Flask(__name__)@app.route('/home')def testGet(): # 重定向到百度首页 return redirect('https://www.baidu.com')if __name__ == '__main__': app.run()
from flask import Flask, redirect, url_forapp = Flask(__name__)@app.route('/home')def testGet(): # 内部testOut函数 return redirect(url_for('testOut'))@app.route('/loginOut')def testOut(): return 'out'if __name__ == '__main__': app.run()
异常处理
abord函数和错误处理
from flask import Flask, request, redirect, jsonifyapp = Flask(__name__)@app.route('/home')def testGet(): # 返回错误类型404 abord(404) return None# 定义错误处理方法@app.errorhandler(404)def handle_404_error(): return redirect('https://www.baidu.com')if __name__ == '__main__': app.run()
原生的数据库游标(CURSER)
python对postgres数据库增删查改操作(采用直接执行拼接的sql,生产有风险,存在sql注入,需注意使用场景)
import psycopg2# 连接数据库db = psycopg2.connect( host="127.0.0.1", port='5432', user="postgres", password="ccucc", database= 'BHQ')# 创建游标cursor = db.cursor()def select_all(): # 直接获取全部数据 sql = 'SELECT * FROM "t_animal_info" ORDER BY "id";' cursor.execute(sql) print(cursor.fetchall()) #打印结果def select_all_one(): # 一次读取一条结果,循环获取所有记录 cursor.execute('SELECT * FROM "t_animal_info";') while True: singleData = cursor.fetchone() if singleData is None: break print(singleData)def select_num(): # 获取前N条记录 cursor.execute('SELECT * FROM "t_animal_info";') print(cursor.fetchmany(2))def add_single(): # 插入单条数据 sql = 'INSERT INTO t_animal_info ("filename") VALUES (\'%s\')' % ("kangkang") cursor.execute(sql) print(f"成功插入{cursor.rowcount}条数据") db.commit()def add_mutil(): # 批量插入数据 sql = 'INSERT INTO "t_animal_info" ("filename") VALUES (%s)' # TypeError: not all arguments converted during string formatting(需要添加,) cursor.executemany(sql, [('Tim',), ('Jane',)]) print(f"成功插入{cursor.rowcount}条数据") db.commit()def update_single(): # 执行单条数据修改 sql='UPDATE "t_animal_info" SET "filename" = %s WHERE "filename" = %s' cursor.execute(sql,('ccucc','Tom')) db.commit()def update_mutil(): # 执行批量数据修改(!!!!注意变量位置,同sql语句占位对应!!!!) sql = 'UPDATE "t_animal_info" SET "filename" = %s WHERE "filename" = %s' cursor.executemany(sql, [('Tim', 'a'), ('Jane', 's')]) db.commit()def delete_single(del_id): # 执行单条数据删除 cursor.execute('DELETE FROM "t_animal_info" WHERE "id" = (\'%s\')' % del_id) db.commit()def delete_mutil(): # 执行批量数据删除 sql = 'DELETE FROM "t_animal_info" WHERE "id" = %s;' cursor.executemany(sql, [('4'), ('5'),('6'), ('7'), ('9')]) db.commit()# update_single()# add_mutil()# add_single()# delete_mutil()# delete_mutil()select_all()cursor.close() #关闭游标db.close() #关闭链接
ORM(SQLAlchemy)
以下演示采用postgresql,需要先安装依赖库
pip install sqlalchemy psycopg2
连接数据库
Base = declarative_base()# 创建数据库连接"""postgresql://[用户名]:[用户密码]@[主机号]/[数据库名]"""url = 'postgresql://cqhy:cqhy123!@120.46.37.218/HY_reserve'engine = sqlalchemy.create_engine(url)Session = orm.sessionmaker(bind=engine)session = Session()
模型定义
# 定义数据模型class TestData(Base): __tablename__ = "a_photo" id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) name = sqlalchemy.Column(sqlalchemy.String(256), nullable=True) uuid = sqlalchemy.Column(sqlalchemy.String(64), unique=True) add_time = sqlalchemy.Column(sqlalchemy.DateTime()) data = sqlalchemy.Column(sqlalchemy.Text(), default='{}') def __init__(self, name, uuid, data='{}', add_time=None): self.name = name self.uuid = uuid self.data = data self.add_time = add_time if None != add_time else datetime.datetime.now()
Column常用参数
| 序号 | 参数名称 | 说明 |
|---|
| 1 | name | 字段的名称,默认为类属性的名称。指定ORM模型的中某个属性映射到表中的字段名。如果不指定,那么会使用这个属性的名字来作为字段名。如果指定了,就会使用指定的这个值作为表字段名。这个参数也可以当作位置参数,在第1个参数来指定。 |
| 2 | type_ | 字段的数据类型,如 Integer、String、Date 等。 |
| 3 | primary_key | 指定某个字段是否为主键,默认为 False。 |
| 4 | unique | 指定某个字段的值是否唯一,默认是False。 |
| 5 | nullable | 指定某个字段是否为空。默认值是True,可以为空。 |
| 6 | default | 默认值,当插入数据时没有提供该字段的值时使用。 |
| 7 | index | 是否创建索引,默认为 False。 |
| 8 | autoincrement | 是否为自增字段,仅适用于整数类型,默认为False。 |
| 9 | onupdate | 更新的时候执行的函数。在数据更新的时候会调用这个参数指定的值或者函数。在第一次插入这条数据的时候,不会用onupdate的值,只会使用default的值。常用于是字段(每次更新数据的时候都要更新该字段值)。 |
常用数据类型
| 序号 | 数据类型 | 说明 |
|---|
| 1 | Integer | 整形 |
| 2 | Float | 浮点类型 |
| 3 | Boolean | 传递True/False |
| 4 | DECIMAL | 定点类型,具有小数点而且数值确定的数值 |
| 5 | enum | 枚举类型 |
| 6 | DateTime | 日期时间类型 |
| 7 | Date | 传递datetime.date()进去 |
| 8 | Time | 传递datatime.time() |
| 9 | String | 字符类型,使用时需要指定长度,区别于Text类型 |
| 10 | Text | 文本类型 |
| 11 | LONGTEXT | 长文本类型 |
基本操作
# 建表def create_table(): with engine.connect() as connection: # 创建表格(如果不存在) Base.metadata.create_all(engine)# 插入一行数据def add_one(): # 创建会话 uuid_str = str(uuid.uuid4()) local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) new_user = TestData(name='zhangsan', uuid=uuid_str, data='{"name": "zhangsan"}', add_time=local_time) # 添加到session: session.add(new_user) # 提交即保存到数据库: session.commit() # 关闭session: session.close()# 插入多行数据def add_multi(): local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) data_list = [] for i in range(10): uuid_str = str(uuid.uuid4()) data_list.append(TestData(name='zhangsan', uuid=uuid_str, data='{"name": "zhangsan"}', add_time=local_time)) # 添加到session: session.add_all(data_list) # 提交即保存到数据库: session.commit() # 关闭session: session.close()# 查询操作def select_data(): # 创建Session # 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行: user = session.query(TestData).filter(TestData.id == '1' and TestData.photo_name == 'qwe').one() print('name:', user.name) print('data:', user.data) session.close() # 关闭Session# 更新操作def update_data(): users = session.query(TestData).filter_by(name="zhangsan").first() # 查询条件 users.data = '{"name": "lisi"}' # 更新操作 session.add(users) # 添加到会话 session.commit() # 提交即保存到数据库 session.close() # 关闭会话# 删除操作def delete_one(): delete_data = session.query(TestData).filter(TestData.id == "1").first() if delete_data: session.delete(delete_data) session.commit() session.close() # 关闭会话# 删除表def drop_table(): session.execute(text('drop table tb_test_1')) session.commit() session.close()
完整示例
import timeimport uuidimport datetimeimport sqlalchemyimport sqlalchemy.orm as ormfrom sqlalchemy.orm import declarative_basefrom sqlalchemy import textBase = declarative_base()# 创建数据库连接url = 'postgresql://[用户名]:[用户密码]@[主机号]/[数据库名]'engine = sqlalchemy.create_engine(url)Session = orm.sessionmaker(bind=engine)session = Session()# 定义数据模型class TestData(Base): __tablename__ = "a_photo" id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) name = sqlalchemy.Column(sqlalchemy.String(256), nullable=True) uuid = sqlalchemy.Column(sqlalchemy.String(64), unique=True) add_time = sqlalchemy.Column(sqlalchemy.DateTime()) data = sqlalchemy.Column(sqlalchemy.Text(), default='{}') def __init__(self, name, uuid, data='{}', add_time=None): self.name = name self.uuid = uuid self.data = data self.add_time = add_time if None != add_time else datetime.datetime.now()def create_table(): with engine.connect() as connection: # 创建表格(如果不存在) Base.metadata.create_all(engine)# 插入一行数据def add_one(): # 创建会话 uuid_str = str(uuid.uuid4()) local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) new_user = TestData(name='zhangsan', uuid=uuid_str, data='{"name": "zhangsan"}', add_time=local_time) # 添加到session: session.add(new_user) # 提交即保存到数据库: session.commit() # 关闭session: session.close()# 插入多行数据def add_multi(): local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) data_list = [] for i in range(10): uuid_str = str(uuid.uuid4()) data_list.append(TestData(name='zhangsan', uuid=uuid_str, data='{"name": "zhangsan"}', add_time=local_time)) # 添加到session: session.add_all(data_list) # 提交即保存到数据库: session.commit() # 关闭session: session.close()# 查询操作def select_data(): # 创建Session # 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行: user = session.query(TestData).filter(TestData.id == '1' and TestData.photo_name == 'qwe').one() print('name:', user.name) print('data:', user.data) session.close() # 关闭Session# 更新操作def update_data(): users = session.query(TestData).filter_by(name="zhangsan").first() # 查询条件 users.data = '{"name": "lisi"}' # 更新操作 session.add(users) # 添加到会话 session.commit() # 提交即保存到数据库 session.close() # 关闭会话# 删除操作def delete_one(): delete_data = session.query(TestData).filter(TestData.id == "1").first() if delete_data: session.delete(delete_data) session.commit() session.close() # 关闭会话# 删除表def drop_table(): session.execute(text('drop table tb_test_1')) session.commit() session.close()def run(): create_table() # add_one() add_multi() # select_data() # update_data() # delete_one() # drop_table() # select_data1()if __name__ == '__main__': run()
打包部署
pip install waitress
from app import appif __name__ == "__main__": from waitress import serve serve(app, host='0.0.0.0', port=8080, threads=4)