当前位置:   article > 正文

Flink入门(二)(使用kafka作为sink和source)_flink对接kafka作为source使用

flink对接kafka作为source使用

在Mac电脑上安装使用kafka

使用kafka需要先安装zookeeper作为注册中心,在Mac上可以先安装homebrew然后再使用homebrew作为工具安装kafka和zookeeper

brew install kafka
brew install zookeeper

进入 /usr/local/Cellar下分别启动kafka和zookeeper
启动命令为
zookeeper命令为

zkServer start

kafka的各种命令为

Kafka 服务端启动启动
kafka-server-start /usr/local/etc/kafka/server.properties

kafka生产者启动
kafka-console-producer --broker-list localhost:9092 --topic first

kafka消费者启动
kafka-console-consumer --bootstrap-server localhost:9092 --topic pktest

kafka创建topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pktest

kafka展示topic列表
kafka-topics --list --zookeeper localhost:2181

Scala 使用Kafka作为Source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object StreamingJob {
  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    var topic ="pktest"

    var  properties = new Properties()
    properties.setProperty("bootstrap.servers","127.0.0.1:9092")
    properties.setProperty("group.id","test")

    val  data = env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),properties))
    data.print()
    env.execute("StreamingJob")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

上述即为Kafka作为消费者的例子,我们在启动producer的窗口中进行输入,即可在控制台中看到输出结果

使用Kafka作为生产者

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

object KafkaConnectorProducer {
  def main(args: Array[String]): Unit = {


    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._


    var data = env.socketTextStream("localhost",9999)

    var topic ="pktest"

    var  properties = new Properties()
    properties.setProperty("bootstrap.servers","127.0.0.1:9092")
    //properties.setProperty("group.id","test")

    //env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),properties))
    var kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),properties)
    data.addSink(kafkaSink)
    env.execute("KafkaConnectorProducer")
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

此例子使用nc -lk 9000 作为数据源。经过Flink包装后传输到对应的Topic在 nc -lk 9000的命令行窗口输入一些字符并回车。即可在对应的kafka消费者命令窗口看到消费的数据

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/44159
推荐阅读
相关标签
  

闽ICP备14008679号