当前位置:   article > 正文

Flink之流处理API之Source_flink socket source linux

flink socket source linux

Flink流处理流程:

Source分类:

  1. 从已有集合读取数据:val stream1 = env.fromCollection(List(  SensorReading("sensor_1", 1547718159, 4),  SensorReading("sensor_2", 1547718261, 1))
  2. 读取任意类型的数据:env.fromElements(1, 34, "date")
  3. 从文件读取数据:val stream = env.readTextFile("D:\\test.txt")
  4. 从socket流中读取数据:val textDstream: DataStream[String] = env.socketTextStream("node03", 7777) (在linux系统中用: nc -lk  7777)
  5. DataStream Connector(下文详解介绍kafka无界数据源)
  6. 自定义source:实现SourceFunction接口 https://blog.csdn.net/qq_40713537/article/details/102698549

源码如下:

  1. package org.apache.flink.streaming.api.scala
  2. @org.apache.flink.annotation.Public
  3. class StreamExecutionEnvironment(javaEnv : org.apache.flink.streaming.api.environment.StreamExecutionEnvironment) extends scala.AnyRef {
  4. def fromElements[T](data : T*)(implicit evidence$1 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
  5. def fromCollection[T](data : scala.Seq[T])(implicit evidence$2 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
  6. def fromCollection[T](data : scala.Iterator[T])(implicit evidence$3 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
  7. def fromParallelCollection[T](data : org.apache.flink.util.SplittableIterator[T])(implicit evidence$4 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
  8. def readTextFile(filePath : _root_.scala.Predef.String) : org.apache.flink.streaming.api.scala.DataStream[_root_.scala.Predef.String] = { /* compiled code */ }
  9. def readTextFile(filePath : _root_.scala.Predef.String, charsetName : _root_.scala.Predef.String) : org.apache.flink.streaming.api.scala.DataStream[_root_.scala.Predef.String] = { /* compiled code */ }
  10. def readFile[T](inputFormat : org.apache.flink.api.common.io.FileInputFormat[T], filePath : _root_.scala.Predef.String)(implicit evidence$5 : org.apache.flink.api.common.typeinfo.TypeInformation[T]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
  11. @org.apache.flink.annotation.PublicEvolving
  12. def socketTextStream(hostname : _root_.scala.Predef.String, port : scala.Int, delimiter : scala.Char = { /* compiled code */ }, maxRetry : scala.Long = { /* compiled code */ }) : org.apache.flink.streaming.api.scala.DataStream[_root_.scala.Predef.String] = { /* compiled code */ }
  13. }

一、Environment

1.1、getExecutionEnvironment(最常用)

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

// 或者创建流处理环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1

1.2、createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。

val env: ExecutionEnvironment = ExecutionEnvironment.createLocalEnvironment(1)

//或者创建流处理环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1)

1.3、createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

val env: ExecutionEnvironment = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")

// 或者创建流处理环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")

二、Source(DataStream Connector)

2.1、创建kafka工具类

  •  pom.xml引入依赖jar包
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-scala_2.12</artifactId>
  4. <version>1.7.2</version>
  5. </dependency>
  6. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-scala_2.12</artifactId>
  10. <version>1.7.2</version>
  11. </dependency>
  12. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <!--<artifactId>flink-connector-kafka_2.12</artifactId>-->
  16. <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
  17. <version>1.7.2</version>
  18. </dependency>
  19. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  20. <dependency>
  21. <groupId>org.apache.kafka</groupId>
  22. <artifactId>kafka_2.12</artifactId>
  23. <version>1.0.0</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.kafka</groupId>
  27. <artifactId>kafka-clients</artifactId>
  28. <version>1.0.0</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.kafka</groupId>
  32. <artifactId>kafka-streams</artifactId>
  33. <version>1.0.0</version>
  34. </dependency>
  1. package com.lxk.util
  2. import java.util.Properties
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
  5. object FlinkKafkaUtil {
  6. val prop = new Properties()
  7. prop.setProperty("bootstrap.servers", "192.168.18.103:9092,192.168.18.104:9092,192.168.18.105:9092")
  8. prop.setProperty("zookeeper.connect", "192.168.18.103:2181,192.168.18.104:2181,192.168.18.105:2181")
  9. prop.setProperty("group.id", "gmall")
  10. def getConsumer(topic: String): FlinkKafkaConsumer010[String] = {
  11. //消费Kafka数据
  12. //Flink’s Kafka consumer is called FlinkKafkaConsumer08 (
  13. // or 09 for Kafka 0.9.0.x versions, etc.
  14. // or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
  15. val myKafkaConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), prop)
  16. myKafkaConsumer
  17. }
  18. }

2.2、增加业务主类 StartupApp

  1. package com.lxk.service
  2. import com.lxk.util.FlinkKafkaUtil
  3. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
  5. import org.apache.flink.api.scala._
  6. object StartupApp {
  7. def main(args: Array[String]): Unit = {
  8. val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  9. //val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("node03", 6123,"F:\\10_code\\scala\\flink2020\\target\\flink-streaming-scala_2.11-1.0-SNAPSHOT.jar")
  10. val kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")
  11. val dstream: DataStream[String] = environment.addSource(kafkaConsumer)
  12. dstream.print()
  13. environment.execute()
  14. }
  15. }

 2.3、测试

  • kafka模拟生产环境生产用户日志
  1. package com.lxk.kafka.producer;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.Properties;
  5. import java.util.Random;
  6. import org.apache.kafka.clients.producer.KafkaProducer;
  7. import org.apache.kafka.clients.producer.ProducerRecord;
  8. import org.apache.kafka.common.serialization.StringSerializer;
  9. import com.alibaba.fastjson.JSON;
  10. import com.lxk.kafka.FlinkBean;
  11. public class KafkaProducerTestFlink implements Runnable {
  12. private final KafkaProducer<String, String> producer;
  13. private final String topic;
  14. private final static Random random = new Random();
  15. public KafkaProducerTestFlink(String topicName) {
  16. Properties props = new Properties();
  17. props.put("bootstrap.servers", "192.168.18.103:9092,192.168.18.104:9092,192.168.18.105:9092");
  18. // acks=0:如果设置为0,生产者不会等待kafka的响应。
  19. // acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
  20. // acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
  21. props.put("acks", "all");
  22. // 配置为大于0的值的话,客户端会在消息发送失败时重新发送。
  23. props.put("retries", 0);
  24. // 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
  25. props.put("batch.size", 16384);
  26. props.put("key.serializer", StringSerializer.class.getName());
  27. props.put("value.serializer", StringSerializer.class.getName());
  28. this.producer = new KafkaProducer<String, String>(props);
  29. this.topic = topicName;
  30. }
  31. public void run() {
  32. int messageNo = 1;
  33. try {
  34. for (;;) {
  35. FlinkBean bean = userlogs();
  36. String userLog = JSON.toJSONString(bean);
  37. producer.send(new ProducerRecord<String, String>(topic, "key_" + messageNo, userLog));
  38. System.out.println("send:\t" + userLog);
  39. // 生产100条就退出
  40. if (messageNo % 20 == 0) {
  41. System.out.println("成功发送了" + messageNo + "条");
  42. break;
  43. }
  44. messageNo++;
  45. // Utils.sleep(1);
  46. }
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. } finally {
  50. producer.close();
  51. }
  52. }
  53. public static void main(String args[]) {
  54. KafkaProducerTestFlink test = new KafkaProducerTestFlink("GMALL_STARTUP");
  55. Thread thread = new Thread(test);
  56. thread.start();
  57. }
  58. // 生成随机数据
  59. private static FlinkBean userlogs() {
  60. String[] channelNames = new String[] { "tencent", "wandoujia", "xiaomiMall", "HuaweiMall" };
  61. String[] areaNames = new String[] { "Beijing", "Shanghai", "Nanjing", "Shenzhen", "Hangzhou" };
  62. String[] osNames = new String[] { "andrid", "ios" };
  63. String[] verNames = new String[] { "1.1.0", "1.1.1", "1.1.2", "1.1.3" };
  64. String dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
  65. FlinkBean bean = new FlinkBean();
  66. bean.setDateToday(dateToday);
  67. bean.setArea(areaNames[random.nextInt(5)]);
  68. bean.setUid((long) random.nextInt(2000));
  69. bean.setOs(osNames[random.nextInt(2)]);
  70. bean.setChannel(channelNames[random.nextInt(4)]);
  71. bean.setVer(verNames[random.nextInt(4)]);
  72. bean.setTimestamp(new Date().getTime());
  73. return bean;
  74. }
  75. }

  • StartupApp接收的数据 

2.4、Flink+kafka是如何实现exactly-once语义的

  • Flink通过checkpoint来保存数据是否处理完成的状态
  • 由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级(本地磁盘或者HDFS)的进行持久化保存。
  • 执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
  • 如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。

2.5、flink 消费 kafka 数据,提交消费组 offset 有三种类型

1、开启 checkpoint : 在 checkpoint 完成后提交

// 默认 private boolean enableCommitOnCheckpoints = true;

consumer010.setCommitOffsetsOnCheckpoints(true);

2、开启 checkpoint,禁用 checkpoint 提交: 不提交消费组 offset

consumer010.setCommitOffsetsOnCheckpoints(false);

3、不开启 checkpoint: 依赖kafka client 的自动提交

kafkaProps.setProperty("enable.auto.commit", "true");
//设置多久一次更新被消费消息的偏移量
kafkaProps.put("auto.commit.interval.ms", "1000");

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/1019121
推荐阅读
相关标签
  

闽ICP备14008679号