赞
踩
目录
在学习的过程中遇到了file和kafka数据源读写(sink和source)的问题,我使用的flink是1.13.3的版本,发现connect方法已经过时了,于是就去官网查看了一下推荐的方式,下面我会用代码来实现一下:
参考官方文档:CSV
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/csv/Kafka
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/
本地数据源:
- id,timeStamp,temperature
- sensor_1,1,1
- sensor_1,2,2
- sensor_1,3,32
- sensor_1,4,4
- sensor_1,5,5
- sensor_1,22,6
- sensor_1,23,99
- sensor_2,25,13
- sensor_2,32,40.2
- sensor_3,40,42
添加依赖
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-csv</artifactId>
- <version>1.13.3</version>
- </dependency>
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
-
- //step-1 表的元数据
- Schema schema = new Schema()
- .field("id", DataTypes.STRING())
- .field("timeStamp", DataTypes.BIGINT())
- .field("temperature", DataTypes.FLOAT());
-
- //step-2 连接文件,并创建一个临时表,其实就是一个动态表
- tEnv.connect(new FileSystem()
- .path("..\\resources\\sensor.txt"))
- .withSchema(schema)//定义表结构
- //fieldDelimiter是一行数据根据逗号划分字段
- //lineDelimiter每行数据根据换行符划分一条数据
- .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n"))
- .createTemporaryTable("sensor");//创建临时表
-
-
- //step-3 做成表对象,然后对动态表进行查询
- Table resTable = tEnv.from("sensor")
- .groupBy($("id"))
- .select($("id"), $("id").count().as("cnt"));
- tEnv.toRetractStream(resTable, Row.class).print("cnt");
-
- env.execute();

- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
- //step-1 编写数据source的sql语句
- String sourceDDL =
- "create table sensor_source(id STRING," +
- "`timeStamp` BIGINT," +
- "temperature DOUBLE" +
- ") with (" +
- "'connector'='filesystem'," +
- "'format'='csv'," +
- "'csv.field-delimiter'=','," + //每个字段按,切分且这里不用指定每行数据的切分,默认一行一行读
- "'path'='..\\resources\\sensor.txt'" +
- ")";
- tEnv.executeSql(sourceDDL);
-
- Table resTable = tEnv.from("sensor_source")
- .groupBy($("id"))
- .select($("id"), $("id").count().as("cnt"));
- tEnv.toRetractStream(resTable, Row.class).print("cnt");
-
- env.execute();

实现结果
- cnt> (true,+I[sensor_1, 1])
- cnt> (false,-U[sensor_1, 1])
- cnt> (true,+U[sensor_1, 2])
- cnt> (false,-U[sensor_1, 2])
- cnt> (true,+U[sensor_1, 3])
- cnt> (false,-U[sensor_1, 3])
- cnt> (true,+U[sensor_1, 4])
- cnt> (false,-U[sensor_1, 4])
- cnt> (true,+U[sensor_1, 5])
- cnt> (false,-U[sensor_1, 5])
- cnt> (true,+U[sensor_1, 6])
- cnt> (false,-U[sensor_1, 6])
- cnt> (true,+U[sensor_1, 7])
- cnt> (true,+I[sensor_2, 1])
- cnt> (false,-U[sensor_2, 1])
- cnt> (true,+U[sensor_2, 2])
- cnt> (true,+I[sensor_3, 1])
-

- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
- //step-1 建造数据源
- DataStreamSource<SensorReading> waterSensorStream =
- env.fromElements(new SensorReading("sensor_1", 1000L, 10D),
- new SensorReading("sensor_1", 2000L, 20D),
- new SensorReading("sensor_2", 3000L, 30D),
- new SensorReading("sensor_1", 4000L, 40D),
- new SensorReading("sensor_1", 5000L, 50D),
- new SensorReading("sensor_2", 6000L, 60D));
-
- //step-2 处理数据
- Table resTable = tEnv.fromDataStream(waterSensorStream)
- .where($("id").isEqual("sensor_1"))
- .select($("id"), $("timeStamp"), $("temperature"));
-
-
- //step-3 配置元数据
- Schema schema = new Schema()
- .field("id", DataTypes.STRING())
- .field("timeStamp", DataTypes.BIGINT())
- .field("temperature", DataTypes.DOUBLE());
-
- tEnv.connect(new FileSystem().path("output/sink.txt"))
- .withFormat(new Csv().fieldDelimiter(','))
- .withSchema(schema)
- .createTemporaryTable("sensor_sink");
-
- resTable.executeInsert("sensor_sink");
- /*
- * explain
- * env.execute()方法会去分析代码,生成一些 graph,但是我们代码中没有调用算子,所以会报错,可以直接不用
- * */

- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
- //step-1 建造数据源
- DataStreamSource<SensorReading> waterSensorStream =
- env.fromElements(new SensorReading("sensor_1", 1000L, 10D),
- new SensorReading("sensor_1", 2000L, 20D),
- new SensorReading("sensor_2", 3000L, 30D),
- new SensorReading("sensor_1", 4000L, 40D),
- new SensorReading("sensor_1", 5000L, 50D),
- new SensorReading("sensor_2", 6000L, 60D));
-
- //step-2 处理数据
- Table resTable = tEnv.fromDataStream(waterSensorStream)
- .where($("id").isEqual("sensor_1"))
- .select($("id"), $("timeStamp"), $("temperature"));
-
-
- tEnv.executeSql(
- "create table file_sink(" +
- "`id` STRING," +
- "`timestamp` BIGINT," +
- "`temp` DOUBLE" +
- ")with(" +
- "'connector' = 'filesystem'," +
- "'format'='csv'," + //可以填写json
- //explain 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
- "'csv.ignore-parse-errors' = 'true'," +
- //explain 是否允许忽略注释行(默认不允许),注释行以 '#' 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。
- "'csv.allow-comments' = 'true'," +
- "'path'='output/sink_new.txt'" +
- ")");
-
- resTable.executeInsert("file_sink");

实现结果

添加依赖
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.12</artifactId>
- <version>1.13.3</version>
- </dependency>
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
-
- //step-1 表的元数据
- Schema schema = new Schema()
- .field("id", DataTypes.STRING())
- .field("timeStamp", DataTypes.BIGINT())
- .field("temperature", DataTypes.FLOAT());
-
- //接收来自sensor主题的消息
- tEnv.connect(new Kafka()
- .version("universal") //kafka通用版本
- .topic("sensor")
- .startFromLatest()
- .property("group.id", "bigdata")
- .property("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092"))
- .withFormat(new Json())
- .withSchema(schema)
- .createTemporaryTable("sensor");
-
- Table table = tEnv.from("sensor")
- .groupBy($("id"))
- .select($("id"), $("id").count().as("cnt"));
-
- tEnv.toRetractStream(table, Row.class).print();
- env.execute();

- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
- tEnv.executeSql(
- "create table kafka_source(" +
- "`id` STRING," +
- "`timestamp` BIGINT," +
- "`temp` DOUBLE" +
- ")with(" +
- "'connector' = 'kafka'," +
- "'topic' = 'sensor'," +
- "'properties.bootstrap.servers' = 'hadoop102:9092'," +
- "'properties.group.id' = 'bigdata'," +
- "'format' = 'json'," + //接收到的kafka都是json格式
- "'json.ignore-parse-errors' = 'true'" +
- ")");
-
- Table table = tEnv.from("kafka_source")
- .groupBy($("id"))
- .select($("id"), $("id").count().as("cnt"));
-
- tEnv.toRetractStream(table, Row.class).print();
-
- env.execute();

实现结果
这里注意kafka发送的数据格式必须是json格式

- (true,+I[sensor_1, 1])
- (false,-U[sensor_1, 1])
- (true,+U[sensor_1, 2])
- (true,+I[sensor_2, 1])
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
- //step-1 建造数据源
- DataStreamSource<SensorReading> waterSensorStream =
- env.fromElements(new SensorReading("sensor_1", 1000L, 10D),
- new SensorReading("sensor_1", 2000L, 20D),
- new SensorReading("sensor_2", 3000L, 30D),
- new SensorReading("sensor_1", 4000L, 40D),
- new SensorReading("sensor_1", 5000L, 50D),
- new SensorReading("sensor_2", 6000L, 60D));
-
- //step-2 处理数据
- Table sensorTable = tEnv.fromDataStream(waterSensorStream);
- Table resultTable = sensorTable
- .where($("id").isEqual("sensor_1"))
- .select($("id"), $("timeStamp"), $("temperature"));
-
- //step-3 配置元数据
- Schema schema = new Schema()
- .field("id", DataTypes.STRING())
- .field("timeStamp", DataTypes.BIGINT())
- .field("temperature", DataTypes.DOUBLE());
-
- tEnv.connect(new Kafka()
- .version("universal")
- .topic("sensor")
- .sinkPartitionerRoundRobin() // 分区轮询
- .property("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092"))
- .withFormat(new Json())
- .withSchema(schema)
- .createTemporaryTable("kafka_sink");
-
- resultTable.executeInsert("kafka_sink");

- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
- //step-1 建造数据源
- DataStreamSource<SensorReading> waterSensorStream =
- env.fromElements(new SensorReading("sensor_1", 1000L, 10D),
- new SensorReading("sensor_1", 2000L, 20D),
- new SensorReading("sensor_2", 3000L, 30D),
- new SensorReading("sensor_1", 4000L, 40D),
- new SensorReading("sensor_1", 5000L, 50D),
- new SensorReading("sensor_2", 6000L, 60D));
-
- //step-2 处理数据
- Table sensorTable = tEnv.fromDataStream(waterSensorStream);
- Table resultTable = sensorTable
- .where($("id").isEqual("sensor_1"))
- .select($("id"), $("timeStamp"), $("temperature"));
-
-
- tEnv.executeSql(
- "create table kafka_sink(" +
- "`id` STRING," +
- "`timestamp` BIGINT," +
- "`temp` DOUBLE" +
- ")with(" +
- "'connector' = 'kafka'," +
- "'topic' = 'sensor'," +
- "'properties.bootstrap.servers' = 'hadoop102:9092'," +
- "'properties.group.id' = 'bigdata'," +
- "'format' = 'json'," +
- "'json.ignore-parse-errors' = 'true'" +
- ")");
- /*
- * Conclusion
- * 对于source和sink上面的配置文件是没有办法影响的,source和sink的配置文件都是一样的;
- * 影响source和sink的就是下面调用的executeInsert("kafka_sink")和
- * tEnv.from("kafka_source"),这才是影响source和sink的方法。
- * */
-
- resultTable.executeInsert("kafka_sink");

实现结果

至此,实验结束,当作一个学习的记录吧。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。