赞
踩
# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值
更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改
-- pg新建用户
CREATE USER hadoop WITH PASSWORD '***';
-- 给用户复制流权限
ALTER ROLE hadoop replication;
-- 给用户数据库权限
grant CONNECT ON DATABASE hadoop to hadoop;
-- 把当前库所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO hadoop;
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
import com.alibaba.ververica.cdc.connectors.postgres.PostgreSQLSource; import com.yogorobot.gmall.realtime.function.MyDebezium; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.time.Duration; import java.util.Properties; public class Flink_CDCWIthProduct { private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis(); //功能:测试实时读取pgsql数据 public static void main(String[] args) throws Exception { //TODO 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("snapshot.mode", "never"); properties.setProperty("debezium.slot.name", "pg_cdc"); properties.setProperty("debezium.slot.drop.on.stop", "true"); properties.setProperty("include.schema.changes", "true"); //使用连接器配置属性启用定期心跳记录生成 properties.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS)); //TODO 创建Flink-PgSQL-CDC的Source 读取生产环境pgsql数据库 SourceFunction<String> pgsqlSource = PostgreSQLSource.<String>builder() .hostname("pgr-***.pg.rds.aliyuncs.com") .port(1921) .database("jarvis_ticket") // monitor postgres database .schemaList("jarvis_ticket") // monitor inventory schema .tableList("jarvis_ticket.t_category") // monitor products table .username("***") .password("***") //反序列化 .deserializer(new MyDebezium()) //标准逻辑解码输出插件 .decodingPluginName("pgoutput") //配置 .debeziumProperties(properties) .build(); //TODO 使用CDC Source从PgSQL读取数据 DataStreamSource<String> pgsqlDS = env.addSource(pgsqlSource); //TODO 将数据输出到kafka中 //pgsqlDS.addSink(MyKafkaUtil.getKafkaSink("***")); //TODO 打印到控制台 pgsqlDS.print(); //TODO 执行任务 env.execute(); } }
import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; public class MyDebezium implements DebeziumDeserializationSchema<String> { @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //1.创建一个JSONObject用来存放最终封装好的数据 JSONObject result = new JSONObject(); //2.获取数据库以及表名 String topic = sourceRecord.topic(); String[] split = topic.split("\\."); //数据库名 String schema = split[1]; //表名 String tableName = split[2]; //4.获取数据 Struct value = (Struct) sourceRecord.value(); //5.获取before数据 Struct structBefore = value.getStruct("before"); JSONObject beforeJson = new JSONObject(); if (structBefore != null) { Schema schemas = structBefore.schema(); List<Field> fields = schemas.fields(); for (Field field : fields) { beforeJson.put(field.name(), structBefore.get(field)); } } //6.获取after数据 Struct structAfter = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if (structAfter != null) { Schema schemas = structAfter.schema(); List<Field> fields = schemas.fields(); for (Field field : fields) { afterJson.put(field.name(), structAfter.get(field)); } } String type="update"; if(structBefore==null){ type="insert"; } if(structAfter==null){ type="delete"; } //将数据封装到JSONObject中 result.put("schema", schema); result.put("tableName", tableName); result.put("before", beforeJson); result.put("after", afterJson); result.put("type", type); //将数据发送至下游 collector.collect(result.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。