当前位置:   article > 正文

Flink的Transformations(2)_f1ink transformations

f1ink transformations

环境描述

这里练习Flink中的keyBy、reduce方法

<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.9.0</flink.version>
		<java.version>1.8</java.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>${java.version}</maven.compiler.source>
		<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.16.18</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.7</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
	</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

java代码

package org.feng.transform;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Feng on 2019/12/6 14:48
 * CurrentProject's name is flink
 * 练习:keyBy() , reduce()
 * 注意这里的求和运算是持续求和计算
 * @author Feng
 */
public class Transformations2 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        // 构建类型为 Tuple2 的数据流
        DataStream<Tuple2<String, Integer>> tuple2DataStream = env
                .fromElements(new Tuple2<>("feng", 1),
                              new Tuple2<>("so", 2),
                              new Tuple2<>("shuai",3),
                              new Tuple2<>("bi", 4),
                              new Tuple2<>("feng", 5));

        // 按照 Tuple2 的第一个元素分区
        // KeyBy 操作用于用户自定义的 POJOs 类型时,该自定义类型必须重写 hashCode 方法;
        // KeyBy 操作不能用于数组类型。
        KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = tuple2DataStream.keyBy(0);

        // 将同一分区的Tuple2的第二个元素累加
        // 写法1:
        tuple2TupleKeyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            private static final long serialVersionUID = 8214548377661004442L;

            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String,
                    Integer> value2) {
                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
            }
        }).map(value -> "匿名类:" + value).print().setParallelism(1);

        // 写法2:
        tuple2TupleKeyedStream.reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) -> new Tuple2<>(value1.f0,
                value1.f1 + value2.f1))
                .map(value -> "lambda:" + value)
                .print()
                .setParallelism(1);

        // 执行
        env.execute();
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

执行结果

匿名类:(feng,1)
匿名类:(feng,6)
匿名类:(so,2)
lambda:(feng,1)
lambda:(feng,6)
lambda:(shuai,3)
lambda:(bi,4)
匿名类:(shuai,3)
匿名类:(bi,4)
lambda:(so,2)

Process finished with exit code 0

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/972447
推荐阅读
相关标签
  

闽ICP备14008679号