欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > ShardingSphere源码解析:跨库查询案例分析

ShardingSphere源码解析:跨库查询案例分析

2025/12/16 14:32:04 来源:https://blog.csdn.net/qq_74156239/article/details/148570867  浏览:    关键词:ShardingSphere源码解析:跨库查询案例分析

引言

在分布式系统中,数据分片是解决数据规模增长的重要手段。ShardingSphere作为分布式数据库中间件,提供了强大的分片查询能力。本文将深入ShardingSphere源码,通过一个完整的查询示例,详细解析其如何从不同库中查询数据,特别关注广播表这一关键特性。

一、ShardingSphere查询处理流程概览

ShardingSphere的查询处理遵循经典的"解析→路由→执行→归并"四阶段模型:

  1. SQL解析:将SQL语句解析为抽象语法树(AST)
  2. SQL路由:根据分片规则确定SQL要访问的数据源和表
  3. SQL执行:并行执行路由后的SQL
  4. 结果归并:合并多数据源返回的结果集

下面通过一个完整的查询示例,深入源码分析这个过程。

二、完整查询示例:跨分片查询订单数据

假设我们有一个订单系统,按用户ID分片,配置如下:

# 分片规则配置
rules:- !SHARDINGtables:t_order:actualDataNodes: ds${0..1}.t_order${0..1}databaseStrategy:standard:shardingColumn: user_idshardingAlgorithmName: database-inlinetableStrategy:standard:shardingColumn: order_idshardingAlgorithmName: table-inlinet_dict:broadcast: true  # 配置为广播表shardingAlgorithms:database-inline:type: INLINEprops:algorithm-expression: ds${user_id % 2}table-inline:type: INLINEprops:algorithm-expression: t_order${order_id % 2}

现在我们执行一个查询:

SELECT o.*, d.dict_name 
FROM t_order o 
JOIN t_dict d ON o.status = d.dict_code 
WHERE user_id = 1001 AND order_id > 2000 
LIMIT 10;

这个查询涉及分片表t_order和广播表t_dict的关联,让我们深入源码,看看ShardingSphere如何处理。

三、源码深度解析

1. SQL解析阶段

SQL解析是查询处理的第一步,ShardingSphere使用Antlr4构建的解析引擎将SQL转换为抽象语法树。

核心类与流程

// ShardingSphere-JDBC 5.3.2版本核心解析入口
public final class SQLParserEngine {private final DatabaseType databaseType;private final Map<String, SQLParser> parsers;public SQLStatement parse(final String sql, final boolean useCache) {// 根据数据库类型选择解析器SQLParser sqlParser = SQLParserFactory.newInstance(databaseType, sql);// 解析SQL语句,生成ASTSQLVisitorRule sqlVisitorRule = SQLVisitorRuleEngine.getSQLVisitorRule(databaseType, sqlParser.getSQLStatementType());SQLVisitor<Object> visitor = SQLVisitorFactory.newInstance(databaseType, sqlVisitorRule, sqlParser.getParseTree(), parsers);// 返回解析后的SQL语句对象return (SQLStatement) sqlParser.getParseTree().accept(visitor);}
}

对于我们的查询,解析器会生成一个包含以下信息的SelectStatement对象:

  • 查询表名:t_order, t_dict
  • 查询条件:user_id = 1001 AND order_id > 2000
  • 连接条件:o.status = d.dict_code
  • 分页信息:LIMIT 10

2. SQL路由阶段

路由阶段是ShardingSphere的核心,它决定SQL将访问哪些数据源和表。特别关注广播表的处理逻辑。

核心类与流程

// 标准分片路由引擎
public final class StandardRoutingEngine implements RoutingEngine {private final ShardingRule shardingRule;private final TableRule tableRule;private final SQLStatementContext sqlStatementContext;@Overridepublic RouteResult route(final List<Object> parameters) {// 1. 提取分片条件Collection<ShardingCondition> shardingConditions = createShardingConditions(parameters);// 2. 计算数据源分片Collection<String> routedDataSources = routeDataSources(shardingConditions);// 3. 计算表分片Collection<RouteUnit> routeUnits = new LinkedList<>();for (String each : routedDataSources) {routeUnits.addAll(routeTables(each, shardingConditions));}return new RouteResult(routeUnits);}// 广播表路由特殊处理private Collection<RouteUnit> routeBroadcastTable(final String logicTable, final String dataSource) {Collection<RouteUnit> result = new LinkedList<>();// 广播表在每个数据源都有完整副本// 直接使用逻辑表名作为物理表名result.add(new RouteUnit(new DataSourceName(dataSource), new TableUnit(logicTable, logicTable)));return result;}// 处理多表关联查询private Collection<RouteUnit> routeTables(final String routedDataSource, final Collection<ShardingCondition> shardingConditions) {Collection<RouteUnit> result = new LinkedList<>();// 遍历SQL中涉及的所有表for (String logicTable : sqlStatementContext.getTablesContext().getTableNames()) {if (isBroadcastTable(logicTable)) {// 广播表处理:直接路由到当前数据源result.addAll(routeBroadcastTable(logicTable, routedDataSource));} else {// 分片表处理TableRule tableRule = shardingRule.getTableRule(logicTable);ShardingAlgorithm tableShardingAlgorithm = tableRule.getTableShardingStrategy().getShardingAlgorithm();Collection<String> availableTargetNames = tableRule.getActualTables(routedDataSource);// 执行分片算法Collection<String> routedTables = tableShardingAlgorithm.doSharding(availableTargetNames, shardingConditions);// 创建路由单元for (String each : routedTables) {result.add(new RouteUnit(new DataSourceName(routedDataSource), new TableUnit(logicTable, each)));}}}return result;}// 判断是否为广播表private boolean isBroadcastTable(final String tableName) {return shardingRule.getBroadcastTables().contains(tableName);}
}

对于我们的查询,路由过程如下:

分片表t_order的路由

  1. 提取分片键值:user_id=1001, order_id>2000
  2. 计算数据源:1001 % 2 = 1,路由到ds1
  3. 由于order_id是范围条件,无法精确计算,需要扫描ds1中的所有表:t_order0和t_order1

广播表t_dict的路由

  1. 识别t_dict为广播表
  2. 直接路由到当前数据源ds1,物理表名为t_dict

最终路由单元

  • ds1.t_order0, ds1.t_dict
  • ds1.t_order1, ds1.t_dict

3. SQL执行阶段

执行阶段负责将路由后的SQL发送到对应的数据源执行。

核心类与流程

// SQL执行引擎
public final class StatementExecutor {private final ConnectionMode connectionMode;private final ShardingRuntimeContext runtimeContext;private final List<Object> parameters;public <T> List<T> executeQuery(final ExecuteCallback<T> executeCallback) throws SQLException {// 1. 创建执行单元Collection<ExecutionUnit> executionUnits = createExecutionUnits();// 2. 执行查询return executeGroup(executionUnits, executeCallback);}private Collection<ExecutionUnit> createExecutionUnits() {Collection<ExecutionUnit> result = new LinkedList<>();// 遍历路由单元for (RouteUnit each : routeContext.getRouteUnits()) {// 创建物理SQLString sql = sqlRewriteContext.getSql();Map<String, Object> attributes = sqlRewriteContext.getAttributes();// 创建执行单元result.add(new ExecutionUnit(each.getDataSourceMapper().getActualName(), new SQLUnit(sql, parameters, attributes)));}return result;}private <T> List<T> executeGroup(final Collection<ExecutionUnit> executionUnits, final ExecuteCallback<T> executeCallback) throws SQLException {// 分组执行(按数据源分组)Map<String, List<SQLUnit>> sqlUnitGroups = groupSQLUnitsByDataSource(executionUnits);List<T> result = new ArrayList<>(sqlUnitGroups.size());// 并行执行各组SQLfor (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {result.addAll(executeGroupInternal(entry.getKey(), entry.getValue(), executeCallback));}return result;}private <T> List<T> executeGroupInternal(final String dataSourceName, final List<SQLUnit> sqlUnits, final ExecuteCallback<T> executeCallback) throws SQLException {// 获取数据源DataSource dataSource = runtimeContext.getDataSourceMap().get(dataSourceName);// 创建JDBC连接try (Connection connection = dataSource.getConnection()) {// 执行SQL并获取结果List<T> result = new ArrayList<>(sqlUnits.size());for (SQLUnit each : sqlUnits) {try (PreparedStatement preparedStatement = connection.prepareStatement(each.getSql())) {// 设置参数setParameters(preparedStatement, each.getParameters());// 执行查询result.add(executeCallback.execute(preparedStatement));}}return result;}}
}

对于我们的查询,执行过程如下:

  1. 生成两个物理SQL:
    • SELECT o.*, d.dict_name FROM ds1.t_order0 o JOIN ds1.t_dict d ON o.status = d.dict_code WHERE user_id = 1001 AND order_id > 2000 LIMIT 10;
    • SELECT o.*, d.dict_name FROM ds1.t_order1 o JOIN ds1.t_dict d ON o.status = d.dict_code WHERE user_id = 1001 AND order_id > 2000 LIMIT 10;
  2. 从连接池获取ds1的连接
  3. 并行执行这两个SQL
  4. 获取两个结果集

4. 结果归并阶段

归并阶段负责将多个数据源返回的结果合并为一个统一的结果集。

核心类与流程

// 归并引擎
public final class MergeEngine {private final DatabaseType databaseType;private final SelectStatementContext selectStatementContext;private final List<QueryResult> queryResults;public QueryResult merge() throws SQLException {// 1. 创建归并器链List<ResultMerger> resultMergers = createResultMergers();// 2. 执行归并QueryResult result = queryResults.isEmpty() ? EmptyQueryResult.INSTANCE : queryResults.get(0);for (ResultMerger each : resultMergers) {result = each.merge(result, queryResults, selectStatementContext);}return result;}private List<ResultMerger> createResultMergers() {List<ResultMerger> result = new LinkedList<>();// 处理聚合函数if (!selectStatementContext.getProjectionsContext().getAggregationProjections().isEmpty()) {result.add(new AggregationDistinctMerger());result.add(new AggregationMerger());}// 处理分组if (!selectStatementContext.getGroupByContext().getItems().isEmpty()) {result.add(new GroupByMerger());}// 处理排序if (!selectStatementContext.getOrderByContext().getItems().isEmpty()) {result.add(new OrderByMerger());}// 处理分页if (null != selectStatementContext.getLimit()) {result.add(new LimitMerger());}return result;}
}

对于我们的查询,归并过程如下:

  1. 由于查询没有聚合函数和分组,直接进入排序阶段
  2. 假设查询没有指定ORDER BY,ShardingSphere会按主键排序
  3. 使用优先级队列对两个结果集进行归并排序
  4. 应用LIMIT 10,返回前10条记录

四、广播表的核心机制详解

1. 广播表的定义与用途

广播表是ShardingSphere中的一种特殊表,它在每个数据源中都有完整的副本。广播表适用于以下场景:

  • 数据量较小的字典表(如地区表、状态码表)
  • 经常与分片表进行关联查询的表
  • 数据变更不频繁的基础数据表

2. 广播表的源码实现

2.1 配置解析
// 分片规则配置解析
public final class ShardingRuleConfigurationYamlSwapper implements TypeBasedYamlSwapper<YamlShardingRuleConfiguration, ShardingRuleConfiguration> {@Overridepublic ShardingRuleConfiguration swapToObject(final YamlShardingRuleConfiguration yamlConfig) {ShardingRuleConfiguration result = new ShardingRuleConfiguration();// 解析广播表配置if (null != yamlConfig.getBroadcastTables()) {result.getBroadcastTables().addAll(yamlConfig.getBroadcastTables());}// 其他配置解析...return result;}
}
2.2 广播表路由优化
// 广播表路由优化
public final class BroadcastRoutingEngine implements RoutingEngine {private final ShardingRule shardingRule;private final Collection<String> logicTables;@Overridepublic RouteResult route(final List<Object> parameters) {RouteResult result = new RouteResult();// 获取所有数据源Collection<String> dataSources = shardingRule.getDataSourceMap().keySet();// 为每个广播表在每个数据源创建路由单元for (String logicTable : logicTables) {for (String dataSource : dataSources) {result.getRouteUnits().add(new RouteUnit(new DataSourceName(dataSource), new TableUnit(logicTable, logicTable)));}}return result;}
}
2.3 广播表数据一致性保证

ShardingSphere通过以下机制保证广播表的数据一致性:

// 广播表数据一致性保证
public final class BroadcastTableConsistencyChecker implements ConsistencyChecker {@Overridepublic boolean check(final String dataSourceName, final String logicTableName) {// 获取所有数据源Map<String, DataSource> dataSourceMap = shardingRuntimeContext.getDataSourceMap();// 获取广播表的所有物理表Collection<String> actualTables = shardingRuntimeContext.getRuleMetaData().getRules().stream().filter(each -> each instanceof ShardingRule).map(each -> (ShardingRule) each).flatMap(each -> each.getBroadcastTables().stream()).filter(each -> each.equalsIgnoreCase(logicTableName)).findFirst().map(each -> dataSourceMap.keySet().stream().map(dataSource -> each).collect(Collectors.toList())).orElse(Collections.emptyList());// 检查每个数据源中的表结构和数据一致性// 实际实现中会比较表结构、数据行数等return isSchemaConsistent(dataSourceMap, actualTables) && isDataConsistent(dataSourceMap, actualTables);}private boolean isSchemaConsistent(final Map<String, DataSource> dataSourceMap, final Collection<String> actualTables) {// 检查表结构一致性// 实际实现中会比较表字段、索引等return true;}private boolean isDataConsistent(final Map<String, DataSource> dataSourceMap, final Collection<String> actualTables) {// 检查数据一致性// 实际实现中会比较数据行数、数据摘要等return true;}
}

3. 广播表的优势与注意事项

优势

  • 显著减少跨库连接查询的开销
  • 简化SQL编写,无需考虑分片键关联
  • 提高查询性能,尤其是多表关联场景

注意事项

  • 广播表数据变更需要同步到所有数据源
  • 不适合大数据量表(会导致存储成本增加)
  • 需要定期检查各数据源中广播表的一致性

五、跨分片查询的优化机制

1. 广播表优化

// 广播表与分片表关联查询优化
public final class ShardingMergeEngine {public QueryResult merge(final SelectStatementContext selectStatementContext, final List<QueryResult> queryResults) {// 如果是广播表与分片表的关联查询if (isBroadcastJoin(selectStatementContext)) {// 优化处理:直接在各分片内完成关联return new BroadcastJoinMergedResult(queryResults, selectStatementContext);}// 其他合并逻辑...}
}

2. 分页优化

// 内存分页优化
public int getMemoryPageOffset() {if (isSameLogicSQLWithDifferentParameters()) {// 当SQL相同但参数不同时,需要计算总的offsetreturn calculateTotalOffset();}return limit.getOffsetValue();
}

3. 并行执行优化

// 并行执行框架
public final class ExecuteEngine {private final ExecutorService executorService;public <T> List<T> execute(final Collection<? extends Callable<T>> callables) throws SQLException {try {// 使用线程池并行执行return executorService.invokeAll(callables).stream().map(this::getResult).collect(Collectors.toList());} catch (final InterruptedException ex) {Thread.currentThread().interrupt();throw new SQLException("Interrupted", ex);}}
}

六、总结与最佳实践

1. 核心机制总结

  • 分片键驱动路由:ShardingSphere通过分片键精确计算目标数据源和表
  • 广播表优化:将小表复制到所有数据源,消除跨库连接开销
  • 并行执行框架:基于线程池实现多数据源并行查询
  • 流式结果归并:通过装饰器模式实现结果集的流式处理和归并
  • 查询重写:自动调整SQL以适应分片环境

2. 最佳实践建议

  1. 合理设计分片键:选择查询频率高、分布均匀的字段作为分片键
  2. 利用广播表:对于字典表等小表,配置为广播表减少跨库查询
  3. 避免全表扫描:尽量在SQL中包含分片键条件
  4. 优化分页:避免大偏移量分页,考虑使用游标分页
  5. 监控与调优:利用ShardingSphere的监控功能,分析慢查询并优化
  6. 定期检查广播表一致性:确保各数据源中广播表数据一致

通过深入理解ShardingSphere的源码实现,特别是广播表这一关键特性,我们可以更好地利用其分片能力,避免常见的性能陷阱,构建高效、稳定的分布式数据库系统。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词