当前位置:   article > 正文

flink cdc 连接 postgresql kafka_flink postgresql

flink postgresql

最近工作中,需要拉取业务库postgresql的数据作实时处理。调研一番,决定用Flink cdc特性,踩了一些坑,特此记录便于日后查阅。

版本信息
组件版本
Flink1.12.2

postgresql

10.10
kafka2.12-2.4.0
scala2.11.12

开干前需要引用一些jar包

flink-sql-connector-postgres-cdc-1.2.0.jar  --  flink cdc 连接到pgsql
flink-format-changelog-json-1.2.0.jar  --  flink 数据sink到kafka时格式化
flink-connector-kafka_2.12-1.12.2.jar  --  flink 连接到kafka
flink-connector-jdbc_2.12-1.12.2.jar  --  flink 连接到jdbc (pgsql、mysql)

jar包、flink和postgresql的传送门:https://pan.baidu.com/s/1LSPDjwfcnuNRc9kuj3uSMA     提取码:1234 

1--6步在说明安装postgresql,如果已安装好可直接跳至第7步

1、安装postgresql所需要的依赖

  1. ​$ sudo apt-get install libreadline-dev
  2. $ sudo apt-get install zlib1g
  3. $ sudo apt-get install zlib1g.dev
  4. $ sudo apt-get install libreadline-dev

2、 解压;编译安装 PostgreSQL

  1. $ tar -zxf postgresql-10.10.tar.gz
  2. $ cd postgresql-10.10
  3. $ ./configure
  4. $ make && sudo make install

3、创建一个postgres,并赋予权限

  1. $ adduser postgres
  2. $ cd /usr/local/pgsql
  3. $ mkdir data
  4. $ chown postgres /usr/local/pgsql/data
  5. $ su postgres
  6. $ /usr/local/pgsql/bin/initdb -D /usr/local/pgsql/data
  7. $ su root
  8. $ /usr/local/pgsql/bin/postgres -D /usr/local/pgsql/data >logfile 2>&1 &

4、启动数据库

  1. $ cd /usr/local/pgsql/bin
  2. $ su postgres
  3. $ ./pg_ctl start -D /usr/local/pgsql/data

5、为 pg_ctl 创建软链接;访问数据库;设置 postgres 数据库密码;退出数据库

  1. $ ln -s /usr/local/pgsql/bin/pg_ctl /usr/bin/pg_ctl
  2. $ ./psql
  3. ALTER USER postgres WITH PASSWORD 'postgres';
  4. \q

6、配置远程访问

$ vim /usr/local/pgsql/data/pg_hba.conf

   并编辑 postgresql.conf 文件,并修改为以下内容:

  1. $ vim /usr/local/pgsql/data/postgresql.conf
  2. listen_addresses = '*'
  3. port = 5432

  至此。 PostgreSQL安装完成。

7、打开pgsql的binlog模式, 并重启数据库

  1. $ vim /usr/local/pgsql/data/postgresql.conf
  2. wal_level = logical
  3. $ su postgres
  4. $ pg_ctl restart -D /usr/local/pgsql/data

 8、pg建表语句

  1. -- 查询表
  2. CREATE TABLE "public"."cdc_pg_source" (
  3. "id" int2 NOT NULL DEFAULT NULL,
  4. "age" int2 DEFAULT NULL,
  5. "name" varchar(255) COLLATE "pg_catalog"."default" DEFAULT NULL,
  6. CONSTRAINT "cdc_pg_source_pkey" PRIMARY KEY ("id")
  7. );
  8. -- 写入表
  9. CREATE TABLE "public"."cdc_pg_sink" (
  10. "id" int2 NOT NULL DEFAULT NULL,
  11. "age" int2 DEFAULT NULL,
  12. "name" varchar(255) COLLATE "pg_catalog"."default" DEFAULT NULL::character varying,
  13. CONSTRAINT "cdc_pg_sink_pkey" PRIMARY KEY ("id")
  14. );

9、 修改表的逻辑复制权限,FULL: 更新和删除包含所有列的先前值

  1. ALTER TABLE public.cdc_pg_source REPLICA IDENTITY FULL
  2. ALTER TABLE public.cdc_pg_sink REPLICA IDENTITY FULL

10 、 开启flink服务;进入flink-sql客户端

  1. FLINK_HOME/bin/start-cluster.sh
  2. FLINK_HOME/bin/sql-client.sh embedded

11、从postgresql查,写入到postgresql

 在flink-sql中创建与pg的映射表, 并执行job (sink.buffer-flush.max-rows :配置刷新前缓冲记录的最大大小)

  1. -- pg中映射表,source
  2. CREATE TABLE cdc_pg_source (
  3. id INT,
  4. age INT,
  5. name STRING
  6. ) WITH (
  7. 'connector' = 'postgres-cdc',
  8. 'hostname' = '192.168.1.88',
  9. 'port' = '5432',
  10. 'database-name' = 'postgres',
  11. 'schema-name' = 'public',
  12. 'username' = 'postgres',
  13. 'password' = 'postgres',
  14. 'table-name' = 'cdc_pg_source',
  15. 'decoding.plugin.name' = 'pgoutput',
  16. 'debezium.slot.name' = 'cdc_pg_source');
  17. -- pg中映射表,sink
  18. CREATE TABLE cdc_pg_sink (
  19. id INT,
  20. age INT,
  21. name STRING,
  22. PRIMARY KEY (id) NOT ENFORCED
  23. ) WITH (
  24. 'connector' = 'jdbc',
  25. 'url' = 'jdbc:postgresql://192.168.1.88:5432/postgres',
  26. 'username' = 'postgres',
  27. 'password' = 'postgres',
  28. 'table-name' = 'cdc_pg_sink',
  29. 'sink.buffer-flush.max-rows' = '1');
  30. -- flink job
  31. INSERT INTO cdc_pg_sink select * from cdc_pg_source;

 flink web UI 查看job已经在运行中

12 、从postgresql查,写入到kafka

注意: debezium.slot.name的值要唯一,不然会报错:Failed to start replication stream at LSN{0/1608BC8}; 
            when setting up multiple connectors for the same database host, 
            please make sure to use a distinct replication slot name for each.

在第11步pg写pg时,pg中的表 'cdc_pg_source'已经映射成'debezium.slot.name' = 'cdc_pg_source', 同一张表再sink到kafka时,要把'debezium.slot.name'重新赋值

  1. -- pg映射表,source
  2. CREATE TABLE cdc_pg_source_kafka (
  3. id INT,
  4. age INT,
  5. name STRING
  6. ) WITH (
  7. 'connector' = 'postgres-cdc',
  8. 'hostname' = '192.168.1.88',
  9. 'port' = '5432',
  10. 'database-name' = 'postgres',
  11. 'schema-name' = 'public',
  12. 'username' = 'postgres',
  13. 'password' = 'postgres',
  14. 'table-name' = 'cdc_pg_source',
  15. 'decoding.plugin.name' = 'pgoutput',
  16. 'debezium.slot.name' = 'cdc_pg_source_kafka');
  17. -- 创建的表sink到kafka
  18. CREATE TABLE pg_to_kafka (
  19. id INT,
  20. age INT,
  21. name STRING
  22. ) WITH (
  23. 'connector' = 'kafka',
  24. 'topic' = 'pg_to_kafka',
  25. 'scan.startup.mode' = 'earliest-offset',
  26. 'properties.bootstrap.servers' = '192.168.1.88:9092',
  27. 'format' = 'changelog-json');
  28. -- 写入到kafka
  29. INSERT INTO pg_to_kafka select * from cdc_pg_source_kafka ;

  flink web UI 查看job已经在运行中

查看kafka topic 列表,flink-sql客户端的topic已经创建

消费创建的topic,已经有数据打印出来

complete

 


 

 

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/200284?site
推荐阅读
相关标签
  

闽ICP备14008679号