当前位置:   article > 正文

Windows系统安装Flink及实现MySQL之间数据同步_flink windows安装

flink windows安装

        Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink的设计目标是在所有常见的集群环境中运行,并以内存执行速度和任意规模来执行计算。它支持高吞吐、低延迟、高性能的流处理,并且是一个面向流处理和批处理的分布式计算框架,将批处理看作一种特殊的有界流。

Flink的主要特点包括:

  1. 事件驱动型:Flink是一个事件驱动型的应用,可以从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
  2. 支持有状态计算:Flink提供了Extactor-once语义及checkpoint机制,支持带有事件操作的流处理和窗口处理,以及灵活的窗口处理(如时间窗口、大小窗口等)。
  3. 轻量级容错处理:Flink使用savepoint进行错误恢复,可以在出现故障时快速恢复任务。
  4. 高吞吐、低延迟、高性能:Flink的设计目标是在保证数据处理稳定性的同时,实现高吞吐、低延迟、高性能的流处理。
  5. 支持大规模集群模式:Flink支持在yarn、Mesos、k8s等大规模集群环境中运行。
  6. 支持多种编程语言:Flink对java、scala、python都提供支持,但最适合使用java进行开发。

        Flink的应用场景非常广泛,可以用于实时流数据的分析计算、实时数据与维表数据关联计算、实时数仓建设、ETL(提取-转换-加载)多存储系统之间进行数据转化和迁移等场景。同时,Flink也适用于事件驱动型应用场景,如以kafka为代表的消息队列等。

1.Winows系统安装Flink

下载地址:Downloads | Apache Flink

选择 Apache Flink 1.16.0 - 2022-10-28 (Binaries

下载 flink-1.16.0-bin-scala_2.12.tgz

在Flink安装路径/bin/目录下创建start-cluster.bat,代码如下:

  1. ::###############################################################################
  2. :: Licensed to the Apache Software Foundation (ASF) under one
  3. :: or more contributor license agreements. See the NOTICE file
  4. :: distributed with this work for additional information
  5. :: regarding copyright ownership. The ASF licenses this file
  6. :: to you under the Apache License, Version 2.0 (the
  7. :: "License"); you may not use this file except in compliance
  8. :: with the License. You may obtain a copy of the License at
  9. ::
  10. :: http://www.apache.org/licenses/LICENSE-2.0
  11. ::
  12. :: Unless required by applicable law or agreed to in writing, software
  13. :: distributed under the License is distributed on an "AS IS" BASIS,
  14. :: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. :: See the License for the specific language governing permissions and
  16. :: limitations under the License.
  17. ::###############################################################################
  18. @echo off
  19. setlocal EnableDelayedExpansion
  20. SET bin=%~dp0
  21. SET FLINK_HOME=%bin%..
  22. SET FLINK_LIB_DIR=%FLINK_HOME%\lib
  23. SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
  24. SET FLINK_CONF_DIR=%FLINK_HOME%\conf
  25. SET FLINK_LOG_DIR=%FLINK_HOME%\log
  26. SET JVM_ARGS=-Xms1024m -Xmx1024m
  27. SET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*
  28. SET logname_jm=flink-%username%-jobmanager.log
  29. SET logname_tm=flink-%username%-taskmanager.log
  30. SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
  31. SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
  32. SET outname_jm=flink-%username%-jobmanager.out
  33. SET outname_tm=flink-%username%-taskmanager.out
  34. SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
  35. SET out_tm=%FLINK_LOG_DIR%\%outname_tm%
  36. SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
  37. SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
  38. :: Log rotation (quick and dirty)
  39. CD "%FLINK_LOG_DIR%"
  40. for /l %%x in (5, -1, 1) do (
  41. SET /A y = %%x+1
  42. RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
  43. RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
  44. RENAME "%outname_jm%.%%x" "%outname_jm%.!y!" 2> nul
  45. RENAME "%outname_tm%.%%x" "%outname_tm%.!y!" 2> nul
  46. )
  47. RENAME "%logname_jm%" "%logname_jm%.0" 2> nul
  48. RENAME "%logname_tm%" "%logname_tm%.0" 2> nul
  49. RENAME "%outname_jm%" "%outname_jm%.0" 2> nul
  50. RENAME "%outname_tm%" "%outname_tm%.0" 2> nul
  51. DEL "%logname_jm%.6" 2> nul
  52. DEL "%logname_tm%.6" 2> nul
  53. DEL "%outname_jm%.6" 2> nul
  54. DEL "%outname_tm%.6" 2> nul
  55. for %%X in (java.exe) do (set FOUND=%%~$PATH:X)
  56. if not defined FOUND (
  57. echo java.exe was not found in PATH variable
  58. goto :eof
  59. )
  60. echo Starting a local cluster with one JobManager process and one TaskManager process.
  61. echo You can terminate the processes via CTRL-C in the spawned shell windows.
  62. echo Web interface by default on http://localhost:8081/.
  63. start /b java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
  64. start /b java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1
  65. endlocal

使用CMD窗口,在Flink安装路径/bin目录下启动start-cluster.bat

访问http://localhost:8081,界面如下:

2.使用Flink实现MySQL数据库之间数据同步(JAVA)

<flink.version>1.16.0</flink.version>
<flink-cdc.version>2.3.0</flink-cdc.version>

1.创建Flink流处理运行环境。

2.设置流处理并发数。

3.设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步。

4.在Flink中创建中间同步数据库。

5.在Flink中创建中间表flink_source,来源于MySQL表source,(注意connector为mysql-cdc)。

6.在Flink中创建中间表flink_sink,来源于MySQL表sink。

7.将Flink中间表来源表数据写入flink_sink表,Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步。

  1. package com.demo.flink;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  4. public class FlinkCdcMySql {
  5. public static void main(String[] args) {
  6. System.out.println("==========start run FlinkCdcMySql#main.");
  7. // 创建Flink流处理运行环境
  8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. // StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081);
  10. // 设置流处理并发数
  11. env.setParallelism(3);
  12. // 设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步
  13. env.enableCheckpointing(5000);
  14. final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  15. // 在Flink中创建中间同步数据库
  16. tEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_test");
  17. // 在Flink中创建中间表flink_source,来源于MySQL表source
  18. // 注意connector为mysql-cdc
  19. tEnv.executeSql("CREATE TABLE flink_test.flink_source (\n" +
  20. " id int,\n" +
  21. " name varchar(255),\n" +
  22. " create_time TIMESTAMP\n," + // Flink不支持datetime格式
  23. " PRIMARY KEY (id) NOT ENFORCED" + //主键必须标明NOT ENFORCED
  24. ") WITH (\n" +
  25. " 'connector' = 'mysql-cdc',\n" +
  26. " 'hostname' = '127.0.0.1',\n" +
  27. " 'database-name' = 'flink-source',\n" +
  28. " 'table-name' = 'source',\n" +
  29. " 'username' = 'root',\n" +
  30. " 'password' = 'root'\n" +
  31. ")");
  32. // 在Flink中创建中间表flink_sink,来源于MySQL表sink
  33. tEnv.executeSql("CREATE TABLE flink_test.flink_sink (\n" +
  34. " id int,\n" +
  35. " name varchar(255),\n" +
  36. " create_time TIMESTAMP\n," +
  37. " PRIMARY KEY (id) NOT ENFORCED" +
  38. ") WITH (\n" +
  39. " 'connector' = 'jdbc',\n" +
  40. " 'url' = 'jdbc:mysql://127.0.0.1:3306/flink-sink',\n" +
  41. " 'table-name' = 'sink',\n" +
  42. " 'driver' = 'com.mysql.jdbc.Driver',\n" +
  43. " 'username' = 'root',\n" +
  44. " 'password' = 'root'\n" +
  45. ")");
  46. // Table transactions = tEnv.from("flink_source");
  47. // transactions.executeInsert("flink_sink");
  48. System.out.println("==========begin Mysql data cdc.");
  49. // 将Flink中间表来源表数据写入flink_sink表
  50. // Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步
  51. tEnv.executeSql("INSERT INTO flink_test.flink_sink(id, name, create_time)\n" +
  52. "select id, name, create_time\n" +
  53. "from flink_test.flink_source\n");
  54. System.out.println("==========continue Mysql data cdc.");
  55. }
  56. }

git代码地址:

flink-cdc-MySQL: FlinkCDC实现MySQL之间数据同步

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/764645
推荐阅读
相关标签
  

闽ICP备14008679号