基于Dubbo实现对远程调用接口的封装
- 服务调用者
- 调用
- 统一包装工具 RemoteCallWrapper
- 使用举例
- 服务提供者
- 提供
- Facade注解实现统一RPC结果包装
- 定义Facade注解
- 定义注解的切面处理类
服务调用者
调用
在调用服务中 给被调服务添加@DubboReference(version = "1.0.0")
注解
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("auth")
public class AuthController {@DubboReference(version = "1.0.0")private UserFacadeService userFacadeService;@GetMapping("/get")public String get(){UserQueryResponse response = userFacadeService.query(new UserQueryRequest());return response.getResponseMessage();}
}
统一包装工具 RemoteCallWrapper
public class RemoteCallWrapper {private static Logger logger = LoggerFactory.getLogger(RemoteCallWrapper.class);private static ImmutableSet<String> SUCCESS_CHECK_METHOD = ImmutableSet.of("isSuccess", "isSucceeded","getSuccess");private static ImmutableSet<String> SUCCESS_CODE_METHOD = ImmutableSet.of("getResponseCode");private static ImmutableSet<String> SUCCESS_CODE = ImmutableSet.of("SUCCESS", "DUPLICATE","DUPLICATED_REQUEST");public static <T, R> R call(Function<T, R> function, T request, boolean checkResponse) {return call(function, request, request.getClass().getSimpleName(), checkResponse, false);}public static <T, R> R call(Function<T, R> function, T request) {return call(function, request, request.getClass().getSimpleName(), true, false);}public static <T, R> R call(Function<T, R> function, T request, String requestName) {return call(function, request, requestName, true, false);}public static <T, R> R call(Function<T, R> function, T request, String requestName,boolean checkResponse) {return call(function, request, requestName, checkResponse, false);}public static <T, R> R call(Function<T, R> function, T request, boolean checkResponse, boolean checkResponseCode) {return call(function, request, request.getClass().getSimpleName(), checkResponse, checkResponseCode);}public static <T, R> R call(Function<T, R> function, T request, String requestName, boolean checkResponse,boolean checkResponseCode) {StopWatch stopWatch = new StopWatch();R response = null;try {//计时器stopWatch.start();//实际的远程调用方法response = function.apply(request);stopWatch.stop();//响应有效性检验if (checkResponse) {//fail-fast 非空校验Assert.notNull(response, REMOTE_CALL_RESPONSE_IS_NULL.name());if (!isResponseValid(response)) {logger.error("Response Invalid on Remote Call request {} , response {}",JSON.toJSONString(request),JSON.toJSONString(response));throw new RemoteCallException(JSON.toJSONString(response), REMOTE_CALL_RESPONSE_IS_FAILED);}}//响应状态码有效性检验if (checkResponseCode) {Assert.notNull(response, REMOTE_CALL_RESPONSE_IS_NULL.name());if (!isResponseCodeValid(response)) {logger.error("Response code Invalid on Remote Call request {} , response {}",JSON.toJSONString(request),JSON.toJSONString(response));throw new RemoteCallException(JSON.toJSONString(response), REMOTE_CALL_RESPONSE_IS_FAILED);}}} catch (IllegalAccessException | InvocationTargetException e) {logger.error("Catch Exception on Remote Call :" + e.getMessage(), e);throw new IllegalArgumentException("Catch Exception on Remote Call " + e.getMessage(), e);} catch (Throwable e) {logger.error("request exception {}", JSON.toJSONString(request));logger.error("Catch Exception on Remote Call :" + e.getMessage(), e);throw e;} finally {if (logger.isInfoEnabled()) {logger.info("## Method={} ,## 耗时={}ms ,## [请求报文]:{},## [响应报文]:{}", requestName,stopWatch.getTotalTimeMillis(),JSON.toJSONString(request), JSON.toJSONString(response));}}return response;}private static <R> boolean isResponseValid(R response)throws IllegalAccessException, InvocationTargetException {Method successMethod = null;Method[] methods = response.getClass().getMethods();for (Method method : methods) {String methodName = method.getName();if (SUCCESS_CHECK_METHOD.contains(methodName)) {successMethod = method;break;}}if (successMethod == null) {return true;}return (Boolean) successMethod.invoke(response);}private static <R> boolean isResponseCodeValid(R response)throws IllegalAccessException, InvocationTargetException {Method successMethod = null;Method[] methods = response.getClass().getMethods();for (Method method : methods) {String methodName = method.getName();if (SUCCESS_CODE_METHOD.contains(methodName)) {successMethod = method;break;}}if (successMethod == null) {return true;}return SUCCESS_CODE.contains(successMethod.invoke(response));}
}
使用举例
//支付服务接口调用货物服务接口
GoodsSaleResponse goodsSaleResponse = RemoteCallWrapper.call(req -> goodsFacadeService.paySuccess(req), goodsSaleRequest, "goodsFacadeService.confirmSale");
服务提供者
提供
在服务类上添加 @DubboService(version = "1.0.0")
注解提供RPC服务
@DubboService(version = "1.0.0")
public class UserFacadeServiceImpl implements UserFacadeService {public UserQueryResponse<UserInfo> query(UserQueryRequest userLoginRequest) {UserQueryResponse response = new UserQueryResponse();response.setResponseMessage("hehaha");return response;}
}
Facade注解实现统一RPC结果包装
定义Facade注解
public @interface Facade {
}
定义注解的切面处理类
功能:
- 方法参数校验
- 捕获处理业务异常
- 封装返回结果
- 记录完整的调用日志
package cn.hollis.nft.turbo.rpc.facade;import cn.hollis.nft.turbo.base.exception.BizException;
import cn.hollis.nft.turbo.base.exception.SystemException;
import cn.hollis.nft.turbo.base.response.BaseResponse;
import cn.hollis.nft.turbo.base.response.ResponseCode;
import cn.hollis.nft.turbo.base.utils.BeanValidator;
import com.alibaba.fastjson2.JSON;
import jakarta.validation.ValidationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;@Aspect
@Component
@Order(Integer.MIN_VALUE)
public class FacadeAspect {private static final Logger LOGGER = LoggerFactory.getLogger(FacadeAspect.class);//核心方法 拦截所有添加Facade注解的方法@Around("@annotation(cn.hollis.nft.turbo.rpc.facade.Facade)")public Object facade(ProceedingJoinPoint pjp) throws Exception {//计时器StopWatch stopWatch = new StopWatch();stopWatch.start();//通过反射拿到方法名和参数Method method = ((MethodSignature) pjp.getSignature()).getMethod();Object[] args = pjp.getArgs();LOGGER.info("start to execute , method = " + method.getName() + " , args = " + JSON.toJSONString(args));//获取返回值类型Class returnType = ((MethodSignature) pjp.getSignature()).getMethod().getReturnType();//循环遍历所有参数,进行参数校验//如果参数上添加了限制性注解 通过这个过程进行参数校验/*** 实体类中的类似这种注解的校验* @NotNull(message = "userId不能为空")* private Long userId;*/for (Object parameter : args) {try {BeanValidator.validateObject(parameter);} catch (ValidationException e) {printLog(stopWatch, method, args, "failed to validate", null, e);return getFailedResponse(returnType, e);}}try {// 目标方法执行Object response = pjp.proceed();//参数补全 补全响应状态信息code和messageenrichObject(response);printLog(stopWatch, method, args, "end to execute", response, null);return response;} catch (Throwable throwable) {// 如果执行异常,则返回一个失败的responseprintLog(stopWatch, method, args, "failed to execute", null, throwable);return getFailedResponse(returnType, throwable);}}/*** 日志打印** @param stopWatch* @param method* @param args* @param action* @param response*/private void printLog(StopWatch stopWatch, Method method, Object[] args, String action, Object response,Throwable throwable) {try {//因为此处有JSON.toJSONString,可能会有异常,需要进行捕获,避免影响主干流程LOGGER.info(getInfoMessage(action, stopWatch, method, args, response, throwable), throwable);// 如果校验失败,则返回一个失败的response} catch (Exception e1) {LOGGER.error("log failed", e1);}}/*** 统一格式输出,方便做日志统计* <p>* *** 如果调整此处的格式,需要同步调整日志监控 ***** @param action 行为* @param stopWatch 耗时* @param method 方法* @param args 参数* @param response 响应* @return 拼接后的字符串*/private String getInfoMessage(String action, StopWatch stopWatch, Method method, Object[] args, Object response,Throwable exception) {StringBuilder stringBuilder = new StringBuilder(action);stringBuilder.append(" ,method = ");stringBuilder.append(method.getName());stringBuilder.append(" ,cost = ");stringBuilder.append(stopWatch.getTime()).append(" ms");if (response instanceof BaseResponse) {stringBuilder.append(" ,success = ");stringBuilder.append(((BaseResponse) response).getSuccess());}if (exception != null) {stringBuilder.append(" ,success = ");stringBuilder.append(false);}stringBuilder.append(" ,args = ");stringBuilder.append(JSON.toJSONString(Arrays.toString(args)));if (response != null) {stringBuilder.append(" ,resp = ");stringBuilder.append(JSON.toJSONString(response));}if (exception != null) {stringBuilder.append(" ,exception = ");stringBuilder.append(exception.getMessage());}if (response instanceof BaseResponse) {BaseResponse baseResponse = (BaseResponse) response;if (!baseResponse.getSuccess()) {stringBuilder.append(" , execute_failed");}}return stringBuilder.toString();}/*** 将response的信息补全,主要是code和message** @param response*/private void enrichObject(Object response) {if (response instanceof BaseResponse) {if (((BaseResponse) response).getSuccess()) {//如果状态是成功的,需要将未设置的responseCode设置成SUCCESSif (StringUtils.isEmpty(((BaseResponse) response).getResponseCode())) {((BaseResponse) response).setResponseCode(ResponseCode.SUCCESS.name());}} else {//如果状态是成功的,需要将未设置的responseCode设置成BIZ_ERRORif (StringUtils.isEmpty(((BaseResponse) response).getResponseCode())) {((BaseResponse) response).setResponseCode(ResponseCode.BIZ_ERROR.name());}}}}/*** 定义并返回一个通用的失败响应*/private Object getFailedResponse(Class returnType, Throwable throwable)throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {//如果返回值的类型为BaseResponse 的子类,则创建一个通用的失败响应if (returnType.getDeclaredConstructor().newInstance() instanceof BaseResponse) {BaseResponse response = (BaseResponse) returnType.getDeclaredConstructor().newInstance();response.setSuccess(false);if (throwable instanceof BizException bizException) {response.setResponseMessage(bizException.getErrorCode().getMessage());response.setResponseCode(bizException.getErrorCode().getCode());} else if (throwable instanceof SystemException systemException) {response.setResponseMessage(systemException.getErrorCode().getMessage());response.setResponseCode(systemException.getErrorCode().getCode());} else {response.setResponseMessage(throwable.toString());response.setResponseCode(ResponseCode.BIZ_ERROR.name());}return response;}LOGGER.error("failed to getFailedResponse , returnType (" + returnType + ") is not instanceof BaseResponse");return null;}
}