当前位置:   article > 正文

使用Canal通过RocketMQ同步Mysql和Redis_canal配置rocketmq

canal配置rocketmq

使用Canal通过RocketMQ同步Mysql和Redis



一、简介

1.1、首先将Master的binary-log的日志文件打开,MySQL会将所有的DDL,DML,TCL,写入到BinaryLog的日志文件中

​ 1.2、canal的工作原理:把自己伪装成MySQL的从机(slave),模拟MySQL的slave的交互协议,向MySQL master发送请求,MySQL master收到canal发送过来的请求,开始推送binary log 给canal,然后canal解析二进制日志(binary log)再发送到存储目的地

二、canal能做什么?

canal数据同步不是全量的,而是增量的,基于binary log 增量订阅和消费,具体可以做以下几个事情

  1. ​ 数据库镜像
  2. ​ 数据库实时备份
  3. ​ 索引构建和实时维护
  4. ​ 业务缓存
  5. ​ 带业务逻辑的增量数据处理

大概的步骤就是,使用canal配置MySQL数据库和RocketMQ,将binaryLog中的增量发送到MQ当中,再由MQ推送到Redis中,从而实现Mysql和Redis之间的数据同步。

三、具体步骤

开启Mysql binarylog的日志

1. 在window中找到my.ini配置文件

  ```ini
  [mysqld]
  #开启bInlog
  log-bin=mysql-bin
  #给mysql服务指定一个唯一的ID
  server-id=7
  #以数据的方式写binlog日志 :statement 是记录SQL,row是记录数据
  binlog-format=ROW
  ```
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2. 在linux中找到/etc/my.cof文件

  ```ini
  [mysqld]
  #开启bInlog
  log-bin=mysql-bin
  #给mysql服务指定一个唯一的ID
  server-id=7
  #以数据的方式写binlog日志 :statement 是记录SQL,row是记录数据
  binlog-format=ROW
  ```
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3. 创建一个数据库(testcanal),并创建一个表user(uid和username)

 create database testcanal;
 create table user(uid int primary key auto_increment,username varchar(50));
  • 1
  • 2

4. 创建一个用户,并给其授权,在工作中不可能将root权限交其使用

#创建用户cannal,密码为canal
CREATE USER canal IDENTIFIED BY 'canal';
#把所有权限赋予canal
grant all privileges on *.* to 'canal'@'%';
  • 1
  • 2
  • 3
  • 4

5. 安装Canal 下载地址

在这里插入图片描述

6. 然后需要修改的是instance 配置文件 : conf/example/instance.properties

#你的数据库的配置文件(注意如果连接失败,关闭防火墙)
canal.instance.master.address=10.70.20.25:3306
#用户名
canal.instance.dbUsername = canal
#密码
canal.instance.dbPassword = canal
#这个是你发送消息的topic
canal.mq.topic=example
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7. 接着修改canal 配置文件 conf/canal.properties

#本次使用的RocketMQ
canal.serverMode = rocketMQ
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
# RocketMQ的地址(注意如果连接失败,关闭防火墙)
rocketmq.namesrv.addr = 10.70.20.25:9876
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

8. 将RockectMQ和Canal进行启动。

查看日志
[root@localhost canal]# tail -f logs/example/example.log
  • 1
  • 2

9. 这样就代表可以啦

10 在MySQL数据库中增加一条数据

在这里插入图片描述

11 在MQ的客户端界面查看消息

在这里插入图片描述

此时,就到了第二步,将MQ中生产的消息,到redis中去消费了。即将数据同步到redis中

三、在JAVA中进行实时监听

添加依赖

<dependencies>
 	<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
        <!--整合Redis , 底层可以用jedis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>
    </dependencies>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

yml文件

spring:
  redis:
    host: 10.70.20.25
    password: 123456
    port: 6379
    timeout: 20000
    jedis:
      pool:
        max-active: 100 #连接池最大连接数
        max-wait: -1 #连接池最大阻塞等待时间(-1标识没有限制)
        max-idle: 10 #连接池中最大空闲连接
        min-idle: 0 #连接池中最小空闲连接
    database: 5
rocketmq:
  name-server: 10.70.20.25:9876
  # 是否开启自动配置
  producer:
    enable-msg-trace: true
    group: "test"
    # 消息最大长度 默认 1024 * 4 (4M)
    max-message-size: 4096
    # 发送消息超时时间,默认 3000
    send-message-timeout: 3000
    # 发送消息失败重试次数,默认2
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

编写实体类来封装MQ的中的消息

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    private Integer uid;
    private String username;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

MQ中的消息内容进行封装

import com.gc.canal.bean.User;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@AllArgsConstructor
@NoArgsConstructor
@Data
public class CanalSynDto {
    private List<User> data;
    private String database;
    private String table;
    private String type;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

编写MQ消费者代码,把MQ消息封装成 CanalSynDto 对象,然后取到data数据,再根据SQL的类型(insert,delete,update)对Redis进行数据同步


import com.alibaba.fastjson.JSON;
import com.gc.canal.util.CanalSynDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @Author gaochuang
 * @Description //TODO
 * @Date {2023-02-24},{17:56}
 * @Param {UseCanal}
 **/
@Slf4j
@Component
//对应了canal的instance.properties 中的canal.mq.topic=example
@RocketMQMessageListener(topic = "example", //TOPIC主题,
        selectorExpression="*" //tag标签
        ,consumerGroup = "test"
        ,messageModel = MessageModel.CLUSTERING
)
public class CanalSynListenner  implements RocketMQListener<MessageExt> {

    //注入Redis API
    @Resource
    private RedisTemplate<Object,Object> redisTemplate;

    @Override
    public void onMessage(MessageExt message) {
        try {
            //拿到MQ中的消息内容
            String json = new String(message.getBody(), "utf-8");
            //把数据转为实体类
            CanalSynDto canalSynDto = JSON.parseObject(json, CanalSynDto.class);
            log.info("canal同步 {}", canalSynDto);
            //如果是INSERT或者UPDATE,直接往Redis添加
            if(canalSynDto.getType().equals("INSERT") || canalSynDto.getType().equals("UPDATE")){
                //insert就添加,update就覆盖
                canalSynDto.getData().forEach(employee -> {
                    //以  ID为key,把对象存储到Redis中
                    redisTemplate.opsForValue().set("ID:"+employee.getUid(),employee);
                });
                //删除命令
            }else if (canalSynDto.getType().equals("DELETE")){
                canalSynDto.getData().forEach(employee -> {
                    //以  ID为key,把对象从Redis中删除
                    redisTemplate.delete("ID:"+employee.getUid());
                });
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

插入数据

在这里插入图片描述

四、最后结果

在这里插入图片描述

总结

canal的好处是对业务代码没有入侵,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时,是很多企业比较常见的数据同步方案。canal的部署也是支持高可用的,可以做集群

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号