赞
踩
业务数据从大数据进行同步,数据量大概1个月1000W条,如果选择按字段进行hash取模分表时间久了数据量依然会很大,所以直接选择按月进行水平分表。
废话不多说,直接上代码
- <dependency>
- <groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
- <version>5.0.0</version>
- </dependency>
spring: shardingsphere: props: # 是否显示sql sql-show: false datasource: ds0: url: jdbc:mysql://127.0.0.1:3306/db_test?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai username: root password: root driver-class-name: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource names: ds0 rules: sharding: key-generators: snowflake: type: SNOWFLAKE sharding-algorithms: ota-strategy-inline: props: strategy: standard # 自定义标准分配算法 algorithmClassName: com.test.business.algorithm.OTAStrategyShardingAlgorithm type: CLASS_BASED tables: #逻辑表 下面是节点表,分表后还有数据在原来的表,所有查询节点需要加上原来的表 ota_strategy_info: actual-data-nodes: ds0.ota_strategy_info_202$->{201..212} key-generate-strategy: column: id key-generator-name: snowflake # 配置分表策略 table-strategy: #分片策略 以创建时候分表,实现类计算 standard: sharding-column: create_time #对应下面的分表策略类 sharding-algorithm-name: ota-strategy-inline # OTA升级策略表水平分表 ================================================================
- import com.alibaba.fastjson.JSON;
- import com.google.common.collect.Range;
- import org.apache.commons.lang.StringUtils;
- import org.apache.shardingsphere.sharding.api.sharding.standard.PreciseShardingValue;
- import org.apache.shardingsphere.sharding.api.sharding.standard.RangeShardingValue;
- import org.apache.shardingsphere.sharding.api.sharding.standard.StandardShardingAlgorithm;
- import org.springframework.stereotype.Component;
-
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.*;
-
-
- /**
- * @Description: sharding分表规则:按单月分表
- * @Author: lg
- * @Date: 2022/6/9
- * @Version: V1.0
- */
- @Component
- public class OTAStrategyShardingAlgorithm implements StandardShardingAlgorithm<String> {
-
- private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private static final DateTimeFormatter yyyyMM = DateTimeFormatter.ofPattern("yyyyMM");
-
- /**
- * 【范围】数据查询
- */
- @Override
- public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<String> rangeShardingValue) {
- // 逻辑表名
- String logicTableName = rangeShardingValue.getLogicTableName();
-
- // 范围参数
- Range<String> valueRange = rangeShardingValue.getValueRange();
- Set<String> queryRangeTables = extracted(logicTableName, LocalDateTime.parse(valueRange.lowerEndpoint(), formatter),
- LocalDateTime.parse(valueRange.upperEndpoint(), formatter));
- ArrayList<String> tables = new ArrayList<>(collection);
- tables.retainAll(queryRangeTables);
- System.out.println(JSON.toJSONString(tables));
- return tables;
- }
-
- /**
- * 根据范围计算表明
- *
- * @param logicTableName 逻辑表明
- * @param lowerEndpoint 范围起点
- * @param upperEndpoint 范围终端
- * @return 物理表名集合
- */
- private Set<String> extracted(String logicTableName, LocalDateTime lowerEndpoint, LocalDateTime upperEndpoint) {
- Set<String> rangeTable = new HashSet<>();
- while (lowerEndpoint.isBefore(upperEndpoint)) {
- String str = getTableNameByDate(lowerEndpoint, logicTableName);
- rangeTable.add(str);
- lowerEndpoint = lowerEndpoint.plusMonths(1);
- }
- // 获取物理表明
- String tableName = getTableNameByDate(upperEndpoint, logicTableName);
- rangeTable.add(tableName);
- return rangeTable;
- }
-
- /**
- * 根据日期获取表明
- * @param dateTime 日期
- * @param logicTableName 逻辑表名
- * @return 物理表名
- */
- private String getTableNameByDate(LocalDateTime dateTime, String logicTableName) {
- String tableSuffix = dateTime.format(yyyyMM);
- return logicTableName.concat("_").concat(tableSuffix);
- }
-
- /**
- * 数据插入
- *
- * @param collection
- * @param preciseShardingValue
- * @return
- */
- @Override
- public String doSharding(Collection<String> collection, PreciseShardingValue<String> preciseShardingValue) {
- String str = preciseShardingValue.getValue();
- if (StringUtils.isEmpty(str)) {
- return collection.stream().findFirst().get();
- }
- LocalDateTime value = LocalDateTime.parse(str, formatter);
- String tableSuffix = value.format(yyyyMM);
- String logicTableName = preciseShardingValue.getLogicTableName();
- String table = logicTableName.concat("_").concat(tableSuffix);
- System.out.println("OrderStrategy.doSharding table name: " + table);
- return collection.stream().filter(s -> s.equals(table)).findFirst().orElseThrow(() -> new RuntimeException("逻辑分表不存在"));
- }
-
- @Override
- public void init() {
-
- }
-
- @Override
- public String getType() {
- // 自定义 这里需要spi支持
- return null;
- }
-
- }

shardingsphere5.0版本开始,数据插入和数据查询都可以在一个类中实现,需要实现接口:StandardShardingAlgorithm
- // 根据精准值查询逻辑表
- public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<String> rangeShardingValue) {
-
- }
- // 根据范围查询逻辑表
- private Set<String> extracted(String logicTableName, LocalDateTime lowerEndpoint, LocalDateTime upperEndpoint) {
- }
-
- // 插入数据时根据分表关键词获取物理表
- public String doSharding(Collection<String> collection, PreciseShardingValue<String> preciseShardingValue) {
这里直接使用mybatis或者mybatisplus插入即可,在步骤4中的doSharding方法查看是否匹配到对应的物理表即可,结果直接从表里查看即可。
执行下述sql
select * from ota_strategy_info where create_time = '2022-04-14 17:21:48'
结果:
这里的查询精确找到了4月的物理表,如果未找到则会查询所有表,同理会执行与表数量相对于的sql数量,所以查询的时候一定要命中物理表,否则效率不仅不会提高返回会降低!
官网文档:概览 :: ShardingSpherehttps://shardingsphere.apache.org/document/current/cn/overview/更多内容可以参考文档,查询sql是否会命中物理表得多测试,根据日志提示选择查询方法!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。