当前位置:   article > 正文

1、 怎样使用Java代码来编写Flink流式计算任务_flink java 代码

flink java 代码

目录

1、Flink application 的构成

2、创建执行环境

3、触发程序执行

3.1 Flink 任务触发说明

3.2 env.execute("jobName")

3.3 env.executeAsync("jobName")


1、Flink application 的构成

通常会将一个计算任务,称为 application 或者 job

Flink application 的构成 :

        1. 获取 flink应用程序 的执行环境对象(Execution Environment)
        2. 指定读取数据源(Source)
        3. 定义基于数据的转换操作(Transformations)
        4. 定义计算结果的输出位置(Sink)
        5. 触发程序执行(Execute)


2、创建执行环境

传送门:看这里


3、触发程序执行

3.1 Flink 任务触发说明

          Flink程序和Spark程序一样,都是延迟执行;当Driver程序的main方法被执行时,数据加载和转换不会直接发生。只有当指定到 env.execute() 时,计算任务才会被触发执行。

3.2 env.execute("jobName")

功能说明:

        触发程序执行,并等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。

3.3 env.executeAsync("jobName")

功能说明: 

        触发程序执行,不会等待作业完成触发作业异步执行。

        它会返回一个 JobClient,你可以通过它与刚刚提交的作业进行通信。


4、这是一个完整的入门案例

需求描述:统计每个小时内的每个用户的活跃次数

代码示例(看不懂没关系,继续看后续的文章,将会使你豁然开朗): 

开发语言:java1.8

flink版本:1.17.0

  1. package com.baidu.showcase;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  9. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  10. import org.apache.flink.streaming.api.windowing.time.Time;
  11. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  12. import org.apache.flink.util.Collector;
  13. import java.util.HashMap;
  14. /*
  15. * 需求描述:By用户统计每个小时内的活跃次数
  16. *
  17. * */
  18. public class CountUvByHour {
  19. public static void main(String[] args) throws Exception {
  20. // 1.获取执行环境(Execution Environment)
  21. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22. env.setParallelism(4);
  23. // 2.读取数据源(Source)
  24. DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
  25. // 3.基于source做转换操作(Transformations)
  26. SingleOutputStreamOperator<String> resultDs = source.map(new MapFunction<String, Tuple2<String, Long>>() {
  27. @Override
  28. public Tuple2 map(String value) throws Exception {
  29. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
  30. }
  31. }
  32. )
  33. // 添加水位线
  34. .assignTimestampsAndWatermarks(
  35. WatermarkStrategy
  36. .<Tuple2<String, Long>>forMonotonousTimestamps()
  37. .withTimestampAssigner(
  38. (event, timestamp) -> event.f1
  39. )
  40. )
  41. // 根据User分组
  42. .keyBy(e -> e.f0)
  43. // 基于事件时间设置10s的滚动窗口
  44. .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  45. // 对窗口内用户的活跃次数计数
  46. .process(
  47. new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
  48. @Override
  49. public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
  50. long start = context.window().getStart();
  51. long end = context.window().getEnd();
  52. HashMap<String, Long> map = new HashMap<>();
  53. for (Tuple2<String, Long> element : elements) {
  54. map.put(element.f0
  55. , map.getOrDefault(element.f0, 0L) + 1L);
  56. }
  57. StringBuffer record = new StringBuffer();
  58. //record.append("============================================\n");
  59. record.append("窗口时间范围:[" + start + "," + end + ")\n");
  60. record.append("当前窗口用户活跃次数:" + map + "\n");
  61. out.collect(record.toString());
  62. }
  63. }
  64. );
  65. // 4.将计算结果的输出位置(Sink)
  66. resultDs.print();
  67. // 5.触发程序执行(Execute)
  68. env.execute();
  69. }
  70. }

运行结果:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/972477
推荐阅读
相关标签
  

闽ICP备14008679号