提示:
OracleDatabaseSchema
类的核心功能是管理与 Oracle 数据库相关的模式信息,并处理数据库模式变更事件。
文章目录
- 前言
- 一、核心功能
- 二、代码分析
- 总结
前言
提示:OracleDatabaseSchema
类主要用于数据库同步工具中,帮助捕捉和处理 Oracle 数据库中的模式变更,并特别关注 LOB 类型的数据处理。它通过维护表结构信息和 LOB 列的映射,确保在模式变更时能够正确地更新内部状态,并支持 LOB 数据类型的特殊处理。
提示:以下是本篇文章正文内容
一、核心功能
核心功能详细说明
-
模式管理:
- 存储和管理数据库表的信息,包括 LOB 列的映射。
- 提供方法来获取数据库表的信息 (
getTables
)。
-
LOB 列处理:
- 使用
ConcurrentMap
(lobColumnsByTableId
) 来存储每个表的 LOB 列信息。 - 提供方法来获取特定表的 LOB 列列表 (
getLobColumnsForTable
)。 - 支持判断给定值是否为 LOB 列的不可用值占位符 (
isColumnUnavailableValuePlaceholder
)。 - 提供静态方法来识别 LOB 列类型(如 CLOB、NCLOB、BLOB 和 SQLXML)。
- 使用
-
模式变更处理:
- 实现
applySchemaChange
方法来响应数据库模式的变更事件,例如表的创建、修改或删除。 - 根据事件类型更新内部状态,包括构建和注册新的表模式,或者移除已删除的表模式。
- 记录 DDL 语句,以便追踪模式的变化。
- 实现
-
值转换:
- 使用
OracleValueConverters
对象来处理数据库值的转换。
- 使用
-
初始化和配置:
- 通过构造函数接收配置参数,如连接配置、值转换器等。
- 初始化成员变量,并调用父类构造函数来设置一些通用的配置。
-
DDL 解析:
- 提供
getDdlParser
方法来获取 DDL 解析器。
- 提供
二、代码分析
// 定义一个线程安全的映射,用于存储每个表的 LOB 列信息
private final ConcurrentMap<TableId, List<Column>> lobColumnsByTableId = new ConcurrentHashMap<>();// 用于转换数据库值的转换器实例
private final OracleValueConverters valueConverters;// 标记是否已经执行了存储初始化
private boolean storageInitializationExecuted = false;// 构造函数,用于初始化 OracleDatabaseSchema 实例
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters,DefaultValueConverter defaultValueConverter, SchemaNameAdjuster schemaNameAdjuster,TopicNamingStrategy<TableId> topicNamingStrategy, TableNameCaseSensitivity tableNameCaseSensitivity) {// 调用父类构造函数进行初始化super(connectorConfig, topicNamingStrategy, connectorConfig.getTableFilters().dataCollectionFilter(),connectorConfig.getColumnFilter(),new TableSchemaBuilder(// 创建 TableSchemaBuilder 实例valueConverters,defaultValueConverter,schemaNameAdjuster,connectorConfig.customConverterRegistry(),connectorConfig.getSourceInfoStructMaker().schema(),connectorConfig.getFieldNamer(),false),// 设置是否忽略表名大小写敏感性TableNameCaseSensitivity.INSENSITIVE.equals(tableNameCaseSensitivity),// 设置键映射器connectorConfig.getKeyMapper());// 保存值转换器实例this.valueConverters = valueConverters;// 创建并初始化 DDL 解析器实例this.ddlParser = new OracleDdlParser(true,false,connectorConfig.isSchemaCommentsHistoryEnabled(),valueConverters,connectorConfig.getTableFilters().dataCollectionFilter());
}// 返回所有表的信息
public Tables getTables() {return tables();
}// 返回值转换器实例
public OracleValueConverters getValueConverters() {return valueConverters;
}// 返回 DDL 解析器实例
@Override
public OracleDdlParser getDdlParser() {return ddlParser;
}// 应用模式变更事件
@Override
public void applySchemaChange(SchemaChangeEvent schemaChange) {LOGGER.debug("Applying schema change event {}", schemaChange);// 根据变更事件的类型进行不同的处理switch (schemaChange.getType()) {case CREATE:case ALTER:// 遍历变更事件中的表变更schemaChange.getTableChanges().forEach(x -> {// 构建并注册新的表模式buildAndRegisterSchema(x.getTable());// 更新表信息tables().overwriteTable(x.getTable());});break;case DROP:// 遍历变更事件中的表变更schemaChange.getTableChanges().forEach(x -> {// 移除已删除的表模式removeSchema(x.getId());});break;default:// 处理其他未知类型的变更事件}// 检查是否只存储捕获的表if (!storeOnlyCapturedTables() ||// 检查变更事件中的表是否包含在过滤器中schemaChange.getTables().stream().map(Table::id).anyMatch(getTableFilter()::isIncluded)) {LOGGER.debug("Recorded DDL statements for database '{}': {}", schemaChange.getDatabase(), schemaChange.getDdl());// 记录 DDL 语句record(schemaChange, schemaChange.getTableChanges());}
}// 移除指定表的模式,并从 LOB 映射中移除
@Override
protected void removeSchema(TableId id) {super.removeSchema(id);lobColumnsByTableId.remove(id);
}// 构建并注册表的模式,同时缓存 LOB 列以提高性能
@Override
protected void buildAndRegisterSchema(Table table) {if (getTableFilter().isIncluded(table.id())) {super.buildAndRegisterSchema(table);// 构建并注册表的 LOB 列buildAndRegisterTableLobColumns(table);}
}// 获取指定表的 LOB 列列表
public List<Column> getLobColumnsForTable(TableId id) {return lobColumnsByTableId.getOrDefault(id, Collections.emptyList());
}// 判断给定值是否为 LOB 列的不可用值占位符
public boolean isColumnUnavailableValuePlaceholder(Column column, Object value) {if (isClobColumn(column) || isXmlColumn(column)) {// 对于 CLOB 或 XML 列,检查字符串占位符return valueConverters.getUnavailableValuePlaceholderString().equals(value);}else if (isBlobColumn(column)) {// 对于 BLOB 列,检查二进制占位符return ByteBuffer.wrap(valueConverters.getUnavailableValuePlaceholderBinary()).equals(value);}return false;
}// 判断提供的列模型是否为 LOB 数据类型
public static boolean isLobColumn(Column column) {return isClobColumn(column) || isBlobColumn(column);
}// 判断提供的列模型是否为 XML 数据类型
public static boolean isXmlColumn(Column column) {return column.jdbcType() == OracleTypes.SQLXML;
}// 判断提供的列模型是否为 CLOB 或 NCLOB 数据类型
private static boolean isClobColumn(Column column) {return column.jdbcType() == OracleTypes.CLOB || column.jdbcType() == OracleTypes.NCLOB;
}// 判断提供的列模型是否为 BLOB 数据类型
private static boolean isBlobColumn(Column column) {return column.jdbcType() == OracleTypes.BLOB;
}// 构建并注册表的 LOB 列
private void buildAndRegisterTableLobColumns(Table table) {final List<Column> lobColumns = new ArrayList<>();for (Column column : table.columns()) {// 遍历表的所有列,查找 LOB 列switch (column.jdbcType()) {case OracleTypes.CLOB:case OracleTypes.NCLOB:case OracleTypes.BLOB:case OracleTypes.SQLXML:// 如果是 LOB 列,则添加到列表中lobColumns.add(column);break;}}if (!lobColumns.isEmpty()) {// 如果有 LOB 列,则将其添加到映射中lobColumnsByTableId.put(table.id(), lobColumns);}else {// 如果没有 LOB 列,则从映射中移除lobColumnsByTableId.remove(table.id());}
}
封装
OracleDatabaseSchema
类通过私有字段和方法实现了良好的封装。例如:
ConcurrentMap<TableId, List<Column>> lobColumnsByTableId
作为私有字段存储每个表的 LOB 列信息,保证了数据的安全性和一致性。OracleValueConverters valueConverters
作为私有字段存储值转换器实例,用于处理数据库值的转换。- 多个私有方法(如
buildAndRegisterTableLobColumns
)用于内部逻辑处理,不对外暴露,保证了类的内部实现细节不会被外部直接访问。
继承
该类通过继承父类(未展示)来复用通用的行为和属性,例如 tables()
方法可能是在父类中定义的,这有助于减少代码重复,并且使得 OracleDatabaseSchema
可以专注于处理 Oracle 数据库特有的逻辑。
多态
通过重写父类的方法(如 applySchemaChange
和 removeSchema
),OracleDatabaseSchema
类可以根据 Oracle 数据库的特点提供特定的实现。这种多态性使得类可以灵活地扩展和定制行为。
模块化
OracleDatabaseSchema
类通过分离关注点来实现模块化设计。例如:
applySchemaChange
方法处理模式变更事件,根据事件类型进行相应的操作。buildAndRegisterTableLobColumns
方法专门处理 LOB 列的构建和注册。isColumnUnavailableValuePlaceholder
等方法提供了特定的逻辑判断。
启发
- 清晰的职责划分:将不同的功能分解到不同的方法中,使得每个方法都有明确的责任,易于理解和维护。
- 利用多态和继承:通过继承和多态来扩展和定制行为,可以有效地避免代码重复,并使类更加灵活。
- 封装和隐藏实现细节:通过私有字段和方法来保护内部状态,只暴露必要的接口给外部使用,增强了代码的安全性和可维护性。
- 利用设计模式:例如使用工厂模式创建
TableSchemaBuilder
实例,可以提高代码的灵活性和可扩展性。
代码亮点
- 使用
ConcurrentMap
:通过使用ConcurrentMap
(ConcurrentHashMap
) 来存储 LOB 列信息,提高了并发处理的能力,适合多线程环境下的使用。 - 静态方法的使用:通过提供静态方法(如
isLobColumn
、isXmlColumn
等)来判断列类型,简化了外部调用者对列类型的判断逻辑。 - 明确的方法命名:方法命名清晰直观,如
getLobColumnsForTable
、isColumnUnavailableValuePlaceholder
等,有助于快速理解方法的功能。
总结
提示:OracleDatabaseSchema
类负责管理 Oracle 数据库的模式信息,并处理模式变更事件,特别是针对 LOB (Large Object) 数据类型的处理。它通过维护表结构信息和 LOB 列的映射,确保在模式变更时能够正确地更新内部状态,并支持 LOB 数据类型的特殊处理,从而帮助捕捉和处理 Oracle 数据库中的模式变更。