第21天-Web开发
哪吒 2023/6/15
# 第21天-Web开发
# 概述
Web开发是Python最重要的应用领域之一。Python拥有丰富的Web开发框架,从轻量级的Flask到功能完整的Django,为不同规模的Web应用提供了优秀的解决方案。今天我们将学习Python Web开发的基础知识和实践技能。
# 1. Web开发基础
# 1.1 Web开发概念
def web_development_concepts():
"""Web开发基础概念演示"""
print("=== Web开发基础概念 ===")
# 1. HTTP协议基础
print("\n1. HTTP协议基础:")
http_concepts = {
"HTTP方法": {
"GET": "获取资源,幂等操作",
"POST": "创建资源,非幂等操作",
"PUT": "更新资源,幂等操作",
"DELETE": "删除资源,幂等操作",
"PATCH": "部分更新资源",
"HEAD": "获取资源头信息",
"OPTIONS": "获取服务器支持的方法"
},
"状态码": {
"2xx": "成功 (200 OK, 201 Created, 204 No Content)",
"3xx": "重定向 (301 Moved, 302 Found, 304 Not Modified)",
"4xx": "客户端错误 (400 Bad Request, 401 Unauthorized, 404 Not Found)",
"5xx": "服务器错误 (500 Internal Error, 502 Bad Gateway, 503 Service Unavailable)"
},
"请求头": {
"Content-Type": "请求体的媒体类型",
"Authorization": "身份验证信息",
"User-Agent": "客户端信息",
"Accept": "客户端可接受的媒体类型",
"Cookie": "客户端存储的会话信息"
},
"响应头": {
"Content-Type": "响应体的媒体类型",
"Set-Cookie": "设置客户端Cookie",
"Location": "重定向地址",
"Cache-Control": "缓存控制",
"Access-Control-Allow-Origin": "CORS跨域控制"
}
}
for category, items in http_concepts.items():
print(f"\n {category}:")
for key, value in items.items():
print(f" {key}: {value}")
# 2. Web架构模式
print("\n2. Web架构模式:")
architecture_patterns = {
"MVC (Model-View-Controller)": {
"Model": "数据模型,处理业务逻辑和数据访问",
"View": "视图层,负责用户界面展示",
"Controller": "控制器,处理用户输入和协调Model、View"
},
"MTV (Model-Template-View)": {
"Model": "数据模型,与MVC中的Model相同",
"Template": "模板,负责页面渲染",
"View": "视图函数,处理请求逻辑"
},
"RESTful API": {
"资源导向": "将数据和功能视为资源",
"统一接口": "使用标准HTTP方法操作资源",
"无状态": "每个请求包含完整信息",
"可缓存": "响应可以被缓存"
}
}
for pattern, details in architecture_patterns.items():
print(f"\n {pattern}:")
for component, description in details.items():
print(f" {component}: {description}")
# 3. 前后端交互
print("\n3. 前后端交互方式:")
interaction_methods = [
"传统表单提交: 页面刷新,服务器渲染",
"AJAX请求: 异步数据交换,局部更新",
"WebSocket: 双向实时通信",
"Server-Sent Events: 服务器主动推送",
"GraphQL: 灵活的数据查询语言",
"gRPC: 高性能RPC框架"
]
for i, method in enumerate(interaction_methods, 1):
print(f" {i}. {method}")
print("\n ✓ Web开发基础概念介绍完成")
# 运行Web开发概念演示
web_development_concepts()
# 1.2 Python Web框架概览
def python_web_frameworks_overview():
"""Python Web框架概览"""
print("=== Python Web框架概览 ===")
frameworks = {
"微框架": {
"Flask": {
"特点": "轻量级、灵活、易学习",
"适用场景": "小型应用、API服务、原型开发",
"核心组件": "路由、模板、请求处理",
"扩展性": "丰富的第三方扩展"
},
"FastAPI": {
"特点": "现代、高性能、自动文档",
"适用场景": "API开发、微服务",
"核心组件": "类型提示、异步支持、自动验证",
"扩展性": "基于Starlette和Pydantic"
}
},
"全栈框架": {
"Django": {
"特点": "功能完整、开箱即用、安全性高",
"适用场景": "大型应用、内容管理、企业级开发",
"核心组件": "ORM、管理后台、认证系统、模板引擎",
"扩展性": "丰富的内置功能和第三方包"
},
"Pyramid": {
"特点": "灵活配置、可扩展、企业级",
"适用场景": "复杂应用、企业开发",
"核心组件": "路由、视图、模板、安全",
"扩展性": "高度可配置和可扩展"
}
},
"异步框架": {
"Tornado": {
"特点": "异步非阻塞、高并发",
"适用场景": "实时应用、长连接服务",
"核心组件": "异步处理、WebSocket支持",
"扩展性": "内置异步库"
},
"Sanic": {
"特点": "类Flask语法、异步支持",
"适用场景": "高性能API、异步应用",
"核心组件": "异步路由、中间件",
"扩展性": "基于asyncio"
}
}
}
for category, framework_list in frameworks.items():
print(f"\n{category}:")
for name, details in framework_list.items():
print(f"\n {name}:")
for key, value in details.items():
print(f" {key}: {value}")
# 框架选择建议
print("\n框架选择建议:")
selection_guide = {
"初学者": "推荐Flask - 简单易学,概念清晰",
"快速原型": "推荐Flask或FastAPI - 开发效率高",
"企业应用": "推荐Django - 功能完整,安全性好",
"API服务": "推荐FastAPI或Flask-RESTful",
"高并发": "推荐FastAPI、Sanic或Tornado",
"内容管理": "推荐Django - 内置管理后台",
"微服务": "推荐FastAPI或Flask"
}
for scenario, recommendation in selection_guide.items():
print(f" {scenario}: {recommendation}")
print("\n ✓ Python Web框架概览完成")
# 运行框架概览
python_web_frameworks_overview()
# 2. Flask基础
# 2.1 Flask入门
def flask_basics_demo():
"""Flask基础演示"""
print("=== Flask基础演示 ===")
# 注意:这里只是演示代码结构,实际运行需要安装Flask
print("\n1. Flask基础应用结构:")
basic_app_code = '''
# app.py - 基础Flask应用
from flask import Flask, request, jsonify, render_template
# 创建Flask应用实例
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
# 基础路由
@app.route('/')
def index():
return '<h1>欢迎使用Flask!</h1>'
# 带参数的路由
@app.route('/user/<username>')
def user_profile(username):
return f'<h1>用户: {username}</h1>'
# 多种HTTP方法
@app.route('/api/data', methods=['GET', 'POST'])
def api_data():
if request.method == 'GET':
return jsonify({'message': '获取数据成功'})
elif request.method == 'POST':
data = request.get_json()
return jsonify({'message': '数据已保存', 'data': data})
# 启动应用
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
'''
print(" 基础应用代码:")
print(basic_app_code)
# 2. Flask配置
print("\n2. Flask配置管理:")
config_code = '''
# config.py - 配置文件
class Config:
SECRET_KEY = 'your-secret-key'
SQLALCHEMY_TRACK_MODIFICATIONS = False
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///dev.db'
class ProductionConfig(Config):
DEBUG = False
SQLALCHEMY_DATABASE_URI = 'postgresql://user:pass@localhost/prod'
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
# 在app.py中使用配置
from config import config
app = Flask(__name__)
app.config.from_object(config['development'])
'''
print(" 配置管理代码:")
print(config_code)
# 3. 请求处理
print("\n3. 请求处理示例:")
request_handling_code = '''
# 请求处理示例
from flask import Flask, request, jsonify, session
@app.route('/form', methods=['GET', 'POST'])
def handle_form():
if request.method == 'GET':
# 显示表单
return render_template('form.html')
elif request.method == 'POST':
# 处理表单数据
username = request.form.get('username')
email = request.form.get('email')
# 验证数据
if not username or not email:
return jsonify({'error': '用户名和邮箱不能为空'}), 400
# 保存到会话
session['username'] = username
session['email'] = email
return jsonify({'message': '数据保存成功'})
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': '没有文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '文件名为空'}), 400
if file:
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return jsonify({'message': '文件上传成功', 'filename': filename})
'''
print(" 请求处理代码:")
print(request_handling_code)
# 4. 模板渲染
print("\n4. 模板渲染:")
template_code = '''
<!-- templates/base.html -->
<!DOCTYPE html>
<html>
<head>
<title>{% block title %}Flask应用{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<nav>
<a href="{{ url_for('index') }}">首页</a>
<a href="{{ url_for('about') }}">关于</a>
</nav>
<main>
{% block content %}{% endblock %}
</main>
<script src="{{ url_for('static', filename='js/app.js') }}"></script>
</body>
</html>
<!-- templates/index.html -->
{% extends "base.html" %}
{% block title %}首页 - Flask应用{% endblock %}
{% block content %}
<h1>欢迎来到Flask应用</h1>
<p>当前用户: {{ session.get('username', '未登录') }}</p>
{% if messages %}
{% for message in messages %}
<div class="alert">{{ message }}</div>
{% endfor %}
{% endif %}
<form method="POST" action="{{ url_for('handle_form') }}">
<input type="text" name="username" placeholder="用户名" required>
<input type="email" name="email" placeholder="邮箱" required>
<button type="submit">提交</button>
</form>
{% endblock %}
'''
print(" 模板代码:")
print(template_code)
print("\n ✓ Flask基础演示完成")
# 运行Flask基础演示
flask_basics_demo()
# 2.2 Flask扩展和中间件
def flask_extensions_demo():
"""Flask扩展和中间件演示"""
print("=== Flask扩展和中间件演示 ===")
# 1. 常用Flask扩展
print("\n1. 常用Flask扩展:")
extensions = {
"Flask-SQLAlchemy": {
"功能": "数据库ORM集成",
"安装": "pip install Flask-SQLAlchemy",
"用途": "简化数据库操作"
},
"Flask-Login": {
"功能": "用户会话管理",
"安装": "pip install Flask-Login",
"用途": "处理用户登录、登出、会话"
},
"Flask-WTF": {
"功能": "表单处理和验证",
"安装": "pip install Flask-WTF",
"用途": "表单渲染、验证、CSRF保护"
},
"Flask-Mail": {
"功能": "邮件发送",
"安装": "pip install Flask-Mail",
"用途": "发送邮件通知"
},
"Flask-Migrate": {
"功能": "数据库迁移",
"安装": "pip install Flask-Migrate",
"用途": "管理数据库结构变更"
},
"Flask-RESTful": {
"功能": "REST API开发",
"安装": "pip install Flask-RESTful",
"用途": "快速构建RESTful API"
}
}
for name, details in extensions.items():
print(f"\n {name}:")
for key, value in details.items():
print(f" {key}: {value}")
# 2. Flask-SQLAlchemy使用示例
print("\n2. Flask-SQLAlchemy使用示例:")
sqlalchemy_code = '''
# models.py - 数据模型
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
db = SQLAlchemy()
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 关系
posts = db.relationship('Post', backref='author', lazy=True)
def __repr__(self):
return f'<User {self.username}>'
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
def __repr__(self):
return f'<Post {self.title}>'
# app.py - 应用配置
from flask import Flask
from models import db, User, Post
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
# 创建数据库表
with app.app_context():
db.create_all()
# 数据库操作示例
@app.route('/api/users', methods=['POST'])
def create_user():
data = request.get_json()
user = User(
username=data['username'],
email=data['email']
)
db.session.add(user)
db.session.commit()
return jsonify({'id': user.id, 'username': user.username})
@app.route('/api/users')
def get_users():
users = User.query.all()
return jsonify([
{'id': u.id, 'username': u.username, 'email': u.email}
for u in users
])
'''
print(" SQLAlchemy代码:")
print(sqlalchemy_code)
# 3. 中间件示例
print("\n3. Flask中间件示例:")
middleware_code = '''
# 中间件和钩子函数
from flask import Flask, request, g
import time
import logging
app = Flask(__name__)
# 请求前处理
@app.before_request
def before_request():
g.start_time = time.time()
g.user_id = request.headers.get('X-User-ID')
# 记录请求日志
logging.info(f"Request: {request.method} {request.path}")
# 请求后处理
@app.after_request
def after_request(response):
# 计算请求处理时间
if hasattr(g, 'start_time'):
duration = time.time() - g.start_time
response.headers['X-Response-Time'] = str(duration)
# 添加CORS头
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
return response
# 错误处理
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': '资源未找到'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': '服务器内部错误'}), 500
# 自定义中间件类
class AuthMiddleware:
def __init__(self, app):
self.app = app
self.app.before_request(self.authenticate)
def authenticate(self):
# 跳过公开路由
if request.endpoint in ['index', 'login']:
return
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': '需要认证'}), 401
# 验证token逻辑
if not self.validate_token(token):
return jsonify({'error': '无效token'}), 401
def validate_token(self, token):
# 实际的token验证逻辑
return token == 'valid-token'
# 应用中间件
auth_middleware = AuthMiddleware(app)
'''
print(" 中间件代码:")
print(middleware_code)
print("\n ✓ Flask扩展和中间件演示完成")
# 运行Flask扩展演示
flask_extensions_demo()
# 3. Django基础
# 3.1 Django项目结构
def django_basics_demo():
"""Django基础演示"""
print("=== Django基础演示 ===")
# 1. Django项目创建和结构
print("\n1. Django项目创建和结构:")
project_structure = '''
# 创建Django项目
$ django-admin startproject myproject
$ cd myproject
$ python manage.py startapp myapp
# 项目结构
myproject/
├── manage.py # 项目管理脚本
├── myproject/ # 项目配置目录
│ ├── __init__.py
│ ├── settings.py # 项目设置
│ ├── urls.py # 主URL配置
│ ├── wsgi.py # WSGI配置
│ └── asgi.py # ASGI配置
└── myapp/ # 应用目录
├── __init__.py
├── admin.py # 管理后台配置
├── apps.py # 应用配置
├── models.py # 数据模型
├── views.py # 视图函数
├── urls.py # 应用URL配置
├── tests.py # 测试文件
└── migrations/ # 数据库迁移文件
└── __init__.py
'''
print(" 项目结构:")
print(project_structure)
# 2. Django设置配置
print("\n2. Django设置配置:")
settings_code = '''
# settings.py - Django设置
import os
from pathlib import Path
# 基础设置
BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = 'your-secret-key-here'
DEBUG = True
ALLOWED_HOSTS = ['localhost', '127.0.0.1']
# 应用配置
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'myapp', # 自定义应用
'rest_framework', # Django REST framework
]
# 中间件配置
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
# URL配置
ROOT_URLCONF = 'myproject.urls'
# 模板配置
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [BASE_DIR / 'templates'],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
# 数据库配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# 静态文件配置
STATIC_URL = '/static/'
STATIC_ROOT = BASE_DIR / 'staticfiles'
STATICFILES_DIRS = [BASE_DIR / 'static']
# 媒体文件配置
MEDIA_URL = '/media/'
MEDIA_ROOT = BASE_DIR / 'media'
# 国际化配置
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_TZ = True
'''
print(" 设置配置:")
print(settings_code)
# 3. Django模型
print("\n3. Django模型示例:")
models_code = '''
# models.py - 数据模型
from django.db import models
from django.contrib.auth.models import User
from django.utils import timezone
class Category(models.Model):
name = models.CharField(max_length=100, unique=True, verbose_name='分类名称')
description = models.TextField(blank=True, verbose_name='描述')
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
class Meta:
verbose_name = '分类'
verbose_name_plural = '分类'
ordering = ['name']
def __str__(self):
return self.name
class Post(models.Model):
STATUS_CHOICES = [
('draft', '草稿'),
('published', '已发布'),
('archived', '已归档'),
]
title = models.CharField(max_length=200, verbose_name='标题')
slug = models.SlugField(max_length=200, unique=True, verbose_name='URL别名')
content = models.TextField(verbose_name='内容')
excerpt = models.TextField(max_length=500, blank=True, verbose_name='摘要')
status = models.CharField(max_length=20, choices=STATUS_CHOICES,
default='draft', verbose_name='状态')
# 关系字段
author = models.ForeignKey(User, on_delete=models.CASCADE,
related_name='posts', verbose_name='作者')
category = models.ForeignKey(Category, on_delete=models.SET_NULL,
null=True, blank=True, verbose_name='分类')
tags = models.ManyToManyField('Tag', blank=True, verbose_name='标签')
# 时间字段
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
published_at = models.DateTimeField(null=True, blank=True, verbose_name='发布时间')
class Meta:
verbose_name = '文章'
verbose_name_plural = '文章'
ordering = ['-created_at']
indexes = [
models.Index(fields=['status', 'published_at']),
models.Index(fields=['author', 'created_at']),
]
def __str__(self):
return self.title
def save(self, *args, **kwargs):
if self.status == 'published' and not self.published_at:
self.published_at = timezone.now()
super().save(*args, **kwargs)
@property
def is_published(self):
return self.status == 'published'
class Tag(models.Model):
name = models.CharField(max_length=50, unique=True, verbose_name='标签名')
color = models.CharField(max_length=7, default='#007bff', verbose_name='颜色')
class Meta:
verbose_name = '标签'
verbose_name_plural = '标签'
ordering = ['name']
def __str__(self):
return self.name
class Comment(models.Model):
post = models.ForeignKey(Post, on_delete=models.CASCADE,
related_name='comments', verbose_name='文章')
author_name = models.CharField(max_length=100, verbose_name='作者姓名')
author_email = models.EmailField(verbose_name='作者邮箱')
content = models.TextField(verbose_name='评论内容')
is_approved = models.BooleanField(default=False, verbose_name='已审核')
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
class Meta:
verbose_name = '评论'
verbose_name_plural = '评论'
ordering = ['-created_at']
def __str__(self):
return f'{self.author_name} 对 {self.post.title} 的评论'
'''
print(" 模型代码:")
print(models_code)
print("\n ✓ Django基础演示完成")
# 运行Django基础演示
django_basics_demo()
# 3.2 Django视图和URL
def django_views_urls_demo():
"""Django视图和URL演示"""
print("=== Django视图和URL演示 ===")
# 1. 函数视图
print("\n1. Django函数视图:")
function_views_code = '''
# views.py - 函数视图
from django.shortcuts import render, get_object_or_404, redirect
from django.http import JsonResponse, HttpResponse
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_http_methods
from django.core.paginator import Paginator
from .models import Post, Category, Tag
from .forms import PostForm, CommentForm
def index(request):
"""首页视图"""
posts = Post.objects.filter(status='published').order_by('-published_at')
# 分页
paginator = Paginator(posts, 10) # 每页10篇文章
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
context = {
'posts': page_obj,
'categories': Category.objects.all(),
'popular_tags': Tag.objects.all()[:10]
}
return render(request, 'blog/index.html', context)
def post_detail(request, slug):
"""文章详情视图"""
post = get_object_or_404(Post, slug=slug, status='published')
comments = post.comments.filter(is_approved=True)
# 处理评论表单
if request.method == 'POST':
comment_form = CommentForm(request.POST)
if comment_form.is_valid():
comment = comment_form.save(commit=False)
comment.post = post
comment.save()
return redirect('post_detail', slug=slug)
else:
comment_form = CommentForm()
context = {
'post': post,
'comments': comments,
'comment_form': comment_form
}
return render(request, 'blog/post_detail.html', context)
@login_required
def post_create(request):
"""创建文章视图"""
if request.method == 'POST':
form = PostForm(request.POST)
if form.is_valid():
post = form.save(commit=False)
post.author = request.user
post.save()
form.save_m2m() # 保存多对多关系
return redirect('post_detail', slug=post.slug)
else:
form = PostForm()
return render(request, 'blog/post_form.html', {'form': form})
@require_http_methods(["GET", "POST"])
def category_posts(request, category_id):
"""分类文章视图"""
category = get_object_or_404(Category, id=category_id)
posts = Post.objects.filter(category=category, status='published')
context = {
'category': category,
'posts': posts
}
return render(request, 'blog/category_posts.html', context)
def api_posts(request):
"""API视图 - 返回JSON数据"""
posts = Post.objects.filter(status='published').values(
'id', 'title', 'slug', 'excerpt', 'published_at'
)
return JsonResponse({
'posts': list(posts),
'count': posts.count()
})
'''
print(" 函数视图代码:")
print(function_views_code)
# 2. 类视图
print("\n2. Django类视图:")
class_views_code = '''
# views.py - 类视图
from django.views.generic import (
ListView, DetailView, CreateView, UpdateView, DeleteView
)
from django.contrib.auth.mixins import LoginRequiredMixin
from django.urls import reverse_lazy
from django.db.models import Q
class PostListView(ListView):
"""文章列表视图"""
model = Post
template_name = 'blog/post_list.html'
context_object_name = 'posts'
paginate_by = 10
def get_queryset(self):
queryset = Post.objects.filter(status='published')
# 搜索功能
search_query = self.request.GET.get('search')
if search_query:
queryset = queryset.filter(
Q(title__icontains=search_query) |
Q(content__icontains=search_query)
)
# 分类过滤
category_id = self.request.GET.get('category')
if category_id:
queryset = queryset.filter(category_id=category_id)
return queryset.order_by('-published_at')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['categories'] = Category.objects.all()
context['search_query'] = self.request.GET.get('search', '')
return context
class PostDetailView(DetailView):
"""文章详情视图"""
model = Post
template_name = 'blog/post_detail.html'
context_object_name = 'post'
slug_field = 'slug'
slug_url_kwarg = 'slug'
def get_queryset(self):
return Post.objects.filter(status='published')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['comments'] = self.object.comments.filter(is_approved=True)
context['comment_form'] = CommentForm()
return context
class PostCreateView(LoginRequiredMixin, CreateView):
"""创建文章视图"""
model = Post
form_class = PostForm
template_name = 'blog/post_form.html'
def form_valid(self, form):
form.instance.author = self.request.user
return super().form_valid(form)
def get_success_url(self):
return reverse_lazy('post_detail', kwargs={'slug': self.object.slug})
class PostUpdateView(LoginRequiredMixin, UpdateView):
"""更新文章视图"""
model = Post
form_class = PostForm
template_name = 'blog/post_form.html'
slug_field = 'slug'
slug_url_kwarg = 'slug'
def get_queryset(self):
# 只允许作者编辑自己的文章
return Post.objects.filter(author=self.request.user)
def get_success_url(self):
return reverse_lazy('post_detail', kwargs={'slug': self.object.slug})
class PostDeleteView(LoginRequiredMixin, DeleteView):
"""删除文章视图"""
model = Post
template_name = 'blog/post_confirm_delete.html'
success_url = reverse_lazy('post_list')
slug_field = 'slug'
slug_url_kwarg = 'slug'
def get_queryset(self):
return Post.objects.filter(author=self.request.user)
'''
print(" 类视图代码:")
print(class_views_code)
# 3. URL配置
print("\n3. Django URL配置:")
urls_code = '''
# myproject/urls.py - 主URL配置
from django.contrib import admin
from django.urls import path, include
from django.conf import settings
from django.conf.urls.static import static
urlpatterns = [
path('admin/', admin.site.urls),
path('', include('myapp.urls')),
path('api/', include('myapp.api_urls')),
path('accounts/', include('django.contrib.auth.urls')),
]
# 开发环境下提供媒体文件服务
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
urlpatterns += static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
# myapp/urls.py - 应用URL配置
from django.urls import path
from . import views
app_name = 'blog'
urlpatterns = [
# 函数视图URL
path('', views.index, name='index'),
path('post/<slug:slug>/', views.post_detail, name='post_detail'),
path('create/', views.post_create, name='post_create'),
path('category/<int:category_id>/', views.category_posts, name='category_posts'),
# 类视图URL
path('posts/', views.PostListView.as_view(), name='post_list'),
path('post/<slug:slug>/edit/', views.PostUpdateView.as_view(), name='post_update'),
path('post/<slug:slug>/delete/', views.PostDeleteView.as_view(), name='post_delete'),
# API URL
path('api/posts/', views.api_posts, name='api_posts'),
]
# myapp/api_urls.py - API URL配置
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .api_views import PostViewSet, CategoryViewSet
router = DefaultRouter()
router.register(r'posts', PostViewSet)
router.register(r'categories', CategoryViewSet)
urlpatterns = [
path('', include(router.urls)),
path('auth/', include('rest_framework.urls')),
]
'''
print(" URL配置代码:")
print(urls_code)
print("\n ✓ Django视图和URL演示完成")
# 运行Django视图和URL演示
django_views_urls_demo()
# 4. Django REST Framework
# 4.1 REST API开发
def django_rest_framework_demo():
"""Django REST Framework演示"""
print("=== Django REST Framework演示 ===")
# 1. 安装和配置
print("\n1. DRF安装和配置:")
installation_code = '''
# 安装Django REST Framework
pip install djangorestframework
pip install django-filter
pip install djangorestframework-simplejwt
# settings.py - 配置DRF
INSTALLED_APPS = [
# ... 其他应用
'rest_framework',
'rest_framework.authtoken',
'django_filters',
]
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
'rest_framework.authentication.SessionAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticatedOrReadOnly',
],
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 20,
'DEFAULT_FILTER_BACKENDS': [
'django_filters.rest_framework.DjangoFilterBackend',
'rest_framework.filters.SearchFilter',
'rest_framework.filters.OrderingFilter',
],
'DEFAULT_RENDERER_CLASSES': [
'rest_framework.renderers.JSONRenderer',
'rest_framework.renderers.BrowsableAPIRenderer',
],
}
'''
print(" 安装配置:")
print(installation_code)
# 2. 序列化器
print("\n2. DRF序列化器:")
serializers_code = '''
# serializers.py - 序列化器
from rest_framework import serializers
from .models import Post, Category, Tag, Comment
from django.contrib.auth.models import User
class UserSerializer(serializers.ModelSerializer):
posts_count = serializers.SerializerMethodField()
class Meta:
model = User
fields = ['id', 'username', 'email', 'date_joined', 'posts_count']
read_only_fields = ['date_joined']
def get_posts_count(self, obj):
return obj.posts.filter(status='published').count()
class CategorySerializer(serializers.ModelSerializer):
posts_count = serializers.SerializerMethodField()
class Meta:
model = Category
fields = ['id', 'name', 'description', 'created_at', 'posts_count']
read_only_fields = ['created_at']
def get_posts_count(self, obj):
return obj.post_set.filter(status='published').count()
class TagSerializer(serializers.ModelSerializer):
class Meta:
model = Tag
fields = ['id', 'name', 'color']
class CommentSerializer(serializers.ModelSerializer):
class Meta:
model = Comment
fields = ['id', 'author_name', 'author_email', 'content',
'is_approved', 'created_at']
read_only_fields = ['created_at', 'is_approved']
class PostListSerializer(serializers.ModelSerializer):
author = UserSerializer(read_only=True)
category = CategorySerializer(read_only=True)
tags = TagSerializer(many=True, read_only=True)
comments_count = serializers.SerializerMethodField()
class Meta:
model = Post
fields = ['id', 'title', 'slug', 'excerpt', 'status',
'author', 'category', 'tags', 'created_at',
'published_at', 'comments_count']
def get_comments_count(self, obj):
return obj.comments.filter(is_approved=True).count()
class PostDetailSerializer(serializers.ModelSerializer):
author = UserSerializer(read_only=True)
category = CategorySerializer(read_only=True)
tags = TagSerializer(many=True, read_only=True)
comments = CommentSerializer(many=True, read_only=True)
class Meta:
model = Post
fields = ['id', 'title', 'slug', 'content', 'excerpt',
'status', 'author', 'category', 'tags',
'created_at', 'updated_at', 'published_at', 'comments']
read_only_fields = ['created_at', 'updated_at']
class PostCreateUpdateSerializer(serializers.ModelSerializer):
tags = serializers.PrimaryKeyRelatedField(
many=True, queryset=Tag.objects.all(), required=False
)
class Meta:
model = Post
fields = ['title', 'slug', 'content', 'excerpt', 'status',
'category', 'tags']
def validate_slug(self, value):
# 验证slug唯一性
if self.instance:
if Post.objects.exclude(pk=self.instance.pk).filter(slug=value).exists():
raise serializers.ValidationError("该URL别名已存在")
else:
if Post.objects.filter(slug=value).exists():
raise serializers.ValidationError("该URL别名已存在")
return value
'''
print(" 序列化器代码:")
print(serializers_code)
# 3. API视图
print("\n3. DRF API视图:")
api_views_code = '''
# api_views.py - API视图
from rest_framework import viewsets, status, filters
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticatedOrReadOnly, IsAuthenticated
from django_filters.rest_framework import DjangoFilterBackend
from django.db.models import Q
from .models import Post, Category, Tag, Comment
from .serializers import (
PostListSerializer, PostDetailSerializer, PostCreateUpdateSerializer,
CategorySerializer, TagSerializer, CommentSerializer
)
class PostViewSet(viewsets.ModelViewSet):
"""文章API视图集"""
queryset = Post.objects.all()
permission_classes = [IsAuthenticatedOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['status', 'category', 'author']
search_fields = ['title', 'content', 'excerpt']
ordering_fields = ['created_at', 'updated_at', 'published_at']
ordering = ['-created_at']
def get_serializer_class(self):
if self.action == 'list':
return PostListSerializer
elif self.action in ['create', 'update', 'partial_update']:
return PostCreateUpdateSerializer
return PostDetailSerializer
def get_queryset(self):
queryset = Post.objects.all()
# 非管理员只能看到已发布的文章
if not self.request.user.is_staff:
queryset = queryset.filter(status='published')
# 作者只能看到自己的文章
if self.action in ['update', 'partial_update', 'destroy']:
if not self.request.user.is_staff:
queryset = queryset.filter(author=self.request.user)
return queryset
def perform_create(self, serializer):
serializer.save(author=self.request.user)
@action(detail=True, methods=['post'], permission_classes=[IsAuthenticated])
def toggle_status(self, request, pk=None):
"""切换文章状态"""
post = self.get_object()
if post.author != request.user and not request.user.is_staff:
return Response(
{'error': '无权限操作'},
status=status.HTTP_403_FORBIDDEN
)
if post.status == 'published':
post.status = 'draft'
else:
post.status = 'published'
post.save()
return Response({
'message': f'文章状态已更改为{post.get_status_display()}',
'status': post.status
})
@action(detail=False, methods=['get'])
def my_posts(self, request):
"""获取当前用户的文章"""
if not request.user.is_authenticated:
return Response(
{'error': '需要登录'},
status=status.HTTP_401_UNAUTHORIZED
)
queryset = self.get_queryset().filter(author=request.user)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = PostListSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = PostListSerializer(queryset, many=True)
return Response(serializer.data)
class CategoryViewSet(viewsets.ModelViewSet):
"""分类API视图集"""
queryset = Category.objects.all()
serializer_class = CategorySerializer
permission_classes = [IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ['name', 'description']
ordering_fields = ['name', 'created_at']
ordering = ['name']
@action(detail=True, methods=['get'])
def posts(self, request, pk=None):
"""获取分类下的文章"""
category = self.get_object()
posts = Post.objects.filter(
category=category,
status='published'
).order_by('-published_at')
page = self.paginate_queryset(posts)
if page is not None:
serializer = PostListSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = PostListSerializer(posts, many=True)
return Response(serializer.data)
class TagViewSet(viewsets.ModelViewSet):
"""标签API视图集"""
queryset = Tag.objects.all()
serializer_class = TagSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ['name']
ordering_fields = ['name']
ordering = ['name']
class CommentViewSet(viewsets.ModelViewSet):
"""评论API视图集"""
queryset = Comment.objects.all()
serializer_class = CommentSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.OrderingFilter]
filterset_fields = ['post', 'is_approved']
ordering_fields = ['created_at']
ordering = ['-created_at']
def get_queryset(self):
queryset = Comment.objects.all()
# 非管理员只能看到已审核的评论
if not self.request.user.is_staff:
queryset = queryset.filter(is_approved=True)
return queryset
@action(detail=True, methods=['post'], permission_classes=[IsAuthenticated])
def approve(self, request, pk=None):
"""审核评论"""
if not request.user.is_staff:
return Response(
{'error': '无权限操作'},
status=status.HTTP_403_FORBIDDEN
)
comment = self.get_object()
comment.is_approved = True
comment.save()
return Response({'message': '评论已审核通过'})
'''
print(" API视图代码:")
print(api_views_code)
print("\n ✓ Django REST Framework演示完成")
# 运行DRF演示
django_rest_framework_demo()
# 5. FastAPI基础
# 5.1 FastAPI入门
def fastapi_basics_demo():
"""FastAPI基础演示"""
print("=== FastAPI基础演示 ===")
# 1. FastAPI基础应用
print("\n1. FastAPI基础应用:")
basic_app_code = '''
# main.py - FastAPI基础应用
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr
from typing import List, Optional
from datetime import datetime
import uvicorn
# 创建FastAPI应用实例
app = FastAPI(
title="博客API",
description="一个简单的博客API示例",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# 安全认证
security = HTTPBearer()
# Pydantic模型
class UserBase(BaseModel):
username: str
email: EmailStr
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
created_at: datetime
class Config:
orm_mode = True
class PostBase(BaseModel):
title: str
content: str
excerpt: Optional[str] = None
class PostCreate(PostBase):
category_id: Optional[int] = None
tags: Optional[List[str]] = []
class Post(PostBase):
id: int
slug: str
status: str
author_id: int
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
class PostResponse(Post):
author: User
comments_count: int
# 依赖注入
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""获取当前用户"""
token = credentials.credentials
# 这里应该验证token并返回用户信息
# 简化示例,直接返回模拟用户
if token != "valid-token":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
return {"id": 1, "username": "testuser", "email": "test@example.com"}
# 路由定义
@app.get("/")
async def root():
"""根路径"""
return {"message": "欢迎使用博客API"}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "timestamp": datetime.now()}
# 用户相关路由
@app.post("/users/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
"""创建用户"""
# 这里应该保存到数据库
return {
"id": 1,
"username": user.username,
"email": user.email,
"created_at": datetime.now()
}
@app.get("/users/me", response_model=User)
async def get_current_user_info(current_user: dict = Depends(get_current_user)):
"""获取当前用户信息"""
return {
"id": current_user["id"],
"username": current_user["username"],
"email": current_user["email"],
"created_at": datetime.now()
}
# 文章相关路由
@app.get("/posts/", response_model=List[PostResponse])
async def get_posts(
skip: int = 0,
limit: int = 10,
search: Optional[str] = None,
category: Optional[int] = None
):
"""获取文章列表"""
# 模拟数据
posts = [
{
"id": 1,
"title": "FastAPI入门",
"content": "FastAPI是一个现代、快速的Web框架...",
"excerpt": "FastAPI简介",
"slug": "fastapi-intro",
"status": "published",
"author_id": 1,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"author": {
"id": 1,
"username": "admin",
"email": "admin@example.com",
"created_at": datetime.now()
},
"comments_count": 5
}
]
return posts[skip:skip + limit]
@app.get("/posts/{post_id}", response_model=PostResponse)
async def get_post(post_id: int):
"""获取单篇文章"""
if post_id != 1:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章未找到"
)
return {
"id": 1,
"title": "FastAPI入门",
"content": "FastAPI是一个现代、快速的Web框架...",
"excerpt": "FastAPI简介",
"slug": "fastapi-intro",
"status": "published",
"author_id": 1,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"author": {
"id": 1,
"username": "admin",
"email": "admin@example.com",
"created_at": datetime.now()
},
"comments_count": 5
}
@app.post("/posts/", response_model=Post, status_code=status.HTTP_201_CREATED)
async def create_post(
post: PostCreate,
current_user: dict = Depends(get_current_user)
):
"""创建文章"""
return {
"id": 2,
"title": post.title,
"content": post.content,
"excerpt": post.excerpt,
"slug": post.title.lower().replace(" ", "-"),
"status": "draft",
"author_id": current_user["id"],
"created_at": datetime.now(),
"updated_at": datetime.now()
}
@app.put("/posts/{post_id}", response_model=Post)
async def update_post(
post_id: int,
post: PostCreate,
current_user: dict = Depends(get_current_user)
):
"""更新文章"""
if post_id != 1:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章未找到"
)
return {
"id": post_id,
"title": post.title,
"content": post.content,
"excerpt": post.excerpt,
"slug": post.title.lower().replace(" ", "-"),
"status": "published",
"author_id": current_user["id"],
"created_at": datetime.now(),
"updated_at": datetime.now()
}
@app.delete("/posts/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
post_id: int,
current_user: dict = Depends(get_current_user)
):
"""删除文章"""
if post_id != 1:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章未找到"
)
# 删除逻辑
return
# 启动应用
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
'''
print(" 基础应用代码:")
print(basic_app_code)
# 2. 中间件和异常处理
print("\n2. FastAPI中间件和异常处理:")
middleware_code = '''
# middleware.py - 中间件和异常处理
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
app = FastAPI()
# CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应该指定具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 信任主机中间件
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["localhost", "127.0.0.1", "*.example.com"]
)
# 自定义中间件
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# 记录请求信息
logging.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# 计算处理时间
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
# 记录响应信息
logging.info(f"Response: {response.status_code} - {process_time:.4f}s")
return response
app.add_middleware(LoggingMiddleware)
# 全局异常处理
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.status_code,
"message": exc.detail,
"path": str(request.url)
}
}
)
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
return JSONResponse(
status_code=400,
content={
"error": {
"code": 400,
"message": "请求参数错误",
"detail": str(exc),
"path": str(request.url)
}
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
logging.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": {
"code": 500,
"message": "服务器内部错误",
"path": str(request.url)
}
}
)
'''
print(" 中间件代码:")
print(middleware_code)
# 3. 数据库集成
print("\n3. FastAPI数据库集成:")
database_code = '''
# database.py - 数据库配置
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
from datetime import datetime
# 数据库配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./blog.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # SQLite特有配置
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 数据库模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
slug = Column(String, unique=True, index=True)
content = Column(Text)
excerpt = Column(Text)
status = Column(String, default="draft")
author_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
author = relationship("User", back_populates="posts")
# 创建数据库表
Base.metadata.create_all(bind=engine)
# 数据库依赖
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# CRUD操作
class UserCRUD:
@staticmethod
def create_user(db: Session, user_data: dict):
db_user = User(**user_data)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@staticmethod
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
@staticmethod
def get_user_by_username(db: Session, username: str):
return db.query(User).filter(User.username == username).first()
class PostCRUD:
@staticmethod
def create_post(db: Session, post_data: dict, author_id: int):
db_post = Post(**post_data, author_id=author_id)
db.add(db_post)
db.commit()
db.refresh(db_post)
return db_post
@staticmethod
def get_posts(db: Session, skip: int = 0, limit: int = 10):
return db.query(Post).offset(skip).limit(limit).all()
@staticmethod
def get_post(db: Session, post_id: int):
return db.query(Post).filter(Post.id == post_id).first()
@staticmethod
def update_post(db: Session, post_id: int, post_data: dict):
db_post = db.query(Post).filter(Post.id == post_id).first()
if db_post:
for key, value in post_data.items():
setattr(db_post, key, value)
db_post.updated_at = datetime.utcnow()
db.commit()
db.refresh(db_post)
return db_post
@staticmethod
def delete_post(db: Session, post_id: int):
db_post = db.query(Post).filter(Post.id == post_id).first()
if db_post:
db.delete(db_post)
db.commit()
return db_post
# 在main.py中使用数据库
from fastapi import Depends
from sqlalchemy.orm import Session
from database import get_db, UserCRUD, PostCRUD
@app.post("/users/", response_model=User)
async def create_user(user: UserCreate, db: Session = Depends(get_db)):
# 检查用户是否已存在
existing_user = UserCRUD.get_user_by_username(db, user.username)
if existing_user:
raise HTTPException(status_code=400, detail="用户名已存在")
# 创建用户(实际应用中需要加密密码)
user_data = {
"username": user.username,
"email": user.email,
"hashed_password": user.password # 应该使用bcrypt等加密
}
return UserCRUD.create_user(db, user_data)
@app.get("/posts/", response_model=List[Post])
async def get_posts(
skip: int = 0,
limit: int = 10,
db: Session = Depends(get_db)
):
return PostCRUD.get_posts(db, skip=skip, limit=limit)
'''
print(" 数据库集成代码:")
print(database_code)
print("\n ✓ FastAPI基础演示完成")
# 运行FastAPI基础演示
fastapi_basics_demo()