赞
踩
需求场景:分布式项目中,每个子项目有各自的 user 数据库, 在综合管理系统中存放这所有用户信息, 为了保持综合管理系统用户的完整性,
子系统添加用户后将用户信息以json格式保存至redis,然后发布到消息到消息通道,综合管理系统监控到子系统发布的消息前往redis
获取出用户信息保存到自己的数据库
application.yml
spring: redis: #数据库索引 database: 5 host: 127.0.0.1 port: 6379 password: 123456 jedis: pool: #最大连接数 max-active: 8 #最大阻塞等待时间(负数表示没限制) #最大空闲 max-idle: 8 #最小空闲 min-idle: 0
集成redis , 初始化redis组件
import com.bigcustomer.utils.redisUtil.RedisService; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; @Configuration @EnableCaching//开启注解 public class RedisConfig extends CachingConfigurerSupport { private static Logger logger = LoggerFactory.getLogger(RedisConfig.class); // 自定义的配置类, 存放了通道地址 @Autowired private BaseConfig baseConfig; /** *@参数 *@返回值 *@创建人 cx *@创建时间 *@描述 //初始化监听器 */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //配置监听通道 container.addMessageListener(listenerAdapter, new PatternTopic(baseConfig.getRedisAisle()));// 通道的名字 logger.info("初始化监听成功,监听通道:【"+baseConfig.getRedisAisle()+"】"); return container; } /** *@参数 *@返回值 *@创建人 cx *@创建时间 *@描述 利用反射来创建监听到消息之后的执行方法 */ @Bean MessageListenerAdapter listenerAdapter(RedisService receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } // /** // *@参数 // *@返回值 // *@创建人 cx // *@创建时间 // *@描述 控制线程用的 // */ // @Bean // Receiver receiver(CountDownLatch latch) { // return new Receiver(latch); // } // // @Bean // CountDownLatch latch() { // return new CountDownLatch(1); // } /** *@参数 *@返回值 *@创建人 cx *@创建时间 *@描述 //使用默认的工厂初始化redis操作String模板 */ @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } /** *@参数 *@返回值 *@创建人 cx *@创建时间 *@描述 //使用默认的工厂初始化redis操作map模板 */ @Bean RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) { Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(connectionFactory); template.setKeySerializer(jackson2JsonRedisSerializer); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashKeySerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
操作string 和 map 的dao封装
import com.alibaba.fastjson.JSON; import com.bigcustomer.biguser.service.BigUserService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; @Component public class MyRedisDao { private static Logger logger = LoggerFactory.getLogger(BigUserService.class); @Resource private StringRedisTemplate template; @Resource private RedisTemplate redisTemplate; //大客户信息同步到redis时保存的map的key private final String BIG_USER_REDIS_KEY = "CM:CHANNELCUSTOMER"; /** * @参数 * @返回值 * @创建人 cx * @创建时间 * @描述 大客户添加成功后存到redis */ public boolean setMap(Map<String , Object> map) { try { redisTemplate.opsForHash().putAll(BIG_USER_REDIS_KEY , map); logger.info("同步大客户信息到redis 成功!userId【" + map.get("funiqueid")+ "】"); return true; } catch (Exception e) { e.printStackTrace(); } logger.info("同步大客户信息到redis 失败!userId【" + map.get("funiqueid")+ "】"); return false; } public Object getMap(String key) { try { Object o = redisTemplate.opsForHash().get(BIG_USER_REDIS_KEY, key); if (null != o) { return o; } } catch (Exception e) { e.printStackTrace(); } logger.info("获取大客户信息到失败!"); return null; } /** * @参数 * @返回值 存在 = true , 不纯在false * @创建人 cx * @创建时间 * @描述 判断是否存在 该key对应的值 */ public boolean isNull(String key) { return template.hasKey(key); } /** * @参数 * @返回值 * @创建人 cx * @创建时间 * @描述 设置值 和 过期时间 单位秒 */ public boolean setValue(String key, Object val, long expire) { if (!this.isNull(key)) { //不存在 String jsonString = JSON.toJSONString(val); template.opsForValue().set(key, jsonString, expire); logger.info("***************************成功在缓存中插入:" + key); return true; } else { logger.info("***************************【" + key + "】已经存在缓存"); return false; } } /** * @参数 * @返回值 * @创建人 cx * @创建时间 * @描述 删除 */ public boolean del(String key) { return template.delete(key); } /** * @参数 * @返回值 * @创建人 cx * @创建时间 * @描述 插入直接覆盖 */ public boolean setValue(String key, Object val) { //不存在 String jsonString = JSON.toJSONString(val); // 去掉多余的 “ String replace = jsonString.replace("\"", ""); template.opsForValue().set(key, replace); logger.info("***************************成功在缓存中插入:" + key); return true; } /** * @参数 * @返回值 * @创建人 cx * @创建时间 * @描述 获取对应key 的值 */ public String getValue(String key) { if (!this.isNull(key)) { //不存在 logger.info("***************************【" + key + "】不存在缓存"); return null; } else { return template.opsForValue().get(key);//根据key获取缓存中的val } } }
消息发布和监听的服务类
package com.bigcustomer.utils.redisUtil; import com.bigcustomer.configs.BaseConfig; import huashitech.kissucomponent.service.BaseService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; @Service public class RedisService extends BaseService { @Autowired private StringRedisTemplate template; @Autowired private BaseConfig baseConfig; @Autowired RedisService redisService; /** *@参数 *@返回值 *@创建人 cx *@创建时间 *@描述 向默认通道发送消息 */ public void setMessage( Long funiqueid) { template.convertAndSend(baseConfig.getRedisAisle(), baseConfig.getRedisMessageName() +funiqueid); } /** *@参数 *@返回值 *@创建人 cx *@创建时间 *@描述 接受监听到的消息 */ public void receiveMessage(String message) { logger.info("接收redis通道消息:"+message); } }
使用
dao.getTransactionManager().doTransaction((TransactionStatus s) -> {
//插入数据库
int insert = dao.insert(tbCmChannelcustomerModel);
// 加入缓存
HashMap<String, Object> map = new HashMap<>();
map.put(tbCmChannelcustomerModel.getFuniqueid().toString()
, JSON.toJSONString(tbCmChannelcustomerModel));
redisDao.setMap(map);
// 发布redis通知消息
redisService.setMessage(tbCmChannelcustomerModel.getFuniqueid());
});
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。