邮件服务集成Skill email-service-integration

构建全面的电子邮件系统,集成SMTP、第三方电子邮件提供商、HTML模板、电子邮件验证、重试机制和适当的错误处理,用于发送事务性电子邮件、实施欢迎/确认电子邮件、创建密码重置流程、发送通知电子邮件、构建电子邮件模板和管理批量电子邮件活动。

后端开发 2 次安装 22 次浏览 更新于 3/3/2026

邮件服务集成

概览

构建全面的电子邮件系统,集成SMTP、第三方电子邮件提供商(SendGrid、Mailgun、AWS SES)、HTML模板、电子邮件验证、重试机制和适当的错误处理。

使用场景

  • 发送事务性电子邮件
  • 实施欢迎/确认电子邮件
  • 创建密码重置流程
  • 发送通知电子邮件
  • 构建电子邮件模板
  • 管理批量电子邮件活动

指南

1. Python/Flask与SMTP

# config.py
import os

class EmailConfig:
    MAIL_SERVER = os.getenv('MAIL_SERVER', 'smtp.gmail.com')
    MAIL_PORT = int(os.getenv('MAIL_PORT', 587))
    MAIL_USE_TLS = os.getenv('MAIL_USE_TLS', True)
    MAIL_USERNAME = os.getenv('MAIL_USERNAME')
    MAIL_PASSWORD = os.getenv('MAIL_PASSWORD')
    MAIL_DEFAULT_SENDER = os.getenv('MAIL_DEFAULT_SENDER', 'noreply@example.com')

# email_service.py
from flask_mail import Mail, Message
from flask import render_template_string
import logging
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

logger = logging.getLogger(__name__)
mail = Mail()

class EmailService:
    def __init__(self, app=None):
        self.app = app
        if app:
            mail.init_app(app)

    def send_email(self, recipient, subject, text_body=None, html_body=None):
        """使用Flask-Mail发送电子邮件"""
        try:
            msg = Message(
                subject=subject,
                recipients=[recipient] if isinstance(recipient, str) else recipient
            )

            if text_body:
                msg.body = text_body
            if html_body:
                msg.html = html_body

            mail.send(msg)
            logger.info(f"Email sent to {recipient}: {subject}")
            return True
        except Exception as e:
            logger.error(f"Failed to send email to {recipient}: {str(e)}")
            return False

    def send_welcome_email(self, user_email, user_name):
        """发送欢迎电子邮件"""
        subject = "Welcome to Our Platform!"
        html_body = render_template_string(
            '''
            <h1>Welcome, {{ name }}!</h1>
            <p>Thank you for joining us. Start exploring now!</p>
            <a href="https://example.com/dashboard">Go to Dashboard</a>
            ''',

            name=user_name
        )
        return self.send_email(user_email, subject, html_body=html_body)

    def send_password_reset_email(self, user_email, reset_token):
        """发送密码重置电子邮件"""
        subject = "Reset Your Password"
        reset_url = f"https://example.com/reset-password?token={reset_token}"
        html_body = render_template_string(
            '''
            <h1>Reset Your Password</h1>
            <p>Click the link below to reset your password:</p>
            <a href="{{ reset_url }}">Reset Password</a>
            <p>This link expires in 24 hours.</p>
            ''',

            reset_url=reset_url
        )
        return self.send_email(user_email, subject, html_body=html_body)

    def send_verification_email(self, user_email, verification_token):
        """发送电子邮件验证"""
        subject = "Verify Your Email"
        verify_url = f"https://example.com/verify-email?token={verification_token}"
        html_body = render_template_string(
            '''
            <h1>Verify Your Email Address</h1>
            <p>Click the link below to verify your email:</p>
            <a href="{{ verify_url }}">Verify Email</a>
            ''',

            verify_url=verify_url
        )
        return self.send_email(user_email, subject, html_body=html_body)

    def send_notification_email(self, user_email, notification_data):
        """发送通知电子邮件"""
        subject = notification_data.get('subject', 'Notification')
        html_body = render_template_string(
            '''
            <h1>{{ title }}</h1>
            <p>{{ message }}</p>
            {{ content|safe }}
            ''',

            title=notification_data.get('title'),
            message=notification_data.get('message'),
            content=notification_data.get('html_content', '')
        )
        return self.send_email(user_email, subject, html_body=html_body)

# routes.py
from flask import Blueprint, request, jsonify
from email_service import EmailService

email_bp = Blueprint('email', __name__)
email_service = EmailService()

@email_bp.route('/api/auth/send-verification', methods=['POST'])
def send_verification():
    """发送验证电子邮件"""
    data = request.json
    user_email = data.get('email')
    verification_token = generate_token()

    success = email_service.send_verification_email(user_email, verification_token)

    if success:
        # Store token in database
        VerificationToken.create(email=user_email, token=verification_token)
        return jsonify({'message': 'Verification email sent'}), 200
    else:
        return jsonify({'error': 'Failed to send email'}), 500

@email_bp.route('/api/auth/send-reset', methods=['POST'])
def send_reset():
    """发送密码重置电子邮件"""
    data = request.json
    user = User.query.filter_by(email=data['email']).first()

    if not user:
        # Don't reveal if email exists
        return jsonify({'message': 'If email exists, reset link sent'}), 200

    reset_token = generate_token()
    success = email_service.send_password_reset_email(user.email, reset_token)

    if success:
        ResetToken.create(user_id=user.id, token=reset_token)
        return jsonify({'message': 'Reset email sent'}), 200
    else:
        return jsonify({'error': 'Failed to send email'}), 500

2. Node.js与SendGrid

// email-service.js
const sgMail = require('@sendgrid/mail');
const logger = require('./logger');

sgMail.setApiKey(process.env.SENDGRID_API_KEY);

class EmailService {
    async sendEmail(to, subject, htmlContent, textContent = null) {
        try {
            const msg = {
                to: Array.isArray(to) ? to : [to],
                from: process.env.MAIL_FROM || 'noreply@example.com',
                subject: subject,
                html: htmlContent,
                ...(textContent && { text: textContent })
            };

            const result = await sgMail.send(msg);
            logger.info(`Email sent to ${to}: ${subject}`);
            return { success: true, messageId: result[0].headers['x-message-id'] };
        } catch (error) {
            logger.error(`Failed to send email: ${error.message}`);
            return { success: false, error: error.message };
        }
    }

    async sendWelcomeEmail(to, userName) {
        const htmlContent = `
            <h1>Welcome, ${userName}!</h1>
            <p>Thank you for joining us.</p>
            <a href="https://example.com/dashboard">Start Exploring</a>
        `;

        return this.sendEmail(to, 'Welcome to Our Platform!', htmlContent);
    }

    async sendPasswordResetEmail(to, resetToken) {
        const resetUrl = `https://example.com/reset-password?token=${resetToken}`;
        const htmlContent = `
            <h1>Reset Your Password</h1>
            <p>Click the link below to reset your password:</p>
            <a href="${resetUrl}">Reset Password</a>
            <p>This link expires in 24 hours.</p>
        `;

        return this.sendEmail(to, 'Reset Your Password', htmlContent);
    }

    async sendVerificationEmail(to, verificationToken) {
        const verifyUrl = `https://example.com/verify-email?token=${verificationToken}`;
        const htmlContent = `
            <h1>Verify Your Email</h1>
            <p>Click the link below to verify your email:</p>
            <a href="${verifyUrl}">Verify Email</a>
        `;

        return this.sendEmail(to, 'Verify Your Email', htmlContent);
    }

    async sendBulkEmails(recipients, subject, htmlContent) {
        try {
            const personalizations = recipients.map(recipient => ({
                to: [{ email: recipient.email }],
                substitutions: {
                    '-name-': recipient.name
                }
            }));

            const msg = {
                personalizations: personalizations,
                from: process.env.MAIL_FROM || 'noreply@example.com',
                subject: subject,
                html: htmlContent
            };

            const result = await sgMail.send(msg);
            logger.info(`Bulk email sent to ${recipients.length} recipients`);
            return { success: true, sent: recipients.length };
        } catch (error) {
            logger.error(`Bulk email failed: ${error.message}`);
            return { success: false, error: error.message };
        }
    }
}

module.exports = new EmailService();

// routes.js
const express = require('express');
const emailService = require('../services/email-service');
const { generateToken } = require('../utils/token');

const router = express.Router();

router.post('/send-verification', async (req, res) => {
    try {
        const { email } = req.body;

        if (!email) {
            return res.status(400).json({ error: 'Email required' });
        }

        const verificationToken = generateToken();
        const result = await emailService.sendVerificationEmail(email, verificationToken);

        if (result.success) {
            // Store token in database
            await VerificationToken.create({ email, token: verificationToken });
            return res.json({ message: 'Verification email sent' });
        } else {
            return res.status(500).json({ error: 'Failed to send email' });
        }
    } catch (error) {
        logger.error(error);
        res.status(500).json({ error: 'Internal server error' });
    }
});

router.post('/send-reset', async (req, res) => {
    try {
        const { email } = req.body;

        const user = await User.findOne({ where: { email } });
        if (!user) {
            return res.json({ message: 'If email exists, reset link sent' });
        }

        const resetToken = generateToken();
        const result = await emailService.sendPasswordResetEmail(email, resetToken);

        if (result.success) {
            await ResetToken.create({ userId: user.id, token: resetToken });
            return res.json({ message: 'Reset email sent' });
        } else {
            return res.status(500).json({ error: 'Failed to send email' });
        }
    } catch (error) {
        res.status(500).json({ error: 'Internal server error' });
    }
});

module.exports = router;

3. Mjml电子邮件模板

<!-- templates/welcome.mjml -->
<mjml>
  <mj-body>
    <mj-container>
      <mj-section>
        <mj-column>
          <mj-image width="100px" src="https://example.com/logo.png"></mj-image>
        </mj-column>
      </mj-section>

      <mj-section background-color="#f4f4f4">
        <mj-column>
          <mj-text font-size="24px" align="center" color="#333">
            Welcome, {{ userName }}!
          </mj-text>
          <mj-text align="center" color="#666">
            Thank you for joining us. Let's get started!
          </mj-text>
        </mj-column>
      </mj-section>

      <mj-section>
        <mj-column>
          <mj-button href="https://example.com/dashboard" background-color="#007bff">
            Go to Dashboard
          </mj-button>
        </mj-column>
      </mj-section>

      <mj-section>
        <mj-column>
          <mj-text font-size="12px" align="center" color="#999">
            © 2024 Example Inc. All rights reserved.
          </mj-text>
        </mj-column>
      </mj-section>
    </mj-container>
  </mj-body>
</mjml>

<!-- Python模板编译 -->
# email_templates.py
from mjml import mjml_to_html

def get_welcome_template(user_name):
    with open('templates/welcome.mjml', 'r') as f:
        mjml_content = f.read()

    mjml_content = mjml_content.replace('{{ userName }}', user_name)
    html = mjml_to_html(mjml_content)
    return html

4. FastAPI电子邮件与后台任务

# email_service.py
from fastapi import BackgroundTasks
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig

conf = ConnectionConfig(
    mail_server=os.getenv("MAIL_SERVER"),
    mail_port=int(os.getenv("MAIL_PORT")),
    mail_from=os.getenv("MAIL_FROM"),
    mail_password=os.getenv("MAIL_PASSWORD"),
    mail_from_name=os.getenv("MAIL_FROM_NAME", "Example App"),
    use_credentials=True,
    validate_certs=True
)

fm = FastMail(conf)

class EmailService:
    @staticmethod
    async def send_email(
        recipients: list,
        subject: str,
        body: str,
        background_tasks: BackgroundTasks = None
    ):
        message = MessageSchema(
            subject=subject,
            recipients=recipients,
            body=body,
            subtype="html"
        )

        if background_tasks:
            background_tasks.add_task(fm.send_message, message)
        else:
            await fm.send_message(message)

    @staticmethod
    async def send_welcome_email(
        email: str,
        name: str,
        background_tasks: BackgroundTasks
    ):
        html_body = f"""
        <h1>Welcome, {name}!</h1>
        <p>Thank you for joining us.</p>
        <a href="https://example.com/dashboard">Start Exploring</a>
        """

        await EmailService.send_email(
            recipients=[email],
            subject="Welcome to Our Platform!",
            body=html_body,
            background_tasks=background_tasks
        )

# routes.py
from fastapi import BackgroundTasks
from email_service import EmailService

@app.post("/api/send-email")
async def send_email(
    email: str,
    background_tasks: BackgroundTasks
):
    await EmailService.send_welcome_email(email, "User", background_tasks)
    return {"message": "Email queued for sending"}

5. 电子邮件验证和验证

# email_validator.py
import re
from email_validator import validate_email, EmailNotValidError
import dns.resolver

class EmailValidator:
    @staticmethod
    def validate_format(email: str) -> tuple:
        """验证电子邮件格式"""
        try:
            valid = validate_email(email)
            return True, valid.email
        except EmailNotValidError as e:
            return False, str(e)

    @staticmethod
    def check_mx_records(email: str) -> bool:
        """检查域的MX记录"""
        try:
            domain = email.split('@')[1]
            mx_records = dns.resolver.resolve(domain, 'MX')
            return len(mx_records) > 0
        except Exception:
            return False

    @staticmethod
    def validate_email_comprehensive(email: str) -> dict:
        """全面的电子邮件验证"""
        # 格式验证
        is_valid, message = EmailValidator.validate_format(email)
        if not is_valid:
            return {'valid': False, 'reason': 'Invalid format'}

        # MX记录检查
        has_mx = EmailValidator.check_mx_records(email)
        if not has_mx:
            return {'valid': False, 'reason': 'Domain has no MX records'}

        return {'valid': True, 'email': email}

最佳实践

✅ DO

  • 使用事务性电子邮件提供商以确保可靠性
  • 实施电子邮件模板以保持一致性
  • 添加退订链接(法律要求)
  • 使用后台任务发送电子邮件
  • 实施适当的错误处理和重试
  • 发送前验证电子邮件地址
  • 添加速率限制以防止滥用
  • 监控电子邮件投递和弹跳
  • 使用SMTP认证
  • 在开发环境中测试电子邮件

❌ DON’T

  • 在请求处理程序中同步发送电子邮件
  • 将密码存储在代码中
  • 在电子邮件中发送敏感信息
  • 对敏感操作使用通用电子邮件地址
  • 跳过电子邮件验证
  • 忽略弹跳和投诉通知
  • 过度使用内联样式的HTML电子邮件
  • 忘记处理失败的电子邮件投递
  • 没有适当模板的情况下发送电子邮件
  • 未经同意存储电子邮件地址

完整示例

@app.post("/register")
async def register(
    email: str,
    password: str,
    background_tasks: BackgroundTasks
):
    user = User(email=email, password=hash_password(password))
    db.add(user)
    db.commit()

    background_tasks.add_task(
        send_verification_email,
        email=user.email,
        token=generate_token()
    )

    return {"message": "User registered. Check email to verify."}