ETL 中多源数据库元数据同步的方案设计
1. 背景
在 ETL(Extract-Transform-Load)或数据集成平台中,源端数据库的表结构、字段类型、索引、主键、外键等元数据信息是数据抽取与映射的基础。传统方案普遍通过直接查询各数据库的系统表来获取这些信息,例如:
| 数据库 | 表信息 | 字段信息 |
|---|---|---|
| MySQL | information_schema.TABLES | information_schema.COLUMNS |
| Oracle | dba_tables/all_tables | dba_tab_columns |
| SQL Server | sys.tables | sys.columns |
| PostgreSQL | information_schema.tables | information_schema.columns |
这种方式存在一个明显弊端:每接入一种新数据库,就需要编写一套针对该系统表的 SQL 查询逻辑,维护成本随数据源种类呈线性增长。
2. 需求
构建一套统一的元数据同步机制,满足以下要求:
一次编写,多库通用— 不依赖各数据库的系统表差异
支持动态数据源切换— 可在运行时按数据源名称获取元数据
异步非阻塞同步— 元数据同步过程不阻塞主线程
增量更新— 只处理变更部分,删除已不存在的旧元数据
多租户隔离— 元数据按租户维度存储
3. 设计思路
3.1 核心原理
JDBC 规范提供了java.sql.DatabaseMetaData接口,该接口定义了一系列获取数据库元数据的标准方法。所有符合 JDBC 标准的数据库驱动都必须实现此接口,因此无需关心底层是 MySQL、Oracle 还是 Hive。
┌──────────────────────────────────────────────────────┐ │ qrcb-meta 服务 │ │ │ │ MetaSchemaService MetaTableService │ │ MetaColumnService MetaIndexService │ │ MetaPrimaryKeyService MetaForeignKeyService │ │ │ │ ↓ 统一入口: java.sql.DatabaseMetaData │ │ │ │ ┌──────────┬──────────┬──────────┬──────────┐ │ │ │ MySQL │ Oracle │ Hive │ 其他JDBC │ │ │ │ Driver │ Driver │ Driver │ Driver │ │ │ └──────────┴──────────┴──────────┴──────────┘ │ └──────────────────────────────────────────────────────┘
3.2 元数据层级
元数据按三层结构组织:
| 层级 | 对应 API | 实体 | 说明 |
|---|---|---|---|
| 1. Schema | metaData.getSchemas() | MetaSchema | 数据库 / 模式 |
| 2. Table | metaData.getTables() | MetaTable | 表 / 视图 |
| 3. Column/Index/PK/FK | metaData.getColumns()等 | MetaColumn等 | 字段、索引、约束 |
3.3 同步流程
触发同步 (asyncSchemaMetadata / asyncTableMetadata) │ ├─ 1. 通过 DynamicDataSourceGenResolver 获取目标 DataSource │ ├─ 2. 获取连接 → connection.getMetaData() │ ├─ 3. 调用 DatabaseMetaData 方法获取实时元数据 │ ├─ getSchemas() → List<MetaSchema> │ ├─ getTables() → List<MetaTable> │ ├─ getColumns() → List<MetaColumn> │ ├─ getIndexInfo() → List<MetaIndex> │ ├─ getPrimaryKeys()→ List<MetaPrimaryKey> │ └─ getImportedKeys()→ List<MetaForeignKey> │ ├─ 4. 对比已存储的元数据,删除不存在的旧记录 │ ├─ 5. 设置关联主键 (schemaId, tableId),批量 upsert │ └─ 6. 异步执行 (ThreadPoolTaskExecutor + TenantBroker)
4. 核心依赖包
| 依赖 | 用途 |
|---|---|
java.sql.DatabaseMetaData | JDK 内置,元数据查询核心接口 |
java.sql.Connection | JDK 内置,数据库连接 |
javax.sql.DataSource | JDK 内置,数据源抽象 |
com.qrcb.common.core.datasource | 项目内部,动态数据源解析 (DynamicDataSourceGenResolver) |
com.qrcb.common.core.assemble.util.JdbcUtils | 项目内部,JDBC ResultSet 安全读取工具 |
com.qrcb.common.core.data.tenant.TenantBroker | 项目内部,租户上下文代理 |
cn.hutool.core.util.StrUtil | Hutool,字符串工具 |
com.baomidou.mybatisplus.extension.service.impl.ServiceImpl | MyBatis-Plus,持久化基类 |
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor | Spring,异步线程池 |
5. 核心代码片段
5.1 动态获取表列表(Schema 级)
// MetaTableServiceImpl.getTablesDynamic() public List<MetaTable> getTablesDynamic(String catalog, String schema, String table, String dsName) { DataSource dataSource = DynamicDataSourceGenResolver.fetch(dsName); List<MetaTable> tableList = new ArrayList<>(); try (Connection conn = dataSource.getConnection()) { DatabaseMetaData metaData = conn.getMetaData(); // ★ 核心 API:获取所有表 try (ResultSet rs = metaData.getTables(catalog, schema, table, null)) { ResultSetMetaData rsMeta = rs.getMetaData(); while (rs.next()) { int i = 1; MetaTable t = new MetaTable(); t.setCatalogName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); t.setSchemaName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); t.setTableName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); t.setTableType(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); t.setRemarks(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); // ... 其余元数据字段 tableList.add(t); } } } return filterTable(tableList, catalog, schema, table); }5.2 获取字段信息
// MetaColumnServiceImpl.getColumnsDynamic() public List<MetaColumn> getColumnsDynamic(String catalog, String schema, String table, String column, String dsName) { DataSource dataSource = DynamicDataSourceGenResolver.fetch(dsName); List<MetaColumn> columnList = new ArrayList<>(); try (Connection conn = dataSource.getConnection()) { DatabaseMetaData metaData = conn.getMetaData(); // ★ 核心 API:获取表字段 try (ResultSet rs = metaData.getColumns(catalog, schema, table, column)) { ResultSetMetaData rsMeta = rs.getMetaData(); while (rs.next()) { int i = 1; MetaColumn col = new MetaColumn(); col.setColumnName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); col.setDataType(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setTypeName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); col.setColumnSize(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setDecimalDigits(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setNullable(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setRemarks(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); col.setColumnDefault(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); col.setOrdinalPos(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setIsAutoInc(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); // ★ 完整映射 23 个 DatabaseMetaData.getColumns 返回列 columnList.add(col); } } } return columnList; }5.3 Schema 获取(含 MySQL 兼容处理)
// MetaSchemaServiceImpl.getSchemasDynamic() public List<MetaSchema> getSchemasDynamic(String catalog, String schema, String dsName) { DataSource dataSource = DynamicDataSourceGenResolver.fetch(dsName); List<MetaSchema> schemaList = new ArrayList<>(); try (Connection conn = dataSource.getConnection()) { DatabaseMetaData metaData = conn.getMetaData(); // ★ 标准方式获取 Schema try (ResultSet rs = metaData.getSchemas(catalog, schema)) { while (rs.next()) { MetaSchema s = new MetaSchema(); s.setSchemaName(JdbcUtils.getStringMetaData(rs.getMetaData(), rs, 1)); s.setCatalogName(JdbcUtils.getStringMetaData(rs.getMetaData(), rs, 2)); schemaList.add(s); } } // ★ MySQL 兼容:getSchemas 可能返回空,回退到 getCatalogs if (CollUtil.isEmpty(schemaList) && "MySQL".equalsIgnoreCase(metaData.getDatabaseProductName())) { try (ResultSet rs = metaData.getCatalogs()) { while (rs.next()) { MetaSchema s = new MetaSchema(); s.setCatalogName(JdbcUtils.getStringMetaData(rs.getMetaData(), rs, 1)); schemaList.add(s); } } } } return filterSchema(schemaList, catalog, schema); }5.4 异步同步入口(多租户 + 线程池)
// MetaTableServiceImpl.asyncTableMetadata() public Boolean asyncTableMetadata(String catalog, String schema, String table, String dsName) { Long tenantId = TenantContextHolder.getTenantId(); // ★ 异步提交到线程池,同时保留租户上下文 threadPoolTaskExecutor.submit(() -> TenantBroker.runAs(tenantId, (id) -> this.syncTableMetadata(crrSchema, table, dsName)) ); return true; }5.5 完整的同步事务
// MetaTableServiceImpl.syncTableMetadata() @Transactional(rollbackFor = Exception.class) public synchronized Boolean syncTableMetadata(MetaSchema schema, String tableName, String dsName) { // 1. 同步表基本信息 List<MetaTable> tables = getTablesDynamic(catalog, schema, tableName, dsName); // 删除不存在的表 deleteTableNotExist(tableIdList); // upsert 表信息 saveOrUpdateBatch(tables); for (MetaTable table : tables) { // 2. 同步字段 List<MetaColumn> columns = metaColumnService.getColumnsDynamic(...); // 删除不存在的字段 → upsert 字段 // 3. 同步索引 List<MetaIndex> indexes = metaIndexService.getIndexInfo(...); // 4. 同步主键 List<MetaPrimaryKey> pks = metaPrimaryKeyService.getPrimaryKeys(...); // 5. 同步外键 List<MetaForeignKey> fks = metaForeignKeyService.getImportedKeys(...); } return true; }6. DatabaseMetaData 核心 API 对照表
| JDBC API | 返回 | 用途 |
|---|---|---|
getSchemas(catalog, schema) | ResultSet | 获取数据库 Schema 列表 |
getCatalogs() | ResultSet | 获取 Catalog 列表(MySQL 回退) |
getTables(catalog, schema, table, types) | ResultSet | 获取表/视图列表(10 列) |
getColumns(catalog, schema, table, column) | ResultSet | 获取字段详情(23 列) |
getIndexInfo(catalog, schema, table, unique, approximate) | ResultSet | 获取索引信息 |
getPrimaryKeys(catalog, schema, table) | ResultSet | 获取主键列 |
getImportedKeys(catalog, schema, table) | ResultSet | 获取外键列 |
getDatabaseProductName() | String | 获取数据库产品名 |
getDatabaseProductVersion() | String | 获取数据库版本 |
7. 方案优势
| 对比维度 | 系统表方案 | JDBC DatabaseMetaData 方案 |
|---|---|---|
| 兼容性 | 每种数据库需单独编写 SQL | 一次编写,所有 JDBC 驱动通用 |
| 维护成本 | 随数据库种类线性增长 | 恒定 |
| 类型映射 | 需自行维护类型映射表 | getColumns()返回标准java.sql.Types |
| 新数据库接入 | 需研究其系统表结构 | 只需配置 JDBC 连接 |
| 标准合规 | 依赖数据库厂商实现 | 遵循 JDBC 规范 |
| 版本升级 | 系统表结构可能变化 | JDBC 驱动保证向后兼容 |
8. 注意事项
MySQL 特殊处理:MySQL 的
getSchemas()可能返回空,需回退到getCatalogs()空值安全:
ResultSet.getXXX()在值为NULL时会抛异常,应使用JdbcUtils.getXXXMetaData()安全读取性能考量:大表量场景建议按 Schema 分批同步,避免一次性加载过多元数据
连接管理:务必使用 try-with-resources 确保 Connection / ResultSet 正确关闭
租户隔离:异步同步时,通过
TenantBroker.runAs()保证租户上下文在线程池中正确传递幂等设计:通过
@Idempotent注解防止短时间内重复提交同步任务