一、增强的数据库设计
1. 客户端账号表 (smpp_clients)
CREATE TABLE `smpp_clients` (`id` BIGINT NOT NULL AUTO_INCREMENT,`system_id` VARCHAR(32) NOT NULL COMMENT 'SMPP账号',`password` VARCHAR(64) NOT NULL COMMENT 'BCrypt加密密码',`ip_whitelist` VARCHAR(512) COMMENT 'IP白名单(逗号分隔)',`max_connections` INT DEFAULT 3 COMMENT '最大并发连接数',`is_active` TINYINT(1) DEFAULT 1 COMMENT '是否启用',`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uk_system_id` (`system_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2. 活跃会话表 (smpp_sessions)
CREATE TABLE `smpp_sessions` (`session_id` VARCHAR(36) NOT NULL COMMENT '会话ID',`client_id` BIGINT NOT NULL COMMENT '关联客户端ID',`remote_ip` VARCHAR(45) NOT NULL COMMENT '客户端IP',`bind_time` DATETIME NOT NULL COMMENT '绑定时间',`last_activity` DATETIME NOT NULL COMMENT '最后活跃时间',PRIMARY KEY (`session_id`),KEY `idx_client` (`client_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
二、核心Java实现
1. 数据库验证的会话处理器
@Service
@RequiredArgsConstructor
public class DatabaseAuthSessionHandler {private final SmppClientRepository clientRepo;private final SmppSessionRepository sessionRepo;private final Map<String, SMPPServerSession> activeSessions = new ConcurrentHashMap<>();/*** 处理新会话连接(含数据库验证)*/public void handleNewSession(SMPPServerSession session) {try {// 1. 等待客户端发送Bind请求BindRequest bindRequest = session.waitForBind(10000);// 2. 从数据库验证客户端SmppClient client = clientRepo.findBySystemId(bindRequest.getSystemId()).filter(c -> c.getIsActive() == 1).filter(c -> BCrypt.checkpw(bindRequest.getPassword(), c.getPassword())).orElseThrow(() -> new SmppAuthException("Invalid credentials"));// 3. IP白名单检查String clientIp = ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress();if (!isIpAllowed(clientIp, client.getIpWhitelist())) {throw new SmppAuthException("IP not allowed");}// 4. 连接数限制检查validateConnectionLimit(client);// 5. 注册会话activeSessions.put(client.getSystemId(), session);registerSession(session, client);// 6. 设置消息处理器session.setMessageReceiverListener(new ClientMessageHandler(client));// 7. 设置心跳检测session.setEnquireLinkTimer(30000);} catch (Exception e) {session.unbindAndClose();log.error("Session setup failed: {}", e.getMessage());}}private boolean isIpAllowed(String ip, String whitelist) {if (whitelist == null || whitelist.isEmpty()) return true;return Arrays.asList(whitelist.split(",")).contains(ip);}private void validateConnectionLimit(SmppClient client) {long activeCount = sessionRepo.countByClientId(client.getId());if (activeCount >= client.getMaxConnections()) {throw new SmppAuthException("Connection limit reached");}}private void registerSession(SMPPServerSession session, SmppClient client) {String sessionId = session.getSessionId();String clientIp = ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress();SmppSession record = new SmppSession();record.setSessionId(sessionId);record.setClientId(client.getId());record.setRemoteIp(clientIp);record.setBindTime(new Date());record.setLastActivity(new Date());sessionRepo.save(record);}
}
2. 多客户端消息处理器
@RequiredArgsConstructor
class ClientMessageHandler implements MessageReceiverListener {private final SmppClient client;private final SmppSessionRepository sessionRepo;@Overridepublic void onAcceptSubmitSm(SubmitSm submitSm, SMPPServerSession source) {try {// 1. 生成唯一消息IDString messageId = "MSG_" + UUID.randomUUID().toString().substring(0, 8);// 2. 模拟处理消息(替换为您的业务逻辑)boolean processSuccess = processBusinessMessage(submitSm.getSourceAddr(),submitSm.getDestAddress().getAddress(),new String(submitSm.getShortMessage(), "ISO-8859-1"));// 3. 立即返回ACCEPTED响应source.processSubmitSmResp(submitSm.getSequenceNumber(),new SubmitSmResp(messageId, MessageState.ACCEPTED));// 4. 异步发送状态报告CompletableFuture.runAsync(() -> {try {Thread.sleep(5000); // 模拟处理延迟sendDeliveryReport(client.getSystemId(),messageId,processSuccess ? "DELIVRD" : "UNDELIV");} catch (Exception e) {log.error("Async report failed", e);}});} catch (Exception e) {log.error("Process message error", e);}}private boolean processBusinessMessage(String from, String to, String text) {// 在这里实现您的实际业务逻辑log.info("Processing message for client {}: {} -> {}: {}", client.getSystemId(), from, to, text);return true; // 模拟处理成功}private void sendDeliveryReport(String clientId, String messageId, String status) {SMPPServerSession session = activeSessions.get(clientId);if (session != null && session.isBound()) {try {DeliverSm deliverSm = new DeliverSm();deliverSm.setSourceAddr("SMPP_SVR");deliverSm.setDestAddress(clientId);deliverSm.setShortMessage(("id:" + messageId + " stat:" + status).getBytes());session.deliverShortMessage(deliverSm);} catch (Exception e) {log.error("Send report failed", e);}}}
}
3. 主动消息发送服务
@Service
@RequiredArgsConstructor
public class SmppMessageSender {private final Map<String, SMPPServerSession> activeSessions;private final SmppClientRepository clientRepo;/*** 主动向指定客户端发送消息*/public void sendToClient(String systemId, String message) throws Exception {SMPPServerSession session = activeSessions.get(systemId);if (session == null || !session.isBound()) {throw new IllegalStateException("Client session not active");}DeliverSm deliverSm = new DeliverSm();deliverSm.setSourceAddr("ADMIN");deliverSm.setDestAddress(systemId);deliverSm.setShortMessage(message.getBytes());session.deliverShortMessage(deliverSm);}/*** 向所有在线客户端广播消息*/public void broadcast(String message) {activeSessions.forEach((systemId, session) -> {if (session.isBound()) {try {DeliverSm deliverSm = new DeliverSm();deliverSm.setSourceAddr("BROADCAST");deliverSm.setDestAddress(systemId);deliverSm.setShortMessage(message.getBytes());session.deliverShortMessage(deliverSm);} catch (Exception e) {log.error("Broadcast to {} failed", systemId, e);}}});}
}
三、管理接口
1. 客户端管理API
@RestController
@RequestMapping("/api/admin/smpp")
@RequiredArgsConstructor
public class SmppAdminController {private final SmppMessageSender messageSender;private final SmppClientRepository clientRepo;@PostMapping("/send-to-client")public ResponseEntity<?> sendToClient(@RequestParam String systemId,@RequestParam String message) {try {messageSender.sendToClient(systemId, message);return ResponseEntity.ok().build();} catch (Exception e) {return ResponseEntity.badRequest().body(e.getMessage());}}@GetMapping("/active-clients")public List<SmppClient> listActiveClients() {return clientRepo.findByIsActive(1);}
}
2. 会话监控API
@GetMapping("/active-sessions")
public List<SmppSession> listActiveSessions() {return sessionRepo.findByLastActivityAfter(Date.from(Instant.now().minus(5, ChronoUnit.MINUTES)));
}
四、配置增强
application.yml
smpp:port: 2775session:timeout: 1800 # 会话超时时间(秒)max-retry: 3 # 消息重试次数spring:datasource:url: jdbc:mysql://localhost:3306/smpp_gatewayusername: smpppassword: your_db_passwordjpa:hibernate.ddl-auto: update
五、关键流程说明
1. 客户端连接流程
2. 消息处理流程
3. 主动消息推送
六、生产环境建议
- 安全增强
- 为管理接口添加JWT认证
- 记录所有消息的完整日志
- 性能优化
- 使用连接池管理数据库连接
- 对消息处理采用线程池异步化
- 高可用
- 部署多个SMPP网关实例 + 负载均衡
- 会话信息共享到Redis
该方案完整实现了:
- 多客户网站并发连接管理
- 数据库驱动的账号验证
- 双向消息通信能力
- 主动消息推送接口
- 完善的生产级监控能力