赞
踩
使用kafka需要先安装zookeeper作为注册中心,在Mac上可以先安装homebrew然后再使用homebrew作为工具安装kafka和zookeeper
brew install kafka
brew install zookeeper
进入 /usr/local/Cellar下分别启动kafka和zookeeper
启动命令为
zookeeper命令为
zkServer start
kafka的各种命令为
Kafka 服务端启动启动
kafka-server-start /usr/local/etc/kafka/server.properties
kafka生产者启动
kafka-console-producer --broker-list localhost:9092 --topic first
kafka消费者启动
kafka-console-consumer --bootstrap-server localhost:9092 --topic pktest
kafka创建topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pktest
kafka展示topic列表
kafka-topics --list --zookeeper localhost:2181
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer object StreamingJob { def main(args: Array[String]) { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ var topic ="pktest" var properties = new Properties() properties.setProperty("bootstrap.servers","127.0.0.1:9092") properties.setProperty("group.id","test") val data = env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),properties)) data.print() env.execute("StreamingJob") } }
上述即为Kafka作为消费者的例子,我们在启动producer的窗口中进行输入,即可在控制台中看到输出结果
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer} object KafkaConnectorProducer { def main(args: Array[String]): Unit = { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ var data = env.socketTextStream("localhost",9999) var topic ="pktest" var properties = new Properties() properties.setProperty("bootstrap.servers","127.0.0.1:9092") //properties.setProperty("group.id","test") //env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),properties)) var kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),properties) data.addSink(kafkaSink) env.execute("KafkaConnectorProducer") } }
此例子使用nc -lk 9000 作为数据源。经过Flink包装后传输到对应的Topic在 nc -lk 9000的命令行窗口输入一些字符并回车。即可在对应的kafka消费者命令窗口看到消费的数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。