当前位置:   article > 正文

SpringBoot整合Flink(施耐德PLC物联网信息采集)_springboot flink

springboot flink

SpringBoot整合Flink(施耐德PLC物联网信息采集)

Linux环境安装kafka

前情:

施耐德PLC设备(TM200C16R)设置好信息采集程序,连接局域网,SpringBoot订阅MQTT主题,消息转至kafka,由flink接收并持久化到mysql数据库;

Wireshark抓包如下:

MQTTBox测试订阅如下:

已知参数:

服务器IP:139.220.193.14

端口号:1883

应用端账号:admin@tenlink

应用端密码:Tenlink@123

物联网账号:202303171001

物联网账号密码:03171001

订阅话题(topic):

202303171001/p(发布话题,由设备发送,应用端接收)

202303171001/s(订阅话题,由应用端发送,设备接收)

订阅mqtt (前提是kafka是已经就绪状态且plc_thoroughfare主题是存在的)

  • maven pom

  1.         <dependency>
  2. <groupId>org.eclipse.paho</groupId>
  3. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  4. <version>1.2.5</version>
  5. </dependency>
  • yaml配置

  1. spring:
  2. kafka:
  3. bootstrap-servers: ip:9092
  4. producer:
  5. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  6. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. ## 自定义
  8. kafka:
  9. topics:
  10.     # kafka 主题
  11. plc1: plc_thoroughfare
  12. plc:
  13. broker: tcp://139.220.193.14:1883
  14. subscribe-topic: 202303171001/p
  15. username: admin@tenlink
  16. password: Tenlink@123
  17. client-id: subscribe_client
  • 订阅mqtt并将报文发送到kafka主题

  1. import org.eclipse.paho.client.mqttv3.*;
  2. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.PostConstruct;
  10. /**
  11. * PLC 订阅消息
  12. */
  13. @Component
  14. public class SubscribeSample {
  15. private static final Logger log = LoggerFactory.getLogger(SubscribeSample.class);
  16. @Autowired
  17. private KafkaTemplate<String,Object> kafkaTemplate;
  18. @Value("${kafka.topics.plc1}")
  19. private String plc1;
  20. @Value("${plc.broker}")
  21. private String broker;
  22. @Value("${plc.subscribe-topic}")
  23. private String subscribeTopic;
  24. @Value("${plc.username}")
  25. private String username;
  26. @Value("${plc.password}")
  27. private String password;
  28. @Value("${plc.client-id}")
  29. private String clientId;
  30. @PostConstruct
  31. public void plcGather() {
  32. int qos = 0;
  33. Thread thread = new Thread(new Runnable() {
  34. @Override
  35. public void run() {
  36. MqttClient client = null;
  37. try {
  38. client = new MqttClient(broker, clientId, new MemoryPersistence());
  39. // 连接参数
  40. MqttConnectOptions options = new MqttConnectOptions();
  41. options.setUserName(username);
  42. options.setPassword(password.toCharArray());
  43. options.setConnectionTimeout(60);
  44. options.setKeepAliveInterval(60);
  45. // 设置回调
  46. client.setCallback(new MqttCallback() {
  47. public void connectionLost(Throwable cause) {
  48. System.out.println("connectionLost: " + cause.getMessage());
  49. }
  50. public void messageArrived(String topic, MqttMessage message) {
  51. String data = new String(message.getPayload());
  52. kafkaTemplate.send(plc1,data).addCallback(success ->{
  53. // 消息发送到的topic
  54. String kafkaTopic = success.getRecordMetadata().topic();
  55. // 消息发送到的分区
  56. // int partition = success.getRecordMetadata().partition();
  57. // 消息在分区内的offset
  58. // long offset = success.getRecordMetadata().offset();
  59. log.info("mqtt成功将消息:{},转入到kafka主题->{}", data,kafkaTopic);
  60. },failure ->{
  61. throw new RuntimeException("发送消息失败:" + failure.getMessage());
  62. });
  63. }
  64. public void deliveryComplete(IMqttDeliveryToken token) {
  65. log.info("deliveryComplete---------{}", token.isComplete());
  66. }
  67. });
  68. client.connect(options);
  69. client.subscribe(subscribeTopic, qos);
  70. } catch (MqttException e) {
  71. e.printStackTrace();
  72. }
  73. }
  74. });
  75. thread.start();
  76. }
  77. }
  • 采集报文测试(如下图表示成功,并且已经发送到了kafka主题上)

Flink接收kafka数据

  • maven pom

  1. <!--工具类 开始-->
  2. <dependency>
  3. <groupId>com.alibaba</groupId>
  4. <artifactId>fastjson</artifactId>
  5. <version>1.2.83</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.commons</groupId>
  9. <artifactId>commons-collections4</artifactId>
  10. <version>4.4</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.commons</groupId>
  14. <artifactId>commons-lang3</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.projectlombok</groupId>
  18. <artifactId>lombok</artifactId>
  19. <version>1.18.26</version>
  20. </dependency>
  21. <!--工具类 结束-->
  22. <!-- flink依赖引入 开始-->
  23. <dependency>
  24. <groupId>org.apache.flink</groupId>
  25. <artifactId>flink-java</artifactId>
  26. <version>1.13.1</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.flink</groupId>
  30. <artifactId>flink-streaming-java_2.11</artifactId>
  31. <version>1.13.1</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.flink</groupId>
  35. <artifactId>flink-clients_2.11</artifactId>
  36. <version>1.13.1</version>
  37. </dependency>
  38. <!-- flink连接kafka -->
  39. <dependency>
  40. <groupId>org.apache.flink</groupId>
  41. <artifactId>flink-connector-kafka_2.11</artifactId>
  42. <version>1.13.1</version>
  43. </dependency>
  44. <!-- flink连接es-->
  45. <dependency>
  46. <groupId>org.apache.flink</groupId>
  47. <artifactId>flink-json</artifactId>
  48. <version>1.13.1</version>
  49. </dependency>
  50. <!-- flink连接mysql-->
  51. <dependency>
  52. <groupId>org.apache.flink</groupId>
  53. <artifactId>flink-jdbc_2.11</artifactId>
  54. <version>1.10.0</version>
  55. </dependency>
  56. <!-- flink依赖引入 结束-->
  57. <!--spring data jpa-->
  58. <dependency>
  59. <groupId>org.springframework.boot</groupId>
  60. <artifactId>spring-boot-starter-data-jpa</artifactId>
  61. </dependency>
  • yaml配置

  1. # 服务接口
  2. server:
  3. port: 8222
  4. spring:
  5. kafka:
  6. bootstrap-servers: ip:9092
  7. consumer:
  8. group-id: kafka
  9. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  10. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  11. datasource:
  12. url: jdbc:mysql://127.0.0.01:3306/ceshi?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
  13. driver-class-name: com.mysql.cj.jdbc.Driver
  14. username: root
  15. password: root
  16. druid:
  17. initial-size: 5 #初始化时建立物理连接的个数
  18. min-idle: 5 #最小连接池数量
  19. maxActive: 20 #最大连接池数量
  20. maxWait: 60000 #获取连接时最大等待时间,单位毫秒
  21. timeBetweenEvictionRunsMillis: 60000 #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
  22. minEvictableIdleTimeMillis: 300000 #配置一个连接在池中最小生存的时间,单位是毫秒
  23. validationQuery: SELECT 1 #用来检测连接是否有效的sql
  24. testWhileIdle: true #申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效
  25. testOnBorrow: false #申请连接时执行validationQuery检测连接是否有效,如果为true会降低性能
  26. testOnReturn: false #归还连接时执行validationQuery检测连接是否有效,如果为true会降低性能
  27. poolPreparedStatements: true # 打开PSCache,并且指定每个连接上PSCache的大小
  28. maxPoolPreparedStatementPerConnectionSize: 20 #要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。在Druid中,不会存在Oracle下PSCache占用内存过多的问题,可以把这个数值配置大一些,比如说100
  29. filters: stat,wall,slf4j #配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
  30. #通过connectProperties属性来打开mergeSql功能;慢SQL记录
  31. connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
  32. jpa:
  33. hibernate:
  34. ddl-auto: none
  35. show-sql: true
  36. repositories:
  37. packages: com.hzh.demo.domain.*
  38. #自定义配置
  39. customer:
  40. #flink相关配置
  41. flink:
  42. # 功能开关
  43. plc-status: true
  44. plc-topic: plc_thoroughfare
  45. # 定时任务定时清理失效数据
  46. task:
  47. plc-time: 0 0/1 * * * ?
  • 表结构

  1. -- plc_test definition
  2. CREATE TABLE `plc_test` (
  3. `pkid` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '主键id',
  4. `json_str` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'json格式数据',
  5. `create_time` bigint NOT NULL COMMENT '创建时间',
  6. PRIMARY KEY (`pkid`)
  7. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='plc存储数据测试表';
  • 启动类

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. import org.springframework.boot.autoconfigure.domain.EntityScan;
  4. import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
  5. import org.springframework.scheduling.annotation.EnableScheduling;
  6. @SpringBootApplication
  7. @EnableJpaRepositories(basePackages = "repository basePackages")
  8. @EntityScan("entity basePackages")
  9. @EnableScheduling
  10. public class PLCStorageApplication {
  11. public static void main(String[] args) {
  12. SpringApplication.run(PLCStorageApplication.class, args);
  13. }
  14. }
  • 实体类

  1. import lombok.Builder;
  2. import lombok.Data;
  3. import javax.persistence.Column;
  4. import javax.persistence.Entity;
  5. import javax.persistence.Id;
  6. import javax.persistence.Table;
  7. import java.io.Serializable;
  8. /**
  9. * PLC接收实体
  10. */
  11. @Table(name = "plc_test")
  12. @Data
  13. @Builder
  14. @Entity
  15. public class PLCDomain implements Serializable {
  16. private static final long serialVersionUID = 4122384962907036649L;
  17. @Id
  18. @Column(name = "pkid")
  19. public String id;
  20. @Column(name = "json_str")
  21. public String jsonStr;
  22. @Column(name = "create_time")
  23. private Long createTime;
  24. public PLCDomain(String id, String jsonStr,Long createTime) {
  25. this.id = id;
  26. this.jsonStr = jsonStr;
  27. this.createTime = createTime;
  28. }
  29. public PLCDomain() {
  30. }
  31. }
  • jpa 接口

  1. import com.hzh.demo.domain.PLCDomain;
  2. import org.springframework.data.jpa.repository.JpaRepository;
  3. import org.springframework.stereotype.Repository;
  4. @Repository
  5. public interface PLCRepository extends JpaRepository<PLCDomain,String> {
  6. }
  • 封装获取上下文工具类(ApplicationContextAware)由于加载先后顺序,flink无法使用spring bean注入的方式,特此封装工具类

  1. import org.springframework.beans.BeansException;
  2. import org.springframework.context.ApplicationContext;
  3. import org.springframework.context.ApplicationContextAware;
  4. import org.springframework.context.i18n.LocaleContextHolder;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class ApplicationContextProvider
  8. implements ApplicationContextAware {
  9. /**
  10. * 上下文对象实例
  11. */
  12. private static ApplicationContext applicationContext;
  13. /**
  14. * 获取applicationContext
  15. *
  16. * @return
  17. */
  18. public static ApplicationContext getApplicationContext() {
  19. return applicationContext;
  20. }
  21. @Override
  22. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  23. ApplicationContextProvider.applicationContext = applicationContext;
  24. }
  25. /**
  26. * 通过name获取 Bean.
  27. *
  28. * @param name
  29. * @return
  30. */
  31. public static Object getBean(String name) {
  32. return getApplicationContext().getBean(name);
  33. }
  34. /**
  35. * 通过class获取Bean.
  36. *
  37. * @param clazz
  38. * @param <T>
  39. * @return
  40. */
  41. public static <T> T getBean(Class<T> clazz) {
  42. return getApplicationContext().getBean(clazz);
  43. }
  44. /**
  45. * 通过name,以及Clazz返回指定的Bean
  46. *
  47. * @param name
  48. * @param clazz
  49. * @param <T>
  50. * @return
  51. */
  52. public static <T> T getBean(String name, Class<T> clazz) {
  53. return getApplicationContext().getBean(name, clazz);
  54. }
  55. /**
  56. * 描述 : <获得多语言的资源内容>. <br>
  57. * <p>
  58. * <使用方法说明>
  59. * </p>
  60. *
  61. * @param code
  62. * @param args
  63. * @return
  64. */
  65. public static String getMessage(String code, Object[] args) {
  66. return getApplicationContext().getMessage(code, args, LocaleContextHolder.getLocale());
  67. }
  68. /**
  69. * 描述 : <获得多语言的资源内容>. <br>
  70. * <p>
  71. * <使用方法说明>
  72. * </p>
  73. *
  74. * @param code
  75. * @param args
  76. * @param defaultMessage
  77. * @return
  78. */
  79. public static String getMessage(String code, Object[] args,
  80. String defaultMessage) {
  81. return getApplicationContext().getMessage(code, args, defaultMessage,
  82. LocaleContextHolder.getLocale());
  83. }
  84. }
  • FIink 第三方输出(mysql写入)

  1. import com.hzh.demo.config.ApplicationContextProvider;
  2. import com.hzh.demo.domain.PLCDomain;
  3. import com.hzh.demo.repository.PLCRepository;
  4. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  8. import org.springframework.stereotype.Component;
  9. import java.util.UUID;
  10. /**
  11. * 向mysql写入数据
  12. */
  13. @Component
  14. @ConditionalOnProperty(name = "customer.flink.plc-status")
  15. public class MysqlSink implements SinkFunction<String> {
  16. private static final Logger log = LoggerFactory.getLogger(MysqlSink.class);
  17. @Override
  18. public void invoke(String value, Context context) throws Exception {
  19. long currentTime = context.currentProcessingTime();
  20. PLCDomain build = PLCDomain.builder()
  21. .id(UUID.randomUUID().toString().replaceAll("-", ""))
  22. .jsonStr(value)
  23. .createTime(currentTime)
  24. .build();
  25. PLCRepository repository = ApplicationContextProvider.getBean(PLCRepository.class);
  26. repository.save(build);
  27. log.info("持久化写入:{}",build);
  28. SinkFunction.super.invoke(value, context);
  29. }
  30. }
  • Flink订阅kafka topic读取持续数据

  1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  2. import org.apache.flink.api.java.functions.KeySelector;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  10. import org.springframework.stereotype.Component;
  11. import javax.annotation.PostConstruct;
  12. import java.util.Properties;
  13. /**
  14. * 接收 kafka topic 读取数据
  15. */
  16. @Component
  17. @ConditionalOnProperty(name = "customer.flink.plc-status")
  18. public class FlinkReceivingPLC {
  19. private static final Logger log = LoggerFactory.getLogger(MyKeyedProcessFunction.class);
  20. @Value("${spring.kafka.bootstrap-servers:localhost:9092}")
  21. private String kafkaServer;
  22. @Value("${customer.flink.plc-topic}")
  23. private String topic;
  24. @Value("${spring.kafka.consumer.group-id:kafka}")
  25. private String groupId;
  26. @Value("${spring.kafka.consumer.key-deserializer:org.apache.kafka.common.serialization.StringDeserializer}")
  27. private String keyDeserializer;
  28. @Value("${spring.kafka.consumer.value-deserializer:org.apache.kafka.common.serialization.StringDeserializer}")
  29. private String valueDeserializer;
  30. /**
  31. * 执行方法
  32. *
  33. * @throws Exception 异常
  34. */
  35. @PostConstruct
  36. public void execute(){
  37. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  38. env.enableCheckpointing(5000);
  39. //设定全局并发度
  40. env.setParallelism(1);
  41. Properties properties = new Properties();
  42. //kafka的节点的IP或者hostName,多个使用逗号分隔
  43. properties.setProperty("bootstrap.servers", kafkaServer);
  44. //kafka的消费者的group.id
  45. properties.setProperty("group.id", groupId);
  46. properties.setProperty("key-deserializer",keyDeserializer);
  47. properties.setProperty("value-deserializer",valueDeserializer);
  48. FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
  49. DataStream<String> stream = env.addSource(myConsumer);
  50. stream.print().setParallelism(1);
  51. stream
  52. //分组
  53. .keyBy(new KeySelector<String, String>() {
  54. @Override
  55. public String getKey(String value) throws Exception {
  56. return value;
  57. }
  58. })
  59. //指定处理类
  60. // .process(new MyKeyedProcessFunction())
  61. //数据第三方输出,mysql持久化
  62. .addSink(new MysqlSink());
  63. //启动任务
  64. new Thread(() -> {
  65. try {
  66. env.execute("PLCPersistenceJob");
  67. } catch (Exception e) {
  68. log.error(e.toString(), e);
  69. }
  70. }).start();
  71. }
  72. }
  • 失效数据清理机制(为了方便测试,所以清理机制执行频率高且数据失效低)

  1. import com.hzh.demo.repository.PLCRepository;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.scheduling.annotation.Scheduled;
  7. import org.springframework.stereotype.Component;
  8. import java.util.Optional;
  9. /**
  10. * 定时任务配置
  11. */
  12. @Component
  13. @Configuration
  14. public class QutrzConfig {
  15. private static final Logger log = LoggerFactory.getLogger(QutrzConfig.class);
  16. @Autowired
  17. private PLCRepository plcRepository;
  18. /**
  19. * 数据清理机制
  20. */
  21. @Scheduled(cron = "${task.plc-time}")
  22. private void PLCCleaningMechanism (){
  23. log.info("执行数据清理机制:{}","PLCCleaningMechanism");
  24. long currentTimeMillis = System.currentTimeMillis();
  25. Optional.of(this.plcRepository.findAll()).ifPresent(list ->{
  26. list.forEach(plc ->{
  27. Long createTime = plc.getCreateTime();
  28. //大于1分钟为失效数据
  29. if ((currentTimeMillis - createTime) > (1000 * 60 * 1) ){
  30. this.plcRepository.delete(plc);
  31. log.info("过期数据已经被清理:{}",plc);
  32. }
  33. });
  34. });
  35. }
  36. }
  • 测试结果

  • mysql入库数据

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/949321
推荐阅读
相关标签
  

闽ICP备14008679号