当前位置:   article > 正文

Kafka集成flume

Kafka集成flume

1.flume作为生产者集成Kafka

        kafka作为flume的sink,扮演消费者角色

     1.1 flume配置文件

        vim $kafka/jobs/flume-kafka.conf

  1. # agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1 c2
  5. # Describe/configure the source
  6. a1.sources.r1.type = TAILDIR
  7. #记录最后监控文件的断点的文件,此文件位置可不改
  8. a1.sources.r1.positionFile = /export/server/flume/job/data/tail_dir.json
  9. a1.sources.r1.filegroups = f1 f2
  10. a1.sources.r1.filegroups.f1 = /export/server/flume/job/data/.*file.*
  11. a1.sources.r1.filegroups.f2 =/export/server/flume/job/data/.*log.*
  12. # Describe the sink
  13. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  14. a1.sinks.k1.kafka.topic = customers
  15. a1.sinks.k1.kafka.bootstrap.servers =node1:9092,node2:9092
  16. a1.sinks.k1.kafka.flumeBatchSize = 20
  17. a1.sinks.k1.kafka.producer.acks = 1
  18. a1.sinks.k1.kafka.producer.linger.ms = 1
  19. a1.sinks.k1.kafka.producer.compression.type = snappy
  20. # Use a channel which buffers events in memory
  21. a1.channels.c1.type = memory
  22. a1.channels.c1.capacity = 1000
  23. a1.channels.c1.transactionCapacity = 100
  24. # Bind the source and sink to the channel
  25. a1.sources.r1.channels = c1
  26. a1.sinks.k1.channel = c1

1.2开启flume监控

flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume.conf

1.3开启Kafka消费者

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers --from-beginning

1.4生产数据

往被监控文件输入数据

[ljr@node1 data]$echo hello >>file2.txt 
[ljr@node1 data]$ echo ============== >>file2.txt 

查看Kafka消费者

可见Kafka集成flume生产者成功。

2.flume作为消费者集成Kafka

        kafka作为flume的source,扮演生产者角色

2.1flume配置文件

vim $kafka/jobs/flume-kafka.conf

  1. # agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  7. #注意不要大于channel transactionCapacity的值100
  8. a1.sources.r1.batchSize = 50
  9. a1.sources.r1.batchDurationMillis = 200
  10. a1.sources.r1.kafka.bootstrap.servers =node1:9092, node1:9092
  11. a1.sources.r1.kafka.topics = consumers
  12. a1.sources.r1.kafka.consumer.group.id = custom.g.id
  13. # Describe the sink
  14. a1.sinks.k1.type = logger
  15. # Use a channel which buffers events in memory
  16. a1.channels.c1.type = memory
  17. a1.channels.c1.capacity = 1000
  18. #注意transactionCapacity的值不要小于sources batchSize的值50
  19. a1.channels.c1.transactionCapacity = 100
  20. # Bind the source and sink to the channel
  21. a1.sources.r1.channels = c1
  22. a1.sinks.k1.channel = c1

2.2开启flume监控

flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume1.conf

2.3开启Kafka生产者并生产数据

kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers

 

查看flume监控台

可见Kafka集成flume消费者成功。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/724407?site
推荐阅读
相关标签
  

闽ICP备14008679号