赞
踩
1.1、首先将Master的binary-log的日志文件打开,MySQL会将所有的DDL,DML,TCL,写入到BinaryLog的日志文件中
1.2、canal的工作原理:把自己伪装成MySQL的从机(slave),模拟MySQL的slave的交互协议,向MySQL master发送请求,MySQL master收到canal发送过来的请求,开始推送binary log 给canal,然后canal解析二进制日志(binary log)再发送到存储目的地
canal数据同步不是全量的,而是增量的,基于binary log 增量订阅和消费,具体可以做以下几个事情
大概的步骤就是,使用canal配置MySQL数据库和RocketMQ,将binaryLog中的增量发送到MQ当中,再由MQ推送到Redis中,从而实现Mysql和Redis之间的数据同步。
```ini
[mysqld]
#开启bInlog
log-bin=mysql-bin
#给mysql服务指定一个唯一的ID
server-id=7
#以数据的方式写binlog日志 :statement 是记录SQL,row是记录数据
binlog-format=ROW
```
```ini
[mysqld]
#开启bInlog
log-bin=mysql-bin
#给mysql服务指定一个唯一的ID
server-id=7
#以数据的方式写binlog日志 :statement 是记录SQL,row是记录数据
binlog-format=ROW
```
create database testcanal;
create table user(uid int primary key auto_increment,username varchar(50));
#创建用户cannal,密码为canal
CREATE USER canal IDENTIFIED BY 'canal';
#把所有权限赋予canal
grant all privileges on *.* to 'canal'@'%';

#你的数据库的配置文件(注意如果连接失败,关闭防火墙)
canal.instance.master.address=10.70.20.25:3306
#用户名
canal.instance.dbUsername = canal
#密码
canal.instance.dbPassword = canal
#这个是你发送消息的topic
canal.mq.topic=example
#本次使用的RocketMQ
canal.serverMode = rocketMQ
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
# RocketMQ的地址(注意如果连接失败,关闭防火墙)
rocketmq.namesrv.addr = 10.70.20.25:9876
查看日志
[root@localhost canal]# tail -f logs/example/example.log



<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.3</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency> <!--整合Redis , 底层可以用jedis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> </dependencies>
spring: redis: host: 10.70.20.25 password: 123456 port: 6379 timeout: 20000 jedis: pool: max-active: 100 #连接池最大连接数 max-wait: -1 #连接池最大阻塞等待时间(-1标识没有限制) max-idle: 10 #连接池中最大空闲连接 min-idle: 0 #连接池中最小空闲连接 database: 5 rocketmq: name-server: 10.70.20.25:9876 # 是否开启自动配置 producer: enable-msg-trace: true group: "test" # 消息最大长度 默认 1024 * 4 (4M) max-message-size: 4096 # 发送消息超时时间,默认 3000 send-message-timeout: 3000 # 发送消息失败重试次数,默认2 retry-times-when-send-failed: 2 retry-times-when-send-async-failed: 2
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private Integer uid;
private String username;
}
import com.gc.canal.bean.User; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.List; @AllArgsConstructor @NoArgsConstructor @Data public class CanalSynDto { private List<User> data; private String database; private String table; private String type; }
import com.alibaba.fastjson.JSON; import com.gc.canal.util.CanalSynDto; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @Author gaochuang * @Description //TODO * @Date {2023-02-24},{17:56} * @Param {UseCanal} **/ @Slf4j @Component //对应了canal的instance.properties 中的canal.mq.topic=example @RocketMQMessageListener(topic = "example", //TOPIC主题, selectorExpression="*" //tag标签 ,consumerGroup = "test" ,messageModel = MessageModel.CLUSTERING ) public class CanalSynListenner implements RocketMQListener<MessageExt> { //注入Redis API @Resource private RedisTemplate<Object,Object> redisTemplate; @Override public void onMessage(MessageExt message) { try { //拿到MQ中的消息内容 String json = new String(message.getBody(), "utf-8"); //把数据转为实体类 CanalSynDto canalSynDto = JSON.parseObject(json, CanalSynDto.class); log.info("canal同步 {}", canalSynDto); //如果是INSERT或者UPDATE,直接往Redis添加 if(canalSynDto.getType().equals("INSERT") || canalSynDto.getType().equals("UPDATE")){ //insert就添加,update就覆盖 canalSynDto.getData().forEach(employee -> { //以 ID为key,把对象存储到Redis中 redisTemplate.opsForValue().set("ID:"+employee.getUid(),employee); }); //删除命令 }else if (canalSynDto.getType().equals("DELETE")){ canalSynDto.getData().forEach(employee -> { //以 ID为key,把对象从Redis中删除 redisTemplate.delete("ID:"+employee.getUid()); }); } } catch (Exception e) { e.printStackTrace(); } } }


canal的好处是对业务代码没有入侵,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时,是很多企业比较常见的数据同步方案。canal的部署也是支持高可用的,可以做集群
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。