ETL 中多源数据库元数据同步的方案设计

1. 背景

在 ETL(Extract-Transform-Load)或数据集成平台中,源端数据库的表结构、字段类型、索引、主键、外键等元数据信息是数据抽取与映射的基础。传统方案普遍通过直接查询各数据库的系统表来获取这些信息,例如:

数据库表信息字段信息
MySQLinformation_schema.TABLESinformation_schema.COLUMNS
Oracledba_tables/all_tablesdba_tab_columns
SQL Serversys.tablessys.columns
PostgreSQLinformation_schema.tablesinformation_schema.columns

这种方式存在一个明显弊端:每接入一种新数据库,就需要编写一套针对该系统表的 SQL 查询逻辑,维护成本随数据源种类呈线性增长。

2. 需求

构建一套统一的元数据同步机制,满足以下要求:

  1. 一次编写,多库通用— 不依赖各数据库的系统表差异

  2. 支持动态数据源切换— 可在运行时按数据源名称获取元数据

  3. 异步非阻塞同步— 元数据同步过程不阻塞主线程

  4. 增量更新— 只处理变更部分,删除已不存在的旧元数据

  5. 多租户隔离— 元数据按租户维度存储

3. 设计思路

3.1 核心原理

JDBC 规范提供了java.sql.DatabaseMetaData接口,该接口定义了一系列获取数据库元数据的标准方法。所有符合 JDBC 标准的数据库驱动都必须实现此接口,因此无需关心底层是 MySQL、Oracle 还是 Hive。

┌──────────────────────────────────────────────────────┐ │ qrcb-meta 服务 │ │ │ │ MetaSchemaService MetaTableService │ │ MetaColumnService MetaIndexService │ │ MetaPrimaryKeyService MetaForeignKeyService │ │ │ │ ↓ 统一入口: java.sql.DatabaseMetaData │ │ │ │ ┌──────────┬──────────┬──────────┬──────────┐ │ │ │ MySQL │ Oracle │ Hive │ 其他JDBC │ │ │ │ Driver │ Driver │ Driver │ Driver │ │ │ └──────────┴──────────┴──────────┴──────────┘ │ └──────────────────────────────────────────────────────┘

3.2 元数据层级

元数据按三层结构组织:

层级对应 API实体说明
1. SchemametaData.getSchemas()MetaSchema数据库 / 模式
2. TablemetaData.getTables()MetaTable表 / 视图
3. Column/Index/PK/FKmetaData.getColumns()MetaColumn字段、索引、约束

3.3 同步流程

触发同步 (asyncSchemaMetadata / asyncTableMetadata) │ ├─ 1. 通过 DynamicDataSourceGenResolver 获取目标 DataSource │ ├─ 2. 获取连接 → connection.getMetaData() │ ├─ 3. 调用 DatabaseMetaData 方法获取实时元数据 │ ├─ getSchemas() → List<MetaSchema> │ ├─ getTables() → List<MetaTable> │ ├─ getColumns() → List<MetaColumn> │ ├─ getIndexInfo() → List<MetaIndex> │ ├─ getPrimaryKeys()→ List<MetaPrimaryKey> │ └─ getImportedKeys()→ List<MetaForeignKey> │ ├─ 4. 对比已存储的元数据,删除不存在的旧记录 │ ├─ 5. 设置关联主键 (schemaId, tableId),批量 upsert │ └─ 6. 异步执行 (ThreadPoolTaskExecutor + TenantBroker)

4. 核心依赖包

依赖用途
java.sql.DatabaseMetaDataJDK 内置,元数据查询核心接口
java.sql.ConnectionJDK 内置,数据库连接
javax.sql.DataSourceJDK 内置,数据源抽象
com.qrcb.common.core.datasource项目内部,动态数据源解析 (DynamicDataSourceGenResolver)
com.qrcb.common.core.assemble.util.JdbcUtils项目内部,JDBC ResultSet 安全读取工具
com.qrcb.common.core.data.tenant.TenantBroker项目内部,租户上下文代理
cn.hutool.core.util.StrUtilHutool,字符串工具
com.baomidou.mybatisplus.extension.service.impl.ServiceImplMyBatis-Plus,持久化基类
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutorSpring,异步线程池

5. 核心代码片段

5.1 动态获取表列表(Schema 级)

// MetaTableServiceImpl.getTablesDynamic() public List<MetaTable> getTablesDynamic(String catalog, String schema, String table, String dsName) { DataSource dataSource = DynamicDataSourceGenResolver.fetch(dsName); List<MetaTable> tableList = new ArrayList<>(); try (Connection conn = dataSource.getConnection()) { DatabaseMetaData metaData = conn.getMetaData(); // ★ 核心 API:获取所有表 try (ResultSet rs = metaData.getTables(catalog, schema, table, null)) { ResultSetMetaData rsMeta = rs.getMetaData(); while (rs.next()) { int i = 1; MetaTable t = new MetaTable(); t.setCatalogName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); t.setSchemaName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); t.setTableName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); t.setTableType(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); t.setRemarks(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); // ... 其余元数据字段 tableList.add(t); } } } return filterTable(tableList, catalog, schema, table); }

5.2 获取字段信息

// MetaColumnServiceImpl.getColumnsDynamic() public List<MetaColumn> getColumnsDynamic(String catalog, String schema, String table, String column, String dsName) { DataSource dataSource = DynamicDataSourceGenResolver.fetch(dsName); List<MetaColumn> columnList = new ArrayList<>(); try (Connection conn = dataSource.getConnection()) { DatabaseMetaData metaData = conn.getMetaData(); // ★ 核心 API:获取表字段 try (ResultSet rs = metaData.getColumns(catalog, schema, table, column)) { ResultSetMetaData rsMeta = rs.getMetaData(); while (rs.next()) { int i = 1; MetaColumn col = new MetaColumn(); col.setColumnName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); col.setDataType(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setTypeName(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); col.setColumnSize(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setDecimalDigits(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setNullable(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setRemarks(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); col.setColumnDefault(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); col.setOrdinalPos(JdbcUtils.getIntMetaData(rsMeta, rs, i++)); col.setIsAutoInc(JdbcUtils.getStringMetaData(rsMeta, rs, i++)); // ★ 完整映射 23 个 DatabaseMetaData.getColumns 返回列 columnList.add(col); } } } return columnList; }

5.3 Schema 获取(含 MySQL 兼容处理)

// MetaSchemaServiceImpl.getSchemasDynamic() public List<MetaSchema> getSchemasDynamic(String catalog, String schema, String dsName) { DataSource dataSource = DynamicDataSourceGenResolver.fetch(dsName); List<MetaSchema> schemaList = new ArrayList<>(); try (Connection conn = dataSource.getConnection()) { DatabaseMetaData metaData = conn.getMetaData(); // ★ 标准方式获取 Schema try (ResultSet rs = metaData.getSchemas(catalog, schema)) { while (rs.next()) { MetaSchema s = new MetaSchema(); s.setSchemaName(JdbcUtils.getStringMetaData(rs.getMetaData(), rs, 1)); s.setCatalogName(JdbcUtils.getStringMetaData(rs.getMetaData(), rs, 2)); schemaList.add(s); } } // ★ MySQL 兼容:getSchemas 可能返回空,回退到 getCatalogs if (CollUtil.isEmpty(schemaList) && "MySQL".equalsIgnoreCase(metaData.getDatabaseProductName())) { try (ResultSet rs = metaData.getCatalogs()) { while (rs.next()) { MetaSchema s = new MetaSchema(); s.setCatalogName(JdbcUtils.getStringMetaData(rs.getMetaData(), rs, 1)); schemaList.add(s); } } } } return filterSchema(schemaList, catalog, schema); }

5.4 异步同步入口(多租户 + 线程池)

// MetaTableServiceImpl.asyncTableMetadata() public Boolean asyncTableMetadata(String catalog, String schema, String table, String dsName) { Long tenantId = TenantContextHolder.getTenantId(); // ★ 异步提交到线程池,同时保留租户上下文 threadPoolTaskExecutor.submit(() -> TenantBroker.runAs(tenantId, (id) -> this.syncTableMetadata(crrSchema, table, dsName)) ); return true; }

5.5 完整的同步事务

// MetaTableServiceImpl.syncTableMetadata() @Transactional(rollbackFor = Exception.class) public synchronized Boolean syncTableMetadata(MetaSchema schema, String tableName, String dsName) { // 1. 同步表基本信息 List<MetaTable> tables = getTablesDynamic(catalog, schema, tableName, dsName); // 删除不存在的表 deleteTableNotExist(tableIdList); // upsert 表信息 saveOrUpdateBatch(tables); ​ for (MetaTable table : tables) { // 2. 同步字段 List<MetaColumn> columns = metaColumnService.getColumnsDynamic(...); // 删除不存在的字段 → upsert 字段 ​ // 3. 同步索引 List<MetaIndex> indexes = metaIndexService.getIndexInfo(...); ​ // 4. 同步主键 List<MetaPrimaryKey> pks = metaPrimaryKeyService.getPrimaryKeys(...); ​ // 5. 同步外键 List<MetaForeignKey> fks = metaForeignKeyService.getImportedKeys(...); } return true; }

6. DatabaseMetaData 核心 API 对照表

JDBC API返回用途
getSchemas(catalog, schema)ResultSet获取数据库 Schema 列表
getCatalogs()ResultSet获取 Catalog 列表(MySQL 回退)
getTables(catalog, schema, table, types)ResultSet获取表/视图列表(10 列)
getColumns(catalog, schema, table, column)ResultSet获取字段详情(23 列)
getIndexInfo(catalog, schema, table, unique, approximate)ResultSet获取索引信息
getPrimaryKeys(catalog, schema, table)ResultSet获取主键列
getImportedKeys(catalog, schema, table)ResultSet获取外键列
getDatabaseProductName()String获取数据库产品名
getDatabaseProductVersion()String获取数据库版本

7. 方案优势

对比维度系统表方案JDBC DatabaseMetaData 方案
兼容性每种数据库需单独编写 SQL一次编写,所有 JDBC 驱动通用
维护成本随数据库种类线性增长恒定
类型映射需自行维护类型映射表getColumns()返回标准java.sql.Types
新数据库接入需研究其系统表结构只需配置 JDBC 连接
标准合规依赖数据库厂商实现遵循 JDBC 规范
版本升级系统表结构可能变化JDBC 驱动保证向后兼容

8. 注意事项

  1. MySQL 特殊处理:MySQL 的getSchemas()可能返回空,需回退到getCatalogs()

  2. 空值安全ResultSet.getXXX()在值为NULL时会抛异常,应使用JdbcUtils.getXXXMetaData()安全读取

  3. 性能考量:大表量场景建议按 Schema 分批同步,避免一次性加载过多元数据

  4. 连接管理:务必使用 try-with-resources 确保 Connection / ResultSet 正确关闭

  5. 租户隔离:异步同步时,通过TenantBroker.runAs()保证租户上下文在线程池中正确传递

  6. 幂等设计:通过@Idempotent注解防止短时间内重复提交同步任务