在之前的开发中开发了一个发邮件的工具类,在客户实际的操作下出现了smtp服务器返回错误消息限制发送速度的问题,所以新的需求出现了,让用户自己去控制发送邮件的速度。这篇文章来写一个邮件限流器。
邮件限流器
import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** 邮件发送限流器* @author yangguanglei* @date 2025/6/11**/
public class EmailRateLimiter {/*** 存储配置ID到速率限制器包装器的映射* 使用ConcurrentHashMap确保线程安全,支持并发访问*/private static final ConcurrentHashMap<Integer, RateLimiterWrapper> RATE_LIMITERS = new ConcurrentHashMap<>();/*** 定时任务执行器*/private static final ScheduledExecutorService CLEANUP_EXECUTOR = Executors.newSingleThreadScheduledExecutor(r -> {Thread thread = new Thread(r, "email-rate-limiter-cleanup");thread.setDaemon(true); // 设置为守护线程,随应用程序退出return thread;});/*** 清理任务是否已启动的标志*/private static final AtomicBoolean CLEANUP_SCHEDULED = new AtomicBoolean(false);/*** 清理间隔时间(默认30分钟)*/private static final long CLEANUP_INTERVAL = 30;private static final TimeUnit CLEANUP_UNIT = TimeUnit.MINUTES;/*** 不活跃阈值(默认30分钟)*/private static final long INACTIVE_THRESHOLD = 30 * 60 * 1000; // 30分钟,单位毫秒/*** 关闭钩子已注册的标志*/private static final AtomicBoolean SHUTDOWN_HOOK_REGISTERED = new AtomicBoolean(false);/*** 获取指定配置的令牌,必要时等待** @param configId 配置ID,用于标识不同的速率限制策略* @param permitsPerSecond 每秒允许的请求数*/public static void acquire(Integer configId, double permitsPerSecond) {// 确保定时清理任务已启动ensureCleanupScheduled();// 计算或更新速率限制器包装器RateLimiterWrapper wrapper = RATE_LIMITERS.compute(configId, (k, v) -> {if (v == null) {// 如果不存在,则创建新的速率限制器return new RateLimiterWrapper(permitsPerSecond);} else {// 如果存在但速率不同,则更新速率if (v.getPermitsPerSecond() != permitsPerSecond) {v.getRateLimiter().setRate(permitsPerSecond);return new RateLimiterWrapper(v.getRateLimiter(), permitsPerSecond);}// 更新最后使用时间v.updateLastUsedTime();return v;}});// 获取令牌,可能会阻塞直到有可用令牌wrapper.getRateLimiter().acquire();}/*** 移除指定配置的速率限制器** @param configId 配置ID*/public static void removeLimiter(Integer configId) {RATE_LIMITERS.remove(configId);}/*** 清除所有速率限制器*/public static void clearAllLimiters() {RATE_LIMITERS.clear();}/*** 设置定期清理不活跃的速率限制器* 如果需要自定义清理间隔,可以调用此方法** @param interval 清理间隔* @param unit 时间单位*/public static void setupPeriodicCleanup(long interval, TimeUnit unit) {// 确保关闭钩子已注册ensureShutdownHookRegistered();// 取消已有的清理任务cancelCleanup();// 设置新的清理任务CLEANUP_EXECUTOR.scheduleAtFixedRate(() -> cleanupInactiveLimiters(), interval, interval, unit);CLEANUP_SCHEDULED.set(true);}/*** 取消定时清理任务*/public static void cancelCleanup() {if (CLEANUP_SCHEDULED.getAndSet(false)) {CLEANUP_EXECUTOR.shutdownNow();}}/*** 清理超过30分钟未使用的速率限制器*/private static void cleanupInactiveLimiters() {// 设置30分钟的阈值long threshold = System.currentTimeMillis() - INACTIVE_THRESHOLD;// 移除所有最后使用时间早于阈值的限制器RATE_LIMITERS.entrySet().removeIf(entry ->entry.getValue().getLastUsedTime() < threshold);}/*** 确保定时清理任务已启动*/private static void ensureCleanupScheduled() {// 确保关闭钩子已注册ensureShutdownHookRegistered();if (CLEANUP_SCHEDULED.compareAndSet(false, true)) {// 首次调用时启动定时任务CLEANUP_EXECUTOR.scheduleAtFixedRate(() -> cleanupInactiveLimiters(),CLEANUP_INTERVAL,CLEANUP_INTERVAL,CLEANUP_UNIT);}}/*** 确保关闭钩子已注册*/private static void ensureShutdownHookRegistered() {if (SHUTDOWN_HOOK_REGISTERED.compareAndSet(false, true)) {Runtime.getRuntime().addShutdownHook(new Thread(() -> {try {// 关闭执行器CLEANUP_EXECUTOR.shutdown();// 等待最多5秒完成正在执行的任务if (!CLEANUP_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) {// 强制关闭CLEANUP_EXECUTOR.shutdownNow();}} catch (InterruptedException e) {// 中断关闭过程CLEANUP_EXECUTOR.shutdownNow();Thread.currentThread().interrupt();}}, "email-rate-limiter-shutdown"));}}/*** 速率限制器包装类,用于跟踪速率限制器及其使用情况*/private static class RateLimiterWrapper {private final RateLimiter rateLimiter;private final double permitsPerSecond;private long lastUsedTime; // 记录最后使用时间/*** 创建新的速率限制器包装器** @param permitsPerSecond 每秒允许的请求数*/public RateLimiterWrapper(double permitsPerSecond) {this.rateLimiter = RateLimiter.create(permitsPerSecond);this.permitsPerSecond = permitsPerSecond;this.lastUsedTime = System.currentTimeMillis();}/*** 使用现有速率限制器创建包装器** @param rateLimiter 现有速率限制器* @param permitsPerSecond 每秒允许的请求数*/public RateLimiterWrapper(RateLimiter rateLimiter, double permitsPerSecond) {this.rateLimiter = rateLimiter;this.permitsPerSecond = permitsPerSecond;this.lastUsedTime = System.currentTimeMillis();}/*** 获取速率限制器,同时更新最后使用时间** @return 速率限制器实例*/public RateLimiter getRateLimiter() {updateLastUsedTime();return rateLimiter;}/*** 获取当前速率** @return 每秒允许的请求数*/public double getPermitsPerSecond() {return permitsPerSecond;}/*** 获取最后使用时间** @return 最后使用时间的毫秒表示*/public long getLastUsedTime() {return lastUsedTime;}/*** 更新最后使用时间为当前时间*/public void updateLastUsedTime() {this.lastUsedTime = System.currentTimeMillis();}}
}
实际使用
@Async("emailTaskExecutor")public void asyncSendEmail(EmailSmtpConfig emailSmtpConfig,EmailTemplate emailTemplate,String subject,RecipientWithTemplate recipient,Integer companyId,String serviceName,String errorReason,String logCode,SmtpRateConfig rateConfig) {log.info("当前执行线程: {}", Thread.currentThread().getName());String replacedContent = emailTemplate.getContent();List<EmailPlaceholder> placeholders = emailPlaceholderManager.list();if (placeholders != null && !placeholders.isEmpty()) {String placeholdersJson = JSON.toJSONString(placeholders);emailTemplate.setPlaceholders(placeholdersJson);}// 处理邮件模板中的占位符if (StringUtils.isNotBlank(emailTemplate.getPlaceholders())){replacedContent = EmailUtil.replacePlaceholder(emailTemplate,recipient.getTemplateParams());}log.info("当前执行线程: {},处理完模板占位符任务", Thread.currentThread().getName());EmailLog emailLog = new EmailLog();//将邮件发送基础信息保存到数据库中,状态为待发送emailLog.setLogCode(logCode);emailLog.setSendTime(LocalDateTime.now(ZoneId.of("Asia/Shanghai")));emailLog.setCompanyId(companyId);emailLog.setReceiver(recipient.getReceiver());emailLog.setStatus(2);emailLogManager.save(emailLog);if (errorReason==null){//发送邮件的基础信息无误if (rateConfig!= null){//执行限流逻辑// 执行限流逻辑log.info("当前执行线程: {},开始执行限流逻辑 - 配置ID={}, 速率={}封/秒",Thread.currentThread().getName(),emailSmtpConfig.getId(),rateConfig.getSendCount()/(rateConfig.getSendInterval()*1.0));EmailRateLimiter.acquire(emailSmtpConfig.getId(),rateConfig.getSendCount()/(rateConfig.getSendInterval()*1.0));}try {errorReason = EmailUtil.sendEmail(emailSmtpConfig,subject,replacedContent,recipient.getReceiver());}catch (Exception e){log.error("发送邮件失败",e);}emailLog.setLogCode(logCode);emailLog.setSendTime(LocalDateTime.now(ZoneId.of("Asia/Shanghai")));emailLog.setTemplateId(emailTemplate.getId());emailLog.setTemplateName(emailTemplate.getName());emailLog.setConfigId(emailSmtpConfig.getId());emailLog.setCompanyId(companyId);emailLog.setSubject(subject);emailLog.setContent(replacedContent);emailLog.setReceiver(recipient.getReceiver());emailLog.setSender(emailSmtpConfig.getUsername());}else {//发送邮件的基础信息有误emailLog.setLogCode(logCode);emailLog.setSendTime(LocalDateTime.now(ZoneId.of("Asia/Shanghai")));emailLog.setCompanyId(companyId);emailLog.setReceiver(recipient.getReceiver());}log.info("当前执行线程: {},处理完发邮件任务", Thread.currentThread().getName());Boolean status = false;log.info("当前执行线程:{},执行邮件日志存在: {}", Thread.currentThread().getName());if (StringUtils.isBlank(errorReason)){// 发送成功status = true;emailLog.setStatus(IsEnum.YES.getCode());} else {// 发送失败status = false;emailLog.setStatus(IsEnum.NO.getCode());emailLog.setErrorReason(errorReason);}// 记录发送邮件日志boolean save = emailLogManager.updateByLogCode(emailLog);log.info("当前执行线程:{},执行邮件日志存在: {}", Thread.currentThread().getName(),save);if (StringUtils.isNotBlank(serviceName)){// 使用策略模式执行回调try {SendEmailCallbackStrategy strategy = sendEmailCallbackStrategyContext.getStrategy(serviceName);strategy.executeCallback(logCode, status);} catch (Exception e) {log.error("回调时出现异常,日志编码:<<{}>>,异常消息:{}", logCode, e.getMessage(), e);}}}
@Data
@TableName("ech_nms_email_smtp_rate_config")
public class SmtpRateConfig implements Serializable {/*** 主键*/@TableId(value = "id", type = IdType.AUTO)private Integer id;/*** 公司ID*/private Integer companyId;/*** 服务商名称*/private String name;/*** SMTP服务器地址*/private String smtpHost;/*** 发送间隔(秒)*/private Integer sendInterval;/*** 邮箱每次发送邮件数*/private Integer sendCount;/*** 配置描述*/private String description;/*** 逻辑删除标志(1:已删除,0:未删除)*/@TableLogic(value = "0", delval = "1")private Integer isDeleted;/*** 创建人ID*/@TableField(fill = FieldFill.INSERT)private Integer createUser;/*** 更新人ID*/@TableField(fill = FieldFill.UPDATE)private Integer updateUser;/*** 创建时间*/@TableField(fill = FieldFill.INSERT)private LocalDateTime createTime;/*** 更新时间*/@TableField(fill = FieldFill.UPDATE)private LocalDateTime updateTime;
}