Python后端工程师必会的指令
掌握这些后端开发指令,让你构建强大的Web应用和API服务
欢迎大家关注此公众号,后台点击按钮【免费资料】可免费获取【Python入门30节课】电子书
此外小庄推荐一本适合于新手\小白入手一本 Python基础书籍,欢迎大家订阅,也感谢大家支持,我才有更新的动力
前言
Python后端工程师需要掌握Web框架、数据库、API设计、认证授权等核心技能。本文将系统性地介绍后端工程师必须掌握的指令和库,帮助你构建高性能、可扩展的后端服务。
一、环境准备与基础指令
1.1 安装必要的库
pip install flask
pip install django
pip install fastapi
pip install uvicorn
pip install sqlalchemy
pip install alembic
pip install redis
pip install celery
pip install pydantic
pip install python-jose
pip install passlib
1.2 虚拟环境管理
# 创建虚拟环境
python -m venv venv
# 激活虚拟环境(Windows)
venv\Scripts\activate
# 激活虚拟环境(Linux/Mac)
source venv/bin/activate
# 退出虚拟环境
deactivate
# 安装依赖
pip install -r requirements.txt
# 导出依赖
pip freeze > requirements.txt
二、Flask框架指令
2.1 创建Flask应用
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route('/')
defindex():
return'Hello, World!'
@app.route('/api/greeting', methods=['GET'])
defgreeting():
name = request.args.get('name', 'World')
return jsonify({'message': f'Hello, {name}!'})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
2.2 路由和视图
from flask import Flask, request, jsonify
app = Flask(__name__)
# 基本路由
@app.route('/')
defindex():
return'首页'
# 带参数的路由
@app.route('/user/<int:user_id>')
defget_user(user_id):
returnf'用户ID: {user_id}'
# 多种HTTP方法
@app.route('/api/users', methods=['GET', 'POST'])
defusers():
if request.method == 'GET':
return jsonify({'users': []})
elif request.method == 'POST':
data = request.get_json()
return jsonify({'message': '用户已创建', 'data': data}), 201
# 路由前缀
@app.route('/api/v1/products')
defget_products():
return jsonify({'products': []})
# 蓝图
from flask import Blueprint
api_bp = Blueprint('api', __name__, url_prefix='/api')
@api_bp.route('/users')
defget_users():
return jsonify({'users': []})
app.register_blueprint(api_bp)
2.3 请求和响应
from flask import Flask, request, jsonify, make_response
app = Flask(__name__)
@app.route('/api/data', methods=['POST'])
defhandle_data():
# 获取JSON数据
data = request.get_json()
# 获取表单数据
form_data = request.form
# 获取查询参数
query_param = request.args.get('key')
# 获取请求头
user_agent = request.headers.get('User-Agent')
# 获取文件
file = request.files.get('file')
# 返回JSON响应
return jsonify({
'status': 'success',
'data': data
})
@app.route('/api/custom-response')
defcustom_response():
# 自定义响应
response = make_response(jsonify({'message': '自定义响应'}))
response.headers['X-Custom-Header'] = 'Custom Value'
response.status_code = 200
return response
@app.route('/api/redirect')
defredirect_example():
# 重定向
return redirect('/api/other-endpoint')
@app.route('/api/error')
deferror_example():
# 返回错误
return jsonify({'error': '未找到资源'}), 404
2.4 Flask模板
from flask import Flask, render_template
app = Flask(__name__)
@app.route('/')
defindex():
return render_template('index.html', title='首页', items=['Item 1', 'Item 2'])
# 模板文件 templates/index.html
"""
<!DOCTYPE html>
<html>
<head>
<title>{{ title }}</title>
</head>
<body>
<h1>{{ title }}</h1>
<ul>
{% for item in items %}
<li>{{ item }}</li>
{% endfor %}
</ul>
</body>
</html>
"""
2.5 Flask数据库集成
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
db = SQLAlchemy(app)
classUser(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
def__repr__(self):
returnf'<User {self.username}>'
# 创建数据库表
with app.app_context():
db.create_all()
# CRUD操作
@app.route('/api/users', methods=['GET'])
defget_users():
users = User.query.all()
return jsonify([{'id': u.id, 'username': u.username} for u in users])
@app.route('/api/users', methods=['POST'])
defcreate_user():
data = request.get_json()
user = User(username=data['username'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify({'message': '用户已创建'}), 201
2.6 Flask认证授权
from flask import Flask, request, jsonify
from functools import wraps
import jwt
from datetime import datetime, timedelta
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
deftoken_required(f):
@wraps(f)
defdecorated(*args, **kwargs):
token = request.headers.get('Authorization')
ifnot token:
return jsonify({'message': '缺少认证token'}), 401
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
current_user = data['user_id']
except:
return jsonify({'message': 'token无效'}), 401
return f(current_user, *args, **kwargs)
return decorated
@app.route('/api/login', methods=['POST'])
deflogin():
data = request.get_json()
# 验证用户名密码
if data['username'] == 'admin'and data['password'] == 'password':
token = jwt.encode({
'user_id': 1,
'exp': datetime.utcnow() + timedelta(hours=24)
}, app.config['SECRET_KEY'])
return jsonify({'token': token})
return jsonify({'message': '用户名或密码错误'}), 401
@app.route('/api/protected')
@token_required
defprotected(current_user):
return jsonify({'message': f'用户{current_user}访问成功'})
三、FastAPI框架指令
3.1 创建FastAPI应用
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing importOptional
app = FastAPI()
classItem(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
@app.get('/')
asyncdefroot():
return {'message': 'Hello, World!'}
@app.post('/items/')
asyncdefcreate_item(item: Item):
return {'item': item, 'message': 'Item created'}
@app.get('/items/{item_id}')
asyncdefread_item(item_id: int, q: Optional[str] = None):
return {'item_id': item_id, 'q': q}
3.2 FastAPI请求验证
from fastapi import FastAPI, Query, Path, Body
from pydantic import BaseModel, Field
from typing importList, Optional
from enum import Enum
app = FastAPI()
classModelName(str, Enum):
alexnet = 'alexnet'
resnet = 'resnet'
lenet = 'lenet'
classItem(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = Field(None, max_length=1000)
price: float = Field(..., gt=0)
tax: Optional[float] = Field(None, ge=0)
tags: List[str] = []
@app.get('/models/{model_name}')
asyncdefget_model(model_name: ModelName):
return {'model_name': model_name}
@app.get('/items/')
asyncdefread_items(
q: Optional[str] = Query(None, min_length=3, max_length=50),
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100)
):
return {'q': q, 'skip': skip, 'limit': limit}
@app.put('/items/{item_id}')
asyncdefupdate_item(
item_id: int = Path(..., title='Item ID', ge=1),
item: Item = Body(..., embed=True)
):
return {'item_id': item_id, 'item': item}
3.3 FastAPI数据库集成
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
SQLALCHEMY_DATABASE_URL = 'sqlite:///./sql_app.db'
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
classUser(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
Base.metadata.create_all(bind=engine)
defget_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app = FastAPI()
@app.post('/users/')
defcreate_user(username: str, email: str, db: Session = Depends(get_db)):
user = User(username=username, email=email)
db.add(user)
db.commit()
db.refresh(user)
return user
@app.get('/users/')
defread_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(User).offset(skip).limit(limit).all()
return users
@app.get('/users/{user_id}')
defread_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if user isNone:
raise HTTPException(status_code=404, detail='User not found')
return user
3.4 FastAPI认证授权
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from pydantic import BaseModel
SECRET_KEY = 'your-secret-key'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
classToken(BaseModel):
access_token: str
token_type: str
classTokenData(BaseModel):
username: str = None
defverify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
defget_password_hash(password):
return pwd_context.hash(password)
defcreate_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
asyncdefget_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Could not validate credentials',
headers={'WWW-Authenticate': 'Bearer'},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
if username isNone:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
return token_data
@app.post('/token', response_model=Token)
asyncdeflogin_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# 验证用户
user = authenticate_user(form_data.username, form_data.password)
ifnot user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Incorrect username or password',
headers={'WWW-Authenticate': 'Bearer'},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={'sub': user.username}, expires_delta=access_token_expires
)
return {'access_token': access_token, 'token_type': 'bearer'}
@app.get('/users/me')
asyncdefread_users_me(current_user: TokenData = Depends(get_current_user)):
return current_user
四、Django框架指令
4.1 创建Django项目
# 创建项目
django-admin startproject myproject
# 创建应用
python manage.py startapp myapp
# 运行开发服务器
python manage.py runserver
# 数据库迁移
python manage.py makemigrations
python manage.py migrate
# 创建超级用户
python manage.py createsuperuser
# 收集静态文件
python manage.py collectstatic
4.2 Django模型
from django.db import models
classUser(models.Model):
username = models.CharField(max_length=100, unique=True)
email = models.EmailField(unique=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
classMeta:
db_table = 'users'
ordering = ['-created_at']
def__str__(self):
returnself.username
classPost(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE, related_name='posts')
created_at = models.DateTimeField(auto_now_add=True)
tags = models.ManyToManyField('Tag', blank=True)
def__str__(self):
returnself.title
classTag(models.Model):
name = models.CharField(max_length=50, unique=True)
def__str__(self):
returnself.name
4.3 Django视图
from django.http import JsonResponse
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
import json
# 函数视图
@csrf_exempt
defuser_list(request):
if request.method == 'GET':
users = User.objects.all().values()
return JsonResponse(list(users), safe=False)
elif request.method == 'POST':
data = json.loads(request.body)
user = User.objects.create(**data)
return JsonResponse({'id': user.id, 'message': '用户已创建'}, status=201)
# 类视图
@method_decorator(csrf_exempt, name='dispatch')
classUserView(View):
defget(self, request, user_id=None):
if user_id:
user = User.objects.get(id=user_id)
return JsonResponse({'id': user.id, 'username': user.username})
else:
users = User.objects.all().values()
return JsonResponse(list(users), safe=False)
defpost(self, request):
data = json.loads(request.body)
user = User.objects.create(**data)
return JsonResponse({'id': user.id}, status=201)
defput(self, request, user_id):
data = json.loads(request.body)
User.objects.filter(id=user_id).update(**data)
return JsonResponse({'message': '用户已更新'})
defdelete(self, request, user_id):
User.objects.filter(id=user_id).delete()
return JsonResponse({'message': '用户已删除'})
4.4 Django REST Framework
pip install djangorestframework
# serializers.py
from rest_framework import serializers
from .models import User
classUserSerializer(serializers.ModelSerializer):
classMeta:
model = User
fields = ['id', 'username', 'email', 'created_at']
# views.py
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from .models import User
from .serializers import UserSerializer
classUserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
@action(detail=False, methods=['get'])
defactive(self, request):
active_users = User.objects.filter(is_active=True)
serializer = self.get_serializer(active_users, many=True)
return Response(serializer.data)
# urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import UserViewSet
router = DefaultRouter()
router.register(r'users', UserViewSet)
urlpatterns = [
path('api/', include(router.urls)),
]
五、SQLAlchemy ORM指令
5.1 数据库连接
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
# SQLite
engine = create_engine('sqlite:///database.db', echo=True)
# MySQL
engine = create_engine('mysql+pymysql://user:password@localhost/dbname')
# PostgreSQL
engine = create_engine('postgresql://user:password@localhost/dbname')
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
5.2 模型定义
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
classUser(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True)
email = Column(String(100), unique=True)
created_at = Column(DateTime, default=datetime.utcnow)
posts = relationship('Post', back_populates='author')
classPost(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200))
content = Column(String)
user_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime, default=datetime.utcnow)
author = relationship('User', back_populates='posts')
# 创建表
Base.metadata.create_all(bind=engine)
5.3 CRUD操作
from sqlalchemy.orm import Session
defget_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 创建
defcreate_user(db: Session, username: str, email: str):
user = User(username=username, email=email)
db.add(user)
db.commit()
db.refresh(user)
return user
# 查询
defget_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
defget_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
# 更新
defupdate_user(db: Session, user_id: int, **kwargs):
db.query(User).filter(User.id == user_id).update(kwargs)
db.commit()
return get_user(db, user_id)
# 删除
defdelete_user(db: Session, user_id: int):
db.query(User).filter(User.id == user_id).delete()
db.commit()
# 复杂查询
defsearch_users(db: Session, keyword: str):
return db.query(User).filter(
User.username.contains(keyword) | User.email.contains(keyword)
).all()
六、Redis缓存指令
6.1 Redis基础操作
import redis
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 字符串操作
r.set('key', 'value')
r.set('key', 'value', ex=3600) # 设置过期时间(秒)
value = r.get('key')
r.delete('key')
r.exists('key')
r.expire('key', 3600)
r.ttl('key')
# 哈希操作
r.hset('user:1', 'name', 'Alice')
r.hset('user:1', 'age', 25)
r.hget('user:1', 'name')
r.hgetall('user:1')
r.hdel('user:1', 'age')
# 列表操作
r.lpush('mylist', 'item1', 'item2')
r.rpush('mylist', 'item3')
r.lrange('mylist', 0, -1)
r.lpop('mylist')
r.rpop('mylist')
# 集合操作
r.sadd('myset', 'member1', 'member2')
r.smembers('myset')
r.sismember('myset', 'member1')
r.srem('myset', 'member1')
# 有序集合操作
r.zadd('myzset', {'member1': 1, 'member2': 2})
r.zrange('myzset', 0, -1)
r.zscore('myzset', 'member1')
6.2 Redis缓存装饰器
import redis
import json
from functools import wraps
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
defcache(expire_time=3600):
defdecorator(func):
@wraps(func)
defwrapper(*args, **kwargs):
# 生成缓存key
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# 尝试从缓存获取
cached_result = r.get(cache_key)
if cached_result:
return json.loads(cached_result)
# 执行函数
result = func(*args, **kwargs)
# 存入缓存
r.set(cache_key, json.dumps(result), ex=expire_time)
return result
return wrapper
return decorator
@cache(expire_time=300)
defget_user(user_id):
# 模拟数据库查询
return {'id': user_id, 'name': 'Alice'}
七、Celery异步任务
7.1 Celery配置
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.update(
result_backend='redis://localhost:6379/0',
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=True,
)
7.2 定义任务
from celery import shared_task
import time
@shared_task
defsend_email(to, subject, body):
"""发送邮件任务"""
time.sleep(5) # 模拟耗时操作
print(f'邮件已发送到 {to}')
return {'status': 'sent', 'to': to}
@shared_task(bind=True, max_retries=3)
defprocess_data(self, data_id):
"""处理数据任务"""
try:
# 处理逻辑
result = heavy_processing(data_id)
return {'status': 'success', 'result': result}
except Exception as exc:
raiseself.retry(exc=exc, countdown=60)
@shared_task
defgenerate_report(report_type, params):
"""生成报告任务"""
# 报告生成逻辑
return {'status': 'completed', 'report_url': '/reports/123'}
7.3 调用任务
from tasks import send_email, process_data, generate_report
# 异步调用
result = send_email.delay('user@example.com', '测试', '测试内容')
print(f'任务ID: {result.id}')
# 带参数调用
result = send_email.apply_async(
args=['user@example.com', '测试', '测试内容'],
countdown=10# 10秒后执行
)
# 获取结果
print(result.get(timeout=10))
# 定时任务
from celery.schedules import crontab
app.conf.beat_schedule = {
'send-daily-report': {
'task': 'tasks.generate_report',
'schedule': crontab(hour=9, minute=0),
'args': ('daily', {}),
},
}
八、API文档生成
8.1 FastAPI自动文档
from fastapi import FastAPI
from pydantic import BaseModel
from typing importOptional, List
app = FastAPI(
title='My API',
description='这是一个示例API',
version='1.0.0',
docs_url='/docs', # Swagger UI
redoc_url='/redoc'# ReDoc
)
classItem(BaseModel):
"""商品模型"""
name: str
description: Optional[str] = None
price: float
tags: List[str] = []
classConfig:
schema_extra = {
'example': {
'name': 'Foo',
'description': 'A very nice Item',
'price': 35.4,
'tags': ['electronics', 'gadget']
}
}
@app.post('/items/', response_model=Item, summary='创建商品', tags=['商品'])
asyncdefcreate_item(item: Item):
"""
创建商品:
- **name**: 商品名称
- **description**: 商品描述(可选)
- **price**: 商品价格
- **tags**: 商品标签(可选)
"""
return item
@app.get('/items/', response_model=List[Item], summary='获取商品列表', tags=['商品'])
asyncdefread_items(skip: int = 0, limit: int = 10):
"""获取商品列表"""
return []
九、部署与运维
9.1 Gunicorn部署
# 安装Gunicorn
pip install gunicorn
# 运行Flask应用
gunicorn -w 4 -b 0.0.0.0:8000 app:app
# 运行FastAPI应用
gunicorn -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000 main:app
9.2 Uvicorn部署
# 安装Uvicorn
pip install uvicorn
# 运行FastAPI应用
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# 生产环境配置
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --log-level info
9.3 Docker部署
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# 构建镜像
docker build -t myapp .
# 运行容器
docker run -d -p 8000:8000 myapp
总结
作为Python后端工程师,掌握这些指令是核心技能:
- 4. SQLAlchemy ORM - 数据库操作
- 8. 部署运维 - Gunicorn/Uvicorn/Docker
掌握这些工具,你就能构建强大的后端服务。
关注我,获取更多Python技术干货!