当前位置:   article > 正文

一个注解解决ShardingJdbc不支持复杂SQL_seata sharding冲突

seata sharding冲突

背景介绍

公司最近做分库分表业务,接入了 Sharding JDBC,接入完成后,回归测试时发现好几个 SQL 执行报错,关键这几个表都还不是分片表。报错如下:

这下糟了嘛。熟悉 Sharding JDBC 的同学应该知道,有很多 SQL 它是不支持的。官方截图如下:

如果要去修改这些复杂 SQL 的话,可能要花费很多时间。那怎么办呢?只能从 Sharding JDBC 这里找突破口了,两天的研究,出来了下面这个只需要加一个注解轻松解决 Sharding Jdbc 不支持复杂 SQL 的方案。

问题复现

我本地写了一个复杂 SQL 进行测试:

  1. public List<Map<String, Object>> queryOrder(){
  2. List<Map<String, Object>> orders = borderRepository.findOrders();
  3. return orders;
  4. }
  1. public interface BOrderRepository extends JpaRepository<BOrder,Long> {
  2. @Query(value = "SELECT * FROM (SELECT id,CASE WHEN company_id =1 THEN '小' WHEN company_id=4 THEN '中' ELSE '大' END AS com,user_id as userId FROM b_order0) t WHERE t.com ='中'",nativeQuery =true)
  3. List<Map<String, Object>> findOrders();
  4. }

写了个测试 controller 来调用,调用后果然报错了。

解决思路

因为查询的复杂 SQL 的表不是分片表,那能不能指定这几个复杂查询的时候不用 Sharding JDBC 的数据源呢?

  1. 在注入 Sharding JDBC 数据源的地方做处理,注入一个我们自定义的数据源
  2. 这样我们获取连接的时候就能返回原生数据源了
  3. 另外我们声明一个注解,对标识了注解的就返回原生数据源,否则还是返回 Sharding 数据源

具体实现

  1. 编写一个 autoConfig 类,来替换 ShardingSphereAutoConfiguration 类
  1. /**
  2. * 动态数据源核心自动配置类
  3. *
  4. *
  5. */
  6. @Configuration
  7. @ComponentScan("org.apache.shardingsphere.spring.boot.converter")
  8. @EnableConfigurationProperties(SpringBootPropertiesConfiguration.class)
  9. @ConditionalOnProperty(prefix = "spring.shardingsphere", name = "enabled", havingValue = "true", matchIfMissing = true)
  10. @AutoConfigureBefore(DataSourceAutoConfiguration.class)
  11. public class DynamicDataSourceAutoConfiguration implements EnvironmentAware {
  12. private String databaseName;
  13. private final SpringBootPropertiesConfiguration props;
  14. private final Map<String, DataSource> dataSourceMap = new LinkedHashMap<>();
  15. public DynamicDataSourceAutoConfiguration(SpringBootPropertiesConfiguration props) {
  16. this.props = props;
  17. }
  18. /**
  19. * Get mode configuration.
  20. *
  21. * @return mode configuration
  22. */
  23. @Bean
  24. public ModeConfiguration modeConfiguration() {
  25. return null == props.getMode() ? null : new ModeConfigurationYamlSwapper().swapToObject(props.getMode());
  26. }
  27. /**
  28. * Get ShardingSphere data source bean.
  29. *
  30. * @param rules rules configuration
  31. * @param modeConfig mode configuration
  32. * @return data source bean
  33. * @throws SQLException SQL exception
  34. */
  35. @Bean
  36. @Conditional(LocalRulesCondition.class)
  37. @Autowired(required = false)
  38. public DataSource shardingSphereDataSource(final ObjectProvider<List<RuleConfiguration>> rules, final ObjectProvider<ModeConfiguration> modeConfig) throws SQLException {
  39. Collection<RuleConfiguration> ruleConfigs = Optional.ofNullable(rules.getIfAvailable()).orElseGet(Collections::emptyList);
  40. DataSource dataSource = ShardingSphereDataSourceFactory.createDataSource(databaseName, modeConfig.getIfAvailable(), dataSourceMap, ruleConfigs, props.getProps());
  41. return new WrapShardingDataSource((ShardingSphereDataSource) dataSource,dataSourceMap);
  42. }
  43. /**
  44. * Get data source bean from registry center.
  45. *
  46. * @param modeConfig mode configuration
  47. * @return data source bean
  48. * @throws SQLException SQL exception
  49. */
  50. @Bean
  51. @ConditionalOnMissingBean(DataSource.class)
  52. public DataSource dataSource(final ModeConfiguration modeConfig) throws SQLException {
  53. DataSource dataSource = !dataSourceMap.isEmpty() ? ShardingSphereDataSourceFactory.createDataSource(databaseName, modeConfig, dataSourceMap, Collections.emptyList(), props.getProps())
  54. : ShardingSphereDataSourceFactory.createDataSource(databaseName, modeConfig);
  55. return new WrapShardingDataSource((ShardingSphereDataSource) dataSource,dataSourceMap);
  56. }
  57. /**
  58. * Create transaction type scanner.
  59. *
  60. * @return transaction type scanner
  61. */
  62. @Bean
  63. public TransactionTypeScanner transactionTypeScanner() {
  64. return new TransactionTypeScanner();
  65. }
  66. @Override
  67. public final void setEnvironment(final Environment environment) {
  68. dataSourceMap.putAll(DataSourceMapSetter.getDataSourceMap(environment));
  69. databaseName = DatabaseNameSetter.getDatabaseName(environment);
  70. }
  71. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  72. @Bean
  73. @ConditionalOnProperty(prefix = "spring.datasource.dynamic.aop", name = "enabled", havingValue = "true", matchIfMissing = true)
  74. public Advisor dynamicDatasourceAnnotationAdvisor() {
  75. DynamicDataSourceAnnotationInterceptor interceptor = new DynamicDataSourceAnnotationInterceptor(true);
  76. DynamicDataSourceAnnotationAdvisor advisor = new DynamicDataSourceAnnotationAdvisor(interceptor, DS.class);
  77. return advisor;
  78. }
  79. }
  1. 自定义数据源
  1. public class WrapShardingDataSource extends AbstractDataSourceAdapter implements AutoCloseable{
  2. private ShardingSphereDataSource dataSource;
  3. private Map<String, DataSource> dataSourceMap;
  4. public WrapShardingDataSource(ShardingSphereDataSource dataSource, Map<String, DataSource> dataSourceMap) {
  5. this.dataSource = dataSource;
  6. this.dataSourceMap = dataSourceMap;
  7. }
  8. public DataSource getTargetDataSource(){
  9. String peek = DynamicDataSourceContextHolder.peek();
  10. if(StringUtils.isEmpty(peek)){
  11. return dataSource;
  12. }
  13. return dataSourceMap.get(peek);
  14. }
  15. @Override
  16. public Connection getConnection() throws SQLException {
  17. return getTargetDataSource().getConnection();
  18. }
  19. @Override
  20. public Connection getConnection(final String username, final String password) throws SQLException {
  21. return getConnection();
  22. }
  23. @Override
  24. public void close() throws Exception {
  25. DataSource targetDataSource = getTargetDataSource();
  26. if (targetDataSource instanceof AutoCloseable) {
  27. ((AutoCloseable) targetDataSource).close();
  28. }
  29. }
  30. @Override
  31. public int getLoginTimeout() throws SQLException {
  32. DataSource targetDataSource = getTargetDataSource();
  33. return targetDataSource ==null ? 0 : targetDataSource.getLoginTimeout();
  34. }
  35. @Override
  36. public void setLoginTimeout(final int seconds) throws SQLException {
  37. DataSource targetDataSource = getTargetDataSource();
  38. targetDataSource.setLoginTimeout(seconds);
  39. }
  40. }
  1. 声明指定数据源注解
  1. @Target({ElementType.TYPE, ElementType.METHOD})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface DS {
  5. /**
  6. * 数据源名
  7. */
  8. String value();
  9. }
  1. 另外使用 AOP 的方式拦截使用了注解的类或方法,并且要将这些用了注解的方法存起来,在获取数据源连接的时候取出来进行判断。这就还要用到 ThreadLocal。

aop 拦截器:

  1. public class DynamicDataSourceAnnotationInterceptor implements MethodInterceptor {
  2. private final DataSourceClassResolver dataSourceClassResolver;
  3. public DynamicDataSourceAnnotationInterceptor(Boolean allowedPublicOnly) {
  4. dataSourceClassResolver = new DataSourceClassResolver(allowedPublicOnly);
  5. }
  6. @Override
  7. public Object invoke(MethodInvocation invocation) throws Throwable {
  8. String dsKey = determineDatasourceKey(invocation);
  9. DynamicDataSourceContextHolder.push(dsKey);
  10. try {
  11. return invocation.proceed();
  12. } finally {
  13. DynamicDataSourceContextHolder.poll();
  14. }
  15. }
  16. private String determineDatasourceKey(MethodInvocation invocation) {
  17. String key = dataSourceClassResolver.findKey(invocation.getMethod(), invocation.getThis());
  18. return key;
  19. }
  20. }

aop 切面定义:

  1. /**
  2. * aop Advisor
  3. */
  4. public class DynamicDataSourceAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
  5. private final Advice advice;
  6. private final Pointcut pointcut;
  7. private final Class<? extends Annotation> annotation;
  8. public DynamicDataSourceAnnotationAdvisor(MethodInterceptor advice,
  9. Class<? extends Annotation> annotation) {
  10. this.advice = advice;
  11. this.annotation = annotation;
  12. this.pointcut = buildPointcut();
  13. }
  14. @Override
  15. public Pointcut getPointcut() {
  16. return this.pointcut;
  17. }
  18. @Override
  19. public Advice getAdvice() {
  20. return this.advice;
  21. }
  22. @Override
  23. public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
  24. if (this.advice instanceof BeanFactoryAware) {
  25. ((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
  26. }
  27. }
  28. private Pointcut buildPointcut() {
  29. Pointcut cpc = new AnnotationMatchingPointcut(annotation, true);
  30. Pointcut mpc = new AnnotationMethodPoint(annotation);
  31. return new ComposablePointcut(cpc).union(mpc);
  32. }
  33. /**
  34. * In order to be compatible with the spring lower than 5.0
  35. */
  36. private static class AnnotationMethodPoint implements Pointcut {
  37. private final Class<? extends Annotation> annotationType;
  38. public AnnotationMethodPoint(Class<? extends Annotation> annotationType) {
  39. Assert.notNull(annotationType, "Annotation type must not be null");
  40. this.annotationType = annotationType;
  41. }
  42. @Override
  43. public ClassFilter getClassFilter() {
  44. return ClassFilter.TRUE;
  45. }
  46. @Override
  47. public MethodMatcher getMethodMatcher() {
  48. return new AnnotationMethodMatcher(annotationType);
  49. }
  50. private static class AnnotationMethodMatcher extends StaticMethodMatcher {
  51. private final Class<? extends Annotation> annotationType;
  52. public AnnotationMethodMatcher(Class<? extends Annotation> annotationType) {
  53. this.annotationType = annotationType;
  54. }
  55. @Override
  56. public boolean matches(Method method, Class<?> targetClass) {
  57. if (matchesMethod(method)) {
  58. return true;
  59. }
  60. // Proxy classes never have annotations on their redeclared methods.
  61. if (Proxy.isProxyClass(targetClass)) {
  62. return false;
  63. }
  64. // The method may be on an interface, so let's check on the target class as well.
  65. Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
  66. return (specificMethod != method && matchesMethod(specificMethod));
  67. }
  68. private boolean matchesMethod(Method method) {
  69. return AnnotatedElementUtils.hasAnnotation(method, this.annotationType);
  70. }
  71. }
  72. }
  73. }
  1. /**
  2. * 数据源解析器
  3. *
  4. */
  5. public class DataSourceClassResolver {
  6. private static boolean mpEnabled = false;
  7. private static Field mapperInterfaceField;
  8. static {
  9. Class<?> proxyClass = null;
  10. try {
  11. proxyClass = Class.forName("com.baomidou.mybatisplus.core.override.MybatisMapperProxy");
  12. } catch (ClassNotFoundException e1) {
  13. try {
  14. proxyClass = Class.forName("com.baomidou.mybatisplus.core.override.PageMapperProxy");
  15. } catch (ClassNotFoundException e2) {
  16. try {
  17. proxyClass = Class.forName("org.apache.ibatis.binding.MapperProxy");
  18. } catch (ClassNotFoundException ignored) {
  19. }
  20. }
  21. }
  22. if (proxyClass != null) {
  23. try {
  24. mapperInterfaceField = proxyClass.getDeclaredField("mapperInterface");
  25. mapperInterfaceField.setAccessible(true);
  26. mpEnabled = true;
  27. } catch (NoSuchFieldException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. /**
  33. * 缓存方法对应的数据源
  34. */
  35. private final Map<Object, String> dsCache = new ConcurrentHashMap<>();
  36. private final boolean allowedPublicOnly;
  37. /**
  38. * 加入扩展, 给外部一个修改aop条件的机会
  39. *
  40. * @param allowedPublicOnly 只允许公共的方法, 默认为true
  41. */
  42. public DataSourceClassResolver(boolean allowedPublicOnly) {
  43. this.allowedPublicOnly = allowedPublicOnly;
  44. }
  45. /**
  46. * 从缓存获取数据
  47. *
  48. * @param method 方法
  49. * @param targetObject 目标对象
  50. * @return ds
  51. */
  52. public String findKey(Method method, Object targetObject) {
  53. if (method.getDeclaringClass() == Object.class) {
  54. return "";
  55. }
  56. Object cacheKey = new MethodClassKey(method, targetObject.getClass());
  57. String ds = this.dsCache.get(cacheKey);
  58. if (ds == null) {
  59. ds = computeDatasource(method, targetObject);
  60. if (ds == null) {
  61. ds = "";
  62. }
  63. this.dsCache.put(cacheKey, ds);
  64. }
  65. return ds;
  66. }
  67. /**
  68. * 查找注解的顺序
  69. * 1\. 当前方法
  70. * 2\. 桥接方法
  71. * 3\. 当前类开始一直找到Object
  72. * 4\. 支持mybatis-plus, mybatis-spring
  73. *
  74. * @param method 方法
  75. * @param targetObject 目标对象
  76. * @return ds
  77. */
  78. private String computeDatasource(Method method, Object targetObject) {
  79. if (allowedPublicOnly && !Modifier.isPublic(method.getModifiers())) {
  80. return null;
  81. }
  82. //1\. 从当前方法接口中获取
  83. String dsAttr = findDataSourceAttribute(method);
  84. if (dsAttr != null) {
  85. return dsAttr;
  86. }
  87. Class<?> targetClass = targetObject.getClass();
  88. Class<?> userClass = ClassUtils.getUserClass(targetClass);
  89. // JDK代理时, 获取实现类的方法声明. method: 接口的方法, specificMethod: 实现类方法
  90. Method specificMethod = ClassUtils.getMostSpecificMethod(method, userClass);
  91. specificMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
  92. //2\. 从桥接方法查找
  93. dsAttr = findDataSourceAttribute(specificMethod);
  94. if (dsAttr != null) {
  95. return dsAttr;
  96. }
  97. // 从当前方法声明的类查找
  98. dsAttr = findDataSourceAttribute(userClass);
  99. if (dsAttr != null && ClassUtils.isUserLevelMethod(method)) {
  100. return dsAttr;
  101. }
  102. //since 3.4.1 从接口查找,只取第一个找到的
  103. for (Class<?> interfaceClazz : ClassUtils.getAllInterfacesForClassAsSet(userClass)) {
  104. dsAttr = findDataSourceAttribute(interfaceClazz);
  105. if (dsAttr != null) {
  106. return dsAttr;
  107. }
  108. }
  109. // 如果存在桥接方法
  110. if (specificMethod != method) {
  111. // 从桥接方法查找
  112. dsAttr = findDataSourceAttribute(method);
  113. if (dsAttr != null) {
  114. return dsAttr;
  115. }
  116. // 从桥接方法声明的类查找
  117. dsAttr = findDataSourceAttribute(method.getDeclaringClass());
  118. if (dsAttr != null && ClassUtils.isUserLevelMethod(method)) {
  119. return dsAttr;
  120. }
  121. }
  122. return getDefaultDataSourceAttr(targetObject);
  123. }
  124. /**
  125. * 默认的获取数据源名称方式
  126. *
  127. * @param targetObject 目标对象
  128. * @return ds
  129. */
  130. private String getDefaultDataSourceAttr(Object targetObject) {
  131. Class<?> targetClass = targetObject.getClass();
  132. // 如果不是代理类, 从当前类开始, 不断的找父类的声明
  133. if (!Proxy.isProxyClass(targetClass)) {
  134. Class<?> currentClass = targetClass;
  135. while (currentClass != Object.class) {
  136. String datasourceAttr = findDataSourceAttribute(currentClass);
  137. if (datasourceAttr != null) {
  138. return datasourceAttr;
  139. }
  140. currentClass = currentClass.getSuperclass();
  141. }
  142. }
  143. // mybatis-plus, mybatis-spring 的获取方式
  144. if (mpEnabled) {
  145. final Class<?> clazz = getMapperInterfaceClass(targetObject);
  146. if (clazz != null) {
  147. String datasourceAttr = findDataSourceAttribute(clazz);
  148. if (datasourceAttr != null) {
  149. return datasourceAttr;
  150. }
  151. // 尝试从其父接口获取
  152. return findDataSourceAttribute(clazz.getSuperclass());
  153. }
  154. }
  155. return null;
  156. }
  157. /**
  158. * 用于处理嵌套代理
  159. *
  160. * @param target JDK 代理类对象
  161. * @return InvocationHandler 的 Class
  162. */
  163. private Class<?> getMapperInterfaceClass(Object target) {
  164. Object current = target;
  165. while (Proxy.isProxyClass(current.getClass())) {
  166. Object currentRefObject = AopProxyUtils.getSingletonTarget(current);
  167. if (currentRefObject == null) {
  168. break;
  169. }
  170. current = currentRefObject;
  171. }
  172. try {
  173. if (Proxy.isProxyClass(current.getClass())) {
  174. return (Class<?>) mapperInterfaceField.get(Proxy.getInvocationHandler(current));
  175. }
  176. } catch (IllegalAccessException ignore) {
  177. }
  178. return null;
  179. }
  180. /**
  181. * 通过 AnnotatedElement 查找标记的注解, 映射为 DatasourceHolder
  182. *
  183. * @param ae AnnotatedElement
  184. * @return 数据源映射持有者
  185. */
  186. private String findDataSourceAttribute(AnnotatedElement ae) {
  187. AnnotationAttributes attributes = AnnotatedElementUtils.getMergedAnnotationAttributes(ae, DS.class);
  188. if (attributes != null) {
  189. return attributes.getString("value");
  190. }
  191. return null;
  192. }
  193. }

ThreadLocal:

  1. public final class DynamicDataSourceContextHolder {
  2. /**
  3. * 为什么要用链表存储(准确的是栈)
  4. * <pre>
  5. * 为了支持嵌套切换,如ABC三个service都是不同的数据源
  6. * 其中A的某个业务要调B的方法,B的方法需要调用C的方法。一级一级调用切换,形成了链。
  7. * 传统的只设置当前线程的方式不能满足此业务需求,必须使用栈,后进先出。
  8. * </pre>
  9. */
  10. private static final ThreadLocal<Deque<String>> LOOKUP_KEY_HOLDER = new NamedThreadLocal<Deque<String>>("dynamic-datasource") {
  11. @Override
  12. protected Deque<String> initialValue() {
  13. return new ArrayDeque<>();
  14. }
  15. };
  16. private DynamicDataSourceContextHolder() {
  17. }
  18. /**
  19. * 获得当前线程数据源
  20. *
  21. * @return 数据源名称
  22. */
  23. public static String peek() {
  24. return LOOKUP_KEY_HOLDER.get().peek();
  25. }
  26. /**
  27. * 设置当前线程数据源
  28. * <p>
  29. * 如非必要不要手动调用,调用后确保最终清除
  30. * </p>
  31. *
  32. * @param ds 数据源名称
  33. */
  34. public static String push(String ds) {
  35. String dataSourceStr = StringUtils.isEmpty(ds) ? "" : ds;
  36. LOOKUP_KEY_HOLDER.get().push(dataSourceStr);
  37. return dataSourceStr;
  38. }
  39. /**
  40. * 清空当前线程数据源
  41. * <p>
  42. * 如果当前线程是连续切换数据源 只会移除掉当前线程的数据源名称
  43. * </p>
  44. */
  45. public static void poll() {
  46. Deque<String> deque = LOOKUP_KEY_HOLDER.get();
  47. deque.poll();
  48. if (deque.isEmpty()) {
  49. LOOKUP_KEY_HOLDER.remove();
  50. }
  51. }
  52. /**
  53. * 强制清空本地线程
  54. * <p>
  55. * 防止内存泄漏,如手动调用了push可调用此方法确保清除
  56. * </p>
  57. */
  58. public static void clear() {
  59. LOOKUP_KEY_HOLDER.remove();
  60. }
  61. }
  1. 启动类上做如下配置:

引入我们写的自动配置类,排除 ShardingJdbc 的自动配置类。

  1. @SpringBootApplication(exclude = ShardingSphereAutoConfiguration.class)
  2. @Import({DynamicDataSourceAutoConfiguration.class})
  3. public class ShardingRunApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(ShardingRunApplication.class);
  6. }
  7. }

最后,我们给之前写的 Repository 加上注解:

  1. public interface BOrderRepository extends JpaRepository<BOrder,Long> {
  2. @DS("slave0")
  3. @Query(value = "SELECT * FROM (SELECT id,CASE WHEN company_id =1 THEN '小' WHEN company_id=4 THEN '中' ELSE '大' END AS com,user_id as userId FROM b_order0) t WHERE t.com ='中'",nativeQuery =true)
  4. List<Map<String, Object>> findOrders();
  5. }

再次调用,查询成功!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/757130
推荐阅读
相关标签
  

闽ICP备14008679号