赞
踩
这里练习Flink中的keyBy、reduce方法
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>
package org.feng.transform; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Created by Feng on 2019/12/6 14:48 * CurrentProject's name is flink * 练习:keyBy() , reduce() * 注意这里的求和运算是持续求和计算 * @author Feng */ public class Transformations2 { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 构建类型为 Tuple2 的数据流 DataStream<Tuple2<String, Integer>> tuple2DataStream = env .fromElements(new Tuple2<>("feng", 1), new Tuple2<>("so", 2), new Tuple2<>("shuai",3), new Tuple2<>("bi", 4), new Tuple2<>("feng", 5)); // 按照 Tuple2 的第一个元素分区 // KeyBy 操作用于用户自定义的 POJOs 类型时,该自定义类型必须重写 hashCode 方法; // KeyBy 操作不能用于数组类型。 KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = tuple2DataStream.keyBy(0); // 将同一分区的Tuple2的第二个元素累加 // 写法1: tuple2TupleKeyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 8214548377661004442L; @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) { return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }).map(value -> "匿名类:" + value).print().setParallelism(1); // 写法2: tuple2TupleKeyedStream.reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1)) .map(value -> "lambda:" + value) .print() .setParallelism(1); // 执行 env.execute(); } }
匿名类:(feng,1)
匿名类:(feng,6)
匿名类:(so,2)
lambda:(feng,1)
lambda:(feng,6)
lambda:(shuai,3)
lambda:(bi,4)
匿名类:(shuai,3)
匿名类:(bi,4)
lambda:(so,2)
Process finished with exit code 0
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。