第19天-电子邮件
哪吒 2023/6/15
# 第19天-电子邮件
# 1. 电子邮件基础
# 1.1 电子邮件协议概述
def email_protocols_overview():
"""电子邮件协议概述"""
print("=== 电子邮件协议概述 ===")
protocols = {
"SMTP (Simple Mail Transfer Protocol)": {
"用途": "发送邮件",
"端口": "25 (非加密), 587 (STARTTLS), 465 (SSL/TLS)",
"特点": "用于邮件传输,从客户端到服务器或服务器间",
"Python模块": "smtplib"
},
"POP3 (Post Office Protocol 3)": {
"用途": "接收邮件",
"端口": "110 (非加密), 995 (SSL/TLS)",
"特点": "下载邮件到本地,服务器上的邮件通常被删除",
"Python模块": "poplib"
},
"IMAP (Internet Message Access Protocol)": {
"用途": "接收邮件",
"端口": "143 (非加密), 993 (SSL/TLS)",
"特点": "邮件保存在服务器上,支持多设备同步",
"Python模块": "imaplib"
}
}
for protocol, details in protocols.items():
print(f"\n{protocol}:")
for key, value in details.items():
print(f" {key}: {value}")
print("\n邮件处理流程:")
flow_steps = [
"1. 编写邮件内容(文本、HTML、附件)",
"2. 使用SMTP协议发送邮件",
"3. 邮件服务器接收并转发邮件",
"4. 收件人使用POP3或IMAP接收邮件",
"5. 邮件客户端显示邮件内容"
]
for step in flow_steps:
print(f" {step}")
# 运行协议概述
email_protocols_overview()
# 1.2 Python邮件模块介绍
import smtplib
import poplib
import imaplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from email.header import decode_header
from email.utils import parseaddr, formataddr
import os
from datetime import datetime
def python_email_modules():
"""Python邮件模块介绍"""
print("=== Python邮件模块介绍 ===")
modules = {
"smtplib": {
"功能": "SMTP客户端,用于发送邮件",
"主要类": "SMTP, SMTP_SSL",
"常用方法": "connect(), login(), sendmail(), quit()"
},
"poplib": {
"功能": "POP3客户端,用于接收邮件",
"主要类": "POP3, POP3_SSL",
"常用方法": "user(), pass_(), list(), retr(), dele()"
},
"imaplib": {
"功能": "IMAP客户端,用于接收邮件",
"主要类": "IMAP4, IMAP4_SSL",
"常用方法": "login(), select(), search(), fetch()"
},
"email.mime": {
"功能": "构造邮件内容",
"主要类": "MIMEText, MIMEMultipart, MIMEBase",
"常用方法": "attach(), set_payload(), as_string()"
},
"email.header": {
"功能": "处理邮件头部编码",
"主要函数": "decode_header(), make_header()",
"用途": "处理中文等非ASCII字符"
}
}
for module, details in modules.items():
print(f"\n{module}:")
for key, value in details.items():
print(f" {key}: {value}")
print("\n邮件格式标准:")
standards = [
"RFC 5322: Internet Message Format (邮件格式)",
"RFC 2045-2049: MIME (多媒体邮件扩展)",
"RFC 3501: IMAP4rev1 (IMAP协议)",
"RFC 1939: POP3 (POP3协议)",
"RFC 5321: SMTP (SMTP协议)"
]
for standard in standards:
print(f" • {standard}")
# 运行模块介绍
python_email_modules()
# 2. 发送邮件
# 2.1 SMTP基础发送
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email.utils import formataddr
def smtp_basic_demo():
"""SMTP基础发送演示"""
print("=== SMTP基础发送演示 ===")
# 1. 发送纯文本邮件
print("\n1. 发送纯文本邮件:")
def send_text_email():
"""发送纯文本邮件"""
# 邮件配置
smtp_server = "smtp.gmail.com" # Gmail SMTP服务器
smtp_port = 587 # STARTTLS端口
sender_email = "your_email@gmail.com"
sender_password = "your_app_password" # 应用专用密码
# 收件人信息
receiver_email = "recipient@example.com"
# 创建邮件内容
subject = "Python发送的测试邮件"
body = """
这是一封使用Python发送的测试邮件。
邮件内容:
- 发送时间:{}
- 发送方式:SMTP
- 编程语言:Python
祝好!
""".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 创建MIMEText对象
message = MIMEText(body, 'plain', 'utf-8')
message['From'] = formataddr(("Python发送者", sender_email))
message['To'] = receiver_email
message['Subject'] = Header(subject, 'utf-8')
try:
# 连接SMTP服务器
print(" 正在连接SMTP服务器...")
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls() # 启用TLS加密
# 登录
print(" 正在登录...")
server.login(sender_email, sender_password)
# 发送邮件
print(" 正在发送邮件...")
text = message.as_string()
server.sendmail(sender_email, receiver_email, text)
print(" ✓ 邮件发送成功!")
except Exception as e:
print(f" ✗ 邮件发送失败: {e}")
finally:
server.quit()
# 2. 发送HTML邮件
print("\n2. 发送HTML邮件:")
def send_html_email():
"""发送HTML邮件"""
# 邮件配置(示例配置)
smtp_config = {
'server': 'smtp.example.com',
'port': 587,
'username': 'your_email@example.com',
'password': 'your_password'
}
# HTML邮件内容
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Python HTML邮件</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background-color: #4CAF50; color: white; padding: 10px; text-align: center; }
.content { padding: 20px; border: 1px solid #ddd; }
.footer { background-color: #f1f1f1; padding: 10px; text-align: center; font-size: 12px; }
.highlight { background-color: #ffeb3b; padding: 5px; }
</style>
</head>
<body>
<div class="header">
<h1>Python邮件发送测试</h1>
</div>
<div class="content">
<h2>邮件内容</h2>
<p>这是一封使用Python发送的<span class="highlight">HTML格式</span>邮件。</p>
<h3>功能特点:</h3>
<ul>
<li>支持HTML格式</li>
<li>支持CSS样式</li>
<li>支持图片和链接</li>
<li>支持表格和列表</li>
</ul>
<h3>技术信息:</h3>
<table border="1" style="border-collapse: collapse; width: 100%;">
<tr>
<th style="padding: 8px; background-color: #f2f2f2;">项目</th>
<th style="padding: 8px; background-color: #f2f2f2;">值</th>
</tr>
<tr>
<td style="padding: 8px;">发送时间</td>
<td style="padding: 8px;">{}</td>
</tr>
<tr>
<td style="padding: 8px;">编程语言</td>
<td style="padding: 8px;">Python</td>
</tr>
<tr>
<td style="padding: 8px;">协议</td>
<td style="padding: 8px;">SMTP</td>
</tr>
</table>
<p>访问我们的网站:<a href="https://www.python.org">Python官网</a></p>
</div>
<div class="footer">
<p>此邮件由Python自动发送 | © 2023 Python邮件系统</p>
</div>
</body>
</html>
""".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 创建HTML邮件
message = MIMEMultipart('alternative')
message['From'] = formataddr(("Python HTML发送者", smtp_config['username']))
message['To'] = "recipient@example.com"
message['Subject'] = Header("Python HTML邮件测试", 'utf-8')
# 添加HTML内容
html_part = MIMEText(html_content, 'html', 'utf-8')
message.attach(html_part)
print(" HTML邮件内容已准备完成")
print(" 邮件大小:", len(message.as_string()), "字节")
# 注意:这里只是演示,实际发送需要真实的SMTP配置
print(" 注意:需要配置真实的SMTP服务器信息才能发送")
# 3. 批量发送邮件
print("\n3. 批量发送邮件:")
def send_bulk_emails():
"""批量发送邮件"""
# 收件人列表
recipients = [
{"email": "user1@example.com", "name": "用户一"},
{"email": "user2@example.com", "name": "用户二"},
{"email": "user3@example.com", "name": "用户三"}
]
# 邮件模板
def create_personalized_email(recipient):
"""创建个性化邮件"""
subject = f"欢迎 {recipient['name']} 加入我们!"
body = f"""
亲爱的 {recipient['name']},
欢迎您加入我们的Python学习社区!
您的邮箱:{recipient['email']}
注册时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
我们为您准备了丰富的学习资源:
• Python基础教程
• 实战项目案例
• 在线编程练习
• 技术交流社区
期待与您一起学习进步!
Python学习团队
"""
message = MIMEText(body, 'plain', 'utf-8')
message['From'] = formataddr(("Python学习团队", "team@pythonlearn.com"))
message['To'] = recipient['email']
message['Subject'] = Header(subject, 'utf-8')
return message
# 模拟批量发送
print(" 开始批量发送邮件...")
success_count = 0
failed_count = 0
for i, recipient in enumerate(recipients, 1):
try:
message = create_personalized_email(recipient)
# 这里模拟发送过程
print(f" [{i}/{len(recipients)}] 正在发送给 {recipient['name']} ({recipient['email']})")
# 实际发送代码会在这里
# server.sendmail(sender, recipient['email'], message.as_string())
success_count += 1
print(f" ✓ 发送成功")
except Exception as e:
failed_count += 1
print(f" ✗ 发送失败: {e}")
print(f"\n 批量发送完成:")
print(f" 成功: {success_count} 封")
print(f" 失败: {failed_count} 封")
print(f" 总计: {len(recipients)} 封")
# 运行示例(注意:需要真实SMTP配置才能实际发送)
print("\n注意:以下示例需要配置真实的SMTP服务器信息")
print("建议使用Gmail、QQ邮箱等提供的SMTP服务")
# send_text_email() # 取消注释并配置SMTP信息后可运行
send_html_email()
send_bulk_emails()
# 运行SMTP基础演示
smtp_basic_demo()
# 2.2 邮件附件处理
import os
import mimetypes
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.audio import MIMEAudio
from email.mime.application import MIMEApplication
from email import encoders
from pathlib import Path
def email_attachments_demo():
"""邮件附件处理演示"""
print("=== 邮件附件处理演示 ===")
# 1. 添加文件附件
print("\n1. 添加文件附件:")
def add_file_attachment(message, file_path):
"""添加文件附件"""
if not os.path.exists(file_path):
print(f" 文件不存在: {file_path}")
return False
# 获取文件信息
filename = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
print(f" 添加附件: {filename} ({file_size} 字节)")
# 猜测文件类型
ctype, encoding = mimetypes.guess_type(file_path)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
try:
with open(file_path, 'rb') as fp:
if maintype == 'text':
# 文本文件
attachment = MIMEText(fp.read().decode('utf-8'), _subtype=subtype)
elif maintype == 'image':
# 图片文件
attachment = MIMEImage(fp.read(), _subtype=subtype)
elif maintype == 'audio':
# 音频文件
attachment = MIMEAudio(fp.read(), _subtype=subtype)
else:
# 其他文件类型
attachment = MIMEBase(maintype, subtype)
attachment.set_payload(fp.read())
encoders.encode_base64(attachment)
# 设置附件头部
attachment.add_header(
'Content-Disposition',
'attachment',
filename=('utf-8', '', filename)
)
# 添加到邮件
message.attach(attachment)
print(f" ✓ 附件添加成功: {filename}")
return True
except Exception as e:
print(f" ✗ 附件添加失败: {e}")
return False
# 2. 创建带附件的邮件
print("\n2. 创建带附件的邮件:")
def create_email_with_attachments():
"""创建带附件的邮件"""
# 创建多部分邮件
message = MIMEMultipart()
message['From'] = formataddr(("Python发送者", "sender@example.com"))
message['To'] = "recipient@example.com"
message['Subject'] = Header("带附件的邮件测试", 'utf-8')
# 邮件正文
body = """
这是一封带有附件的测试邮件。
附件说明:
• 文档文件:包含项目说明
• 图片文件:项目截图
• 数据文件:分析结果
请查收附件内容。
谢谢!
"""
message.attach(MIMEText(body, 'plain', 'utf-8'))
# 模拟添加不同类型的附件
attachments_info = [
{"name": "document.txt", "type": "文本文档", "size": "2.5 KB"},
{"name": "screenshot.png", "type": "PNG图片", "size": "156 KB"},
{"name": "data.csv", "type": "CSV数据", "size": "45 KB"},
{"name": "report.pdf", "type": "PDF文档", "size": "1.2 MB"}
]
print(" 准备添加的附件:")
total_size = 0
for i, att in enumerate(attachments_info, 1):
print(f" {i}. {att['name']} ({att['type']}, {att['size']})")
# 这里模拟文件大小计算
size_num = float(att['size'].split()[0])
if 'KB' in att['size']:
total_size += size_num * 1024
elif 'MB' in att['size']:
total_size += size_num * 1024 * 1024
print(f" 总附件大小: {total_size/1024/1024:.2f} MB")
# 检查邮件大小限制
if total_size > 25 * 1024 * 1024: # 25MB限制
print(" ⚠️ 警告:附件总大小超过25MB,可能被邮件服务器拒绝")
return message
# 3. 内嵌图片处理
print("\n3. 内嵌图片处理:")
def create_email_with_embedded_images():
"""创建带内嵌图片的邮件"""
# 创建相关多部分邮件
message = MIMEMultipart('related')
message['From'] = formataddr(("Python发送者", "sender@example.com"))
message['To'] = "recipient@example.com"
message['Subject'] = Header("带内嵌图片的HTML邮件", 'utf-8')
# HTML内容,引用内嵌图片
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>内嵌图片邮件</title>
</head>
<body>
<h1>Python邮件系统</h1>
<p>这是一封包含内嵌图片的HTML邮件。</p>
<h2>项目截图:</h2>
<img src="cid:screenshot" alt="项目截图" style="max-width: 600px;">
<h2>Logo:</h2>
<img src="cid:logo" alt="公司Logo" style="width: 200px;">
<p>图片已内嵌在邮件中,无需额外下载。</p>
</body>
</html>
"""
# 添加HTML内容
html_part = MIMEText(html_content, 'html', 'utf-8')
message.attach(html_part)
# 模拟添加内嵌图片
embedded_images = [
{"cid": "screenshot", "file": "screenshot.png", "desc": "项目截图"},
{"cid": "logo", "file": "logo.png", "desc": "公司Logo"}
]
print(" 内嵌图片信息:")
for img in embedded_images:
print(f" CID: {img['cid']} -> {img['file']} ({img['desc']})")
# 实际使用时的代码示例
print(f" # 添加内嵌图片的代码:")
print(f" # with open('{img['file']}', 'rb') as f:")
print(f" # img_data = f.read()")
print(f" # image = MIMEImage(img_data)")
print(f" # image.add_header('Content-ID', '<{img['cid']}>')")
print(f" # message.attach(image)")
return message
# 4. 附件安全检查
print("\n4. 附件安全检查:")
def check_attachment_security(file_path):
"""检查附件安全性"""
if not os.path.exists(file_path):
return False, "文件不存在"
filename = os.path.basename(file_path)
file_ext = os.path.splitext(filename)[1].lower()
file_size = os.path.getsize(file_path)
# 危险文件扩展名
dangerous_extensions = {
'.exe', '.bat', '.cmd', '.com', '.pif', '.scr', '.vbs', '.js',
'.jar', '.msi', '.dll', '.sys', '.drv', '.ocx'
}
# 检查文件扩展名
if file_ext in dangerous_extensions:
return False, f"危险的文件类型: {file_ext}"
# 检查文件大小(25MB限制)
max_size = 25 * 1024 * 1024
if file_size > max_size:
return False, f"文件过大: {file_size/1024/1024:.2f}MB > 25MB"
# 检查文件名
if len(filename) > 255:
return False, "文件名过长"
# 检查特殊字符
invalid_chars = '<>:"/\\|?*'
if any(char in filename for char in invalid_chars):
return False, "文件名包含非法字符"
return True, "文件安全"
# 测试安全检查
test_files = [
"document.txt",
"image.png",
"script.exe", # 危险文件
"very_long_filename_" + "x" * 250 + ".txt", # 文件名过长
"file<with>invalid:chars.txt" # 非法字符
]
print(" 附件安全检查结果:")
for file_path in test_files:
is_safe, message = check_attachment_security(file_path)
status = "✓ 安全" if is_safe else "✗ 危险"
print(f" {file_path}: {status} - {message}")
# 运行示例
create_email_with_attachments()
create_email_with_embedded_images()
# 运行附件处理演示
email_attachments_demo()
# 2.3 高级发送功能
import time
import threading
from queue import Queue
from datetime import datetime, timedelta
import json
import logging
def advanced_email_features():
"""高级邮件发送功能演示"""
print("=== 高级邮件发送功能演示 ===")
# 1. 邮件模板系统
print("\n1. 邮件模板系统:")
class EmailTemplate:
"""邮件模板类"""
def __init__(self, name, subject_template, body_template, template_type='text'):
self.name = name
self.subject_template = subject_template
self.body_template = body_template
self.template_type = template_type
self.created_at = datetime.now()
def render(self, **kwargs):
"""渲染模板"""
try:
subject = self.subject_template.format(**kwargs)
body = self.body_template.format(**kwargs)
return subject, body
except KeyError as e:
raise ValueError(f"模板变量缺失: {e}")
def validate_variables(self, **kwargs):
"""验证模板变量"""
import re
# 提取模板中的变量
subject_vars = set(re.findall(r'\{(\w+)\}', self.subject_template))
body_vars = set(re.findall(r'\{(\w+)\}', self.body_template))
required_vars = subject_vars | body_vars
provided_vars = set(kwargs.keys())
missing_vars = required_vars - provided_vars
if missing_vars:
return False, f"缺少变量: {', '.join(missing_vars)}"
return True, "变量完整"
# 创建邮件模板
templates = {
'welcome': EmailTemplate(
name='欢迎邮件',
subject_template='欢迎 {username} 加入 {platform}!',
body_template="""
亲爱的 {username},
欢迎您加入 {platform}!
您的账户信息:
• 用户名:{username}
• 邮箱:{email}
• 注册时间:{register_time}
• 会员等级:{membership_level}
接下来您可以:
1. 完善个人资料
2. 浏览学习资源
3. 参与社区讨论
4. 开始您的学习之旅
如有任何问题,请随时联系我们。
祝您学习愉快!
{platform} 团队
"""
),
'notification': EmailTemplate(
name='通知邮件',
subject_template='{notification_type}:{title}',
body_template="""
{username},您好!
您有一条新的{notification_type}:
标题:{title}
内容:{content}
时间:{timestamp}
优先级:{priority}
{action_required}
详情请登录系统查看。
系统自动发送,请勿回复。
"""
)
}
# 测试模板渲染
print(" 测试邮件模板:")
# 欢迎邮件测试
welcome_data = {
'username': '张三',
'platform': 'Python学习平台',
'email': 'zhangsan@example.com',
'register_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'membership_level': '普通会员'
}
try:
subject, body = templates['welcome'].render(**welcome_data)
print(f" 欢迎邮件主题: {subject}")
print(f" 邮件长度: {len(body)} 字符")
except Exception as e:
print(f" 模板渲染失败: {e}")
# 通知邮件测试
notification_data = {
'username': '李四',
'notification_type': '系统通知',
'title': '课程更新提醒',
'content': 'Python高级编程课程已更新第10章内容',
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'priority': '中等',
'action_required': '请及时查看新增内容。'
}
try:
subject, body = templates['notification'].render(**notification_data)
print(f" 通知邮件主题: {subject}")
print(f" 邮件长度: {len(body)} 字符")
except Exception as e:
print(f" 模板渲染失败: {e}")
# 2. 邮件队列系统
print("\n2. 邮件队列系统:")
class EmailQueue:
"""邮件队列管理器"""
def __init__(self, max_workers=3, retry_times=3):
self.queue = Queue()
self.max_workers = max_workers
self.retry_times = retry_times
self.workers = []
self.is_running = False
self.sent_count = 0
self.failed_count = 0
# 配置日志
self.logger = logging.getLogger('EmailQueue')
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def add_email(self, email_data, priority=1):
"""添加邮件到队列"""
email_item = {
'data': email_data,
'priority': priority,
'retry_count': 0,
'created_at': datetime.now(),
'id': f"email_{int(time.time() * 1000)}"
}
self.queue.put(email_item)
self.logger.info(f"邮件已加入队列: {email_item['id']}")
def worker(self, worker_id):
"""工作线程"""
self.logger.info(f"工作线程 {worker_id} 启动")
while self.is_running:
try:
# 获取邮件任务
email_item = self.queue.get(timeout=1)
self.logger.info(
f"[Worker-{worker_id}] 处理邮件: {email_item['id']}"
)
# 模拟发送邮件
success = self._send_email(email_item['data'])
if success:
self.sent_count += 1
self.logger.info(
f"[Worker-{worker_id}] 邮件发送成功: {email_item['id']}"
)
else:
# 重试机制
email_item['retry_count'] += 1
if email_item['retry_count'] < self.retry_times:
self.logger.warning(
f"[Worker-{worker_id}] 邮件发送失败,重试 {email_item['retry_count']}/{self.retry_times}: {email_item['id']}"
)
# 重新加入队列
time.sleep(2 ** email_item['retry_count']) # 指数退避
self.queue.put(email_item)
else:
self.failed_count += 1
self.logger.error(
f"[Worker-{worker_id}] 邮件发送最终失败: {email_item['id']}"
)
self.queue.task_done()
except Exception as e:
if "timed out" not in str(e):
self.logger.error(f"[Worker-{worker_id}] 处理错误: {e}")
self.logger.info(f"工作线程 {worker_id} 停止")
def _send_email(self, email_data):
"""模拟发送邮件"""
# 模拟网络延迟
time.sleep(0.5)
# 模拟90%成功率
import random
return random.random() > 0.1
def start(self):
"""启动队列处理"""
if self.is_running:
return
self.is_running = True
self.sent_count = 0
self.failed_count = 0
# 启动工作线程
for i in range(self.max_workers):
worker = threading.Thread(
target=self.worker,
args=(i + 1,),
daemon=True
)
worker.start()
self.workers.append(worker)
self.logger.info(f"邮件队列启动,工作线程数: {self.max_workers}")
def stop(self):
"""停止队列处理"""
self.is_running = False
# 等待队列清空
self.queue.join()
self.logger.info("邮件队列已停止")
self.logger.info(f"发送统计 - 成功: {self.sent_count}, 失败: {self.failed_count}")
def get_status(self):
"""获取队列状态"""
return {
'queue_size': self.queue.qsize(),
'is_running': self.is_running,
'workers': len(self.workers),
'sent_count': self.sent_count,
'failed_count': self.failed_count
}
# 测试邮件队列
print(" 测试邮件队列系统:")
email_queue = EmailQueue(max_workers=2)
email_queue.start()
# 添加测试邮件
test_emails = [
{'to': 'user1@example.com', 'subject': '测试邮件1', 'body': '内容1'},
{'to': 'user2@example.com', 'subject': '测试邮件2', 'body': '内容2'},
{'to': 'user3@example.com', 'subject': '测试邮件3', 'body': '内容3'},
{'to': 'user4@example.com', 'subject': '测试邮件4', 'body': '内容4'},
{'to': 'user5@example.com', 'subject': '测试邮件5', 'body': '内容5'}
]
for email_data in test_emails:
email_queue.add_email(email_data)
print(f" 已添加 {len(test_emails)} 封邮件到队列")
# 等待处理完成
time.sleep(3)
status = email_queue.get_status()
print(f" 队列状态: {status}")
email_queue.stop()
# 3. 邮件发送统计
print("\n3. 邮件发送统计:")
class EmailStatistics:
"""邮件发送统计"""
def __init__(self):
self.stats = {
'total_sent': 0,
'total_failed': 0,
'daily_stats': {},
'hourly_stats': {},
'recipient_stats': {},
'template_stats': {}
}
def record_sent(self, recipient, template_name=None):
"""记录发送成功"""
now = datetime.now()
date_key = now.strftime('%Y-%m-%d')
hour_key = now.strftime('%Y-%m-%d %H:00')
self.stats['total_sent'] += 1
# 按日统计
if date_key not in self.stats['daily_stats']:
self.stats['daily_stats'][date_key] = {'sent': 0, 'failed': 0}
self.stats['daily_stats'][date_key]['sent'] += 1
# 按小时统计
if hour_key not in self.stats['hourly_stats']:
self.stats['hourly_stats'][hour_key] = {'sent': 0, 'failed': 0}
self.stats['hourly_stats'][hour_key]['sent'] += 1
# 按收件人统计
if recipient not in self.stats['recipient_stats']:
self.stats['recipient_stats'][recipient] = {'sent': 0, 'failed': 0}
self.stats['recipient_stats'][recipient]['sent'] += 1
# 按模板统计
if template_name:
if template_name not in self.stats['template_stats']:
self.stats['template_stats'][template_name] = {'sent': 0, 'failed': 0}
self.stats['template_stats'][template_name]['sent'] += 1
def record_failed(self, recipient, template_name=None):
"""记录发送失败"""
now = datetime.now()
date_key = now.strftime('%Y-%m-%d')
hour_key = now.strftime('%Y-%m-%d %H:00')
self.stats['total_failed'] += 1
# 按日统计
if date_key not in self.stats['daily_stats']:
self.stats['daily_stats'][date_key] = {'sent': 0, 'failed': 0}
self.stats['daily_stats'][date_key]['failed'] += 1
# 按小时统计
if hour_key not in self.stats['hourly_stats']:
self.stats['hourly_stats'][hour_key] = {'sent': 0, 'failed': 0}
self.stats['hourly_stats'][hour_key]['failed'] += 1
# 按收件人统计
if recipient not in self.stats['recipient_stats']:
self.stats['recipient_stats'][recipient] = {'sent': 0, 'failed': 0}
self.stats['recipient_stats'][recipient]['failed'] += 1
# 按模板统计
if template_name:
if template_name not in self.stats['template_stats']:
self.stats['template_stats'][template_name] = {'sent': 0, 'failed': 0}
self.stats['template_stats'][template_name]['failed'] += 1
def get_summary(self):
"""获取统计摘要"""
total = self.stats['total_sent'] + self.stats['total_failed']
success_rate = (self.stats['total_sent'] / total * 100) if total > 0 else 0
return {
'total_emails': total,
'successful': self.stats['total_sent'],
'failed': self.stats['total_failed'],
'success_rate': f"{success_rate:.2f}%",
'unique_recipients': len(self.stats['recipient_stats']),
'templates_used': len(self.stats['template_stats'])
}
def export_stats(self, filename=None):
"""导出统计数据"""
if not filename:
filename = f"email_stats_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.stats, f, ensure_ascii=False, indent=2, default=str)
return filename
except Exception as e:
print(f"导出统计数据失败: {e}")
return None
# 测试统计功能
stats = EmailStatistics()
# 模拟一些发送记录
test_records = [
('user1@example.com', 'welcome', True),
('user2@example.com', 'welcome', True),
('user3@example.com', 'notification', False),
('user1@example.com', 'notification', True),
('user4@example.com', 'welcome', True)
]
for recipient, template, success in test_records:
if success:
stats.record_sent(recipient, template)
else:
stats.record_failed(recipient, template)
summary = stats.get_summary()
print(" 发送统计摘要:")
for key, value in summary.items():
print(f" {key}: {value}")
# 导出统计数据
# export_file = stats.export_stats()
# if export_file:
# print(f" 统计数据已导出到: {export_file}")
# 运行高级功能演示
advanced_email_features()
# 3. 接收邮件
# 3.1 POP3接收邮件
import poplib
from email.parser import Parser
from email.header import decode_header
from email.utils import parseaddr
import base64
import quopri
def pop3_receive_demo():
"""POP3接收邮件演示"""
print("=== POP3接收邮件演示 ===")
# 1. 连接POP3服务器
print("\n1. 连接POP3服务器:")
def connect_pop3_server():
"""连接POP3服务器"""
# POP3服务器配置
pop3_config = {
'server': 'pop.gmail.com',
'port': 995, # SSL端口
'username': 'your_email@gmail.com',
'password': 'your_app_password'
}
try:
print(" 正在连接POP3服务器...")
# 连接SSL POP3服务器
server = poplib.POP3_SSL(pop3_config['server'], pop3_config['port'])
# 登录
print(" 正在登录...")
server.user(pop3_config['username'])
server.pass_(pop3_config['password'])
# 获取邮箱统计信息
num_messages, total_size = server.stat()
print(f" ✓ 连接成功!")
print(f" 邮件数量: {num_messages}")
print(f" 总大小: {total_size} 字节")
return server
except Exception as e:
print(f" ✗ 连接失败: {e}")
return None
# 2. 获取邮件列表
print("\n2. 获取邮件列表:")
def get_email_list(server):
"""获取邮件列表"""
if not server:
print(" 服务器连接无效")
return []
try:
# 获取邮件列表
messages = server.list()
print(f" 邮件列表响应: {messages[0]}")
email_list = []
for msg_info in messages[1]:
# 解析邮件信息:序号 大小
parts = msg_info.decode('utf-8').split()
msg_num = int(parts[0])
msg_size = int(parts[1])
email_list.append({
'number': msg_num,
'size': msg_size
})
print(f" 获取到 {len(email_list)} 封邮件:")
for i, email_info in enumerate(email_list[:5], 1): # 只显示前5封
print(f" {i}. 邮件{email_info['number']}: {email_info['size']} 字节")
if len(email_list) > 5:
print(f" ... 还有 {len(email_list) - 5} 封邮件")
return email_list
except Exception as e:
print(f" 获取邮件列表失败: {e}")
return []
# 3. 下载和解析邮件
print("\n3. 下载和解析邮件:")
def download_and_parse_email(server, msg_num):
"""下载和解析邮件"""
if not server:
print(" 服务器连接无效")
return None
try:
print(f" 正在下载邮件 {msg_num}...")
# 获取邮件内容
response, lines, octets = server.retr(msg_num)
# 合并邮件内容
raw_email = b'\n'.join(lines)
# 解析邮件
parser = Parser()
email_message = parser.parsestr(raw_email.decode('utf-8', errors='ignore'))
# 提取邮件信息
email_info = {
'number': msg_num,
'subject': decode_email_header(email_message.get('Subject', '')),
'from': decode_email_header(email_message.get('From', '')),
'to': decode_email_header(email_message.get('To', '')),
'date': email_message.get('Date', ''),
'size': octets
}
print(f" ✓ 邮件下载成功")
print(f" 主题: {email_info['subject']}")
print(f" 发件人: {email_info['from']}")
print(f" 收件人: {email_info['to']}")
print(f" 日期: {email_info['date']}")
print(f" 大小: {email_info['size']} 字节")
# 提取邮件正文
body = extract_email_body(email_message)
if body:
print(f" 正文预览: {body[:100]}...")
return email_info, email_message
except Exception as e:
print(f" 下载邮件失败: {e}")
return None, None
def decode_email_header(header):
"""解码邮件头部"""
if not header:
return ''
try:
decoded_parts = decode_header(header)
decoded_str = ''
for part, encoding in decoded_parts:
if isinstance(part, bytes):
if encoding:
decoded_str += part.decode(encoding)
else:
decoded_str += part.decode('utf-8', errors='ignore')
else:
decoded_str += part
return decoded_str
except Exception as e:
return header
def extract_email_body(email_message):
"""提取邮件正文"""
body = ''
try:
if email_message.is_multipart():
# 多部分邮件
for part in email_message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get('Content-Disposition', ''))
# 跳过附件
if 'attachment' in content_disposition:
continue
if content_type == 'text/plain':
charset = part.get_content_charset() or 'utf-8'
body = part.get_payload(decode=True).decode(charset, errors='ignore')
break
elif content_type == 'text/html' and not body:
charset = part.get_content_charset() or 'utf-8'
body = part.get_payload(decode=True).decode(charset, errors='ignore')
else:
# 单部分邮件
charset = email_message.get_content_charset() or 'utf-8'
body = email_message.get_payload(decode=True).decode(charset, errors='ignore')
return body
except Exception as e:
print(f" 提取邮件正文失败: {e}")
return ''
# 4. 删除邮件
print("\n4. 删除邮件:")
def delete_email(server, msg_num):
"""删除邮件"""
if not server:
print(" 服务器连接无效")
return False
try:
print(f" 正在删除邮件 {msg_num}...")
server.dele(msg_num)
print(f" ✓ 邮件 {msg_num} 已标记为删除")
print(" 注意:邮件将在quit()后真正删除")
return True
except Exception as e:
print(f" 删除邮件失败: {e}")
return False
# 模拟POP3操作
print(" 模拟POP3邮件接收流程:")
print(" 注意:需要配置真实的POP3服务器信息")
# 模拟邮件信息
mock_emails = [
{
'number': 1,
'subject': 'Python学习资料',
'from': 'teacher@pythonlearn.com',
'date': '2023-06-15 10:30:00',
'size': 2048
},
{
'number': 2,
'subject': '系统通知',
'from': 'system@example.com',
'date': '2023-06-15 14:20:00',
'size': 1024
}
]
print(f" 模拟邮件列表 ({len(mock_emails)} 封):")
for email in mock_emails:
print(f" 邮件{email['number']}: {email['subject']} - {email['from']}")
# server = connect_pop3_server() # 需要真实配置
# if server:
# email_list = get_email_list(server)
# if email_list:
# # 下载第一封邮件
# download_and_parse_email(server, 1)
# server.quit()
# 运行POP3演示
pop3_receive_demo()
# 3.2 IMAP接收邮件
import imaplib
import email
from email.header import decode_header
import re
def imap_receive_demo():
"""IMAP接收邮件演示"""
print("=== IMAP接收邮件演示 ===")
# 1. 连接IMAP服务器
print("\n1. 连接IMAP服务器:")
def connect_imap_server():
"""连接IMAP服务器"""
# IMAP服务器配置
imap_config = {
'server': 'imap.gmail.com',
'port': 993, # SSL端口
'username': 'your_email@gmail.com',
'password': 'your_app_password'
}
try:
print(" 正在连接IMAP服务器...")
# 连接SSL IMAP服务器
server = imaplib.IMAP4_SSL(imap_config['server'], imap_config['port'])
# 登录
print(" 正在登录...")
server.login(imap_config['username'], imap_config['password'])
print(f" ✓ 连接成功!")
return server
except Exception as e:
print(f" ✗ 连接失败: {e}")
return None
# 2. 选择邮箱文件夹
print("\n2. 选择邮箱文件夹:")
def list_mailboxes(server):
"""列出邮箱文件夹"""
if not server:
print(" 服务器连接无效")
return []
try:
# 获取文件夹列表
status, folders = server.list()
if status == 'OK':
print(" 可用文件夹:")
folder_list = []
for folder in folders:
# 解析文件夹信息
folder_str = folder.decode('utf-8')
# 提取文件夹名称
match = re.search(r'"([^"]+)"$', folder_str)
if match:
folder_name = match.group(1)
else:
# 如果没有引号,取最后一个空格后的内容
folder_name = folder_str.split()[-1]
folder_list.append(folder_name)
print(f" • {folder_name}")
return folder_list
else:
print(" 获取文件夹列表失败")
return []
except Exception as e:
print(f" 获取文件夹列表失败: {e}")
return []
def select_mailbox(server, mailbox='INBOX'):
"""选择邮箱文件夹"""
if not server:
print(" 服务器连接无效")
return False
try:
print(f" 正在选择文件夹: {mailbox}")
status, messages = server.select(mailbox)
if status == 'OK':
num_messages = int(messages[0])
print(f" ✓ 文件夹选择成功")
print(f" 邮件数量: {num_messages}")
return True
else:
print(f" 文件夹选择失败: {status}")
return False
except Exception as e:
print(f" 选择文件夹失败: {e}")
return False
# 3. 搜索邮件
print("\n3. 搜索邮件:")
def search_emails(server, criteria='ALL'):
"""搜索邮件"""
if not server:
print(" 服务器连接无效")
return []
try:
print(f" 搜索条件: {criteria}")
status, messages = server.search(None, criteria)
if status == 'OK':
# 获取邮件ID列表
email_ids = messages[0].split()
print(f" 找到 {len(email_ids)} 封邮件")
# 显示邮件ID
if email_ids:
print(" 邮件ID列表:")
for i, email_id in enumerate(email_ids[:10], 1): # 只显示前10个
print(f" {i}. {email_id.decode('utf-8')}")
if len(email_ids) > 10:
print(f" ... 还有 {len(email_ids) - 10} 封邮件")
return [id.decode('utf-8') for id in email_ids]
else:
print(f" 搜索失败: {status}")
return []
except Exception as e:
print(f" 搜索邮件失败: {e}")
return []
def advanced_search_examples(server):
"""高级搜索示例"""
print(" 高级搜索示例:")
search_examples = [
('ALL', '所有邮件'),
('UNSEEN', '未读邮件'),
('FROM "example@gmail.com"', '来自特定发件人'),
('SUBJECT "Python"', '主题包含Python'),
('SINCE "01-Jun-2023"', '2023年6月1日之后的邮件'),
('BEFORE "01-Jul-2023"', '2023年7月1日之前的邮件'),
('LARGER 1000000', '大于1MB的邮件'),
('SMALLER 1000', '小于1KB的邮件')
]
for criteria, description in search_examples:
print(f" {criteria} - {description}")
# 实际搜索时取消注释
# email_ids = search_emails(server, criteria)
# 4. 获取邮件内容
print("\n4. 获取邮件内容:")
def fetch_email(server, email_id):
"""获取邮件内容"""
if not server:
print(" 服务器连接无效")
return None
try:
print(f" 正在获取邮件 {email_id}...")
# 获取邮件
status, msg_data = server.fetch(email_id, '(RFC822)')
if status == 'OK':
# 解析邮件
raw_email = msg_data[0][1]
email_message = email.message_from_bytes(raw_email)
# 提取邮件信息
email_info = {
'id': email_id,
'subject': decode_email_header(email_message.get('Subject', '')),
'from': decode_email_header(email_message.get('From', '')),
'to': decode_email_header(email_message.get('To', '')),
'date': email_message.get('Date', ''),
'message_id': email_message.get('Message-ID', '')
}
print(f" ✓ 邮件获取成功")
print(f" 主题: {email_info['subject']}")
print(f" 发件人: {email_info['from']}")
print(f" 日期: {email_info['date']}")
# 提取邮件正文和附件
body, attachments = extract_email_content(email_message)
if body:
print(f" 正文预览: {body[:100]}...")
if attachments:
print(f" 附件数量: {len(attachments)}")
for i, att in enumerate(attachments, 1):
print(f" {i}. {att['filename']} ({att['size']} 字节)")
return email_info, email_message
else:
print(f" 获取邮件失败: {status}")
return None, None
except Exception as e:
print(f" 获取邮件失败: {e}")
return None, None
def extract_email_content(email_message):
"""提取邮件内容和附件"""
body = ''
attachments = []
try:
if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get('Content-Disposition', ''))
if 'attachment' in content_disposition:
# 处理附件
filename = part.get_filename()
if filename:
filename = decode_email_header(filename)
payload = part.get_payload(decode=True)
attachments.append({
'filename': filename,
'content_type': content_type,
'size': len(payload) if payload else 0,
'data': payload
})
elif content_type == 'text/plain' and not body:
# 提取纯文本正文
charset = part.get_content_charset() or 'utf-8'
payload = part.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors='ignore')
elif content_type == 'text/html' and not body:
# 提取HTML正文
charset = part.get_content_charset() or 'utf-8'
payload = part.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors='ignore')
else:
# 单部分邮件
charset = email_message.get_content_charset() or 'utf-8'
payload = email_message.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors='ignore')
return body, attachments
except Exception as e:
print(f" 提取邮件内容失败: {e}")
return '', []
# 5. 邮件标记操作
print("\n5. 邮件标记操作:")
def mark_email_as_read(server, email_id):
"""标记邮件为已读"""
try:
server.store(email_id, '+FLAGS', '\\Seen')
print(f" ✓ 邮件 {email_id} 已标记为已读")
except Exception as e:
print(f" 标记邮件失败: {e}")
def mark_email_as_unread(server, email_id):
"""标记邮件为未读"""
try:
server.store(email_id, '-FLAGS', '\\Seen')
print(f" ✓ 邮件 {email_id} 已标记为未读")
except Exception as e:
print(f" 标记邮件失败: {e}")
def delete_email_imap(server, email_id):
"""删除邮件(IMAP)"""
try:
# 标记为删除
server.store(email_id, '+FLAGS', '\\Deleted')
# 执行删除
server.expunge()
print(f" ✓ 邮件 {email_id} 已删除")
except Exception as e:
print(f" 删除邮件失败: {e}")
# 模拟IMAP操作
print(" 模拟IMAP邮件接收流程:")
print(" 注意:需要配置真实的IMAP服务器信息")
# 模拟操作流程
operations = [
"1. 连接IMAP服务器",
"2. 登录账户",
"3. 列出邮箱文件夹",
"4. 选择INBOX文件夹",
"5. 搜索未读邮件",
"6. 获取邮件内容",
"7. 处理附件",
"8. 标记邮件状态",
"9. 关闭连接"
]
print(" IMAP操作流程:")
for op in operations:
print(f" {op}")
# server = connect_imap_server() # 需要真实配置
# if server:
# list_mailboxes(server)
# if select_mailbox(server, 'INBOX'):
# email_ids = search_emails(server, 'UNSEEN')
# if email_ids:
# fetch_email(server, email_ids[0])
# server.close()
# server.logout()
advanced_search_examples(None)
# 运行IMAP演示
imap_receive_demo()
# 3.3 邮件内容解析
import re
import html2text
from bs4 import BeautifulSoup
import chardet
def email_parsing_demo():
"""邮件内容解析演示"""
print("=== 邮件内容解析演示 ===")
# 1. HTML邮件解析
print("\n1. HTML邮件解析:")
def parse_html_email(html_content):
"""解析HTML邮件"""
print(" 解析HTML邮件内容...")
try:
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取文本内容
text_content = soup.get_text()
# 清理文本
text_content = re.sub(r'\n\s*\n', '\n\n', text_content)
text_content = text_content.strip()
# 提取链接
links = []
for link in soup.find_all('a', href=True):
links.append({
'text': link.get_text().strip(),
'url': link['href']
})
# 提取图片
images = []
for img in soup.find_all('img', src=True):
images.append({
'alt': img.get('alt', ''),
'src': img['src']
})
print(f" ✓ HTML解析完成")
print(f" 文本长度: {len(text_content)} 字符")
print(f" 链接数量: {len(links)}")
print(f" 图片数量: {len(images)}")
if links:
print(" 链接列表:")
for i, link in enumerate(links[:3], 1):
print(f" {i}. {link['text']} -> {link['url']}")
return {
'text': text_content,
'links': links,
'images': images
}
except Exception as e:
print(f" HTML解析失败: {e}")
return None
def html_to_text(html_content):
"""HTML转纯文本"""
try:
# 使用html2text转换
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = False
h.body_width = 0 # 不限制行宽
text = h.handle(html_content)
return text
except Exception as e:
print(f" HTML转文本失败: {e}")
return html_content
# 2. 邮件编码处理
print("\n2. 邮件编码处理:")
def detect_encoding(data):
"""检测文本编码"""
if isinstance(data, str):
return 'utf-8'
try:
result = chardet.detect(data)
encoding = result['encoding']
confidence = result['confidence']
print(f" 检测到编码: {encoding} (置信度: {confidence:.2f})")
return encoding
except Exception as e:
print(f" 编码检测失败: {e}")
return 'utf-8'
def decode_content(data, encoding=None):
"""解码内容"""
if isinstance(data, str):
return data
if not encoding:
encoding = detect_encoding(data)
try:
return data.decode(encoding, errors='ignore')
except Exception as e:
print(f" 解码失败: {e}")
return data.decode('utf-8', errors='ignore')
# 3. 邮件地址解析
print("\n3. 邮件地址解析:")
def parse_email_addresses(address_string):
"""解析邮件地址"""
print(f" 解析地址字符串: {address_string}")
addresses = []
try:
# 分割多个地址
addr_list = re.split(r'[,;]', address_string)
for addr in addr_list:
addr = addr.strip()
if not addr:
continue
# 解析地址格式:"Name" <email@domain.com> 或 email@domain.com
match = re.match(r'^"?([^"<>]+?)"?\s*<([^<>]+)>$', addr)
if match:
name = match.group(1).strip()
email = match.group(2).strip()
else:
# 简单邮件地址
email_match = re.search(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', addr)
if email_match:
email = email_match.group(0)
name = addr.replace(email, '').strip('<> "')
else:
continue
addresses.append({
'name': name if name else '',
'email': email
})
print(f" 解析出 {len(addresses)} 个地址:")
for i, addr in enumerate(addresses, 1):
if addr['name']:
print(f" {i}. {addr['name']} <{addr['email']}>")
else:
print(f" {i}. {addr['email']}")
return addresses
except Exception as e:
print(f" 地址解析失败: {e}")
return []
# 4. 邮件内容提取
print("\n4. 邮件内容提取:")
def extract_email_metadata(email_message):
"""提取邮件元数据"""
metadata = {}
try:
# 基本信息
metadata['subject'] = decode_email_header(email_message.get('Subject', ''))
metadata['from'] = decode_email_header(email_message.get('From', ''))
metadata['to'] = decode_email_header(email_message.get('To', ''))
metadata['cc'] = decode_email_header(email_message.get('Cc', ''))
metadata['bcc'] = decode_email_header(email_message.get('Bcc', ''))
metadata['date'] = email_message.get('Date', '')
metadata['message_id'] = email_message.get('Message-ID', '')
# 技术信息
metadata['content_type'] = email_message.get_content_type()
metadata['charset'] = email_message.get_content_charset()
metadata['transfer_encoding'] = email_message.get('Content-Transfer-Encoding', '')
# 邮件客户端信息
metadata['user_agent'] = email_message.get('User-Agent', '')
metadata['x_mailer'] = email_message.get('X-Mailer', '')
# 路由信息
received_headers = email_message.get_all('Received', [])
metadata['received_count'] = len(received_headers)
print(" 邮件元数据:")
for key, value in metadata.items():
if value:
print(f" {key}: {value}")
return metadata
except Exception as e:
print(f" 元数据提取失败: {e}")
return {}
def extract_urls_from_text(text):
"""从文本中提取URL"""
url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+'
urls = re.findall(url_pattern, text, re.IGNORECASE)
print(f" 从文本中提取到 {len(urls)} 个URL:")
for i, url in enumerate(urls[:5], 1):
print(f" {i}. {url}")
return urls
def extract_phone_numbers(text):
"""从文本中提取电话号码"""
phone_patterns = [
r'\b\d{3}-\d{3}-\d{4}\b', # 123-456-7890
r'\b\d{3}\.\d{3}\.\d{4}\b', # 123.456.7890
r'\b\(\d{3}\)\s*\d{3}-\d{4}\b', # (123) 456-7890
r'\b\d{11}\b', # 12345678901
r'\b\d{3}\s+\d{4}\s+\d{4}\b' # 123 4567 8901
]
phones = []
for pattern in phone_patterns:
phones.extend(re.findall(pattern, text))
print(f" 从文本中提取到 {len(phones)} 个电话号码:")
for i, phone in enumerate(phones[:3], 1):
print(f" {i}. {phone}")
return phones
# 测试解析功能
print(" 测试邮件内容解析:")
# 测试HTML内容
sample_html = """
<html>
<body>
<h1>Python学习通知</h1>
<p>亲爱的学员,</p>
<p>我们的<a href="https://python.org">Python课程</a>已经更新。</p>
<p>请访问 <a href="https://example.com/course">课程页面</a> 查看详情。</p>
<img src="https://example.com/logo.png" alt="Logo">
<p>联系电话:123-456-7890</p>
<p>网站:www.example.com</p>
</body>
</html>
"""
parsed_html = parse_html_email(sample_html)
if parsed_html:
extract_urls_from_text(parsed_html['text'])
extract_phone_numbers(parsed_html['text'])
# 测试地址解析
test_addresses = [
'"张三" <zhangsan@example.com>',
'lisi@example.com',
'"王五" <wangwu@test.com>, "赵六" <zhaoliu@demo.com>'
]
for addr in test_addresses:
parse_email_addresses(addr)
# 运行邮件解析演示
email_parsing_demo()
# 4. 邮件处理和管理
# 4.1 邮件过滤和分类
import re
from datetime import datetime, timedelta
import json
def email_filtering_demo():
"""邮件过滤和分类演示"""
print("=== 邮件过滤和分类演示 ===")
# 1. 邮件过滤器
print("\n1. 邮件过滤器:")
class EmailFilter:
"""邮件过滤器类"""
def __init__(self):
self.rules = []
def add_rule(self, name, condition, action):
"""添加过滤规则"""
rule = {
'name': name,
'condition': condition,
'action': action,
'created': datetime.now().isoformat()
}
self.rules.append(rule)
print(f" ✓ 添加规则: {name}")
def apply_filters(self, email_info):
"""应用过滤规则"""
applied_rules = []
for rule in self.rules:
if self._check_condition(email_info, rule['condition']):
result = self._execute_action(email_info, rule['action'])
applied_rules.append({
'rule_name': rule['name'],
'action_result': result
})
return applied_rules
def _check_condition(self, email_info, condition):
"""检查条件"""
try:
condition_type = condition['type']
if condition_type == 'sender':
pattern = condition['pattern']
return re.search(pattern, email_info.get('from', ''), re.IGNORECASE)
elif condition_type == 'subject':
pattern = condition['pattern']
return re.search(pattern, email_info.get('subject', ''), re.IGNORECASE)
elif condition_type == 'body':
pattern = condition['pattern']
return re.search(pattern, email_info.get('body', ''), re.IGNORECASE)
elif condition_type == 'size':
operator = condition['operator'] # '>', '<', '=='
threshold = condition['value']
size = email_info.get('size', 0)
if operator == '>':
return size > threshold
elif operator == '<':
return size < threshold
elif operator == '==':
return size == threshold
elif condition_type == 'date':
operator = condition['operator']
days_ago = condition['days']
threshold_date = datetime.now() - timedelta(days=days_ago)
email_date = datetime.fromisoformat(email_info.get('date', datetime.now().isoformat()))
if operator == 'older':
return email_date < threshold_date
elif operator == 'newer':
return email_date > threshold_date
return False
except Exception as e:
print(f" 条件检查失败: {e}")
return False
def _execute_action(self, email_info, action):
"""执行动作"""
try:
action_type = action['type']
if action_type == 'move_to_folder':
folder = action['folder']
print(f" → 移动到文件夹: {folder}")
return f"moved_to_{folder}"
elif action_type == 'mark_as_read':
print(f" → 标记为已读")
return "marked_as_read"
elif action_type == 'mark_as_important':
print(f" → 标记为重要")
return "marked_as_important"
elif action_type == 'delete':
print(f" → 删除邮件")
return "deleted"
elif action_type == 'forward':
to_address = action['to']
print(f" → 转发到: {to_address}")
return f"forwarded_to_{to_address}"
elif action_type == 'add_label':
label = action['label']
print(f" → 添加标签: {label}")
return f"labeled_{label}"
return "action_executed"
except Exception as e:
print(f" 动作执行失败: {e}")
return "action_failed"
# 2. 创建过滤规则
print("\n2. 创建过滤规则:")
def create_filter_rules():
"""创建过滤规则"""
email_filter = EmailFilter()
# 垃圾邮件过滤
email_filter.add_rule(
"垃圾邮件过滤",
{
'type': 'subject',
'pattern': r'(广告|促销|优惠|免费|中奖)'
},
{
'type': 'move_to_folder',
'folder': 'spam'
}
)
# 工作邮件分类
email_filter.add_rule(
"工作邮件",
{
'type': 'sender',
'pattern': r'@company\.com$'
},
{
'type': 'move_to_folder',
'folder': 'work'
}
)
# 重要邮件标记
email_filter.add_rule(
"重要邮件",
{
'type': 'subject',
'pattern': r'(紧急|重要|urgent|important)'
},
{
'type': 'mark_as_important'
}
)
# 大附件邮件
email_filter.add_rule(
"大附件邮件",
{
'type': 'size',
'operator': '>',
'value': 5 * 1024 * 1024 # 5MB
},
{
'type': 'add_label',
'label': 'large_attachment'
}
)
# 旧邮件清理
email_filter.add_rule(
"旧邮件清理",
{
'type': 'date',
'operator': 'older',
'days': 365
},
{
'type': 'move_to_folder',
'folder': 'archive'
}
)
return email_filter
# 3. 应用过滤规则
print("\n3. 应用过滤规则:")
def test_email_filtering():
"""测试邮件过滤"""
email_filter = create_filter_rules()
# 测试邮件数据
test_emails = [
{
'subject': '紧急:系统维护通知',
'from': 'admin@company.com',
'body': '系统将于今晚进行维护',
'size': 1024,
'date': datetime.now().isoformat()
},
{
'subject': '免费获得iPhone!点击领取',
'from': 'promo@spam.com',
'body': '恭喜您中奖了!',
'size': 512,
'date': datetime.now().isoformat()
},
{
'subject': '会议资料',
'from': 'colleague@company.com',
'body': '附件是明天会议的资料',
'size': 8 * 1024 * 1024, # 8MB
'date': datetime.now().isoformat()
},
{
'subject': '旧项目文档',
'from': 'archive@oldcompany.com',
'body': '这是去年的项目文档',
'size': 2048,
'date': (datetime.now() - timedelta(days=400)).isoformat()
}
]
print(" 测试邮件过滤:")
for i, email in enumerate(test_emails, 1):
print(f"\n 邮件 {i}: {email['subject']}")
print(f" 发件人: {email['from']}")
applied_rules = email_filter.apply_filters(email)
if applied_rules:
print(f" 应用的规则:")
for rule in applied_rules:
print(f" • {rule['rule_name']}: {rule['action_result']}")
else:
print(f" → 无匹配规则,保持在收件箱")
test_email_filtering()
# 运行邮件过滤演示
email_filtering_demo()
# 4.2 邮件自动回复
import json
from datetime import datetime
import re
def auto_reply_demo():
"""邮件自动回复演示"""
print("=== 邮件自动回复演示 ===")
# 1. 自动回复系统
print("\n1. 自动回复系统:")
class AutoReplySystem:
"""自动回复系统"""
def __init__(self):
self.templates = {}
self.rules = []
self.enabled = True
def add_template(self, name, subject, body, variables=None):
"""添加回复模板"""
template = {
'name': name,
'subject': subject,
'body': body,
'variables': variables or [],
'created': datetime.now().isoformat()
}
self.templates[name] = template
print(f" ✓ 添加模板: {name}")
def add_rule(self, name, condition, template_name, delay_hours=0):
"""添加自动回复规则"""
rule = {
'name': name,
'condition': condition,
'template': template_name,
'delay_hours': delay_hours,
'enabled': True
}
self.rules.append(rule)
print(f" ✓ 添加规则: {name}")
def process_email(self, email_info):
"""处理邮件并生成自动回复"""
if not self.enabled:
return None
for rule in self.rules:
if not rule['enabled']:
continue
if self._check_condition(email_info, rule['condition']):
template_name = rule['template']
if template_name in self.templates:
reply = self._generate_reply(email_info, template_name)
reply['delay_hours'] = rule['delay_hours']
reply['rule_name'] = rule['name']
return reply
return None
def _check_condition(self, email_info, condition):
"""检查触发条件"""
try:
condition_type = condition['type']
if condition_type == 'keyword':
keywords = condition['keywords']
text = f"{email_info.get('subject', '')} {email_info.get('body', '')}"
return any(keyword.lower() in text.lower() for keyword in keywords)
elif condition_type == 'sender_domain':
domain = condition['domain']
sender = email_info.get('from', '')
return domain in sender
elif condition_type == 'time_range':
start_hour = condition['start_hour']
end_hour = condition['end_hour']
current_hour = datetime.now().hour
if start_hour <= end_hour:
return start_hour <= current_hour <= end_hour
else: # 跨午夜
return current_hour >= start_hour or current_hour <= end_hour
elif condition_type == 'first_contact':
# 简化实现:假设检查是否为首次联系
return email_info.get('is_first_contact', False)
return False
except Exception as e:
print(f" 条件检查失败: {e}")
return False
def _generate_reply(self, email_info, template_name):
"""生成回复邮件"""
template = self.templates[template_name]
# 提取变量
variables = {
'sender_name': self._extract_sender_name(email_info.get('from', '')),
'original_subject': email_info.get('subject', ''),
'current_date': datetime.now().strftime('%Y年%m月%d日'),
'current_time': datetime.now().strftime('%H:%M')
}
# 替换模板变量
subject = self._replace_variables(template['subject'], variables)
body = self._replace_variables(template['body'], variables)
reply = {
'to': email_info.get('from', ''),
'subject': subject,
'body': body,
'template_used': template_name,
'generated_at': datetime.now().isoformat()
}
return reply
def _extract_sender_name(self, from_address):
"""提取发件人姓名"""
# 简单提取:"Name" <email@domain.com> -> Name
match = re.match(r'^"?([^"<>]+?)"?\s*<', from_address)
if match:
return match.group(1).strip()
else:
# 从邮箱地址提取用户名
email_match = re.search(r'([^@<>\s]+)@', from_address)
if email_match:
return email_match.group(1)
return '朋友'
def _replace_variables(self, text, variables):
"""替换模板变量"""
for var_name, var_value in variables.items():
text = text.replace(f'{{{var_name}}}', str(var_value))
return text
# 2. 创建回复模板
print("\n2. 创建回复模板:")
def create_reply_templates():
"""创建回复模板"""
auto_reply = AutoReplySystem()
# 外出自动回复
auto_reply.add_template(
"外出回复",
"自动回复:Re: {original_subject}",
"""
亲爱的 {sender_name},
感谢您的邮件。我目前外出办公,将于下周一返回。
如有紧急事务,请联系我的同事:
- 张三:zhangsan@company.com
- 李四:lisi@company.com
我会在返回后尽快回复您的邮件。
此邮件为自动回复,请勿直接回复。
谢谢!
{current_date}
"""
)
# 客服自动回复
auto_reply.add_template(
"客服回复",
"收到您的咨询:{original_subject}",
"""
尊敬的 {sender_name},
感谢您联系我们的客服团队!
我们已收到您于 {current_date} {current_time} 发送的邮件,我们的客服代表将在24小时内回复您。
如需紧急帮助,请拨打客服热线:400-123-4567
常见问题解答:https://example.com/faq
谢谢您的耐心等待!
客服团队
"""
)
# 招聘自动回复
auto_reply.add_template(
"招聘回复",
"感谢您的求职申请",
"""
亲爱的 {sender_name},
感谢您对我们公司的关注和求职申请!
我们已收到您的简历,人力资源部门将在5个工作日内审核您的申请。
如果您的背景符合我们的要求,我们会尽快与您联系安排面试。
在此期间,您可以访问我们的官网了解更多公司信息:
https://company.com/about
再次感谢您的申请!
人力资源部
{current_date}
"""
)
# 技术支持回复
auto_reply.add_template(
"技术支持回复",
"技术支持工单已创建:{original_subject}",
"""
您好 {sender_name},
感谢您联系技术支持!
我们已为您创建了支持工单,工单编号:TS-{current_date}-001
我们的技术工程师将在2个工作日内分析您的问题并提供解决方案。
您可以通过以下方式跟踪工单状态:
- 访问:https://support.company.com
- 电话:400-888-9999
常见问题可能的解决方案:
1. 重启应用程序
2. 清除浏览器缓存
3. 检查网络连接
技术支持团队
"""
)
return auto_reply
# 3. 设置自动回复规则
print("\n3. 设置自动回复规则:")
def setup_reply_rules():
"""设置自动回复规则"""
auto_reply = create_reply_templates()
# 外出时间自动回复
auto_reply.add_rule(
"外出自动回复",
{
'type': 'time_range',
'start_hour': 18, # 下午6点后
'end_hour': 8 # 上午8点前
},
"外出回复"
)
# 客服邮箱自动回复
auto_reply.add_rule(
"客服自动回复",
{
'type': 'sender_domain',
'domain': 'customer'
},
"客服回复",
delay_hours=0
)
# 招聘相关自动回复
auto_reply.add_rule(
"招聘自动回复",
{
'type': 'keyword',
'keywords': ['求职', '应聘', '简历', '招聘', 'job', 'resume']
},
"招聘回复",
delay_hours=1
)
# 技术支持自动回复
auto_reply.add_rule(
"技术支持自动回复",
{
'type': 'keyword',
'keywords': ['bug', '错误', '问题', '故障', 'error', 'help']
},
"技术支持回复"
)
return auto_reply
# 4. 测试自动回复
print("\n4. 测试自动回复:")
def test_auto_reply():
"""测试自动回复功能"""
auto_reply = setup_reply_rules()
# 测试邮件
test_emails = [
{
'from': '"张三" <zhangsan@customer.com>',
'subject': '产品咨询',
'body': '我想了解你们的产品功能',
'is_first_contact': True
},
{
'from': 'lisi@jobseeker.com',
'subject': '求职申请 - Python开发工程师',
'body': '附件是我的简历,希望能加入贵公司',
'is_first_contact': True
},
{
'from': 'user@example.com',
'subject': '系统bug报告',
'body': '登录时出现error 500错误',
'is_first_contact': False
},
{
'from': 'friend@personal.com',
'subject': '周末聚会',
'body': '这周末有空吗?一起吃饭',
'is_first_contact': False
}
]
print(" 测试自动回复:")
for i, email in enumerate(test_emails, 1):
print(f"\n 邮件 {i}: {email['subject']}")
print(f" 发件人: {email['from']}")
reply = auto_reply.process_email(email)
if reply:
print(f" ✓ 生成自动回复:")
print(f" 规则: {reply['rule_name']}")
print(f" 模板: {reply['template_used']}")
print(f" 延迟: {reply['delay_hours']} 小时")
print(f" 主题: {reply['subject']}")
print(f" 正文预览: {reply['body'][:100]}...")
else:
print(f" → 无匹配规则,不发送自动回复")
test_auto_reply()
# 运行自动回复演示
auto_reply_demo()
# 4.3 邮件统计和分析
import json
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
def email_analytics_demo():
"""邮件统计和分析演示"""
print("=== 邮件统计和分析演示 ===")
# 1. 邮件统计类
print("\n1. 邮件统计分析:")
class EmailAnalytics:
"""邮件分析类"""
def __init__(self):
self.emails = []
self.stats = {}
def add_email(self, email_info):
"""添加邮件数据"""
self.emails.append(email_info)
def analyze_volume(self):
"""分析邮件量"""
print(" 邮件量分析:")
total_emails = len(self.emails)
print(f" 总邮件数: {total_emails}")
if total_emails == 0:
return
# 按日期统计
daily_count = defaultdict(int)
for email in self.emails:
date = email.get('date', '').split('T')[0] # 提取日期部分
daily_count[date] += 1
print(f" 活跃天数: {len(daily_count)}")
if daily_count:
avg_daily = total_emails / len(daily_count)
print(f" 日均邮件: {avg_daily:.1f}")
max_day = max(daily_count.items(), key=lambda x: x[1])
print(f" 最忙的一天: {max_day[0]} ({max_day[1]} 封)")
# 按小时统计
hourly_count = defaultdict(int)
for email in self.emails:
try:
dt = datetime.fromisoformat(email.get('date', ''))
hour = dt.hour
hourly_count[hour] += 1
except:
continue
if hourly_count:
peak_hour = max(hourly_count.items(), key=lambda x: x[1])
print(f" 邮件高峰时段: {peak_hour[0]}:00 ({peak_hour[1]} 封)")
def analyze_senders(self):
"""分析发件人"""
print("\n 发件人分析:")
sender_count = Counter()
domain_count = Counter()
for email in self.emails:
sender = email.get('from', '')
sender_count[sender] += 1
# 提取域名
if '@' in sender:
domain = sender.split('@')[-1].split('>')[0]
domain_count[domain] += 1
print(f" 独立发件人数: {len(sender_count)}")
# 最活跃发件人
if sender_count:
top_senders = sender_count.most_common(5)
print(" 最活跃发件人:")
for i, (sender, count) in enumerate(top_senders, 1):
print(f" {i}. {sender}: {count} 封")
# 最常见域名
if domain_count:
top_domains = domain_count.most_common(5)
print(" 最常见域名:")
for i, (domain, count) in enumerate(top_domains, 1):
print(f" {i}. {domain}: {count} 封")
def analyze_subjects(self):
"""分析主题"""
print("\n 主题分析:")
# 主题关键词统计
keyword_count = Counter()
subject_lengths = []
for email in self.emails:
subject = email.get('subject', '')
subject_lengths.append(len(subject))
# 提取关键词(简单分词)
words = re.findall(r'\b\w{2,}\b', subject.lower())
for word in words:
if len(word) > 2: # 过滤短词
keyword_count[word] += 1
if subject_lengths:
avg_length = sum(subject_lengths) / len(subject_lengths)
print(f" 平均主题长度: {avg_length:.1f} 字符")
print(f" 最长主题: {max(subject_lengths)} 字符")
print(f" 最短主题: {min(subject_lengths)} 字符")
# 热门关键词
if keyword_count:
top_keywords = keyword_count.most_common(10)
print(" 热门关键词:")
for i, (keyword, count) in enumerate(top_keywords, 1):
print(f" {i}. {keyword}: {count} 次")
def analyze_sizes(self):
"""分析邮件大小"""
print("\n 邮件大小分析:")
sizes = [email.get('size', 0) for email in self.emails]
sizes = [s for s in sizes if s > 0] # 过滤无效大小
if not sizes:
print(" 无大小数据")
return
total_size = sum(sizes)
avg_size = total_size / len(sizes)
print(f" 总大小: {self._format_size(total_size)}")
print(f" 平均大小: {self._format_size(avg_size)}")
print(f" 最大邮件: {self._format_size(max(sizes))}")
print(f" 最小邮件: {self._format_size(min(sizes))}")
# 大小分布
size_ranges = {
'< 1KB': 0,
'1KB - 10KB': 0,
'10KB - 100KB': 0,
'100KB - 1MB': 0,
'> 1MB': 0
}
for size in sizes:
if size < 1024:
size_ranges['< 1KB'] += 1
elif size < 10 * 1024:
size_ranges['1KB - 10KB'] += 1
elif size < 100 * 1024:
size_ranges['10KB - 100KB'] += 1
elif size < 1024 * 1024:
size_ranges['100KB - 1MB'] += 1
else:
size_ranges['> 1MB'] += 1
print(" 大小分布:")
for range_name, count in size_ranges.items():
percentage = (count / len(sizes)) * 100
print(f" {range_name}: {count} 封 ({percentage:.1f}%)")
def _format_size(self, size_bytes):
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def generate_report(self):
"""生成分析报告"""
print("\n 生成邮件分析报告:")
report = {
'generated_at': datetime.now().isoformat(),
'total_emails': len(self.emails),
'analysis_period': self._get_date_range(),
'summary': self._generate_summary()
}
print(f" 报告生成时间: {report['generated_at']}")
print(f" 分析期间: {report['analysis_period']}")
print(f" 邮件总数: {report['total_emails']}")
return report
def _get_date_range(self):
"""获取日期范围"""
if not self.emails:
return "无数据"
dates = []
for email in self.emails:
try:
dt = datetime.fromisoformat(email.get('date', ''))
dates.append(dt)
except:
continue
if dates:
start_date = min(dates).strftime('%Y-%m-%d')
end_date = max(dates).strftime('%Y-%m-%d')
return f"{start_date} 至 {end_date}"
return "日期解析失败"
def _generate_summary(self):
"""生成摘要"""
if not self.emails:
return "无邮件数据"
# 计算基本统计
total = len(self.emails)
# 发件人统计
senders = set(email.get('from', '') for email in self.emails)
unique_senders = len(senders)
# 大小统计
sizes = [email.get('size', 0) for email in self.emails if email.get('size', 0) > 0]
avg_size = sum(sizes) / len(sizes) if sizes else 0
summary = f"共分析 {total} 封邮件,来自 {unique_senders} 个不同发件人,平均大小 {self._format_size(avg_size)}"
return summary
# 2. 生成测试数据
print("\n2. 生成测试数据:")
def generate_test_emails():
"""生成测试邮件数据"""
import random
senders = [
'boss@company.com',
'colleague@company.com',
'client@customer.com',
'support@vendor.com',
'newsletter@news.com',
'spam@promo.com'
]
subjects = [
'会议通知',
'项目进度更新',
'客户反馈',
'系统维护通知',
'新闻简报',
'促销活动',
'技术支持',
'发票确认',
'培训邀请',
'重要通知'
]
emails = []
base_date = datetime.now() - timedelta(days=30)
for i in range(100):
# 随机日期(最近30天)
random_days = random.randint(0, 30)
random_hours = random.randint(0, 23)
email_date = base_date + timedelta(days=random_days, hours=random_hours)
email = {
'from': random.choice(senders),
'subject': random.choice(subjects),
'date': email_date.isoformat(),
'size': random.randint(500, 50000), # 500B - 50KB
'body': f"这是第{i+1}封测试邮件的内容"
}
emails.append(email)
print(f" 生成了 {len(emails)} 封测试邮件")
return emails
# 3. 执行分析
print("\n3. 执行邮件分析:")
def run_analysis():
"""运行邮件分析"""
# 创建分析器
analytics = EmailAnalytics()
# 添加测试数据
test_emails = generate_test_emails()
for email in test_emails:
analytics.add_email(email)
# 执行各项分析
analytics.analyze_volume()
analytics.analyze_senders()
analytics.analyze_subjects()
analytics.analyze_sizes()
# 生成报告
report = analytics.generate_report()
return analytics, report
analytics, report = run_analysis()
print(f"\n 分析完成!")
print(f" 报告摘要: {report['summary']}")
# 运行邮件分析演示
email_analytics_demo()
# 5. 实际应用案例
# 5.1 邮件备份系统
import os
import json
import sqlite3
from datetime import datetime
import hashlib
def email_backup_demo():
"""邮件备份系统演示"""
print("=== 邮件备份系统演示 ===")
# 1. 邮件备份类
print("\n1. 邮件备份系统:")
class EmailBackupSystem:
"""邮件备份系统"""
def __init__(self, backup_dir="email_backup"):
self.backup_dir = backup_dir
self.db_path = os.path.join(backup_dir, "emails.db")
self._init_backup_dir()
self._init_database()
def _init_backup_dir(self):
"""初始化备份目录"""
if not os.path.exists(self.backup_dir):
os.makedirs(self.backup_dir)
print(f" ✓ 创建备份目录: {self.backup_dir}")
# 创建子目录
subdirs = ['attachments', 'exports', 'logs']
for subdir in subdirs:
subdir_path = os.path.join(self.backup_dir, subdir)
if not os.path.exists(subdir_path):
os.makedirs(subdir_path)
def _init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建邮件表
cursor.execute('''
CREATE TABLE IF NOT EXISTS emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT UNIQUE,
sender TEXT,
recipients TEXT,
subject TEXT,
date_sent TEXT,
date_received TEXT,
body_text TEXT,
body_html TEXT,
attachments TEXT,
size INTEGER,
hash TEXT,
backup_date TEXT
)
''')
# 创建附件表
cursor.execute('''
CREATE TABLE IF NOT EXISTS attachments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email_id INTEGER,
filename TEXT,
content_type TEXT,
size INTEGER,
file_path TEXT,
hash TEXT,
FOREIGN KEY (email_id) REFERENCES emails (id)
)
''')
conn.commit()
conn.close()
print(f" ✓ 初始化数据库: {self.db_path}")
def backup_email(self, email_data):
"""备份单封邮件"""
try:
# 计算邮件哈希
email_hash = self._calculate_email_hash(email_data)
# 检查是否已备份
if self._is_email_backed_up(email_hash):
print(f" 邮件已存在,跳过: {email_data.get('subject', 'No Subject')[:50]}")
return False
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 插入邮件记录
cursor.execute('''
INSERT INTO emails (
message_id, sender, recipients, subject, date_sent,
date_received, body_text, body_html, attachments,
size, hash, backup_date
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
email_data.get('message_id', ''),
email_data.get('from', ''),
json.dumps(email_data.get('to', [])),
email_data.get('subject', ''),
email_data.get('date', ''),
datetime.now().isoformat(),
email_data.get('body_text', ''),
email_data.get('body_html', ''),
json.dumps(email_data.get('attachments', [])),
email_data.get('size', 0),
email_hash,
datetime.now().isoformat()
))
email_id = cursor.lastrowid
# 备份附件
if 'attachments' in email_data:
for attachment in email_data['attachments']:
self._backup_attachment(cursor, email_id, attachment)
conn.commit()
conn.close()
print(f" ✓ 备份邮件: {email_data.get('subject', 'No Subject')[:50]}")
return True
except Exception as e:
print(f" ✗ 备份失败: {e}")
return False
def _calculate_email_hash(self, email_data):
"""计算邮件哈希值"""
# 使用关键字段计算哈希
key_fields = [
email_data.get('message_id', ''),
email_data.get('from', ''),
email_data.get('subject', ''),
email_data.get('date', ''),
email_data.get('body_text', '')
]
content = '|'.join(key_fields)
return hashlib.md5(content.encode('utf-8')).hexdigest()
def _is_email_backed_up(self, email_hash):
"""检查邮件是否已备份"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT id FROM emails WHERE hash = ?', (email_hash,))
result = cursor.fetchone()
conn.close()
return result is not None
def _backup_attachment(self, cursor, email_id, attachment):
"""备份附件"""
try:
filename = attachment.get('filename', 'unknown')
content = attachment.get('content', b'')
# 生成安全的文件名
safe_filename = self._generate_safe_filename(filename)
file_path = os.path.join(self.backup_dir, 'attachments', safe_filename)
# 保存附件文件
with open(file_path, 'wb') as f:
f.write(content)
# 计算文件哈希
file_hash = hashlib.md5(content).hexdigest()
# 插入附件记录
cursor.execute('''
INSERT INTO attachments (
email_id, filename, content_type, size, file_path, hash
) VALUES (?, ?, ?, ?, ?, ?)
''', (
email_id,
filename,
attachment.get('content_type', ''),
len(content),
file_path,
file_hash
))
print(f" ✓ 备份附件: {filename}")
except Exception as e:
print(f" ✗ 附件备份失败: {e}")
def _generate_safe_filename(self, filename):
"""生成安全的文件名"""
# 移除危险字符
safe_chars = "-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
safe_filename = ''.join(c for c in filename if c in safe_chars)
# 添加时间戳避免冲突
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
name, ext = os.path.splitext(safe_filename)
return f"{name}_{timestamp}{ext}"
def search_emails(self, query, search_type='subject'):
"""搜索邮件"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if search_type == 'subject':
cursor.execute(
'SELECT * FROM emails WHERE subject LIKE ?',
(f'%{query}%',)
)
elif search_type == 'sender':
cursor.execute(
'SELECT * FROM emails WHERE sender LIKE ?',
(f'%{query}%',)
)
elif search_type == 'content':
cursor.execute(
'SELECT * FROM emails WHERE body_text LIKE ? OR body_html LIKE ?',
(f'%{query}%', f'%{query}%')
)
else:
cursor.execute('SELECT * FROM emails')
results = cursor.fetchall()
conn.close()
return results
def export_emails(self, output_format='json', date_range=None):
"""导出邮件"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 构建查询
query = 'SELECT * FROM emails'
params = []
if date_range:
query += ' WHERE date_sent BETWEEN ? AND ?'
params.extend(date_range)
cursor.execute(query, params)
emails = cursor.fetchall()
# 获取列名
columns = [description[0] for description in cursor.description]
conn.close()
# 转换为字典列表
email_dicts = []
for email in emails:
email_dict = dict(zip(columns, email))
email_dicts.append(email_dict)
# 导出文件
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if output_format == 'json':
filename = f"emails_export_{timestamp}.json"
filepath = os.path.join(self.backup_dir, 'exports', filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(email_dicts, f, ensure_ascii=False, indent=2)
elif output_format == 'csv':
import csv
filename = f"emails_export_{timestamp}.csv"
filepath = os.path.join(self.backup_dir, 'exports', filename)
with open(filepath, 'w', newline='', encoding='utf-8') as f:
if email_dicts:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
writer.writerows(email_dicts)
print(f" ✓ 导出完成: {filepath}")
return filepath
def get_backup_stats(self):
"""获取备份统计"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 邮件统计
cursor.execute('SELECT COUNT(*) FROM emails')
total_emails = cursor.fetchone()[0]
cursor.execute('SELECT SUM(size) FROM emails')
total_size = cursor.fetchone()[0] or 0
# 附件统计
cursor.execute('SELECT COUNT(*) FROM attachments')
total_attachments = cursor.fetchone()[0]
cursor.execute('SELECT SUM(size) FROM attachments')
attachments_size = cursor.fetchone()[0] or 0
# 日期范围
cursor.execute('SELECT MIN(date_sent), MAX(date_sent) FROM emails')
date_range = cursor.fetchone()
conn.close()
stats = {
'total_emails': total_emails,
'total_size': total_size,
'total_attachments': total_attachments,
'attachments_size': attachments_size,
'date_range': date_range,
'backup_dir': self.backup_dir
}
return stats
# 2. 测试备份系统
print("\n2. 测试备份系统:")
def test_backup_system():
"""测试备份系统"""
# 创建备份系统
backup_system = EmailBackupSystem("test_backup")
# 生成测试邮件
test_emails = [
{
'message_id': 'msg001@example.com',
'from': 'sender1@example.com',
'to': ['recipient@example.com'],
'subject': '重要会议通知',
'date': '2024-01-15T10:30:00',
'body_text': '明天下午2点开会,请准时参加。',
'body_html': '<p>明天下午2点开会,请准时参加。</p>',
'size': 1024,
'attachments': [
{
'filename': 'agenda.pdf',
'content_type': 'application/pdf',
'content': b'PDF content here'
}
]
},
{
'message_id': 'msg002@example.com',
'from': 'sender2@example.com',
'to': ['recipient@example.com'],
'subject': '项目进度报告',
'date': '2024-01-16T14:20:00',
'body_text': '本周项目进度如下...',
'body_html': '<p>本周项目进度如下...</p>',
'size': 2048
}
]
# 备份邮件
print(" 备份邮件:")
for email in test_emails:
backup_system.backup_email(email)
# 重复备份测试(应该跳过)
print("\n 重复备份测试:")
backup_system.backup_email(test_emails[0])
# 搜索测试
print("\n 搜索测试:")
results = backup_system.search_emails('会议', 'subject')
print(f" 找到 {len(results)} 封包含'会议'的邮件")
# 导出测试
print("\n 导出测试:")
json_file = backup_system.export_emails('json')
# 统计信息
print("\n 备份统计:")
stats = backup_system.get_backup_stats()
print(f" 总邮件数: {stats['total_emails']}")
print(f" 总大小: {stats['total_size']} 字节")
print(f" 附件数: {stats['total_attachments']}")
print(f" 日期范围: {stats['date_range'][0]} 至 {stats['date_range'][1]}")
test_backup_system()
# 运行邮件备份演示
email_backup_demo()
# 5.2 邮件监控系统
import time
import threading
from datetime import datetime, timedelta
import logging
def email_monitoring_demo():
"""邮件监控系统演示"""
print("=== 邮件监控系统演示 ===")
# 1. 邮件监控类
print("\n1. 邮件监控系统:")
class EmailMonitor:
"""邮件监控系统"""
def __init__(self, check_interval=60):
self.check_interval = check_interval # 检查间隔(秒)
self.is_running = False
self.monitor_thread = None
self.alerts = []
self.rules = []
self.stats = {
'total_checked': 0,
'alerts_triggered': 0,
'last_check': None
}
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def add_rule(self, name, condition, alert_type, threshold=None):
"""添加监控规则"""
rule = {
'name': name,
'condition': condition,
'alert_type': alert_type,
'threshold': threshold,
'enabled': True,
'last_triggered': None,
'trigger_count': 0
}
self.rules.append(rule)
print(f" ✓ 添加监控规则: {name}")
def start_monitoring(self):
"""开始监控"""
if self.is_running:
print(" 监控已在运行中")
return
self.is_running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print(f" ✓ 开始邮件监控,检查间隔: {self.check_interval} 秒")
def stop_monitoring(self):
"""停止监控"""
self.is_running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
print(" ✓ 停止邮件监控")
def _monitor_loop(self):
"""监控循环"""
while self.is_running:
try:
self._check_emails()
self.stats['last_check'] = datetime.now().isoformat()
time.sleep(self.check_interval)
except Exception as e:
self.logger.error(f"监控检查失败: {e}")
time.sleep(self.check_interval)
def _check_emails(self):
"""检查邮件"""
# 模拟获取新邮件
new_emails = self._fetch_new_emails()
self.stats['total_checked'] += len(new_emails)
for email in new_emails:
self._evaluate_rules(email)
def _fetch_new_emails(self):
"""获取新邮件(模拟)"""
# 这里应该连接到实际的邮件服务器
# 为演示目的,返回模拟数据
import random
if random.random() < 0.3: # 30% 概率有新邮件
return [
{
'from': random.choice([
'important@company.com',
'spam@suspicious.com',
'client@customer.com'
]),
'subject': random.choice([
'紧急:系统故障',
'免费赠品!',
'会议邀请',
'发票确认'
]),
'size': random.randint(1000, 100000),
'date': datetime.now().isoformat(),
'body': '邮件内容...'
}
]
return []
def _evaluate_rules(self, email):
"""评估监控规则"""
for rule in self.rules:
if not rule['enabled']:
continue
if self._check_rule_condition(email, rule):
self._trigger_alert(email, rule)
def _check_rule_condition(self, email, rule):
"""检查规则条件"""
condition = rule['condition']
condition_type = condition['type']
if condition_type == 'sender_suspicious':
suspicious_domains = condition.get('domains', [])
sender = email.get('from', '')
return any(domain in sender for domain in suspicious_domains)
elif condition_type == 'large_email':
threshold = rule.get('threshold', 10 * 1024 * 1024) # 10MB
return email.get('size', 0) > threshold
elif condition_type == 'keyword_alert':
keywords = condition.get('keywords', [])
text = f"{email.get('subject', '')} {email.get('body', '')}"
return any(keyword.lower() in text.lower() for keyword in keywords)
elif condition_type == 'high_volume':
# 检查短时间内的邮件量
threshold = rule.get('threshold', 10)
time_window = condition.get('time_window', 300) # 5分钟
# 这里应该检查实际的邮件量
# 为演示目的,随机返回
import random
return random.random() < 0.1 # 10% 概率触发
return False
def _trigger_alert(self, email, rule):
"""触发警报"""
alert = {
'id': len(self.alerts) + 1,
'rule_name': rule['name'],
'alert_type': rule['alert_type'],
'email_info': {
'from': email.get('from', ''),
'subject': email.get('subject', ''),
'date': email.get('date', '')
},
'triggered_at': datetime.now().isoformat(),
'severity': self._get_alert_severity(rule['alert_type'])
}
self.alerts.append(alert)
rule['last_triggered'] = alert['triggered_at']
rule['trigger_count'] += 1
self.stats['alerts_triggered'] += 1
# 记录日志
self.logger.warning(
f"警报触发: {rule['name']} - {email.get('subject', 'No Subject')}"
)
# 发送通知
self._send_notification(alert)
def _get_alert_severity(self, alert_type):
"""获取警报严重级别"""
severity_map = {
'security': 'HIGH',
'spam': 'MEDIUM',
'volume': 'MEDIUM',
'size': 'LOW',
'keyword': 'MEDIUM'
}
return severity_map.get(alert_type, 'LOW')
def _send_notification(self, alert):
"""发送通知"""
# 这里可以实现各种通知方式:
# - 邮件通知
# - 短信通知
# - 即时消息
# - 系统通知
print(f" 🚨 {alert['severity']} 警报: {alert['rule_name']}")
print(f" 邮件: {alert['email_info']['subject']}")
print(f" 发件人: {alert['email_info']['from']}")
def get_alerts(self, severity=None, limit=None):
"""获取警报列表"""
alerts = self.alerts
if severity:
alerts = [a for a in alerts if a['severity'] == severity]
if limit:
alerts = alerts[-limit:]
return alerts
def clear_alerts(self, older_than_days=None):
"""清理警报"""
if older_than_days:
cutoff_date = datetime.now() - timedelta(days=older_than_days)
cutoff_str = cutoff_date.isoformat()
original_count = len(self.alerts)
self.alerts = [
alert for alert in self.alerts
if alert['triggered_at'] > cutoff_str
]
cleared_count = original_count - len(self.alerts)
print(f" ✓ 清理了 {cleared_count} 个旧警报")
else:
cleared_count = len(self.alerts)
self.alerts = []
print(f" ✓ 清理了所有 {cleared_count} 个警报")
def get_stats(self):
"""获取监控统计"""
stats = self.stats.copy()
stats.update({
'active_rules': len([r for r in self.rules if r['enabled']]),
'total_rules': len(self.rules),
'pending_alerts': len(self.alerts),
'is_running': self.is_running
})
return stats
# 2. 配置监控规则
print("\n2. 配置监控规则:")
def setup_monitoring_rules():
"""设置监控规则"""
monitor = EmailMonitor(check_interval=5) # 5秒检查一次(演示用)
# 可疑发件人监控
monitor.add_rule(
"可疑发件人检测",
{
'type': 'sender_suspicious',
'domains': ['suspicious.com', 'spam.net', 'phishing.org']
},
'security'
)
# 大邮件监控
monitor.add_rule(
"大邮件检测",
{
'type': 'large_email'
},
'size',
threshold=5 * 1024 * 1024 # 5MB
)
# 关键词警报
monitor.add_rule(
"敏感关键词检测",
{
'type': 'keyword_alert',
'keywords': ['紧急', '故障', '病毒', '攻击', 'urgent', 'critical']
},
'keyword'
)
# 邮件量异常监控
monitor.add_rule(
"邮件量异常检测",
{
'type': 'high_volume',
'time_window': 300 # 5分钟
},
'volume',
threshold=20 # 5分钟内超过20封
)
return monitor
# 3. 测试监控系统
print("\n3. 测试监控系统:")
def test_monitoring_system():
"""测试监控系统"""
monitor = setup_monitoring_rules()
# 开始监控
monitor.start_monitoring()
print(" 监控运行中,等待警报...")
# 运行一段时间
time.sleep(15)
# 检查统计
stats = monitor.get_stats()
print(f"\n 监控统计:")
print(f" 检查的邮件数: {stats['total_checked']}")
print(f" 触发的警报数: {stats['alerts_triggered']}")
print(f" 活跃规则数: {stats['active_rules']}")
print(f" 最后检查时间: {stats['last_check']}")
# 显示警报
alerts = monitor.get_alerts()
if alerts:
print(f"\n 最近的警报:")
for alert in alerts[-5:]: # 显示最近5个
print(f" {alert['severity']} - {alert['rule_name']}")
print(f" {alert['email_info']['subject']}")
# 停止监控
monitor.stop_monitoring()
test_monitoring_system()
# 运行邮件监控演示
email_monitoring_demo()
# 6. 最佳实践和安全
# 6.1 邮件安全最佳实践
import ssl
import base64
from cryptography.fernet import Fernet
import keyring
def email_security_demo():
"""邮件安全最佳实践演示"""
print("=== 邮件安全最佳实践演示 ===")
# 1. 安全连接配置
print("\n1. 安全连接配置:")
def create_secure_smtp_connection():
"""创建安全的SMTP连接"""
import smtplib
# 创建SSL上下文
context = ssl.create_default_context()
# 配置SSL选项
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
# 禁用不安全的协议
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
print(" ✓ 创建安全SSL上下文")
try:
# 使用SMTP_SSL直接建立加密连接
server = smtplib.SMTP_SSL('smtp.gmail.com', 465, context=context)
print(" ✓ 建立SSL加密连接")
# 或者使用STARTTLS
# server = smtplib.SMTP('smtp.gmail.com', 587)
# server.starttls(context=context)
return server
except Exception as e:
print(f" ✗ 连接失败: {e}")
return None
def create_secure_imap_connection():
"""创建安全的IMAP连接"""
import imaplib
# 创建SSL上下文
context = ssl.create_default_context()
try:
# 使用IMAP4_SSL
server = imaplib.IMAP4_SSL('imap.gmail.com', 993, ssl_context=context)
print(" ✓ 建立IMAP SSL连接")
return server
except Exception as e:
print(f" ✗ IMAP连接失败: {e}")
return None
# 测试安全连接
smtp_server = create_secure_smtp_connection()
if smtp_server:
smtp_server.quit()
imap_server = create_secure_imap_connection()
if imap_server:
imap_server.logout()
# 2. 密码安全管理
print("\n2. 密码安全管理:")
class SecureCredentialManager:
"""安全凭据管理器"""
def __init__(self, service_name="email_client"):
self.service_name = service_name
def store_credentials(self, username, password):
"""安全存储凭据"""
try:
# 使用系统密钥环存储密码
keyring.set_password(self.service_name, username, password)
print(f" ✓ 安全存储凭据: {username}")
return True
except Exception as e:
print(f" ✗ 存储失败: {e}")
return False
def get_credentials(self, username):
"""获取存储的凭据"""
try:
password = keyring.get_password(self.service_name, username)
if password:
print(f" ✓ 获取凭据: {username}")
return password
else:
print(f" ✗ 未找到凭据: {username}")
return None
except Exception as e:
print(f" ✗ 获取失败: {e}")
return None
def delete_credentials(self, username):
"""删除存储的凭据"""
try:
keyring.delete_password(self.service_name, username)
print(f" ✓ 删除凭据: {username}")
return True
except Exception as e:
print(f" ✗ 删除失败: {e}")
return False
def encrypt_data(self, data):
"""加密敏感数据"""
# 生成密钥
key = Fernet.generate_key()
cipher = Fernet(key)
# 加密数据
encrypted_data = cipher.encrypt(data.encode())
print(" ✓ 数据已加密")
return key, encrypted_data
def decrypt_data(self, key, encrypted_data):
"""解密数据"""
try:
cipher = Fernet(key)
decrypted_data = cipher.decrypt(encrypted_data)
print(" ✓ 数据已解密")
return decrypted_data.decode()
except Exception as e:
print(f" ✗ 解密失败: {e}")
return None
# 测试凭据管理
cred_manager = SecureCredentialManager()
# 注意:实际使用时不要在代码中硬编码密码
test_username = "test@example.com"
test_password = "secure_password_123"
# 存储和获取凭据(仅演示,实际环境中谨慎使用)
# cred_manager.store_credentials(test_username, test_password)
# retrieved_password = cred_manager.get_credentials(test_username)
# 测试数据加密
sensitive_data = "这是敏感的邮件内容"
key, encrypted = cred_manager.encrypt_data(sensitive_data)
decrypted = cred_manager.decrypt_data(key, encrypted)
# 3. 邮件内容安全
print("\n3. 邮件内容安全:")
class EmailSecurityValidator:
"""邮件安全验证器"""
def __init__(self):
self.suspicious_patterns = [
r'(?i)password.*reset',
r'(?i)click.*here.*urgent',
r'(?i)verify.*account.*immediately',
r'(?i)suspended.*account',
r'(?i)winner.*lottery',
r'(?i)free.*money',
r'(?i)nigerian.*prince'
]
self.safe_domains = [
'gmail.com', 'outlook.com', 'yahoo.com',
'company.com' # 添加你信任的域名
]
def validate_sender(self, sender_email):
"""验证发件人"""
import re
# 检查邮箱格式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, sender_email):
return False, "邮箱格式无效"
# 检查域名
domain = sender_email.split('@')[1]
if domain in self.safe_domains:
return True, "可信域名"
# 检查可疑域名特征
suspicious_domain_patterns = [
r'.*\d{4,}.*', # 包含4位以上数字
r'.*-.*-.*', # 多个连字符
r'.{20,}', # 域名过长
]
for pattern in suspicious_domain_patterns:
if re.match(pattern, domain):
return False, f"可疑域名模式: {pattern}"
return True, "域名检查通过"
def scan_content(self, subject, body):
"""扫描邮件内容"""
import re
content = f"{subject} {body}"
threats = []
# 检查可疑模式
for pattern in self.suspicious_patterns:
if re.search(pattern, content):
threats.append(f"可疑模式: {pattern}")
# 检查URL
url_pattern = r'https?://[^\s]+'
urls = re.findall(url_pattern, content)
for url in urls:
if self._is_suspicious_url(url):
threats.append(f"可疑链接: {url}")
# 检查附件提及
attachment_patterns = [
r'(?i)attachment.*exe',
r'(?i)download.*file',
r'(?i)open.*document'
]
for pattern in attachment_patterns:
if re.search(pattern, content):
threats.append(f"可疑附件提及: {pattern}")
return threats
def _is_suspicious_url(self, url):
"""检查URL是否可疑"""
import re
suspicious_url_patterns = [
r'.*bit\.ly.*',
r'.*tinyurl.*',
r'.*[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}.*', # IP地址
r'.*[a-z]{20,}\.[a-z]{2,3}.*', # 随机长域名
]
for pattern in suspicious_url_patterns:
if re.match(pattern, url):
return True
return False
def validate_email(self, email_data):
"""综合验证邮件"""
results = {
'is_safe': True,
'warnings': [],
'threats': []
}
# 验证发件人
sender_valid, sender_msg = self.validate_sender(email_data.get('from', ''))
if not sender_valid:
results['threats'].append(f"发件人验证失败: {sender_msg}")
results['is_safe'] = False
# 扫描内容
content_threats = self.scan_content(
email_data.get('subject', ''),
email_data.get('body', '')
)
if content_threats:
results['threats'].extend(content_threats)
results['is_safe'] = False
# 检查附件
attachments = email_data.get('attachments', [])
for attachment in attachments:
filename = attachment.get('filename', '')
if self._is_dangerous_attachment(filename):
results['threats'].append(f"危险附件: {filename}")
results['is_safe'] = False
return results
def _is_dangerous_attachment(self, filename):
"""检查附件是否危险"""
dangerous_extensions = [
'.exe', '.scr', '.bat', '.cmd', '.com', '.pif',
'.vbs', '.js', '.jar', '.zip', '.rar'
]
filename_lower = filename.lower()
return any(filename_lower.endswith(ext) for ext in dangerous_extensions)
# 测试安全验证
validator = EmailSecurityValidator()
# 测试邮件
test_emails = [
{
'from': 'legitimate@company.com',
'subject': '会议通知',
'body': '明天下午2点开会',
'attachments': []
},
{
'from': 'suspicious123456@random-domain.com',
'subject': 'URGENT: Verify your account immediately!',
'body': 'Click here to reset your password: http://bit.ly/suspicious',
'attachments': [{'filename': 'document.exe'}]
}
]
print(" 邮件安全验证:")
for i, email in enumerate(test_emails, 1):
print(f"\n 邮件 {i}: {email['subject']}")
result = validator.validate_email(email)
if result['is_safe']:
print(" ✓ 邮件安全")
else:
print(" ⚠️ 发现威胁:")
for threat in result['threats']:
print(f" • {threat}")
# 运行邮件安全演示
email_security_demo()
# 7. 学习建议和总结
# 7.1 学习路径
基础知识
- 理解邮件协议(SMTP、POP3、IMAP)
- 掌握Python邮件模块的基本用法
- 学习邮件格式和编码
进阶应用
- 邮件内容解析和处理
- 附件处理和文件操作
- 邮件过滤和自动化
实际项目
- 构建邮件客户端
- 实现邮件监控系统
- 开发邮件备份工具
# 7.2 最佳实践
安全性
- 使用SSL/TLS加密连接
- 安全存储邮箱密码
- 验证邮件来源和内容
性能优化
- 合理使用连接池
- 批量处理邮件
- 异步处理大量邮件
错误处理
- 完善的异常处理机制
- 重试机制和超时设置
- 详细的日志记录
# 7.3 常见陷阱和解决方案
编码问题
# 问题:邮件内容乱码 # 解决:正确处理字符编码 import chardet def decode_email_content(content): if isinstance(content, bytes): encoding = chardet.detect(content)['encoding'] return content.decode(encoding or 'utf-8') return content
连接超时
# 问题:邮件服务器连接超时 # 解决:设置合理的超时时间 import socket # 设置全局超时 socket.setdefaulttimeout(30) # 或在连接时设置 server = smtplib.SMTP('smtp.gmail.com', 587, timeout=30)
内存占用
# 问题:处理大量邮件时内存占用过高 # 解决:使用生成器和流式处理 def process_emails_efficiently(email_list): for email in email_list: # 处理单封邮件 process_single_email(email) # 及时释放内存 del email
# 7.4 本章总结
本章详细介绍了Python电子邮件编程的各个方面:
- 邮件基础:学习了SMTP、POP3、IMAP协议的基本概念和Python实现
- 发送邮件:掌握了文本邮件、HTML邮件、附件邮件的发送方法
- 接收邮件:学会了连接邮件服务器、下载和解析邮件内容
- 邮件处理:实现了邮件过滤、自动回复、统计分析等高级功能
- 实际应用:构建了邮件备份系统和监控系统
- 安全实践:了解了邮件安全的重要性和实现方法
通过本章的学习,你应该能够:
- 使用Python发送和接收各种类型的邮件
- 解析和处理邮件内容和附件
- 实现邮件自动化处理功能
- 构建实用的邮件应用系统
- 确保邮件操作的安全性
邮件编程是Python的重要应用领域,掌握这些技能将为你的项目开发提供强大的通信能力。