当前位置:   article > 正文

FlinkCDC实时读PostgreSQL数据库_flink cdc postgresql

flink cdc postgresql

一.前置工作

1.更改配置文件postgresql.conf


# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值

更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改

2.新建用户并且给用户复制流权限

-- pg新建用户
CREATE USER hadoop WITH PASSWORD '***';
-- 给用户复制流权限
ALTER ROLE hadoop replication;
-- 给用户数据库权限
grant CONNECT ON DATABASE hadoop to hadoop;
-- 把当前库所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO hadoop;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3.发布表

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

二.java代码示例

import com.alibaba.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.yogorobot.gmall.realtime.function.MyDebezium;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.time.Duration;
import java.util.Properties;

public class Flink_CDCWIthProduct {
    private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();
        //功能:测试实时读取pgsql数据
    public static void main(String[] args) throws Exception {
        
        //TODO 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("snapshot.mode", "never");
        properties.setProperty("debezium.slot.name", "pg_cdc");
        properties.setProperty("debezium.slot.drop.on.stop", "true");
        properties.setProperty("include.schema.changes", "true");
        //使用连接器配置属性启用定期心跳记录生成
        properties.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS));

        //TODO 创建Flink-PgSQL-CDC的Source 读取生产环境pgsql数据库
        SourceFunction<String> pgsqlSource = PostgreSQLSource.<String>builder()
                .hostname("pgr-***.pg.rds.aliyuncs.com")
                .port(1921)
                .database("jarvis_ticket") // monitor postgres database
                .schemaList("jarvis_ticket")  // monitor inventory schema
                .tableList("jarvis_ticket.t_category") // monitor products table
                .username("***")
                .password("***")
                //反序列化
                .deserializer(new MyDebezium())
                //标准逻辑解码输出插件
                .decodingPluginName("pgoutput")
                //配置
                .debeziumProperties(properties)
                .build();

        //TODO 使用CDC Source从PgSQL读取数据
        DataStreamSource<String> pgsqlDS = env.addSource(pgsqlSource);

        //TODO 将数据输出到kafka中
     //pgsqlDS.addSink(MyKafkaUtil.getKafkaSink("***"));

        //TODO 打印到控制台
        pgsqlDS.print();

        //TODO 执行任务
        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

三.new MyDebezium代码示例

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class MyDebezium implements DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //1.创建一个JSONObject用来存放最终封装好的数据
        JSONObject result = new JSONObject();

        //2.获取数据库以及表名
        String topic = sourceRecord.topic();
        String[] split = topic.split("\\.");

        //数据库名
        String schema = split[1];
        //表名
        String tableName = split[2];


        //4.获取数据
        Struct value = (Struct) sourceRecord.value();

        //5.获取before数据
        Struct structBefore = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (structBefore != null) {
            Schema schemas = structBefore.schema();
            List<Field> fields = schemas.fields();
            for (Field field : fields) {
                beforeJson.put(field.name(), structBefore.get(field));
            }
        }

        //6.获取after数据
        Struct structAfter = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (structAfter != null) {
            Schema schemas = structAfter.schema();
            List<Field> fields = schemas.fields();
            for (Field field : fields) {
                afterJson.put(field.name(), structAfter.get(field));
            }
        }

        String type="update";
        if(structBefore==null){
            type="insert";
        }
        if(structAfter==null){
            type="delete";
        }

        //将数据封装到JSONObject中
        result.put("schema", schema);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);

        //将数据发送至下游
        collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/200249
推荐阅读
相关标签
  

闽ICP备14008679号