赞
踩
目录
3.3 env.executeAsync("jobName")
通常会将一个计算任务,称为 application 或者 job
Flink application 的构成 :
1. 获取 flink应用程序 的执行环境对象(Execution Environment)
2. 指定读取数据源(Source)
3. 定义基于数据的转换操作(Transformations)
4. 定义计算结果的输出位置(Sink)
5. 触发程序执行(Execute)
传送门:看这里
Flink程序和Spark程序一样,都是延迟执行;当Driver程序的main方法被执行时,数据加载和转换不会直接发生。只有当指定到 env.execute() 时,计算任务才会被触发执行。
功能说明:
触发程序执行,并等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。
功能说明:
触发程序执行,不会等待作业完成,触发作业异步执行。
它会返回一个 JobClient
,你可以通过它与刚刚提交的作业进行通信。
需求描述:统计每个小时内的每个用户的活跃次数
代码示例(看不懂没关系,继续看后续的文章,将会使你豁然开朗):
开发语言:java1.8
flink版本:1.17.0
- package com.baidu.showcase;
-
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
-
- import java.util.HashMap;
-
- /*
- * 需求描述:By用户统计每个小时内的活跃次数
- *
- * */
- public class CountUvByHour {
- public static void main(String[] args) throws Exception {
- // 1.获取执行环境(Execution Environment)
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // 2.读取数据源(Source)
- DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
-
- // 3.基于source做转换操作(Transformations)
- SingleOutputStreamOperator<String> resultDs = source.map(new MapFunction<String, Tuple2<String, Long>>() {
- @Override
- public Tuple2 map(String value) throws Exception {
- return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
- }
- }
- )
- // 添加水位线
- .assignTimestampsAndWatermarks(
- WatermarkStrategy
- .<Tuple2<String, Long>>forMonotonousTimestamps()
- .withTimestampAssigner(
- (event, timestamp) -> event.f1
- )
- )
- // 根据User分组
- .keyBy(e -> e.f0)
- // 基于事件时间设置10s的滚动窗口
- .window(TumblingEventTimeWindows.of(Time.seconds(10)))
- // 对窗口内用户的活跃次数计数
- .process(
- new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
- @Override
- public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
- long start = context.window().getStart();
- long end = context.window().getEnd();
-
- HashMap<String, Long> map = new HashMap<>();
- for (Tuple2<String, Long> element : elements) {
- map.put(element.f0
- , map.getOrDefault(element.f0, 0L) + 1L);
- }
-
- StringBuffer record = new StringBuffer();
- //record.append("============================================\n");
- record.append("窗口时间范围:[" + start + "," + end + ")\n");
- record.append("当前窗口用户活跃次数:" + map + "\n");
- out.collect(record.toString());
- }
- }
- );
-
- // 4.将计算结果的输出位置(Sink)
- resultDs.print();
-
- // 5.触发程序执行(Execute)
- env.execute();
- }
- }
运行结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。