赞
踩
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sand</groupId> <artifactId>flinkcdc</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>Flink Quickstart Job</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.17.1</flink.version> <!-- <flink.version>1.14.4</flink.version>--> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>2.17.1</log4j.version> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <!-- <artifactId>flink-streaming-java_2.12</artifactId>--> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <!-- <dependency>--> <!-- <groupId>org.apache.flink</groupId>--> <!-- <artifactId>flink-scala_2.12</artifactId>--> <!-- <version>${flink.version}</version>--> <!-- </dependency>--> <dependency> <groupId>org.apache.flink</groupId> <!-- <artifactId>flink-clients_2.12</artifactId>--> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7</artifactId> <version>3.0.1-1.17</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>3.1.1-1.17</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.32</version> </dependency> <!-- 打印日志的jar包 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.17.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.30</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.10</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${target.java.version}</source> <target>${target.java.version}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <excludes> <exclude>org.apache.flink:flink-shaded-force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>org.apache.logging.log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.sand.DataStreamJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> <pluginManagement> <plugins> <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --> <plugin> <groupId>org.eclipse.m2e</groupId> <artifactId>lifecycle-mapping</artifactId> <version>1.0.0</version> <configuration> <lifecycleMappingMetadata> <pluginExecutions> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <versionRange>[3.1.1,)</versionRange> <goals> <goal>shade</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <versionRange>[3.1,)</versionRange> <goals> <goal>testCompile</goal> <goal>compile</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> </pluginExecutions> </lifecycleMappingMetadata> </configuration> </plugin> </plugins> </pluginManagement> </build> </project>
package com.sand; import org.apache.commons.collections.CollectionUtils; import java.util.Arrays; import java.util.List; import java.util.StringJoiner; /** * @author zdd */ public class CDCKit { public static void main(String[] args) { String tempDir = System.getProperty("java.io.tmpdir"); System.out.println("tempDir = " + tempDir); } /** * 数据库 */ private static final String database = "byyy_iowtb_wms_test"; /** * 表名 */ private static final List<String> tableList = Arrays.asList( "inv_tt_stock_info", "base_tm_sku", "base_tm_third_sku_certificate", "base_tm_sku_gsp" ); /** * ip */ private static final String hostname = "192.168.111.107"; /** * 端口 */ private static final int port = 3306; /** * 用户名 */ private static final String username = "test_cdc"; /** * 密码 */ private static final String password = "Test_cdc@123"; public static String getDatabase() { return database; } public static String getTableList() { if (CollectionUtils.isEmpty(tableList)) { return null; } //,分割 StringJoiner stringJoiner = new StringJoiner(","); for (String tableName : tableList) { stringJoiner.add(getDatabase() + "." + tableName); } return stringJoiner.toString(); } public static String getHostname() { return hostname; } public static int getPort() { return port; } public static String getUsername() { return username; } public static String getPassword() { return password; } }
package com.sand; import cn.hutool.core.io.FileUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.File; import java.util.Objects; import java.util.Properties; public class DataStreamJob { public static void main(String[] args) throws Exception { //获取临时文件目录 String tempDir = System.getProperty("java.io.tmpdir"); String latestCheckpoint = getLatestCheckpoint(); System.out.println("latestCheckpoint = " + latestCheckpoint); Configuration configuration = new Configuration(); if(StringUtils.isNotBlank(latestCheckpoint)){ configuration.setString("execution.savepoint.path", "file:///" + latestCheckpoint); } StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); env.setParallelism(1); //2.1 开启 Checkpoint,每隔 60 秒钟做一次 CK env.enableCheckpointing(1000L * 60); //2.2 指定 CK 的一致性语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //2.3 设置任务关闭的时候保留最后一次 CK 数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //2.4 指定从 CK 自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //2.5 设置状态后端 env.setStateBackend(new FsStateBackend("file:///" + tempDir + "ck")); // ck 设置 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); Properties properties = new Properties(); properties.setProperty("snapshot.locking.mode", "none"); properties.setProperty("decimal.handling.mode", "string"); MySqlSource<String> sourceFunction = MySqlSource.<String>builder() .hostname(CDCKit.getHostname()) .port(CDCKit.getPort()) .databaseList(CDCKit.getDatabase()) .tableList(CDCKit.getTableList()) .username(CDCKit.getUsername()) .password(CDCKit.getPassword()) .scanNewlyAddedTableEnabled(true) .deserializer(new JsonDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .debeziumProperties(properties) .build(); //4.使用 CDC Source 从 MySQL 读取数据 env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "mysql-source").addSink(new MysqlSink()); //5.打印数据 // mysqlStream.print(); //6.执行任务 env.execute(); } private static String getLatestCheckpoint() { File ckDir = new File(System.getProperty("java.io.tmpdir") + "ck"); File[] files = ckDir.listFiles(); if (files == null) { return null; } String path = null; long lastModified = 0; for (File file : files) { //获取文件夹下-chk-开头文件夹-最新的文件夹 if (file.isDirectory()) { File[] files1 = file.listFiles(); if (files1 == null) { continue; } for (File file1 : files1) { if (!file1.isDirectory() || !file1.getName().startsWith("chk-")) { continue; } if (file1.lastModified() > lastModified) { lastModified = file1.lastModified(); path = file1.getAbsolutePath(); } } } } //删除其余目录 if (StringUtils.isEmpty(path)) { return null; } String tempPath = path.substring(0, path.lastIndexOf("\\")); for (File file : files) { if (file.isDirectory() && !Objects.equals(file.getAbsolutePath(), tempPath)) { FileUtil.del(file); } } return path; } }
package com.sand;
/**
* @author zdd
*/
public class MysqlSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<String> {
@Override
public void invoke(String value, org.apache.flink.streaming.api.functions.sink.SinkFunction.Context context) throws Exception {
System.out.println("value = " + value);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。