赞
踩
Kafaka消息队列框架、不用框架也可以解决消息队列问题,利用阻塞队列产生消息系统
阻塞队列比较简单、理解了阻塞队列,可以更加直观的理解Kafaka底层逻辑
Java自带的API
面试常问:写一个生产者消费者实现
public class Test { public static void main(String[] args) { BlockingQueue<String> queue = new LinkedBlockingDeque<>(10); Producer p = new Producer(queue); Consumer c = new Consumer(queue); new Thread(p,"producer").start(); new Thread(c,"consumer").start(); } } class Consumer implements Runnable { private BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { while(true){ Thread.sleep(20); System.out.println("消费者消费了:" + queue.take()); } }catch (InterruptedException e) { e.printStackTrace(); } } } class Producer implements Runnable{ private BlockingQueue<String> queue; public Producer(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run() { try { for (int i = 0; i < 5; i++) { String tmp = "a product " + i + " from:" + Thread.currentThread().getName(); System.out.println("生产者生产了:" + tmp); queue.put(tmp); Thread.sleep(20); } }catch (InterruptedException e) { e.printStackTrace(); } } }
Kafka 性能最好的消息队列服务器,能够处理TB级异步消息函数
服务器自动给某些用户发消息,系统消息
数据量大、频率高
Kafka简介
Kafka特点
高吞吐量、消息持久化、高可靠性、高扩展性。
将消息存在硬盘上、而不是简单的存在内存里,容量大:可持久化存储、可存储海量数据
对硬盘读取的效率取决于读取硬盘的方式,若对硬盘进行顺序读取,而不是随机存取,性能甚至由于内存读取 性能高
分布式的集群部署==》高可靠性、高扩展性
目前最为流行、性能最好的消息队列框架
Kafka术语
消息队列实现的方式:①点对点的方式 ②发布订阅模式
Kafka术语解释
Kafka相关链接:官网
1.config目录下zookeeper.properties修改
2.server.properties
1.启动Zookeeper
2.启动Kafka
启动完成后就会出现我们设置的文件夹
3.创建主题
查看所有主题判断是否创建成功
4.往主题上发送消息
发两条消息
5.消费者接收消息
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
@SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class KafkaTest { @Autowired KafkaProducer kafkaProducer; @Autowired KafkaConsumer kafkaConsumer; @Test public void testKafka(){ kafkaProducer.sendMessage("test","hello world"); kafkaProducer.sendMessage("test","I love java"); try { Thread.sleep(1000*10); } catch (InterruptedException e) { e.printStackTrace(); } } } @Component class KafkaProducer{ @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic,String content){ kafkaTemplate.send(topic,content); } } @Component class KafkaConsumer{ @KafkaListener(topics = { "test"}) public void handleMessage(ConsumerRecord record){ System.out.println(record.value()); } }
生产者主动发消息,消费者被动接收消息
频繁操作、保证性能==》Kafaka消息队列
可以定义三种不同的主题、一旦事件发生,将消息包装到队列中。消费者继续处理该业务的后续过程、生产者可以继续别的业务
生产者和消费者处理业务是并发的,可以同时处理事务【异步】
在Kafaka的基础上,以事件进行封装
public class Event { //张三给李四点赞---userId是张三,entityUserId是李四 private String topic; private int userId; private int entityType; private int entityId; private int entityUserId; //额外的数据放在Map中,具备一定的扩展性 private Map<String,Object> data = new HashMap<>(); public String getTopic() { return topic; } public Event setTopic(String topic) { //改造,set主题之后,返回当前对象,方便set其它数据 //因为参数很多,最好不用参数构造器 this.topic = topic; return this; } public int getUserId() { return userId; } public Event setUserId(int userId) { this.userId = userId; return this; } public int getEntityType() { return entityType; } public Event setEntityType(int entityType) { this.entityType = entityType; return this; } public int getEntityId() { return entityId; } public Event setEntityId(int entityId) { this.entityId = entityId; return this; } public int getEntityUserId() { return entityUserId; } public Event setEntityUserId(int entityUserId) { this.entityUserId = entityUserId; return this; } public Map<String, Object> getData() { return data; } public Event setData(String key,String object) { this.data.put(key,object); return this; } }
新建event包
@Component
public class EventProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
//处理事件,本质就是发送消息
public void fireEvent(Event event){
//将事件发送到指定的主题
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
}
}
@Component public class EventConsumer implements CommunityContant { //记日志 private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class); //消息最终是要往message表中插入数据的 @Autowired private MessageService messageService; @KafkaListener(topics = { TOPIC_LIKE,TOPIC_COMMENT,TOPIC_FOLLOW}) public void handleCommentMessage(ConsumerRecord record){ if(record==null||record.value()==null){ logger.error("消息的内容为空"); return; } //将JSON字符串 解析为对象 Event event = JSONObject.parseObject(record.value().toString(),Event.class); if(event==null){ logger.error("消息格式错误"); return; } //发送站内通知,主要是构造message对象 Message message = new Message(); //User表中id为1代表系统用户 message.setFromId(SYSTEM_ID); message.setToId(event.getEntityUserId()); message.setConversationId(event.getTopic());//存的是主题 message.setCreateTime(new Date()
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。