当前位置:   article > 正文

Flink学习——处理不同数据源的流数据,存入不同的Sink端_flink-connector-redis_2.12

flink-connector-redis_2.12

目录

一、单机版安装

二、IDEA操作Flink

(一)添加依赖

(二)数据源——Source

1.加载元素数据

2.加载集合数据

3.加载文件目录

4.加载端口

5.加载kafka的topic——重要&常用

6.加载自定义数据源

(三)输出端——Sink

1.读取文件中的数据,处理后输出到另一个文件

2.Source——文件&Sink——Mysql

3.读取kafka的数据,处理后传入mysql中

4.加载kafka中topic的数据,处理后传入另一个topic

5.加载kafka中topic的数据,处理后传入HBase中

6. 加载kafka中topic的数据,处理后传入redis中


一、单机版安装

Flink单机版的安装只需要把压缩包解压即可。

  1. [root@ant168 install]# ls
  2. flink-1.13.2-bin-scala_2.12.tgz mongodb-linux-x86_64-4.0.10.tgz
  3. kafka_2.12-2.8.0.tgz zookeeper-3.4.5-cdh5.14.2.tar.gz
  4. [root@ant168 install]# tar -zxf /opt/install/flink-1.13.2-bin-scala_2.12.tgz -C /opt/soft/
  5. # 开启flink客户端
  6. [root@ant168 flink-1.13.2]# ./bin/start-cluster.sh
  7. [root@ant168 flink-1.13.2]# jps
  8. 9050 Jps
  9. 1628 StandaloneSessionClusterEntrypoint
  10. 1903 TaskManagerRunner

WebUI:localhost:8081

(一)添加依赖

创建maven项目,quickstart

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <maven.compiler.source>1.8</maven.compiler.source>
  4. <maven.compiler.target>1.8</maven.compiler.target>
  5. <flink.version>1.13.2</flink.version>
  6. </properties>
  7. <dependencies>
  8. <dependency>
  9. <groupId>junit</groupId>
  10. <artifactId>junit</artifactId>
  11. <version>4.11</version>
  12. <scope>test</scope>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-java</artifactId>
  17. <version>${flink.version}</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-streaming-java_2.12</artifactId>
  22. <version>${flink.version}</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.flink</groupId>
  26. <artifactId>flink-clients_2.12</artifactId>
  27. <version>${flink.version}</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.apache.commons</groupId>
  31. <artifactId>commons-compress</artifactId>
  32. <version>1.21</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.apache.flink</groupId>
  36. <artifactId>flink-connector-kafka_2.12</artifactId>
  37. <version>${flink.version}</version>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.apache.flink</groupId>
  41. <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
  42. <version>${flink.version}</version>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.apache.flink</groupId>
  46. <artifactId>flink-table-planner_2.12</artifactId>
  47. <version>${flink.version}</version>
  48. </dependency>
  49. <dependency>
  50. <groupId>org.apache.flink</groupId>
  51. <artifactId>flink-table-planner-blink_2.12</artifactId>
  52. <version>${flink.version}</version>
  53. </dependency>
  54. <dependency>
  55. <groupId>org.apache.flink</groupId>
  56. <artifactId>flink-csv</artifactId>
  57. <version>${flink.version}</version>
  58. </dependency>
  59. <dependency>
  60. <groupId>mysql</groupId>
  61. <artifactId>mysql-connector-java</artifactId>
  62. <version>8.0.29</version>
  63. </dependency>
  64. <!--<dependency>-->
  65. <!--<groupId>org.apache.bahir</groupId>-->
  66. <!--<artifactId>flink-connector-redis_2.12</artifactId>-->
  67. <!--<version>1.0</version>-->
  68. <!--</dependency>-->
  69. <!-- scala -->
  70. <dependency>
  71. <groupId>org.apache.flink</groupId>
  72. <artifactId>flink-scala_2.12</artifactId>
  73. <version>${flink.version}</version>
  74. </dependency>
  75. <dependency>
  76. <groupId>org.apache.flink</groupId>
  77. <artifactId>flink-streaming-scala_2.12</artifactId>
  78. <version>${flink.version}</version>
  79. </dependency>
  80. <dependency>
  81. <groupId>org.apache.hadoop</groupId>
  82. <artifactId>hadoop-common</artifactId>
  83. <version>3.1.3</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.apache.hadoop</groupId>
  87. <artifactId>hadoop-hdfs</artifactId>
  88. <version>3.1.3</version>
  89. </dependency>
  90. <dependency>
  91. <groupId>org.apache.flink</groupId>
  92. <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
  93. <artifactId>flink-connector-kafka_2.12</artifactId>
  94. <version>${flink.version}</version>
  95. </dependency>
  96. <dependency>
  97. <groupId>org.apache.hbase</groupId>
  98. <artifactId>hbase-client</artifactId>
  99. <version>2.3.5</version>
  100. </dependency>
  101. <dependency>
  102. <groupId>org.apache.hbase</groupId>
  103. <artifactId>hbase-server</artifactId>
  104. <version>2.3.5</version>
  105. </dependency>
  106. </dependencies>

(二)数据源——Source

1.加载元素数据

  1. import org.apache.flink.streaming.api.scala._
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. object SourceTest {
  4. def main(args: Array[String]): Unit = {
  5. // TODO 1.创建环境
  6. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  7. env.setParallelism(1) // 设置并行度
  8. // TODO 2.添加数据源
  9. // TODO 加载元素
  10. val stream1: DataStream[Any] = env.fromElements(1, 2, 3, 4, 5, "hello")
  11. // TODO 3.输出
  12. stream1.print()
  13. env.execute("sourcetest")
  14. }
  15. }
  16. 运行结果:
  17. 1
  18. 2
  19. 3
  20. 4
  21. 5
  22. hello

2.加载集合数据

  1. import org.apache.flink.streaming.api.scala._
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. // 定义一个样例类——温度传感器
  4. case class SensorReading(id: String, timestamp: Long, temperature: Double)
  5. object SourceTest {
  6. def main(args: Array[String]): Unit = {
  7. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  8. env.setParallelism(1)
  9. val dataList = List(
  10. SensorReading("sensor_1", 1684201947, 36.8),
  11. SensorReading("sensor_2", 1684202000, 35.7),
  12. SensorReading("sensor_3", 1684202064, 36.3),
  13. SensorReading("sensor_4", 1684202064, 35.8)
  14. )
  15. val stream1: DataStream[SensorReading] = env.fromCollection(dataList)
  16. stream1.print()
  17. env.execute("sourcetest")
  18. }
  19. }
  20. 运行结果:
  21. SensorReading(sensor_1,1684201947,36.8)
  22. SensorReading(sensor_2,1684202000,35.7)
  23. SensorReading(sensor_3,1684202064,36.3)
  24. SensorReading(sensor_4,1684202064,35.8)

3.加载文件目录

  1. import org.apache.flink.streaming.api.scala._
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. object SourceTest {
  4. def main(args: Array[String]): Unit = {
  5. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  6. env.setParallelism(1)
  7. val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
  8. val stream1: DataStream[String] = env.readTextFile(path)
  9. stream1.print()
  10. env.execute("sourcetest")
  11. }
  12. }
  13. 运行结果:
  14. sensor_1,1684201947,36.8
  15. sensor_7,1684202000,17.7
  16. sensor_4,1684202064,20.3
  17. sensor_2,1684202064,35.8

4.加载端口

虚拟机要下载nc命令,已经下载的可以忽略

yum -y install nc
  1. import org.apache.flink.streaming.api.scala._
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. object SourceTest {
  4. def main(args: Array[String]): Unit = {
  5. // TODO 1.创建环境
  6. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  7. env.setParallelism(1)
  8. val stream1: DataStream[String] = env.socketTextStream("ant168", 7777)
  9. stream1.print()
  10. env.execute("sourcetest")
  11. }
  12. }

5.加载kafka的topic——重要&常用

  1. import org.apache.flink.streaming.api.scala._
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  4. import org.apache.kafka.clients.consumer.ConsumerConfig
  5. import java.util.Properties
  6. object SourceTest {
  7. def main(args: Array[String]): Unit = {
  8. // TODO 1.创建环境
  9. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  10. env.setParallelism(1)
  11. val properties = new Properties()
  12. properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ant168:9092")
  13. properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "sensorgroup1")
  14. val stream1: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), properties))
  15. // 注意:重新启动就不会读取topic之前的数据
  16. stream1.print()
  17. env.execute("sourcetest")
  18. }
  19. }
  20. 运行结果:
  21. hello
  22. world
  1. # 1.开启zookeeper和kafka
  2. zkServer.sh status
  3. nohup kafka-server-start.sh /opt/soft/kafka212/config/server.properties &
  4. # 2.创建topic
  5. kafka-topics.sh --create --zookeeper ant168:2181 --topic sensor --partitions 1 --replication-factor 1
  6. # 3.开始生产消息
  7. kafka-console-producer.sh --topic sensor --broker-list ant168:9092
  8. >hello
  9. >world

6.加载自定义数据源

  1. import org.apache.flink.streaming.api.functions.source.SourceFunction
  2. import org.apache.flink.streaming.api.scala._
  3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  4. import scala.util.Random
  5. // 定义一个样例类——温度传感器
  6. case class SensorReading(id: String, timestamp: Long, temperature: Double)
  7. object SourceTest {
  8. def main(args: Array[String]): Unit = {
  9. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  10. env.setParallelism(1)
  11. // TODO 读取自定义数据源
  12. val stream1: DataStream[SensorReading] = env.addSource(new MySensorSource)
  13. // TODO 3.输出
  14. stream1.print()
  15. env.execute("sourcetest")
  16. }
  17. }
  18. // 自定义数据源
  19. class MySensorSource() extends SourceFunction[SensorReading] {
  20. override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
  21. val random = new Random()
  22. while (true) {
  23. val i: Int = random.nextInt()
  24. sourceContext.collect(SensorReading("生成: " + i, 1, 1))
  25. }
  26. Thread.sleep(500)
  27. }
  28. override def cancel(): Unit = {
  29. }
  30. }
  31. 运行结果:
  32. SensorReading(生成: -439723144,1,1.0)
  33. SensorReading(生成: -937590179,1,1.0)
  34. SensorReading(生成: -40987764,1,1.0)
  35. SensorReading(生成: 525868361,1,1.0)
  36. SensorReading(生成: -840926328,1,1.0)
  37. SensorReading(生成: -998392768,1,1.0)
  38. SensorReading(生成: -1308765349,1,1.0)
  39. SensorReading(生成: -806454922,1,1.0)

(三)输出端——Sink

1.读取文件中的数据,处理后输出到另一个文件

  1. import source.SensorReading
  2. import org.apache.flink.api.common.serialization.SimpleStringEncoder
  3. import org.apache.flink.core.fs.Path
  4. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  7. object SinkTest {
  8. def main(args: Array[String]): Unit = {
  9. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  10. env.setParallelism(1)
  11. val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
  12. val stream1: DataStream[String] = env.readTextFile(path)
  13. val dataStream: DataStream[SensorReading] = stream1.map(data => {
  14. val arr: Array[String] = data.split(",")
  15. SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
  16. })
  17. // dataStream.print()
  18. // writeAsCsv方法已过时
  19. // dataStream.writeAsCsv("D:\\javaseprojects\\flinkstu\\resources\\out.txt")
  20. dataStream.addSink(
  21. StreamingFileSink.forRowFormat(
  22. new Path("D:\\javaseprojects\\flinkstu\\resources\\out1.txt"),
  23. new SimpleStringEncoder[SensorReading]()
  24. ).build()
  25. )
  26. env.execute("sinktest")
  27. }
  28. }
  29. out1.txt文件内容:
  30. SensorReading(sensor_1,1684201947,36.8)
  31. SensorReading(sensor_7,1684202000,17.7)
  32. SensorReading(sensor_4,1684202064,20.3)
  33. SensorReading(sensor_2,1684202064,35.8)

2.Source——文件&Sink——Mysql

  1. import source.SensorReading
  2. import org.apache.flink.configuration.Configuration
  3. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  4. import org.apache.flink.streaming.api.scala._
  5. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  6. import java.sql.{Connection, DriverManager, PreparedStatement}
  7. /**
  8. * 将flink处理后的数据传入mysql中
  9. */
  10. object JdbcSinkTest {
  11. def main(args: Array[String]): Unit = {
  12. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  13. env.setParallelism(1)
  14. // TODO 从文件中读取数据存入mysql中
  15. val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
  16. val stream1: DataStream[String] = env.readTextFile(path)
  17. // TODO 处理文件数据
  18. val dataStream: DataStream[SensorReading] = stream1.map(data => {
  19. val arr: Array[String] = data.split(",")
  20. SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
  21. })
  22. dataStream.addSink(new MyJdbcSink)
  23. env.execute("jdbc sink test")
  24. }
  25. }
  26. class MyJdbcSink extends RichSinkFunction[SensorReading] {
  27. var connection: Connection = _
  28. var insertState: PreparedStatement = _
  29. var updateState: PreparedStatement = _
  30. override def open(parameters: Configuration): Unit = {
  31. connection = DriverManager.getConnection("jdbc:mysql://192.168.180.141:3306/kb21?useSSL=false", "root", "root")
  32. insertState = connection.prepareStatement("insert into sensor_temp(id,temp) value (?,?)")
  33. updateState = connection.prepareStatement("update sensor_temp set temp=? where id=?")
  34. }
  35. override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {
  36. updateState.setDouble(1, value.temperature)
  37. updateState.setString(2, value.id)
  38. val i: Int = updateState.executeUpdate()
  39. println(i)
  40. // 当原表中没有数据时,就不能执行update语句,所以影响的行数==0,而是执行insert语句
  41. // 反之,当原表中有数据,就执行update语句,影响的行数为1
  42. if (i == 0) {
  43. insertState.setString(1, value.id)
  44. insertState.setDouble(2, value.temperature)
  45. insertState.execute()
  46. }
  47. }
  48. override def close(): Unit = {
  49. insertState.close()
  50. updateState.close()
  51. connection.close()
  52. }
  53. }

数据源:

  1. D:\javaseprojects\flinkstu\resources\sensor.txt
  2. sensor_1,1684201947,36.8
  3. sensor_2,1684202000,17.7
  4. sensor_1,1684202064,20.3
  5. sensor_2,1684202068,35.8

DataGrip操作:

  1. drop table sensor_temp;
  2. create table sensor_temp(
  3. id varchar(32),
  4. temp double
  5. );
  6. select * from sensor_temp;

每次只获取最新的数据。

3.读取kafka的数据,处理后传入mysql中

  1. import source.SensorReading
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema
  3. import org.apache.flink.configuration.Configuration
  4. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  7. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  8. import org.apache.kafka.clients.consumer.ConsumerConfig
  9. import java.sql.{Connection, DriverManager, PreparedStatement}
  10. import java.util.Properties
  11. /**
  12. * 将flink处理kafka后的数据传入mysql中
  13. */
  14. object KafkaToMysqlSinkTest {
  15. def main(args: Array[String]): Unit = {
  16. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  17. env.setParallelism(1) // 设置并行度
  18. // TODO 从kafka中读取数据
  19. val properties = new Properties()
  20. properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ant168:9092")
  21. properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "sensorgroup1")
  22. // TODO 订阅topic
  23. val stream1: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), properties))
  24. // TODO 处理topic数据
  25. val dataStream: DataStream[SensorReading] = stream1.map(data => {
  26. val arr: Array[String] = data.split(",")
  27. SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
  28. })
  29. // TODO 处理后的topic数据存入mysql中
  30. dataStream.addSink(new MysqlSink)
  31. env.execute("kafka sink test")
  32. }
  33. }
  34. class MysqlSink extends RichSinkFunction[SensorReading] {
  35. var connection: Connection = _
  36. var insertState: PreparedStatement = _
  37. var updateState: PreparedStatement = _
  38. override def open(parameters: Configuration): Unit = {
  39. connection = DriverManager.getConnection("jdbc:mysql://192.168.180.141:3306/kb21?useSSL=false", "root", "root")
  40. insertState = connection.prepareStatement("insert into sensor_temp(id,temp) value (?,?)")
  41. updateState = connection.prepareStatement("update sensor_temp set temp=? where id=?")
  42. }
  43. override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {
  44. updateState.setDouble(1, value.temperature)
  45. updateState.setString(2, value.id)
  46. val i: Int = updateState.executeUpdate()
  47. println(i)
  48. // 当原表中没有数据时,就不能执行update语句,所以影响的行数==0,而是执行insert语句
  49. // 反之,当原表中有数据,就执行update语句,影响的行数为1
  50. if (i == 0) {
  51. insertState.setString(1, value.id)
  52. insertState.setDouble(2, value.temperature)
  53. insertState.execute()
  54. }
  55. }
  56. override def close(): Unit = {
  57. insertState.close()
  58. updateState.close()
  59. connection.close()
  60. }
  61. }

kafka生产消息:

  1. [root@ant168 opt]# kafka-console-producer.sh --topic sensor --broker-list ant168:9092
  2. >sensor_1,1684201947,36.8
  3. >sensor_1,1684201947,36.10
  4. >sensor_2,1684202068,35.8

Mysql数据库:

4.加载kafka中topic的数据,处理后传入另一个topic

  1. import source.SensorReading
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema
  3. import org.apache.flink.streaming.api.scala._
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  5. import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
  6. import org.apache.kafka.clients.consumer.ConsumerConfig
  7. import java.util.Properties
  8. /**
  9. * 将flink处理kafka后的数据传入kafka中
  10. */
  11. object KafkaToKafkaSinkTest {
  12. def main(args: Array[String]): Unit = {
  13. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  14. env.setParallelism(1) // 设置并行度
  15. // TODO 从kafka中读取数据
  16. val properties = new Properties()
  17. properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ant168:9092")
  18. properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "sensorgroup1")
  19. // TODO 订阅topic
  20. val stream1: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), properties))
  21. // TODO 处理topic数据
  22. val dataStream: DataStream[String] = stream1.map(data => {
  23. val arr: Array[String] = data.split(",")
  24. SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString()
  25. })
  26. // TODO 处理后的topic数据存入另一个topic中
  27. dataStream.addSink(
  28. new FlinkKafkaProducer[String]("ant168:9092","sensorsinkout",new SimpleStringSchema())
  29. )
  30. env.execute("kafka sink test")
  31. }
  32. }

        注意:这里默认是latest提交方式,如果程序中断,kafka生产者此时传入数据,重新开启该程序,后面传入的数据也会被消费。

5.加载kafka中topic的数据,处理后传入HBase

HBase中创建表和列簇:

  1. hbase(main):002:0> create_namespace 'ha'
  2. hbase(main):004:0> create 'ha:test','sensor'
  3. hbase(main):006:0> list_namespace_tables 'ha'

Flink代码:

  1. import source.SensorReading
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema
  3. import org.apache.flink.configuration.Configuration
  4. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  7. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  8. import org.apache.hadoop.conf
  9. import org.apache.hadoop.hbase.client.{BufferedMutator, BufferedMutatorParams, ConnectionFactory, Put}
  10. import org.apache.hadoop.hbase.util.Bytes
  11. import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName, client}
  12. import org.apache.kafka.clients.consumer.ConsumerConfig
  13. import java.util.Properties
  14. /**
  15. * 将数据流处理后写入到HBase中
  16. */
  17. object KafkaToHBaseSinkTest {
  18. def main(args: Array[String]): Unit = {
  19. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  20. env.setParallelism(1) // 设置并行度
  21. // TODO 从kafka中读取数据
  22. val properties = new Properties()
  23. properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ant168:9092")
  24. properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "sensorgroup1")
  25. // TODO 订阅topic
  26. val stream1: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), properties))
  27. // TODO 处理topic数据
  28. val dataStream: DataStream[SensorReading] = stream1.map(data => {
  29. val arr: Array[String] = data.split(",")
  30. SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
  31. })
  32. // TODO 处理后的topic数据存入HBase中
  33. dataStream.addSink(new MyHBaseSink)
  34. env.execute("HBase sink test")
  35. }
  36. }
  37. class MyHBaseSink extends RichSinkFunction[SensorReading] {
  38. var connection: client.Connection = _
  39. var mutator: BufferedMutator = _
  40. override def open(parameters: Configuration): Unit = {
  41. val configuration: conf.Configuration = HBaseConfiguration.create()
  42. configuration.set(HConstants.HBASE_DIR, "hdfs://lxm147:9000/hbase")
  43. configuration.set(HConstants.ZOOKEEPER_QUORUM, "lxm147")
  44. configuration.set(HConstants.CLIENT_PORT_STR, "2181")
  45. connection = ConnectionFactory.createConnection(configuration)
  46. val params = new BufferedMutatorParams(TableName.valueOf("ha:test"))
  47. params.writeBufferSize(10 * 1024 * 1024) // 达到缓存进行写入
  48. params.setWriteBufferPeriodicFlushTimeoutMs(5 * 1000L) // 达不到缓存但是达到时间也进行写入
  49. mutator = connection.getBufferedMutator(params)
  50. }
  51. override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {
  52. println(connection)
  53. println(mutator)
  54. println(value)
  55. val put = new Put(Bytes.toBytes(value.id + value.temperature + value.timestamp))
  56. put.addColumn("sensor".getBytes(), "id".getBytes(), value.id.getBytes())
  57. put.addColumn("sensor".getBytes(), "timestamp".getBytes(), value.timestamp.toString.getBytes)
  58. put.addColumn("sensor".getBytes(), "temperature".getBytes(), value.temperature.toString.getBytes())
  59. mutator.mutate(put)
  60. mutator.flush()
  61. }
  62. override def close(): Unit = {
  63. connection.close()
  64. }
  65. }

kafka中传入数据:

  1. [root@ant168 flink-1.13.2]# kafka-console-producer.sh --topic sensor --broker-list ant168:9092
  2. >sensor_4 , 1684202064, 27.3
  3. >sensor_2,1684202068, 35.8

HBase中查看表数据:

  1. hbase(main):010:0> scan 'ha:test'
  2. ROW COLUMN+CELL
  3. sensor_235.81684202068 column=sensor:id, timestamp=2023-05-17T09:44:41.123, value=sensor_2
  4. sensor_235.81684202068 column=sensor:temperature, timestamp=2023-05-17T09:44:41.123, value=35.8
  5. sensor_235.81684202068 column=sensor:timestamp, timestamp=2023-05-17T09:44:41.123, value=1684202068
  6. sensor_427.31684202064 column=sensor:id, timestamp=2023-05-17T09:21:41.475, value=sensor_4
  7. sensor_427.31684202064 column=sensor:temperature, timestamp=2023-05-17T09:21:41.475, value=27.3
  8. sensor_427.31684202064 column=sensor:timestamp, timestamp=2023-05-17T09:21:41.475, value=1684202064

6. 加载kafka中topic的数据,处理后传入redis中

  1. <dependency>
  2. <groupId>org.apache.bahir</groupId>
  3. <artifactId>flink-connector-redis_2.12</artifactId>
  4. <version>1.1.0</version>
  5. </dependency>
  1. package nj.zb.kb21.sink
  2. import nj.zb.kb21.source.SensorReading
  3. import org.apache.flink.streaming.api.scala._
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  5. import org.apache.flink.streaming.connectors.redis.RedisSink
  6. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
  7. import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
  8. /**
  9. * 将flink处理后的数据传入redis中
  10. */
  11. object RedisSinkTest {
  12. def main(args: Array[String]): Unit = {
  13. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  14. env.setParallelism(1) // 设置并行度
  15. val inputStream: DataStream[String] = env.socketTextStream("192.168.180.141", 7777)
  16. val dataStream: DataStream[SensorReading] = inputStream.map(data => {
  17. val arr: Array[String] = data.split(",")
  18. SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
  19. })
  20. val redisConf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
  21. .setHost("192.168.180.141")
  22. .setPort(6379)
  23. .setTimeout(30000)
  24. .build()
  25. dataStream.addSink(new RedisSink[SensorReading](redisConf,new MyRedisMapper))
  26. env.execute("redis sink test")
  27. }
  28. }
  29. class MyRedisMapper extends RedisMapper[SensorReading] {
  30. override def getCommandDescription: RedisCommandDescription = {
  31. new RedisCommandDescription(RedisCommand.HSET, "sensor")
  32. }
  33. override def getKeyFromData(data: SensorReading): String = {
  34. data.id
  35. }
  36. override def getValueFromData(data: SensorReading): String = {
  37. data.timestamp+":"+data.temperature
  38. }
  39. }

Redis Desktop

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/44155
推荐阅读
相关标签
  

闽ICP备14008679号