赞
踩
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
创建 RedisMessageListenerConfig之间要先自定义定义一个接口RedisPubSub接口,这个接口用于处理收到的信息,如果实现发布订阅只需实现这个接口即可。
public interface RedisPubSub {
/**
* 接收消息
* @param message
*/
void receiveMessage(String message);
/**
* 发布订阅监听的topic key
* @return
*/
CacheKeyEnum getCacheKeyEnum();
}
public enum CacheKeyEnum { /** * 消息订阅 */ PUBSUB_QUEUE("pubsub:queue" , 0L), ; // 缓存键名 private String key; /** * 过期时间,单位秒 * 0 表示不过期 */ private Long expireTime; //省略getter、setter }
RedisMessageListenerConfig.java
@Component public class RedisMessageListenerConfig { // 如果项目中没有RedisPubSub实现类,启动会报错,所以设置required = false @Autowired(required = false) private Set<RedisPubSub> redisPubSubs; /** * 创建连接工厂 * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , Map<? extends MessageListener, Collection<? extends Topic>> listenerAdapters){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setMessageListeners(listenerAdapters); return container; } @Bean public Map<MessageListener, Collection<Topic>> listenerAdapters(){ if (!CollectionUtils.isEmpty(redisPubSubs)) { Map<MessageListener, Collection<Topic>> map = new HashMap<>(redisPubSubs.size()); for (RedisPubSub redisPubSub : redisPubSubs) { final CacheKeyEnum cacheKeyEnum = redisPubSub.getCacheKeyEnum(); // redis会利用反射调用receiveMessage方法 final MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisPubSub, "receiveMessage"); messageListenerAdapter.afterPropertiesSet(); map.put(messageListenerAdapter , Collections.singletonList(new PatternTopic(cacheKeyEnum.getKey()))); } return map; } return Collections.emptyMap(); } }
准备redis工具类:
@Slf4j @Component public class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 消息发布 * @param cacheKeyEnum * @param message */ public void publish(CacheKeyEnum cacheKeyEnum , Object message){ redisTemplate.convertAndSend(cacheKeyEnum.getKey() , message); } /** * 反序列化redis数据 * @param value * @param <T> * @return */ public <T> T deserialize(String value){ final RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer(); final Object deserialize = valueSerializer.deserialize(value.getBytes()); return deserialize == null ? null : (T) deserialize; } }
@Component public class PubsubQueue implements RedisPubSub { @Autowired private RedisUtil redisUtil; /** * 接收消息 * * @param message */ @Override public void receiveMessage(String message) { System.out.println(message); final UserEntity deserialize = redisUtil.deserialize(message); System.out.println("getId="+deserialize.getId()); } /** * 发布订阅监听的topic key * * @return */ @Override public CacheKeyEnum getCacheKeyEnum() { return CacheKeyEnum.PUBSUB_QUEUE; } }
@Test
public void testRedisQueue(){
UserEntity userEntity = new UserEntity();
userEntity.setId(123456L);
redisUtil.publish(CacheKeyEnum.PUBSUB_QUEUE , userEntity);
}
控制台输出:
使用redis发布订阅的注意点:
RedisPubSub 中receiveMessage接收的参数是String类型,redis在发布订阅中接收到的对象是字节数组,控制台打印是一个json格式的,如果redis用的是默认的JdkSerializationRedisSerializer序列化类,直接想通过将String转成JSON是不行的,会报错的,而且如果用的是jdk的序列化类,要发布的对象必须实现Serializable接口,否则也会报错。
所以可以通过上面的redisUtil中的反序列化方法来进行对象的转化,这样不管是不是用的是什么序列化类都不会报错。
Redis发布订阅的缺点:
消息不持久化,一旦订阅者没收到消息或者重启,消息将丢失。
相对于专业的消息中间件来说,Redis的发布订阅相对简单,慎用即可。
能力一般,水平有限,如有错误,请多指出。
如果对你有用点个关注给个赞呗
更多文章可以关注一下我的微信公众号suncodernote
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。