【深入理解SpringCloud微服务】手写实现一个微服务分布式事务组件
- 什么是分布式事务
- 实现一个分布式事务组件
- 整体流程
- 架构设计
- 代码解析
- 客户端
- GlobalTransaction
- GlobalTransactionScanner
- GlobalTransactionInterceptor
- GlobalTransactionClient
- GlobalTransactionContext
- DataSourceProxyPostProcessor
- DataSourceProxy
- ConnectionProxy
- ConnectionHolder
- GlobalTransactionXidAddInterceptor
- TransactionPropagationInterceptor
- GlobalTransactionRMController
- 服务端
- 源码地址
什么是分布式事务
分布式事务是指在分布式环境中保证事务一致性的机制。
在传统的单体应用中,事务一般是在一个数据库里面完成的,只要保证对同一个数据库的多次增删改操作要么全部成功要么全部失败即可。
而分布式事务则是在多个服务对应多个数据库库表修改的情况下,保证他们的一致性。
常见分布式事务解决方案有AT模式、TCC模式、SAGA模式、XA模式,其中XA模式又分两阶段和三阶段,还有基于MQ的最终一致性解决方案。
由于我们本篇文章的主题是手写一个分布式事务组件,因此不对这些不同类型的分布式事务展开描述。
实现一个分布式事务组件
整体流程
这是一种实现分布式事务的思路,但不是唯一的。我们基于两阶段提交的分布式事务设计我们的分布式事务组件。
我们需要有服务端和客户端。
首先第一阶段开始时,由最开始的那个client注册全局事务,server会返回一个全局事务id,我们称为XID。
注册成功后,开始执行业务逻辑,期间涉及到跨服务调用,XID会沿着调用链往下传递。
我们的分布式事务组件就是要保证跨服务调用过程中,各服务的事务一致性。因此,调用链上的每一个服务都来一套下面这个操作:
如果服务调用链一路下去都正常,那么在二阶段时调用链起点的client可以向server提交全局事务。然后server就可以通知各client提交本地事务。
假如调用链上某个服务出现异常,异常会沿着调用链往上抛,位于调用链起点的服务会catch住这个异常,进行全局事务回滚。
那么在二阶段时调用链起点的client可以向server回滚全局事务。然后server就可以通知各client回滚本地事务。
架构设计
由于调用链起点的服务需要往服务端注册全局事务,因此我们通过AOP扫描注解切入增强逻辑的方式实现全局事务的注册。
AOP的增强逻辑就是一个try-catch代码块。try中开启全局事务、执行业务逻辑;如果没有报错,就提交全局事务;如果报错了,则在catch块中发起全局事务回滚;最后在finally块中清空事务信息。
开启全局事务,我们就通过http客户端工具发送一个http请求给服务端,服务端返回一个xid,然后客户端接收到xid后缓存到ThreadLocal。
执行业务逻辑时,执行的sql都经过我们的Connection代理对象ConnectionProxy,ConnectionProxy的commit方法判断ThreadLocal中缓存有XID,则发送http请求注册分支事务,然后把真正的Connection对象缓存到ConnectionHolder中,而本地事务则是不提交的。
然后当前服务调用其他的微服务,则请求头携带XID,其他的微服务也是缓存到ThreadLocal,然后其他的微服务也有ConnectionProxy和ConnectionHolder,也是由ConnectionProxy注册分支事务,并缓存Connection到ConnectionHolder。
这样,通过每个参与者都向服务端注册分支事务,服务端就保存了XID与该全局事务所有参与者的信息的影射了。
然后,位于调用链起点的微服务根据是否有报错,决定提交全局事务还是回滚全局事务。无论是提交全局事务还是回滚全局事务,都是向服务端发送一个http请求。而清空事务信息则是clear掉ThreadLocal中缓存的XID。
最后,由服务端根据XID取得所有事务参与者的ip和port,通过http通知它们提交或回滚。事务参与者接收到通知,通过XID从ConnectionHolder取得Connection对象,真正向数据库发起commit或rollback操作。
代码解析
客户端
客户端的META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.huangjunyi1993.simple.microservice.global.transaction.config.GlobalTransactionConfig,\
com.huangjunyi1993.simple.microservice.global.transaction.config.HttpAutoConfiguration
引入两个配置类GlobalTransactionConfig,HttpAutoConfiguration,在配置类中引入的核心类如下:
GlobalTransaction
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface GlobalTransaction {
}
这个是我们定义的注解,如果一个方法要启用分布式事务,只需要添加@GlobalTransaction注解即可。
GlobalTransactionScanner
public class GlobalTransactionScanner extends AbstractAutoProxyCreator {@Autowiredprivate GlobalTransactionInterceptor globalTransactionInterceptor;@Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {try {// 从目标类中遍历每一个方法,看是否有被@GlobalTransaction注解修饰// 如果有,则调用父类的wrapIfNecessary方法,生成并返回代理对象Class<?> serviceInterface = findTargetClass(bean);for (Method method : serviceInterface.getMethods()) {GlobalTransaction annotation = method.getAnnotation(GlobalTransaction.class);if (annotation != null) {return super.wrapIfNecessary(bean, beanName, cacheKey);}}} catch (...) {...}return bean;}// 查找目标类的方法private Class<?> findTargetClass(Object bean) throws Exception {...}// 重写父类的getAdvicesAndAdvisorsForBean返回指定的AOP增强拦截器GlobalTransactionInterceptor@Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {return new Object[] { globalTransactionInterceptor };}
}
GlobalTransactionScanner继承了spring的AbstractAutoProxyCreator,重写wrapIfNecessary方法,判断如果目标类有被@GlobalTransaction修饰的方法,则返回代理对象,并指定AOP增强处理器是GlobalTransactionInterceptor。
GlobalTransactionInterceptor
public class GlobalTransactionInterceptor implements MethodInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionInterceptor.class);@Autowiredprivate GlobalTransactionClient client;@Autowiredprivate GlobalTransactionContext globalTransactionContext;@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {// 获取当前方法上的GlobalTransaction注解,如果不存在则直接执行原方法。GlobalTransaction annotation = invocation.getMethod().getAnnotation(GlobalTransaction.class);if (annotation == null) {return invocation.proceed();}try {Object rs = null;// 开启事务client.begin();try {// 执行业务方法rs = invocation.proceed();} catch (Exception e) {LOGGER.error(e.toString());// 回滚事务client.rollback();throw e;}// 提交事务client.commit();return rs;} finally {// 清空事务上下文clear();}}private void clear() {globalTransactionContext.remove(XID);}}
GlobalTransactionInterceptor中首先判断当前调用的方法是否有@GlobalTransaction注解,如果没有,则直接执行目标方法。如果有@GlobalTransaction注解,则进入try-catch块中的全局事务模板代码:开启全局事务,执行目标方法,提交/回滚全局事务。最后会清空事务上下文。
GlobalTransactionClient
public class GlobalTransactionClient implements EnvironmentAware {@Autowiredprivate TransactionCoordinatorProperties transactionCoordinatorProperties;@Autowiredprivate GlobalTransactionContext globalTransactionContext;private int serverPort;// 请求服务端回滚全局事务public void rollback() throws IOException {...}// 请求服务端提交全局事务public void commit() throws IOException {...}// 请求服务端注册分支事务public void register() throws IOException {...}...// 请求服务端开启全局事务public void begin() throws IOException {// 使用okhttp工具发送http请求// 请求服务端/tc/begin接口,开启全局事务OkHttpClient okHttpClient=new OkHttpClient();String requestUrl = transactionCoordinatorProperties.getAddress() + "/tc/begin";requestUrl = checkUrlPrefix(requestUrl);okhttp3.RequestBody requestBody = new FormBody.Builder().build();Request.Builder builder = new Request.Builder();Request request = builder.url(requestUrl).post(requestBody).build();Call call = okHttpClient.newCall(request);Response response = call.execute();String xid = null;if (response.isSuccessful() && StringUtils.isNotBlank(xid = Objects.requireNonNull(response.body()).string())) {// 把服务端返回的全局事务id存到GlobalTransactionContext中globalTransactionContext.put(XID, xid);return;}throw new RuntimeException("call tc begin transaction failed");}...@Overridepublic void setEnvironment(Environment environment) {serverPort = Integer.valueOf(environment.getProperty("server.port", "8080"));}
}
GlobalTransactionClient封装了通过http请求服务端开启、提交、回滚全局事务以及注册分支事务等方法。由于代码基本差不多,这里只展示了一个开启全局事务的begin方法。
GlobalTransactionContext
public class GlobalTransactionContext {public static final String XID = "xid";private ThreadLocal<Map<String, String>> threadLocal = ThreadLocal.withInitial(HashMap::new);public String put(String key, String value) {return threadLocal.get().put(key, value);}public String get(String key) {return threadLocal.get().get(key);}public String remove(String key) {return threadLocal.get().remove(key);}}
GlobalTransactionContext是全局事务上下文,用ThreadLocal存储当前线程的XID。
DataSourceProxyPostProcessor
public class DataSourceProxyPostProcessor implements BeanPostProcessor {private GlobalTransactionContext globalTransactionContext;private GlobalTransactionClient globalTransactionClient;public DataSourceProxyPostProcessor(GlobalTransactionContext globalTransactionContext, GlobalTransactionClient globalTransactionClient) {this.globalTransactionContext = globalTransactionContext;this.globalTransactionClient = globalTransactionClient;}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 如果是DataSource类型,封装成DataSourceProxy返回return bean instanceof DataSource ? new DataSourceProxy((DataSource) bean, globalTransactionContext, globalTransactionClient) : bean;}
}
DataSourceProxyPostProcessor是一个Spring后置处理器,在postProcessAfterInitialization方法判断如果bean是DataSource类型,封装成DataSourceProxy返回。
DataSourceProxy
public class DataSourceProxy implements DataSource {private DataSource dataSource;private GlobalTransactionContext globalTransactionContext;private GlobalTransactionClient globalTransactionClient;...@Overridepublic Connection getConnection() throws SQLException {// 获取Connection对象并包装成ConnectionProxy返回Connection connection = dataSource.getConnection();return new ConnectionProxy(connection, globalTransactionContext, globalTransactionClient);}...}
DataSourceProxy是数据源代理,它的作用就是当获取connection数据库连接对象时,封装成ConnectionProxy返回。
ConnectionProxy
public class ConnectionProxy implements Connection {private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxy.class);private Connection connection;private GlobalTransactionContext globalTransactionContext;private GlobalTransactionClient globalTransactionClient;...@Overridepublic void commit() throws SQLException {if (globalTransactionContext.get(GlobalTransactionContext.XID) == null) {connection.commit();}// 注册分支事务try {globalTransactionClient.register();} catch (IOException e) {LOGGER.error(e.toString());throw new RuntimeException("register branch transaction error");}// 保持连接ConnectionHolder.add(globalTransactionContext.get(GlobalTransactionContext.XID), this);}public void realCommit() throws SQLException {connection.commit();}...@Overridepublic void close() throws SQLException {if (globalTransactionContext.get(GlobalTransactionContext.XID) == null) {connection.commit();}}public void realClose() throws SQLException {connection.commit();}...
}
ConnectionProxy中省略掉的方法都是直接调用的Connection的同名方法。ConnectionProxy真正重写的只有commit方法和close方法。
commit方法判断当前线程在globalTransactionContext中是否保存有XID。如果没有则直接提交;如果有,则请求服务端注册分支事务,然后把当前连接对象保存到ConnectionHolder中。
close方法只有判断当前线程在globalTransactionContext中是没有XID,才会提交,否则啥也不干。
此外,还增加了realCommit方法和realClose方法。用于接收到服务端提交本地事务通知时调用。
ConnectionHolder
public class ConnectionHolder {private static final Map<String, ConnectionProxy> CONNECTION_PROXY_MAP = new ConcurrentHashMap<>();public static void add(String xid, ConnectionProxy connectionProxy) {CONNECTION_PROXY_MAP.put(xid, connectionProxy);}public static ConnectionProxy remove(String xid) {return CONNECTION_PROXY_MAP.remove(xid);}}
ConnectionHolder是暂存连接对象的容器,第一阶段完成以后,会把连接对象暂存到这里,待第二阶段收到服务端的事务提交或回滚的通知时,根据XID从这里取出对应的连接对象进行真正的事务提交和回滚。
GlobalTransactionXidAddInterceptor
GlobalTransactionXidAddInterceptor是一个RestTemplate的拦截器,这里模仿了Ribbon的做法,往RestTemplate的拦截器链中放入自定义的拦截器。
@Beanpublic GlobalTransactionXidAddInterceptor globalTransactionXidAddInterceptor() {// 自定义的RestTemplate拦截器,发送http请求时往请求头中添加XID,把XID传递给下游服务return new GlobalTransactionXidAddInterceptor();}@Beanpublic RestTemplateCustomizer restTemplateCustomizer(GlobalTransactionXidAddInterceptor globalTransactionXidAddInterceptor) {// 往RestTemplate的连接器链中加入我们的拦截器globalTransactionXidAddInterceptorreturn (restTemplate) -> {List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();interceptors.add(globalTransactionXidAddInterceptor);};}@Beanpublic SmartInitializingSingleton loadBalancedRestTemplateInitializer(List<RestTemplateCustomizer> customizers) {// Spring扩展点,在Spring完成非懒加载单例bean的初始化后回触发回调,调用我们的RestTemplateCustomizerreturn () -> restTemplates.forEach(restTemplate -> customizers.forEach(customizer -> customizer.customize(restTemplate)));}
SmartInitializingSingleton是Spring的扩展点,在Spring完成非懒加载单例bean的初始化后回触发回调,我们这里的SmartInitializingSingleton会调用我们注册的RestTemplateCustomizer,往RestTemplate的拦截器链中加入我们的GlobalTransactionXidAddInterceptor。
public class GlobalTransactionXidAddInterceptor implements ClientHttpRequestInterceptor {@Autowiredprivate GlobalTransactionContext globalTransactionContext;@Overridepublic ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {// 检查当前线程在GlobalTransactionContext中是否保存了XID,// 如果有的话要添加到请求头,把它传递给下游服务String xid = globalTransactionContext.get(XID);if (StringUtils.isNotBlank(xid)) {HttpHeaders headers = request.getHeaders();headers.put(XID, Collections.singletonList(xid));}return execution.execute(request, body);}
}
GlobalTransactionXidAddInterceptor的作用就是在发送http请求时,检查当前线程在GlobalTransactionContext中是否保存了XID,如果有的话要添加到请求头,把它传递给下游服务。
TransactionPropagationInterceptor
public class TransactionPropagationInterceptor implements HandlerInterceptor {private GlobalTransactionContext globalTransactionContext;...@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 从请求头中获取xidString xid = request.getHeader(XID);if (StringUtils.isNotBlank(xid)) {// 把xid放入globalTransactionContext中与当前线程绑定globalTransactionContext.put(XID, xid);}return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {// 从globalTransactionContext移除xidglobalTransactionContext.remove(XID);}
}
TransactionPropagationInterceptor是一个SpringMVC拦截器。前置拦截preHandle方法会检查请求头是否携带了XID,如果有则把它放入globalTransactionContext中与当前线程绑定。afterCompletion方法则从globalTransactionContext中移除XID。
GlobalTransactionRMController
@RestController
@RequestMapping("/rm")
public class GlobalTransactionRMController {@PostMapping("/commit/{xid}")public void commit(@PathVariable String xid) throws SQLException {// 接收到服务端发送的提交事务的回调通知// 从ConnectionHolder中根据XID取出对应的连接对象,提交事务,关闭连接ConnectionProxy connectionProxy;if ((connectionProxy = ConnectionHolder.remove(xid)) != null) {connectionProxy.realCommit();connectionProxy.realClose();}}@PostMapping("/rollback/{xid}")public void rollback(@PathVariable String xid) throws SQLException {// 接收到服务端发送的回滚事务的回调通知// 从ConnectionHolder中根据XID取出对应的连接对象,回滚事务,关闭连接ConnectionProxy connectionProxy;if ((connectionProxy = ConnectionHolder.remove(xid)) != null) {connectionProxy.rollback();connectionProxy.realClose();}}}
GlobalTransactionRMController是客户端的Controller,用于在二阶段接收服务端发送的提交或回滚事务的回调通知。
GlobalTransactionRMController从ConnectionHolder中根据XID取出对应的连接对象。如果是提交事务操作,则调用realCommit方法提交本地事务,然后调用realClose关闭连接。如果是回滚事务操作,则调用rollback方法回滚本地事务,然后调用realClose关闭连接。
服务端
服务端的/META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.huangjunyi1993.simple.microservice.global.transaction.coordinator.config.CoordinatorAutoConfig
CoordinatorAutoConfig:
@Configuration
@ComponentScan(basePackages = {"com.huangjunyi1993.simple.microservice.global.transaction.coordinator.controller"})
public class CoordinatorAutoConfig {}
服务端的代码比较简单,在spring.factories中指定自动配置类CoordinatorAutoConfig,通过SpringBoot的自动装配机制进行配置类的自动加载。然后CoordinatorAutoConfig通过@ComponentScan注解配置包扫描路径,扫描到的只有一个TransactionCoordinatorController类。
TransactionCoordinatorController类用于接收客户端发送的全局事务开启、提交、回滚,以及分支事务注册等http请求。
@RestController
@RequestMapping("/tc")
public class TransactionCoordinatorController {private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCoordinatorController.class);// 全局事务id集合private Set<String> xidSet = new CopyOnWriteArraySet<>();// 全局事务id与参与者的host的映射private Map<String, Set<String>> xidHostsMap = new ConcurrentHashMap<>();// 开启全局事务@PostMapping("/begin")public String begin() {// 使用UUID生成一个全局事务id(XID),放入xidSet中,并且返回XID给客户端String xid = UUID.randomUUID().toString();xidSet.add(xid);return xid;}// 提交全局事务@PostMapping("/commit")public void commit(String xid) throws IOException {// 根据XID从xidHostsMap中取出对应全局事务所有参与者的hostSet<String> hosts = xidHostsMap.get(xid);if (CollectionUtils.isEmpty(hosts)) {LOGGER.info("hosts is not exists");return;}// 遍历每一个host,回调通知该参与者提交本地事务for (String host : hosts) {OkHttpClient okHttpClient=new OkHttpClient();// http://{全局事务参与者的host}/rm/commit/{xid}String requestUrl = host + "/rm/commit/" + xid;requestUrl = checkUrlPrefix(requestUrl);okhttp3.RequestBody requestBody = new FormBody.Builder().build();Request.Builder builder = new Request.Builder();Request request = builder.url(requestUrl).post(requestBody).build();Call call = okHttpClient.newCall(request);Response response = call.execute();if (!response.isSuccessful()) {throw new RuntimeException("call rm commit transaction failed");}}hosts.remove(xid);}// 回滚全局事务@PostMapping("/rollback")public void rollback(String xid) throws IOException {// 根据XID从xidHostsMap中取出对应全局事务所有参与者的hostSet<String> hosts = xidHostsMap.get(xid);if (CollectionUtils.isEmpty(hosts)) {return;}// 遍历每一个host,回调通知该参与者回滚本地事务for (String host : hosts) {OkHttpClient okHttpClient=new OkHttpClient();// http://{全局事务参与者的host}/rm/rollback/{xid}String requestUrl = host + "/rm/rollback/" + xid;requestUrl = checkUrlPrefix(requestUrl);okhttp3.RequestBody requestBody = okhttp3.RequestBody.create(MediaType.parse("application/json;charset=utf-8"), "");Request.Builder builder = new Request.Builder();Request request = builder.url(requestUrl).post(requestBody).build();Call call = okHttpClient.newCall(request);Response response = call.execute();if (!response.isSuccessful()) {throw new RuntimeException("call rm rollback transaction failed");}}hosts.remove(xid);}// 注册分支事务@PostMapping("/register")public void register(String xid, String host) {if (!xidSet.contains(xid)) {LOGGER.info("xid is not exists");return;}// 往xidHostsMap中与指定XID对应的Set中添加当前全局事务参与者的hostif (!xidHostsMap.containsKey(xid)) {xidHostsMap.put(xid, new CopyOnWriteArraySet<>());}Set<String> set = xidHostsMap.get(xid);set.add(host);}...}
这里偷了个懒,TransactionCoordinatorController并没有把全局事务id存入数据库,而是放在内存中的xidSet集合,包括全局事务id与参与者的host的映射也是放入一个Map中。
“/tc/begin”接口开启全局事务,其实就是生成一个XID放入xidSet;“/tc/register”接口注册分支事务,其实就是往xidHostsMap中与指定XID对应的Set中添加当前全局事务参与者的host。
而“/tc/commit”接口提交全局事务以及“/tc/rollback”回滚全局事务,其实就是根据xidHostsMap记录的事务参与者的host,使用okhttp工具发送http请求客户端的GlobalTransactionRMController。
以上就是这个微服务分布式事务组件的所有源码介绍,下面是整体流程与类之间的关系图:
源码地址
以上代码已经上传到Gitee,想看的可以自取。
客户端:https://gitee.com/huang_junyi/simple-microservice/tree/master/simple-microservice-global-transaction-client
服务端:https://gitee.com/huang_junyi/simple-microservice/tree/master/simple-microservice-global-transaction-coordinator