赞
踩
目录
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment localEnv =StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包);
// 批处理环境ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();// 流处理环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
bin/flink run -Dexecution.runtime-mode=BATCH ...
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.execute();
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
- import java.util.Properties;
- public class SourceKafkaTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", "hadoop102:9092");
- properties.setProperty("group.id", "consumer-group");
- properties.setProperty("key.deserializer",
- "org.apache.kafka.common.serialization.StringDeserializer");
- properties.setProperty("value.deserializer",
- "org.apache.kafka.common.serialization.StringDeserializer");
- properties.setProperty("auto.offset.reset", "latest");
- DataStreamSource<String> stream = env.addSource(new
- FlinkKafkaConsumer<String>(
- "clicks",
- new SimpleStringSchema(),
- properties
- ));
- stream.print("Kafka");
- env.execute();
- }
- }

- package com.atmk.stream.app;
-
- import com.atmk.stream.entity.Event;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
- import java.util.Calendar;
- import java.util.Random;
-
- /**
- * @author:lss
- * @date:2022/11/3 17:18
- * @description:some
- */
- public class ClickSource implements SourceFunction<Event> {
- //声明一个变量,作为控制数据生成的标识位
- private Boolean running = true;
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- //在指定数据集中随机选取数据
- Random random = new Random();
- String[] users = {"Mary","Bob","Alice","Cary"};
- String[] urls = {"./home","./cart","./fav","./prod?id=1"};
- while (running){
- ctx.collect(new Event(
- users[random.nextInt(users.length)],
- urls[random.nextInt(urls.length)],
- Calendar.getInstance().getTimeInMillis()
- ));
- //隔一秒生成一个点击事件,方面观测
- Thread.sleep(1000);
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }

- package com.atmk.stream.app;
-
- import com.atmk.stream.entity.Event;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * @author:lss
- * @date:2022/11/3 17:26
- * @description:some
- */
- public class SourceCustom {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //使用自定义的source function,调用addSource方法
- DataStreamSource<Event> stream = env.addSource(new ClickSource());
-
- stream.print("SourceCustom");
- env.execute();
- }
- }

- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
- import java.util.Random;
- public class SourceThrowException {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.addSource(new ClickSource()).setParallelism(2).print();
- env.execute();
- }
- }
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
- import java.util.Random;
- public class ParallelSourceExample {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.addSource(new CustomSource()).setParallelism(2).print();
- env.execute();
- }
- public static class CustomSource implements ParallelSourceFunction<Integer>
- {
- private boolean running = true;
- private Random random = new Random();
- @Override
- public void run(SourceContext<Integer> sourceContext) throws Exception {
- while (running) {
- sourceContext.collect(random.nextInt());
- }
- }
- @Override
- public void cancel() {
- running = false;
- }
- }
- }

2> -6861690472> 4295153972> -2235162882> 11379073122> -3801657302> 2082090389
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。