SpringBootShardingJDBC分库分表(草稿)
- 电脑硬件
- 2025-08-23 11:54:02

ShardingJDBC分库分表 1.Maven 引用 <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <version>4.1.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> 2.数据库和表格 数据库 *****_ch *****_hk *****_us *****_olap 表格 kline kline_D_0 kline_D_1 ....... kline_D_15 kline kline_M_0 kline_M_1 ....... kline_M_15 kline_m1 kline_m1_250121 ....... kline_m1_2501221 kline_M5_0 ....... kline_M5_15 kline_M30_0 ....... kline_M30_15 kline_M60_0 ....... kline_M60_15 kline_W_0 ....... kline_W_15 kline_Y_0 ....... kline_Y_15 trade_record_240101 trade_record_250213_0 ........ trade_record_250221_249 CREATE DEFINER=`admin`@`%` PROCEDURE `CreateKlineTables`() BEGIN DECLARE i INT DEFAULT 0; DECLARE j INT DEFAULT 0; DECLARE table_name VARCHAR(64); DECLARE date_parts TEXT; DECLARE date_part VARCHAR(10); -- 定义时间周期数组 SET date_parts = 'M5,M30,M60,D,W,M,Y'; -- 循环遍历时间周期 WHILE j < LENGTH(date_parts) - LENGTH(REPLACE(date_parts, ',', '')) + 1 DO SET date_part = SUBSTRING_INDEX(SUBSTRING_INDEX(date_parts, ',', j + 1), ',', -1); -- 循环创建表 SET i = 0; WHILE i < 16 DO SET table_name = CONCAT('kline_', date_part, '_', i); SET @sql = CONCAT(' CREATE TABLE IF NOT EXISTS ', table_name, ' LIKE kline'); PREPARE stmt FROM @sql; EXECUTE stmt; DEALLOCATE PREPARE stmt; SET i = i + 1; END WHILE; SET j = j + 1; END WHILE; END CREATE DEFINER=`admin`@`%` PROCEDURE `CreateTradeRecordTables`(IN date_part VARCHAR(10)) BEGIN DECLARE i INT DEFAULT 0; DECLARE table_name VARCHAR(64); -- 循环创建表 WHILE i < 250 DO SET table_name = CONCAT('trade_record_', date_part, '_', i); SET @sql = CONCAT(' CREATE TABLE IF NOT EXISTS ', table_name, ' like trade_record_240101'); PREPARE stmt FROM @sql; EXECUTE stmt; DEALLOCATE PREPARE stmt; SET i = i + 1; END WHILE; END CREATE DEFINER=`admin`@`%` PROCEDURE `DropTradeRecordTables`(IN date_part VARCHAR(10)) BEGIN DECLARE i INT DEFAULT 0; DECLARE table_name VARCHAR(64); -- 循环删除表 WHILE i < 250 DO SET table_name = CONCAT('trade_record_', date_part, '_', i); SET @sql = CONCAT('DROP TABLE IF EXISTS ', table_name); PREPARE stmt FROM @sql; EXECUTE stmt; DEALLOCATE PREPARE stmt; SET i = i + 1; END WHILE; END 2.application.yaml配置 配置文件 spring: port: 8888 tomcat: uri-encoding: UTF-8 max-http-post-size: 20MB max-http-header-size: 20MB http: encoding: force: true charset: UTF-8 enabled: true aop: auto: true main: allow-bean-definition-overriding: true jpa: database-platform: org.hibernate.dialect.MySQL5InnoDBDialect show-sql: false hibernate: ddl-auto: none dsx: olap: type: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.cj.jdbc.Driver jdbcUrl: username: password: hikari: maximum-pool-size: 20 minimum-idle: 20 shardingsphere: datasource: names: center, ds0, ds1, ds2 center: type: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.cj.jdbc.Driver jdbcUrl: username: password: hikari: maximum-pool-size: 20 minimum-idle: 20 ds0: type: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.cj.jdbc.Driver jdbcUrl: username: password: hikari: maximum-pool-size: 20 minimum-idle: 20 ds1: type: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.cj.jdbc.Driver jdbcUrl: username: password: hikari: maximum-pool-size: 20 minimum-idle: 20 ds2: type: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.cj.jdbc.Driver jdbcUrl: username: password: hikari: maximum-pool-size: 20 minimum-idle: 20 props: sql: show: false sharding: default-data-source-name: center tables: trade_record: actual-data-nodes: ds$->{0..2}.trade_record_$->{0..10} database-strategy: standard: sharding-column: market_code precise-algorithm-class-name: com.zzc.sharding.DbShardingByMarketTypeAlgorithm table-strategy: complex: sharding-columns: trade_date,symbol_id algorithm-class-name: com.zzc.sharding.TableShardingByDateAndSymbolAlgorithm kline_m1: actual-data-nodes: ds$->{0..2}.kline_m1 # actual-data-nodes: ds$->{0..1} database-strategy: standard: sharding-column: market_code precise-algorithm-class-name: com.zzc.sharding.DbShardingByMarketTypeAlgorithm table-strategy: complex: sharding-columns: trade_date algorithm-class-name: com.zzc.sharding.TableShardingByDateAlg kline: actual-data-nodes: ds$->{0..2}.kline_${['M5', 'M30','M60','D','W','M','Y']}_${0..15} # actual-data-nodes: ds$->{0..1} database-strategy: standard: sharding-column: market_code precise-algorithm-class-name: com.zzc.sharding.DbShardingByMarketTypeAlgorithm table-strategy: complex: sharding-columns: kline_type,symbol_id algorithm-class-name: com.zzc.sharding.TableShardingByKlineTypeAndSymbolIdAlg 创建路由规则 DbShardingByMarketTypeAlgorithm package com.zzc.sharding; import java.util.Collection; @Slf4j public class DbShardingByMarketTypeAlgorithm implements PreciseShardingAlgorithm<String> { private DatabaseShardingConfig config; @Override public String doSharding(Collection<String> collection, PreciseShardingValue<String> preciseShardingValue) { // 从 sql 中获取 marketType String marketType = preciseShardingValue.getValue(); if (config == null) { config = SpringContextUtil.getBean(DatabaseShardingConfig.class); } // 依据 marketType 获取配置的数据库名 String dbName = config.getDbName(marketType); if (!collection.contains(dbName)) { log.error("Database sharding error. column-value : [{}], DatabaseShardingConfig dbName : [{}], shardingsphere configs : [{}]", marketType, dbName, collection); throw new IllegalArgumentException("Database sharding error."); } return dbName; } } TableShardingByDateAndSymbolAlgorithm package com.zzc.sharding; @Slf4j public class TableShardingByDateAndSymbolAlgorithm implements ComplexKeysShardingAlgorithm { private static final String FIELD_NAME_DATE = "trade_date"; private static final String FIELD_NAME_SYMBOL = "symbol_id"; private DatabaseShardingConfig config; @Override public Collection<String> doSharding(Collection collection, ComplexKeysShardingValue complexKeysShardingValue) { if (config == null) { config = SpringContextUtil.getBean(DatabaseShardingConfig.class); } // 从 sql 中获取成交日期 data 字段 String date = ((List<String>) complexKeysShardingValue.getColumnNameAndShardingValuesMap().get(FIELD_NAME_DATE)).get(0); // 从 sql 中获取成交日期 symbol_id 字段 Long symbolId = ((List<Long>) complexKeysShardingValue.getColumnNameAndShardingValuesMap().get(FIELD_NAME_SYMBOL)).get(0); // 以逻辑表名 x + "241118_1" 类似字符串为实际表名,返回最终的表名 String logicTable = complexKeysShardingValue.getLogicTableName(); DatabaseShardingConfig.TableShardingConfig shardingConfig = config.getTableShardingConfig(logicTable); return Collections.singletonList(logicTable + "_" + date.substring(2).replaceAll("-", "") + "_" + symbolId % shardingConfig.getTableShardingNum()); } } TableShardingByDateAlg package com.zzc.sharding; @Slf4j public class TableShardingByDateAlg implements ComplexKeysShardingAlgorithm { @Override public Collection<String> doSharding(Collection collection, ComplexKeysShardingValue complexKeysShardingValue) { // 从 sql 中获取成交日期 trade_date 字段 String date = ((List<String>) complexKeysShardingValue.getColumnNameAndShardingValuesMap().get("trade_date")).get(0); // 以逻辑表名 x + "_241118" 类似字符串为实际表名,返回最终的表名 String logicTable = complexKeysShardingValue.getLogicTableName(); return Collections.singletonList(logicTable + "_" + date.substring(2).replaceAll("-", "")); } } TableShardingByKlineTypeAndSymbolIdAlg package com.zzc.sharding; @Slf4j public class TableShardingByKlineTypeAndSymbolIdAlg implements ComplexKeysShardingAlgorithm { private DatabaseShardingConfig config; @Override public Collection<String> doSharding(Collection collection, ComplexKeysShardingValue complexKeysShardingValue) { if (config == null) { config = SpringContextUtil.getBean(DatabaseShardingConfig.class); } String klineType = ((List<String>) complexKeysShardingValue.getColumnNameAndShardingValuesMap().get("kline_type")).get(0); Long symbolId = ((List<Long>) complexKeysShardingValue.getColumnNameAndShardingValuesMap().get("symbol_id")).get(0); String logicTable = complexKeysShardingValue.getLogicTableName(); DatabaseShardingConfig.TableShardingConfig shardingConfig = config.getTableShardingConfig(logicTable); log.warn("symbolId:{}",symbolId); log.warn("klineType:{}",klineType); log.warn("shardingConfig:{}",shardingConfig); return Collections.singletonList(logicTable + "_" + klineType + "_" + symbolId % shardingConfig.getTableShardingNum()); } } 定时任务创建表格 package com.zzc.service.schedule; @Slf4j @Component @RequiredArgsConstructor public class QuotationDataManagementJob { /** 获取锁等待时间 */ private final static int LOCK_WAIT_SECONDS = 10; /** 获取锁后的锁的自动释放时间 */ private final static int LOCK_LEASE_SECONDS = 30 * 60; /** 创建分表语句(使用模版表创建实际表) */ private final static String SHARDING_TABLE_CREATE_SQL = "CREATE TABLE IF NOT EXISTS %s LIKE %s;"; /** 删除分表语句(数据清理,防止 mysql 磁盘占用过大) */ private final static String SHARDING_TABLE_CLEAR_SQL = "DROP TABLE IF EXISTS %s;"; private final static String DS_SHARDING = "shardingDataSource"; private final static String DS_OLAP = "olapDataSource"; private final DatabaseShardingConfig dbShardingConfig; private final RedissonClient redissonClient; private final DataSource shardingDataSource; private final DataSource olapDataSource; /** * 每周五下午12点30分生成下一周的行情表 */ @Scheduled(cron = "0 30 12 ? * FRI") public void createShardingTableJob() { RLock lock = redissonClient.getLock(LOCK_CREATE_SHARDING_TABLE); RedisLockUtils.lockExecute(lock, LOCK_WAIT_SECONDS, LOCK_LEASE_SECONDS, TimeUnit.SECONDS, () -> { dbShardingConfig.getTables().forEach((tableName, config) -> { if(config.getRunCreateJob()) createShardingTable(tableName, config); }); return null; }); log.info("createShardingTable job done"); } /** * 每天10点清理数据 */ @Scheduled(cron = "0 0 10 * * ?") public void clearShardingTableJob() { RLock lock = redissonClient.getLock(LOCK_CLEAR_SHARDING_TABLE); RedisLockUtils.lockExecute(lock, LOCK_WAIT_SECONDS, LOCK_LEASE_SECONDS, TimeUnit.SECONDS, () -> { dbShardingConfig.getTables().forEach((tableName, config) -> { clearShardingTable(tableName, config); }); return null; }); log.info("clearShardingTable job done"); } private void createShardingTable(String tableName, DatabaseShardingConfig.TableShardingConfig config) { if (DS_OLAP.equals(config.getDs())) { try { Connection connection = olapDataSource.getConnection(); List<String> nextWeekWorkDays = getNextWeekWorkDays(); nextWeekWorkDays.forEach(day -> { createShardingTable("olap", connection, tableName, day, config); }); } catch (Throwable t) { log.error("createShardingTable error. db : [olap] tableName : [{}]", tableName, t); } } else { ((ShardingDataSource) shardingDataSource).getDataSourceMap().forEach((dbName, myDataSource) -> { if (dbName.equals(dbShardingConfig.getCenterDs())) { // 中心库不生成相关表 return; } try { Connection connection = myDataSource.getConnection(); List<String> nextWeekWorkDays = getNextWeekWorkDays(); nextWeekWorkDays.forEach(day -> { createShardingTable(dbName, connection, tableName, day, config); }); } catch (Throwable t) { log.error("createShardingTable error. db : [{}] tableName : [{}]", dbName, tableName, t); } }); } } /** * 创建分表 * * @param dbName 数据库名称 * @param connection 数据库连接 * @param tableName 表名称 * @param day 工作日 - 预留给手动补数据使用 */ private void createShardingTable(String dbName, Connection connection, String tableName, String day, DatabaseShardingConfig.TableShardingConfig config) { DatabaseShardingConfig.TableShardingConfig tableShardingConfig = dbShardingConfig.getTableShardingConfig(tableName); if (config.getTableShardingNum() > 1) { for (int i = 0; i < tableShardingConfig.getTableShardingNum(); i++) { String realTableName = tableName + "_" + day.substring(2) + "_" + i; try { String sql = String.format(SHARDING_TABLE_CREATE_SQL, realTableName, tableShardingConfig.getTemplateTable()); connection.createStatement().execute(sql); log.info("createShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]", dbName, tableName, realTableName, sql); } catch (Throwable t) { log.error("createShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]", dbName, tableName, realTableName, t); } } } else { String realTableName = tableName + "_" + day.substring(2); try { String sql = String.format(SHARDING_TABLE_CREATE_SQL, realTableName, tableShardingConfig.getTemplateTable()); connection.createStatement().execute(sql); log.info("createShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]", dbName, tableName, realTableName, sql); } catch (Throwable t) { log.error("createShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]", dbName, tableName, realTableName, t); } } } /** * 获取下一周的全部工作日 * * @return 下一周的工作日 */ private List<String> getNextWeekWorkDays() { LocalDate today = LocalDate.now(); // 下周一 LocalDate nextMonday = today.with(TemporalAdjusters.next(DayOfWeek.MONDAY)); List<String> workDays = new ArrayList<>(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DateUtils.YYYYMMDD); for (int i = 0; i < 5; i++) { // 下周一到下周五 LocalDate date = nextMonday.plusDays(i); workDays.add(date.format(formatter)); } return workDays; } private void clearShardingTable(String tableName, DatabaseShardingConfig.TableShardingConfig config) { if (DS_OLAP.equals(config.getDs())) { try { Connection connection = olapDataSource.getConnection(); List<String> nextWeekWorkDays = getToBeClearDays(tableName); nextWeekWorkDays.forEach(day -> { clearShardingTable("olap", connection, tableName, day, config); }); } catch (Throwable t) { log.error("clearShardingTable error. db : [olap] tableName : [{}]", tableName, t); } } else { ((ShardingDataSource) shardingDataSource).getDataSourceMap().forEach((dbName, myDataSource) -> { if (dbName.equals(dbShardingConfig.getCenterDs())) { // 中心库不删除相关表 return; } try { Connection connection = myDataSource.getConnection(); List<String> nextWeekWorkDays = getToBeClearDays(tableName); nextWeekWorkDays.forEach(day -> { clearShardingTable(dbName, connection, tableName, day, config); }); } catch (Throwable t) { log.error("clearShardingTable error. db : [{}] tableName : [{}]", dbName, tableName, t); } }); } } /** * 清理分表 * * @param dbName 数据库名称 * @param connection 数据库连接 * @param tableName 表名称 * @param day 工作日 - 预留给手动补数据使用 */ private void clearShardingTable(String dbName, Connection connection, String tableName, String day, DatabaseShardingConfig.TableShardingConfig config) { if (config.getTableShardingNum() > 1) { for (int i = 0; i < config.getTableShardingNum(); i++) { String realTableName = tableName + "_" + day.substring(2) + "_" + i; try { String sql = String.format(SHARDING_TABLE_CLEAR_SQL, realTableName); connection.createStatement().execute(sql); log.info("clearShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]", dbName, tableName, realTableName, sql); } catch (Throwable t) { log.error("clearShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]", dbName, tableName, realTableName, t); } } } else { String realTableName = tableName + "_" + day.substring(2); try { String sql = String.format(SHARDING_TABLE_CLEAR_SQL, realTableName); connection.createStatement().execute(sql); log.info("clearShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]", dbName, tableName, realTableName, sql); } catch (Throwable t) { log.error("clearShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]", dbName, tableName, realTableName, t); } } } /** * 获取待清理的表对应的日期 * * @param tableName 逻辑表名称 * @return 待清理的日期 */ private List<String> getToBeClearDays(String tableName) { List<String> days = new ArrayList<>(); DatabaseShardingConfig.TableShardingConfig tableShardingConfig = dbShardingConfig.getTableShardingConfig(tableName); LocalDate today = LocalDate.now(); LocalDate startDay = today.minusDays(tableShardingConfig.getClearOffset()); LocalDate endDay = today.minusDays(tableShardingConfig.getKeepDays()); DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DateUtils.YYYYMMDD); for (LocalDate date = startDay; date.isBefore(endDay); date = date.plusDays(1)) { days.add(date.format(formatter)); } return days; } } 配置 package com.zzc.service.config; @Data @Slf4j @RefreshScope @Configuration @ConfigurationProperties(prefix = "refinitiv.api-service.db-sharding") @PropertySource(value = "classpath:guda-refinitiv-api-db-sharding.yaml", factory = YamlPropertySourceFactory.class) public class DatabaseShardingConfig { private String centerDs; private Map<String, TableShardingConfig> tables; private Map<String, String> marketConfigs; @Setter(AccessLevel.PRIVATE) private Map<String, String> dbMap; @PostConstruct public void init() { if (marketConfigs == null || marketConfigs.isEmpty()) { throw new RuntimeException("DatabaseShardingConfig error. configs is empty"); } Map<String, String> tmp = new HashMap<>(); marketConfigs.forEach((dbName, markets) -> { for (String market : markets.split(",")) { tmp.put(market.trim(), dbName); } }); dbMap = tmp; log.info("DatabaseShardingConfig init success. config: [{}]", this); } /** * 根据市场类型获取对应的数据库名称 * * @param market 市场类型(MarketCodeType 枚举的 name) * @return 数据库名称 */ public String getDbName(String market) { return dbMap.get(market); } /** * 根据表名获取对应的分库配置 * * @param tableName 表名 * @return 分库配置 */ public TableShardingConfig getTableShardingConfig(String tableName) { return tables.get(tableName); } @Data @NoArgsConstructor @AllArgsConstructor public static class TableShardingConfig { /** 模板表名 */ private String templateTable; /** 分多少张表 */ private int tableShardingNum; /** 数据保留天数 */ private int keepDays; /** 从哪一天开始清理 */ private int clearOffset; /** 数据库名称 */ private String ds; /** 是否按日期分表 */ private Boolean runCreateJob = true; } } refinitiv.api-service: db-sharding: centerDs: 'center' tables: trade_record: # 模版表 templateTable: 'trade_record_240101' # 分多少张表 tableShardingNum: 250 # 数据保留天数 keepDays: 7 # 从哪一天开始清理(一直清理到 keepDays 为止) clearOffset: 15 ds: 'shardingDataSource' olap_quotation_snapshot: # 模版表 templateTable: 'olap_quotation_snapshot_240101' # 分多少张表 tableShardingNum: 1 # 数据保留天数 keepDays: 30 # 从哪一天开始清理(一直清理到 keepDays 为止) clearOffset: 40 ds: 'olapDataSource' kline_m1: # 模版表 templateTable: 'kline_m1' # 分多少张表 tableShardingNum: 1 # 数据保留天数 keepDays: 30 # 从哪一天开始清理(一直清理到 keepDays 为止) clearOffset: 40 ds: 'shardingDataSource' kline: # 模版表 templateTable: 'kline' # 分多少张表 tableShardingNum: 16 # 数据保留天数 keepDays: 30 # 从哪一天开始清理(一直清理到 keepDays 为止) clearOffset: 40 ds: 'shardingDataSource' runCreateJob: false marketConfigs: # db0 存储 US, US_PINK, US_OPTION 相关数据 # ds2: 'US, US_PINK, US_OPTION' ds1: 'HK, HK_WRNT, HK_BONDA, HK_TRUST' ds0: 'US, US_PINK, US_OPTION, SH, SZ, SZ_INDEX, SZ_FUND, SZ_GEM, US_ETF'
SpringBootShardingJDBC分库分表(草稿)由讯客互联电脑硬件栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“SpringBootShardingJDBC分库分表(草稿)”