Python编写自动化邮件发送程序:从零开始详解
在数字化时代,自动化邮件发送功能已成为企业和个人提升工作效率的重要工具。据统计,全球每天发送的商业邮件超过30亿封,其中约40%是通过自动化系统发送的。这种功能被广泛应用于多种场景:在日常办公中用于自动发送会议通知和日程提醒;在电商领域用于订单确认和物流跟踪;在营销推广中用于EDM邮件营销和客户关怀;在IT系统中用于发送告警通知和验证码等。
Python凭借其简洁优雅的语法和强大的生态系统,成为实现邮件自动化的首选编程语言。其标准库中的smtplib和email模块提供了完整的邮件处理功能,而第三方库如yagmail更是将发送过程简化到极致。Python的跨平台特性使其可以在Windows、Linux和macOS等不同操作系统上稳定运行,且能轻松与其他系统集成。
本教程将详细介绍如何用Python编写一个完整的自动化邮件发送程序,包括:
我们将通过实际案例演示,从基础的单封邮件发送到高级的定时批量发送功能,帮助读者掌握完整的邮件自动化解决方案。
理解SMTP协议基础
SMTP(Simple Mail Transfer Protocol)是用于发送电子邮件的标准协议。Python通过smtplib库提供SMTP协议支持。发送邮件的基本流程涉及三个关键环节:建立SMTP连接、进行身份验证、构造并发送邮件内容。
电子邮件通常由头部(From/To/Subject等)和正文组成,可能包含纯文本、HTML内容或附件。MIME(Multipurpose Internet Mail Extensions)标准用于扩展邮件格式支持。
配置开发环境
需要安装Python 3.6及以上版本。通过以下命令安装必要依赖库:
pip install secure-smtplib email-validator
建议使用虚拟环境隔离项目依赖:
python -m venv email_env
source email_env/bin/activate # Linux/macOS
email_env\\Scripts\\activate # Windows
构建邮件发送函数核心逻辑
创建send_email.py文件,导入基础模块:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import formatdate
import getpass
初始化SMTP连接函数:
def connect_smtp_server(smtp_server, port, username, password):
try:
server = smtplib.SMTP(smtp_server, port)
server.starttls()
server.login(username, password)
return server
except Exception as e:
print(f"连接SMTP服务器失败: {str(e)}")
raise
邮件内容构造器:
def build_email(sender, receiver, subject, body, body_type='plain'):
message = MIMEMultipart()
message['From'] = sender
message['To'] = receiver
message['Date'] = formatdate(localtime=True)
message['Subject'] = subject
if body_type == 'html':
content = MIMEText(body, 'html')
else:
content = MIMEText(body, 'plain')
message.attach(content)
return message
实现完整发送流程
主发送函数整合所有组件:
def send_email(smtp_config, email_content):
try:
server = connect_smtp_server(
smtp_config['server'],
smtp_config['port'],
smtp_config['username'],
smtp_config['password']
)
message = build_email(
email_content['sender'],
email_content['receiver'],
email_content['subject'],
email_content['body'],
email_content.get('body_type', 'plain')
)
server.sendmail(
email_content['sender'],
email_content['receiver'],
message.as_string()
)
server.quit()
print("邮件发送成功")
return True
except Exception as e:
print(f"邮件发送失败: {str(e)}")
return False
添加附件支持功能
扩展邮件构造器支持附件:
from email.mime.application import MIMEApplication
from email.mime.base import MIMEBase
from email import encoders
import os
def add_attachment(message, file_path):
with open(file_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename={os.path.basename(file_path)}'
)
message.attach(part)
更新后的邮件构造器:
def build_email(sender, receiver, subject, body, attachments=None, body_type='plain'):
message = MIMEMultipart()
# … 原有头部设置代码 …
if attachments:
for file_path in attachments:
if os.path.exists(file_path):
add_attachment(message, file_path)
else:
print(f"警告:附件文件不存在 {file_path}")
return message
实现HTML邮件模板
创建HTML模板文件template.html:
<!DOCTYPE html>
<html>
<head>
<style>
body { font-family: Arial, sans-serif; }
.header { color: #2c3e50; }
.content { margin: 20px 0; }
.footer { color: #7f8c8d; font-size: 0.8em; }
</style>
</head>
<body>
<h1 class="header">${HEADER}</h1>
<div class="content">${CONTENT}</div>
<div class="footer">自动发送于 ${DATE}</div>
</body>
</html>
添加模板处理函数:
from string import Template
import datetime
def render_template(template_file, **kwargs):
with open(template_file, 'r') as f:
template = Template(f.read())
if 'DATE' not in kwargs:
kwargs['DATE'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
return template.substitute(**kwargs)
安全增强措施
实现密码加密存储:
from cryptography.fernet import Fernet
import base64
import json
class CredentialManager:
def __init__(self, key_file='.key'):
self.key = self._load_or_generate_key(key_file)
def _load_or_generate_key(self, key_file):
if os.path.exists(key_file):
with open(key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(key_file, 'wb') as f:
f.write(key)
return key
def save_credentials(self, config_file, credentials):
cipher = Fernet(self.key)
encrypted = cipher.encrypt(json.dumps(credentials).encode())
with open(config_file, 'wb') as f:
f.write(encrypted)
def load_credentials(self, config_file):
cipher = Fernet(self.key)
with open(config_file, 'rb') as f:
encrypted = f.read()
return json.loads(cipher.decrypt(encrypted).decode())
完整实现代码
# email_sender.py
import smtplib
import os
import json
import getpass
import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.mime.base import MIMEBase
from email import encoders
from email.utils import formatdate
from string import Template
from cryptography.fernet import Fernet
import base64
class EmailSender:
def __init__(self, config_file='config.json', key_file='.key'):
self.config_file = config_file
self.cred_manager = CredentialManager(key_file)
if not os.path.exists(config_file):
self._setup_config()
self.config = self.cred_manager.load_credentials(config_file)
def _setup_config(self):
print("首次使用需要进行配置")
smtp_server = input("SMTP服务器地址 (如smtp.example.com): ")
port = int(input("端口号 (587 for TLS): "))
username = input("邮箱地址: ")
password = getpass.getpass("邮箱密码/授权码: ")
credentials = {
'smtp_server': smtp_server,
'port': port,
'username': username,
'password': password
}
self.cred_manager.save_credentials(self.config_file, credentials)
def connect_smtp(self):
try:
server = smtplib.SMTP(self.config['smtp_server'], self.config['port'])
server.starttls()
server.login(self.config['username'], self.config['password'])
return server
except Exception as e:
raise Exception(f"SMTP连接失败: {str(e)}")
def build_message(self, to, subject, body, body_type='plain', attachments=None, cc=None, bcc=None):
msg = MIMEMultipart()
msg['From'] = self.config['username']
msg['To'] = to
msg['Subject'] = subject
msg['Date'] = formatdate(localtime=True)
if cc:
msg['Cc'] = cc
if bcc:
msg['Bcc'] = bcc
if body_type == 'html':
msg.attach(MIMEText(body, 'html'))
else:
msg.attach(MIMEText(body, 'plain'))
if attachments:
for file_path in attachments:
if os.path.exists(file_path):
self._add_attachment(msg, file_path)
else:
print(f"附件不存在: {file_path}")
return msg
def _add_attachment(self, message, file_path):
with open(file_path, "rb") as f:
part = MIMEBase('application', 'octet-stream')
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename="{os.path.basename(file_path)}"'
)
message.attach(part)
def send(self, to, subject, body, body_type='plain', attachments=None, cc=None, bcc=None):
try:
server = self.connect_smtp()
recipients = [to]
if cc:
recipients += cc.split(',')
if bcc:
recipients += bcc.split(',')
msg = self.build_message(to, subject, body, body_type, attachments, cc, bcc)
server.sendmail(self.config['username'], recipients, msg.as_string())
server.quit()
return True
except Exception as e:
print(f"发送失败: {str(e)}")
return False
class CredentialManager:
def __init__(self, key_file='.key'):
self.key = self._load_or_generate_key(key_file)
def _load_or_generate_key(self, key_file):
if os.path.exists(key_file):
with open(key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(key_file, 'wb') as f:
f.write(key)
return key
def save_credentials(self, config_file, credentials):
cipher = Fernet(self.key)
encrypted = cipher.encrypt(json.dumps(credentials).encode())
with open(config_file, 'wb') as f:
f.write(encrypted)
def load_credentials(self, config_file):
cipher = Fernet(self.key)
with open(config_file, 'rb') as f:
encrypted = f.read()
return json.loads(cipher.decrypt(encrypted).decode())
def render_template(template_file, **kwargs):
with open(template_file, 'r') as f:
template = Template(f.read())
if 'DATE' not in kwargs:
kwargs['DATE'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
return template.substitute(**kwargs)
if __name__ == "__main__":
sender = EmailSender()
# 示例1:发送纯文本邮件
sender.send(
to="recipient@example.com",
subject="测试纯文本邮件",
body="这是一封测试邮件的正文内容。"
)
# 示例2:发送带附件的HTML邮件
html_content = """
<h1>HTML邮件测试</h1>
<p>这是一封<strong>HTML格式</strong>的测试邮件。</p>
<p>当前时间:{}</p>
""".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sender.send(
to="recipient@example.com",
subject="测试HTML邮件",
body=html_content,
body_type='html',
attachments=['report.pdf', 'data.xlsx']
)
# 示例3:使用模板发送邮件
template_vars = {
'HEADER': '季度报告',
'CONTENT': '请查收附件中的季度财务报告。'
}
rendered = render_template('template.html', **template_vars)
sender.send(
to="manager@example.com",
subject="季度财务报告",
body=rendered,
body_type='html',
attachments=['quarterly_report.pdf']
)
项目结构说明
完整项目应包含以下文件:
/email_project
│── email_sender.py # 主程序
│── config.json # 加密的配置文件
│── .key # 加密密钥
│── template.html # HTML模板
├── attachments/ # 附件目录
│ ├── report.pdf
│ └── data.xlsx
└── requirements.txt # 依赖列表
requirements.txt内容:
secure-smtplib==1.0.0
cryptography==39.0.1
部署与使用指南
- 纯文本邮件
- HTML格式邮件
- 带附件邮件
安全注意事项
通过本教程,可以构建一个功能完备的企业级邮件自动发送系统。根据实际需求,可进一步扩展邮件队列、发送状态跟踪等功能。
评论前必须登录!
注册