赞
踩
最近一个项目上使用了shardingsphere
来做分库。常规使用是先在数据库创建表,然后配置到shardingsphere
实现分表;然而本项目是根据业务操作后生成的表,需要在运行期进行动态分表。网上找的动态分表可能是版本问题还是有很多要改的地方,为方便后续使用特此记录一下。
shardingsphere
只配置数据源,不配置分表规则shardingsphere
,后续查询才能分表查询shardingsphere
文章主要针对第三步
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.1.1</version>
</dependency>
spring.shardingsphere.datasource.names=m1
spring.shardingsphere.datasource.m1.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.m1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.shardingsphere.datasource.m1.url=jdbc:mysql://xxx:3306/xxx?useSSL=false&characterEncoding=utf8&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&&rewriteBatchedStatements=true&allowMultiQueries=true
spring.shardingsphere.datasource.m1.username=xxx
spring.shardingsphere.datasource.m1.password=xxx
spring.shardingsphere.props.sql.show=true
spring.main.allow-bean-definition-overriding=true
我的分表是根据外键id进行取模0和1,表名为:xxx_xxx_0 和 xxx_xxx_1,不同规则要做修改
import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration; import org.apache.shardingsphere.api.config.sharding.strategy.InlineShardingStrategyConfiguration; import org.apache.shardingsphere.core.rule.ShardingRule; import org.apache.shardingsphere.core.rule.TableRule; import org.apache.shardingsphere.core.strategy.keygen.SnowflakeShardingKeyGenerator; import org.apache.shardingsphere.core.strategy.route.inline.InlineShardingStrategy; import org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource; import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData; import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaDataLoader; import org.apache.shardingsphere.underlying.common.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.underlying.common.rule.DataNode; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sql.DataSource; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Component public class ShardingTableRuleActualDataNodesRefresh { @Autowired private DataSource dataSource; public void refreshActualDataNodes(String dynamicTableName) { log.info("Job 动态刷新 actualDataNodes START"); ShardingDataSource shardingDataSource = (ShardingDataSource) dataSource; TableRule tableRule = null; try { ShardingSphereMetaData metaData = shardingDataSource.getRuntimeContext().getMetaData(); TableMetaData tableMetaData = TableMetaDataLoader.load(dataSource, dynamicTableName + "_0", shardingDataSource.getDatabaseType().getName()); metaData.getSchema().put(dynamicTableName, tableMetaData); ShardingRule shardingRule = shardingDataSource.getRuntimeContext().getRule(); tableRule = shardingRule.getTableRule(dynamicTableName); } catch (Exception e) { log.error(String.format("逻辑表:%s 动态分表配置错误!", dynamicTableName)); } String dataSourceName = tableRule.getActualDataNodes().get(0).getDataSourceName(); String logicTableName = tableRule.getLogicTable(); assert tableRule != null; List<DataNode> newDataNodes = getDataNodes(dataSourceName, logicTableName); if (newDataNodes.isEmpty()) { throw new UnsupportedOperationException(); } try { dynamicRefreshDatasource(dataSourceName, dynamicTableName, tableRule, newDataNodes); } catch (Exception e) { e.printStackTrace(); } TableRuleConfiguration configuration = new TableRuleConfiguration(dynamicTableName, dataSourceName + "." + dynamicTableName + "_${0..1}"); InlineShardingStrategyConfiguration a = new InlineShardingStrategyConfiguration("monitor_point_id", dynamicTableName + DBUtil.shardingStrategy()); configuration.setTableShardingStrategyConfig(a); Collection<TableRuleConfiguration> ruleConfigs = shardingDataSource.getRuntimeContext().getRule().getRuleConfiguration().getTableRuleConfigs(); ruleConfigs.add(configuration); log.info("Job 动态刷新 actualDataNodes END"); } /** * 获取数据节点 */ private List<DataNode> getDataNodes(String dataSourceName, String logicTableName) { Set<DataNode> newDataNodes = Sets.newHashSet(); newDataNodes.add(new DataNode(dataSourceName + "." + logicTableName + "_0")); newDataNodes.add(new DataNode(dataSourceName + "." + logicTableName + "_1")); // 扩展点 return Lists.newLinkedList(newDataNodes); } /** * 动态刷新数据源 */ private void dynamicRefreshDatasource(String dataSourceName, String dynamicTableName, TableRule tableRule, List<DataNode> newDataNodes) throws NoSuchFieldException, IllegalAccessException { Set<String> actualTables = Sets.newHashSet(); Map<DataNode, Integer> dataNodeIndexMap = Maps.newHashMap(); AtomicInteger index = new AtomicInteger(0); newDataNodes.forEach(dataNode -> { actualTables.add(dataNode.getTableName()); if (index.intValue() == 0) { dataNodeIndexMap.put(dataNode, 0); } else { dataNodeIndexMap.put(dataNode, index.intValue()); } index.incrementAndGet(); }); Field generateKeyColumn = TableRule.class.getDeclaredField("generateKeyColumn"); generateKeyColumn.setAccessible(true); generateKeyColumn.set(tableRule, "id"); Field shardingKeyGenerator = TableRule.class.getDeclaredField("shardingKeyGenerator"); shardingKeyGenerator.setAccessible(true); shardingKeyGenerator.set(tableRule, new SnowflakeShardingKeyGenerator()); // 动态刷新:actualDataNodesField Field actualDataNodesField = TableRule.class.getDeclaredField("actualDataNodes"); Field modifiersField = Field.class.getDeclaredField("modifiers"); modifiersField.setAccessible(true); modifiersField.setInt(actualDataNodesField, actualDataNodesField.getModifiers() & ~Modifier.FINAL); actualDataNodesField.setAccessible(true); actualDataNodesField.set(tableRule, newDataNodes); // 动态刷新:actualTablesField Field actualTablesField = TableRule.class.getDeclaredField("actualTables"); actualTablesField.setAccessible(true); actualTablesField.set(tableRule, actualTables); // 动态刷新:dataNodeIndexMapField Field dataNodeIndexMapField = TableRule.class.getDeclaredField("dataNodeIndexMap"); dataNodeIndexMapField.setAccessible(true); dataNodeIndexMapField.set(tableRule, dataNodeIndexMap); // 动态刷新:datasourceToTablesMapField Map<String, Collection<String>> datasourceToTablesMap = Maps.newHashMap(); datasourceToTablesMap.put(dataSourceName, actualTables); Field datasourceToTablesMapField = TableRule.class.getDeclaredField("datasourceToTablesMap"); datasourceToTablesMapField.setAccessible(true); datasourceToTablesMapField.set(tableRule, datasourceToTablesMap); Field tableShardingStrategy = TableRule.class.getDeclaredField("tableShardingStrategy"); tableShardingStrategy.setAccessible(true); tableShardingStrategy.set(tableRule, getsStrategy(dynamicTableName)); ShardingDataSource shardingDataSource = (ShardingDataSource) dataSource; ShardingRule shardingRule = shardingDataSource.getRuntimeContext().getRule(); Collection<TableRule> tableRules = shardingRule.getTableRules(); tableRules.add(tableRule); } public InlineShardingStrategy getsStrategy(String tableName) { return new InlineShardingStrategy(new InlineShardingStrategyConfiguration("外键ID", tableName + "_${外键ID % 2}")); } }
表是动态创建的,在项目重启后分表规则失效,需要重新装载到
shardingsphere
ShardingSphereMetaData metaData = shardingDataSource.getRuntimeContext().getMetaData();
SchemaMetaData schema = metaData.getSchema();
Collection<String> tableNames = schema.getAllTableNames();
Set<String> tables = new HashSet<>();
for (String tableName : tableNames) {
if (tableName.startsWith("xxx_")) {
tables.add(tableName.substring(0, tableName.lastIndexOf("_")));
}
}
tables.forEach(name -> {
shardingTableRuleActualDataNodesRefresh.refreshActualDataNodes(name);
});
shardingTableRuleActualDataNodesRefresh.refreshActualDataNodes(name)
根据表名分表。
当时项目较急,没做什么优化。欢迎大佬指正!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。