当前位置:   article > 正文

Kafka实战 - 02 Kafka生产者发送消息至topic实现数据上报_kafka向topic里面发送数据

kafka向topic里面发送数据

1. 项目背景

资产可能会遭受各种网络攻击,安全事件和安全告警就是已经被攻击的资产产生的日志,一条攻击链路可能会经过多个资产,由此产生的日志为安全事件,而具体某一个被攻击的资产产生的日志为安全告警。一个安全事件关联多个安全告警,安全事件存在数据库mongodb中,安全告警存在数据库ElasticSearch中。

SIR和XDR是两个不同的产品,SIR平台是安全事件协同响应平台,能够根据安全告警和安全事件日志对已经遭受攻击的资产进行处置闭环,处置完成后需要修改安全事件和安全告警日志的处置状态。

但是XDR产品无法对安全事件和安全告警处置闭环,因此需要将XDR平台的安全告警和安全事件数据接入到SIR平台处置闭环,待处置完成后将数据的处置状态同步给XDR平台,保证两个平台的数据的处置状态一致;

所以下面要做的就是XDR平台的数据上报:

将XDR平台的安全告警和安全事件数据发送到指定的topic中,SIR平台通过Python脚本从topic中取出数据将安全事件存在数据库mongodb中,安全告警存在数据库ElasticSearch中。

2. 依赖和配置

相关文章:Kafka实战 - 01 自定义 SpringBoot Starter 实现 Kafka 的自动配置

在 ngsoc-open 服务中引入自定义的 ngsoc-common-kafka 的依赖:

<dependency>
    <groupId>com.hh</groupId>
    <artifactId>ngsoc-common-kafka</artifactId>
    <version>3.0.1</version>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

kafka的相关配置:项目的配置中心使用的confd,#{{}}是相关配置文件语法,不影响

ngsoc:
  kafka:
    clusters:
      - name: ngsoc
        #{{$data := json (getv "/ngsoc/kafka/common/cluster/conn_info")}}
        bootstrap-servers: #{{range $data.route}}
          - '127.0.0.1:9092' #{{end}}
        topics:
          - name: NGSOC_APP_ALARM
            partition: 1
            replication: 1
          - name: NGSOC_APP_INCIDENT
            partition: 1
            replication: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. 生产者配置 KafkaConfiguration

@Configuration
public class KafkaConfiguration {

    @Bean("jsonKafkaTemplate")
    public KafkaTemplate<String, Object> jsonKafkaTemplate(ProducerFactory<String, Object> pf) {
        Map<String, Object> config = Map.of(
                // 3次重试
                ProducerConfig.RETRIES_CONFIG, "3",
                // 5ms批量发送
                ProducerConfig.LINGER_MS_CONFIG, "500",
                // JSON序列化
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class
        );

        return new KafkaTemplate<>(pf, config );
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

4. 同步数据Topic枚举 SyncDataTopicEnum

public enum SyncDataTopicEnum {

    /**
     * 安全告警
     */
    ALARM(0, "NGSOC_APP_ALARM"),

    /**
     * 安全事件
     */
    INCIDENT(1, "NGSOC_APP_INCIDENT");

    private final int type;
    private final String name;

    SyncDataTopicEnum(int type, String name) {
        this.type = type;
        this.name = name;
    }

    public static String getTopicNameByType(int type) {
        for (SyncDataTopicEnum constants : SyncDataTopicEnum.values()) {
            if (type == constants.type) {
                return constants.name;
            }
        }
        return null;
    }

    public static boolean contains(int type) {
        for (SyncDataTopicEnum value : SyncDataTopicEnum.values()) {
            if (type == value.type) {
                return true;
            }
        }
        return false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

5. 请求体 DataSyncQo

@Data
public class DataSyncQo implements ValidateAble {

    @ApiModelProperty("数据类型,0-安全事件,1-安全告警")
    @NotNull(message = "data.type.must.be.not.null")
    private Integer type;

    @ApiModelProperty("具体数据")
    @NotEmpty(message = "data.must.not.empty")
    private List<Object> data;

    private final int DATA_MAX_SIZE = 200;

    @Override
    public void validate() throws ValidateException {
        if (!SyncDataTopicEnum.contains(type)) {
            throw new ValidateException("type.of.data.is.not.valid");
        }

        if (data.size() > DATA_MAX_SIZE) {
            throw new ValidateException("data.size.limit");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

6. 同步数据控制层 AppSyncDataController

@RestController
@RequestMapping("/api/v1/app")
public class AppSyncDataController {

    @Setter(onMethod_ = @Autowired)
    private IXdrDataSyncService incidentUploadService;

    @OpenApi
    @PostMapping("/syncData")
    @CheckValidateAble
    @OperateLog(target = "app.data", action = "xdr.data.sync")
    public ApiResponse<Object> upload(@RequestHeader("Authorization") String key, @Validated @RequestBody DataSyncQo qo) {
        return incidentUploadService.upload(key, qo);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

7. 同步数据业务层 XdrDataSyncServiceImpl

@Slf4j
@Service
public class XdrDataSyncServiceImpl implements IXdrDataSyncService {

    @Autowired
    @Qualifier("jsonKafkaTemplate")
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Override
    public ApiResponse<Object> upload(String key, DataSyncQo qo) {
        // 根据同步数据类型获取topic名称
        String topicName = Objects.requireNonNull(SyncDataTopicEnum.getTopicNameByType(qo.getType()));
        // 将同步数据发送到topic中
        pushToTopic(topicName, qo);
        return ApiResponse.ok();
    }

    private void pushToTopic(String topicName, DataSyncQo qo) {
 log.info("[XDR数据上报]XDR数据[{}]正在进行上报,数据长度为:{}", topicName, qo.getData().size());
        for (Object data : qo.getData()) {
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, data);
            future.addCallback(new ListenableFutureCallback<>() {
                @Override
                public void onFailure(@NotNull Throwable t) {
                    log.error("[XDR数据上报]设备上传的数据打入Kafka异常:", t);
                }

                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    // 正确不做记录
                }
            });
        }
        log.info("[XDR数据上报]XDR数据[{}]上报行为执行完毕", topicName);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/707539
推荐阅读
相关标签
  

闽ICP备14008679号