赞
踩
Flink流处理流程:
Source分类:
源码如下:
- package org.apache.flink.streaming.api.scala
- @org.apache.flink.annotation.Public
- class StreamExecutionEnvironment(javaEnv : org.apache.flink.streaming.api.environment.StreamExecutionEnvironment) extends scala.AnyRef {
- def fromElements[T](data : T*)(implicit evidence$1 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
- def fromCollection[T](data : scala.Seq[T])(implicit evidence$2 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
- def fromCollection[T](data : scala.Iterator[T])(implicit evidence$3 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
- def fromParallelCollection[T](data : org.apache.flink.util.SplittableIterator[T])(implicit evidence$4 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
- def readTextFile(filePath : _root_.scala.Predef.String) : org.apache.flink.streaming.api.scala.DataStream[_root_.scala.Predef.String] = { /* compiled code */ }
- def readTextFile(filePath : _root_.scala.Predef.String, charsetName : _root_.scala.Predef.String) : org.apache.flink.streaming.api.scala.DataStream[_root_.scala.Predef.String] = { /* compiled code */ }
- def readFile[T](inputFormat : org.apache.flink.api.common.io.FileInputFormat[T], filePath : _root_.scala.Predef.String)(implicit evidence$5 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
- @org.apache.flink.annotation.PublicEvolving
- def socketTextStream(hostname : _root_.scala.Predef.String, port : scala.Int, delimiter : scala.Char = { /* compiled code */ }, maxRetry : scala.Long = { /* compiled code */ }) : org.apache.flink.streaming.api.scala.DataStream[_root_.scala.Predef.String] = { /* compiled code */ }
- }
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 或者创建流处理环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1
返回本地执行环境,需要在调用时指定默认的并行度。
val env: ExecutionEnvironment = ExecutionEnvironment.createLocalEnvironment(1)
//或者创建流处理环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1)
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
val env: ExecutionEnvironment = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")
// 或者创建流处理环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.12</artifactId>
- <version>1.7.2</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.12</artifactId>
- <version>1.7.2</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <!--<artifactId>flink-connector-kafka_2.12</artifactId>-->
- <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
- <version>1.7.2</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>1.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>1.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- <version>1.0.0</version>
- </dependency>

- package com.lxk.util
- import java.util.Properties
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
-
- object FlinkKafkaUtil {
- val prop = new Properties()
-
- prop.setProperty("bootstrap.servers", "192.168.18.103:9092,192.168.18.104:9092,192.168.18.105:9092")
-
- prop.setProperty("zookeeper.connect", "192.168.18.103:2181,192.168.18.104:2181,192.168.18.105:2181")
- prop.setProperty("group.id", "gmall")
-
- def getConsumer(topic: String): FlinkKafkaConsumer010[String] = {
- //消费Kafka数据
- //Flink’s Kafka consumer is called FlinkKafkaConsumer08 (
- // or 09 for Kafka 0.9.0.x versions, etc.
- // or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
- val myKafkaConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), prop)
- myKafkaConsumer
- }
- }

- package com.lxk.service
-
- import com.lxk.util.FlinkKafkaUtil
- import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
- import org.apache.flink.api.scala._
-
-
- object StartupApp {
- def main(args: Array[String]): Unit = {
- val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- //val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("node03", 6123,"F:\\10_code\\scala\\flink2020\\target\\flink-streaming-scala_2.11-1.0-SNAPSHOT.jar")
-
- val kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")
-
- val dstream: DataStream[String] = environment.addSource(kafkaConsumer)
-
- dstream.print()
-
- environment.execute()
- }
- }

- package com.lxk.kafka.producer;
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.Properties;
- import java.util.Random;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import com.alibaba.fastjson.JSON;
- import com.lxk.kafka.FlinkBean;
-
- public class KafkaProducerTestFlink implements Runnable {
-
- private final KafkaProducer<String, String> producer;
- private final String topic;
- private final static Random random = new Random();
-
- public KafkaProducerTestFlink(String topicName) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.18.103:9092,192.168.18.104:9092,192.168.18.105:9092");
- // acks=0:如果设置为0,生产者不会等待kafka的响应。
- // acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
- // acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
- props.put("acks", "all");
- // 配置为大于0的值的话,客户端会在消息发送失败时重新发送。
- props.put("retries", 0);
- // 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
- props.put("batch.size", 16384);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
- this.producer = new KafkaProducer<String, String>(props);
- this.topic = topicName;
- }
-
- public void run() {
- int messageNo = 1;
- try {
- for (;;) {
- FlinkBean bean = userlogs();
- String userLog = JSON.toJSONString(bean);
- producer.send(new ProducerRecord<String, String>(topic, "key_" + messageNo, userLog));
- System.out.println("send:\t" + userLog);
- // 生产100条就退出
- if (messageNo % 20 == 0) {
- System.out.println("成功发送了" + messageNo + "条");
- break;
- }
- messageNo++;
- // Utils.sleep(1);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- producer.close();
- }
- }
-
- public static void main(String args[]) {
- KafkaProducerTestFlink test = new KafkaProducerTestFlink("GMALL_STARTUP");
- Thread thread = new Thread(test);
- thread.start();
- }
-
- // 生成随机数据
- private static FlinkBean userlogs() {
- String[] channelNames = new String[] { "tencent", "wandoujia", "xiaomiMall", "HuaweiMall" };
- String[] areaNames = new String[] { "Beijing", "Shanghai", "Nanjing", "Shenzhen", "Hangzhou" };
- String[] osNames = new String[] { "andrid", "ios" };
- String[] verNames = new String[] { "1.1.0", "1.1.1", "1.1.2", "1.1.3" };
- String dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
-
- FlinkBean bean = new FlinkBean();
- bean.setDateToday(dateToday);
- bean.setArea(areaNames[random.nextInt(5)]);
- bean.setUid((long) random.nextInt(2000));
- bean.setOs(osNames[random.nextInt(2)]);
- bean.setChannel(channelNames[random.nextInt(4)]);
- bean.setVer(verNames[random.nextInt(4)]);
- bean.setTimestamp(new Date().getTime());
-
- return bean;
- }
- }

1、开启 checkpoint : 在 checkpoint 完成后提交
// 默认 private boolean enableCommitOnCheckpoints = true;
consumer010.setCommitOffsetsOnCheckpoints(true);
2、开启 checkpoint,禁用 checkpoint 提交: 不提交消费组 offset
consumer010.setCommitOffsetsOnCheckpoints(false);
3、不开启 checkpoint: 依赖kafka client 的自动提交
kafkaProps.setProperty("enable.auto.commit", "true");
//设置多久一次更新被消费消息的偏移量
kafkaProps.put("auto.commit.interval.ms", "1000");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。