当前位置:   article > 正文

Flink中的的Transformation算子及实例_mapoperator> mapop

mapoperator> mapoperator = flatmapoperator.m

Map

DataStream → DataStream:输入一个参数产生一个参数。

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val stream = env.generateSequence(1,10)
  3. val streamMap = stream.map { x => x * 2 }
  4. streamFilter.print()
  5. env.execute("FirstJob")
注意:stream.print():每一行前面的数字代表这一行是哪一个并行线程输出的。
  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.MapOperator;
  5. import org.apache.flink.api.java.utils.ParameterTool;
  6. import scala.Tuple2;
  7. import java.util.Random;
  8. public class StuScore {
  9. private static Random rand = new Random();
  10. public static void main(String[] args) throws Exception {
  11. ParameterTool params = ParameterTool.fromArgs(args);
  12. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  13. env.getConfig().setGlobalJobParameters(params);
  14. DataSet<String> text;
  15. if (params.has("input")) {
  16. text = env.readTextFile("F:\\date\\flinkdata\\stu.txt");
  17. }else{
  18. System.out.println("请检查你的输入");
  19. return;
  20. }
  21. MapOperator<String, Tuple2<String, Integer>> stuscore = text.map(new MapFunction<String, Tuple2<String, Integer>>() {
  22. @Override
  23. public Tuple2<String, Integer> map(String s) throws Exception {
  24. return new Tuple2<>(s, rand.nextInt(100) + 1);
  25. }
  26. });
  27. if (params.has("output")) {
  28. stuscore.writeAsCsv("F:\\date\\flinkdata\\personinput\\A");
  29. }else {
  30. System.out.println("打印到控制台");
  31. stuscore.print();
  32. }
  33. }
  34. }

FlatMap

DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。

  1. import org.apache.flink.streaming.api.scala._
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
  4. val streamFlatMap = stream.flatMap{
  5. x => x.split(" ")
  6. }
  7. streamFilter.print()
  8. env.execute("FirstJob")
Filter

DataStream → DataStream:结算每个元素的布尔值,并返回布尔值为true的元素。下面这个例子是过滤出非0的元素:

  1. import org.apache.flink.streaming.api.scala._
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. val stream = env.generateSequence(1,10)
  4. val streamFilter = stream.filter{
  5. //打印奇数
  6. x => (x % 2 != 0)
  7. }
  8. streamFilter.print()
  9. env.execute("FirstJob")
 

Connect

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
  3. val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
  4. val streamCollect = env.fromCollection(List(1,2,3,4))
  5. //streamMap和streamCollect交换顺序不会影响结果
  6. val streamConnect = streamMap.connect(streamCollect)
  7. streamConnect.map(item=>println(item), item=>println(item))
  8. env.execute("FirstJob")

CoMap,CoFlatMap

ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val stream1 = env.readTextFile("F:\date\flinkdata\stu.tsv")
  3. val streamFlatMap = stream1.flatMap(x => x.split(" "))
  4. val stream2 = env.fromCollection(List(1,2,3,4))
  5. val streamConnect = streamFlatMap.connect(stream2)
  6. val streamCoMap = streamConnect.map(
  7. (str) => str + "connect",
  8. (in) => in + 100
  9. )
  10. streamCoMap.print()
  11. env.execute("FirstJob")
  12. //========================
  13. val env = StreamExecutionEnvironment.getExecutionEnvironment
  14. val stream1 = env.readTextFile("test.txt")
  15. val stream2 = env.readTextFile("test1.txt")
  16. val streamConnect = stream1.connect(stream2)
  17. val streamCoMap = streamConnect.flatMap(
  18. (str1) => str1.split(" "),
  19. (str2) => str2.split(" ")
  20. )
  21. streamConnect.map(item=>println(item), item=>println(item))
  22. env.execute("FirstJob")

Split

DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。注:此代码无法运行出结果,使用Select即可运行

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
  3. val streamFlatMap = stream.flatMap(x => x.split(" "))
  4. val streamSplit = streamFlatMap.split(
  5. num =>
  6. //字符串内容为hadoop的组成一个DataStream,其余的组成一个DataStream
  7. (num.equals("hadoop")) match{
  8. case true => List("hadoop")
  9. case false => List("other")
  10. }
  11. )
  12. env.execute("FirstJob")

Select

SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
  3. val streamFlatMap = stream.flatMap(x => x.split(" "))
  4. val streamSplit = streamFlatMap.split(
  5. num =>
  6. (num.equals("hadoop")) match{
  7. case true => List("hadoop")
  8. case false => List("other")
  9. }
  10. )
  11. val hadoop = streamSplit.select("hadoop")
  12. val other = streamSplit.select("other")
  13. other.print()
  14. env.execute("FirstJob")

Union

DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val stream1 = env.readTextFile("test.txt")
  3. val streamFlatMap1 = stream1.flatMap(x => x.split(" "))
  4. val stream2 = env.readTextFile("test1.txt")
  5. val streamFlatMap2 = stream2.flatMap(x => x.split(" "))
  6. val streamConnect = streamFlatMap1.union(streamFlatMap2)
  7. env.execute("FirstJob")

KeyBy

DataStream → KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val stream = env.readTextFile("test.txt")
  3. val streamFlatMap = stream.flatMap{
  4. x => x.split(" ")
  5. }
  6. val streamMap = streamFlatMap.map{
  7. x => (x,1)
  8. }
  9. val streamKeyBy = streamMap.keyBy(0)
  10. env.execute("FirstJob")

Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
  3. val streamReduce = stream.reduce(
  4. (item1, item2) => (item1._1, item1._2 + item2._2)
  5. )
  6. streamReduce.print()
  7. env.execute("FirstJob")

Fold

KeyedStream → DataStream:一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
  3. val streamReduce = stream.fold(100)(
  4. (begin, item) => (begin + item._2)
  5. )
  6. streamReduce.print()
  7. env.execute("FirstJob")

Aggregations

KeyedStream → DataStream:分组数据流上的滚动聚合操作。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

  1. keyedStream.sum(0)
  2. keyedStream.sum("key")
  3. keyedStream.min(0)
  4. keyedStream.min("key")
  5. keyedStream.max(0)
  6. keyedStream.max("key")
  7. keyedStream.minBy(0)
  8. keyedStream.minBy("key")
  9. keyedStream.maxBy(0)
  10. keyedStream.maxBy("key")
  11. val env = StreamExecutionEnvironment.getExecutionEnvironment
  12. val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
  13. val streamReduce = stream.sum(1)
  14. streamReduce.print()
  15. env.execute("FirstJob")

   在2.3.10之前的算子都是可以直接作用在Stream上的,因为他们不是聚合类型的操作,但是到2.3.10后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的结果。

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/972438
推荐阅读
相关标签
  

闽ICP备14008679号