文章目录
- 项目地址
- 一、KeyCloak
- 二、OutBox Pattern
- 2.1 配置Common模块的OutBox
- 1. OutboxMessage
- 2. 数据库配置OutboxMessageConfiguration
- 3. 创建Save前的EF拦截器
- 4. 创建Quartz后台任务
- 5. 配置后台任务
- 6. 注册服务
- 2.2 创建OutBox的消费者
- 1. 自定义IDomainEventHandler
- 2. 定义抽象类DomainEventHandler
- 3. 更改之前的EventHandler
- 4. 注册DomainEventHandler
- 5. 创建DomainEventHandlersFactory
- 6. 使用DomainEventHandlersFactory
- 7. 幂等性包装器事件处理器
- 2.3 Inbox Pattern
项目地址
- 教程作者:
- 教程地址:
- 代码仓库地址:
- 所用到的框架和插件:
dbt
airflow
一、KeyCloak
王教员 029-033
二、OutBox Pattern
王 38-40
2.1 配置Common模块的OutBox
1. OutboxMessage
- 定义Outbox message
namespace Evently.Common.Infrastructure.Outbox;public sealed class OutboxMessage
{//消息idpublic Guid Id { get; init; }//消息类型 例如UserRegisteredEventpublic string Type { get; init; }//消息内容 Json格式public string Content { get; init; }//消息发生时间public DateTime OccurredOnUtc { get; init; }//消息处理时间,可为空,表示没有被消费public DateTime? ProcessedOnUtc { get; init; }//消息错误信息public string? Error { get; init; }
}
2. 数据库配置OutboxMessageConfiguration
- 创建OutBoxMessage表需要用到的数据库配置
namespace Evently.Common.Infrastructure.Outbox;
public sealed class OutboxMessageConfiguration : IEntityTypeConfiguration<OutboxMessage>
{public void Configure(EntityTypeBuilder<OutboxMessage> builder){builder.ToTable("outbox_messages");builder.HasKey(o => o.Id);builder.Property(o => o.Content).HasMaxLength(2000).HasColumnType("jsonb");}
}
- 在User模块里需要用到消息服务,所以需要在User模块的schema里添加表的migration
namespace Evently.Modules.Users.Infrastructure.Database;
public sealed class UsersDbContext(DbContextOptions<UsersDbContext> options) : DbContext(options), IUnitOfWork
{internal DbSet<User> Users { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){modelBuilder.HasDefaultSchema(Schemas.Users);modelBuilder.ApplyConfiguration(new OutboxMessageConfiguration());modelBuilder.ApplyConfiguration(new UserConfiguration());modelBuilder.ApplyConfiguration(new RoleConfiguration());modelBuilder.ApplyConfiguration(new PermissionConfiguration());}
}
- 在User模块下,执行迁移,生成outbox_messages表
3. 创建Save前的EF拦截器
- 之前我们使用的拦截器是在数据库save之后在进行事件的发布,现在我们直接在save之前,自动将领域事件转换为OutBoxMessage并存入数据库
public sealed class InsertOutboxMessagesInterceptor : SaveCha