赞
踩
Flink 中你可以使用 StreamExecutionEnvironment.getExecutionEnvironment创建流处理的执行环境
Flink 中你可以使用 StreamExecutionEnvironment.addSource(source) 来为你的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction来自定义非并行的source或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。
Flink在流处理上的source和在批处理上的source基本一致。大致有4大类:
基于本地集合的source(Collection-based-source)
基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回
基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。
自定义的source(Custom-source)
代码示例
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import scala.collection.immutable.{Queue, Stack} import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} object StreamingDemoFromCollectionSource { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment //0.用element创建DataStream(fromElements) val ds0: DataStream[String] = senv.fromElements("spark", "flink") ds0.print() //1.用Tuple创建DataStream(fromElements) val ds1: DataStream[(Int, String)] = senv.fromElements((1, "spark"), (2, "flink")) ds1.print() //2.用Array创建DataStream val ds2: DataStream[String] = senv.fromCollection(Array("spark", "flink")) ds2.print() //3.用ArrayBuffer创建DataStream val ds3: DataStream[String] = senv.fromCollection(ArrayBuffer("spark", "flink")) ds3.print() //4.用List创建DataStream val ds4: DataStream[String] = senv.fromCollection(List("spark", "flink")) ds4.print() //5.用List创建DataStream val ds5: DataStream[String] = senv.fromCollection(ListBuffer("spark", "flink")) ds5.print() //6.用Vector创建DataStream val ds6: DataStream[String] = senv.fromCollection(Vector("spark", "flink")) ds6.print() //7.用Queue创建DataStream val ds7: DataStream[String] = senv.fromCollection(Queue("spark", "flink")) ds7.print() //8.用Stack创建DataStream val ds8: DataStream[String] = senv.fromCollection(Stack("spark", "flink")) ds8.print() //9.用Stream创建DataStream(Stream相当于lazy List,避免在中间过程中生成不必要的集合) val ds9: DataStream[String] = senv.fromCollection(Stream("spark", "flink")) ds9.print() //10.用Seq创建DataStream val ds10: DataStream[String] = senv.fromCollection(Seq("spark", "flink")) ds10.print() //11.用Set创建DataStream(不支持) //val ds11: DataStream[String] = senv.fromCollection(Set("spark", "flink")) //ds11.print() //12.用Iterable创建DataStream(不支持) //val ds12: DataStream[String] = senv.fromCollection(Iterable("spark", "flink")) //ds12.print() //13.用ArraySeq创建DataStream val ds13: DataStream[String] = senv.fromCollection(mutable.ArraySeq("spark", "flink")) ds13.print() //14.用ArrayStack创建DataStream val ds14: DataStream[String] = senv.fromCollection(mutable.ArrayStack("spark", "flink")) ds14.print() //15.用Map创建DataStream(不支持) //val ds15: DataStream[(Int, String)] = senv.fromCollection(Map(1 -> "spark", 2 -> "flink")) //ds15.print() //16.用Range创建DataStream val ds16: DataStream[Int] = senv.fromCollection(Range(1, 9)) ds16.print() //17.用fromElements创建DataStream val ds17: DataStream[Long] = senv.generateSequence(1, 9) ds17.print() senv.execute(this.getClass.getName) } }
Flink的流处理可以直接通过readTextFile()方法读取文件来创建数据源,方法如下:
object DataSource_CSV {
def main(args: Array[String]): Unit = {
// 1. 获取流处理运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 读取文件
val textDataStream: DataStream[String] = env.readTextFile("hdfs://node01:8020/flink-datas/score.csv")
// 3. 打印数据
textDataStream.print()
// 4. 执行程序
env.execute()
}
}
上面两种方式创建的数据源一般都是固定的.如果需要源源不断的产生数据,可以使用socket的方式来获取数据,通过调用socketTextStream()方法
示例
编写Flink程序,接收socket的单词数据,并以空格进行单词拆分打印。
步骤
获取流处理运行环境
构建socket流数据源,并指定IP地址和端口号
对接收到的数据进行空格拆分
打印输出
启动执行
在Linux中,使用nc -lk 端口号监听端口,并发送单词
**安装nc: ** yum install -y nc
nc -lk 9999 监听9999端口的信息
object SocketSource {
def main(args: Array[String]): Unit = {
//1. 获取流处理运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 构建socket流数据源,并指定IP地址和端口号
// hadoop hadoop hive spark
val socketDataStream: DataStream[String] = env.socketTextStream("node01", 9999)
// 3. 转换,以空格拆分单词
val mapDataSet: DataStream[String] = socketDataStream.flatMap(_.split(" "))
// 4. 打印输出
mapDataSet.print()
// 5. 启动执行
env.execute("WordCount_Stream")
}
}
我们也可以通过去实现SourceFunction或者它的子类RichSourceFunction类来自定义实现一些自定义的source,Kafka创建source数据源类FlinkKafkaConsumer010也是采用类似的方式。
示例:
自定义数据源, 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
要求:
随机生成订单ID(UUID)
随机生成用户ID(0-2)
随机生成订单金额(0-100)
时间戳为当前系统时间
开发步骤:
创建订单样例类
获取流处理环境
创建自定义数据源
打印数据
执行任务
object StreamFlinkSqlDemo { // 创建一个订单样例类Order,包含四个字段(订单ID、用户ID、订单金额、时间戳) case class Order(id: String, userId: Int, money: Long, createTime: Long) def main(args: Array[String]): Unit = { // 1. 获取流处理运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 2. 创建一个自定义数据源 val orderDataStream = env.addSource(new RichSourceFunction[Order] { override def run(ctx: SourceFunction.SourceContext[Order]): Unit = { // 使用for循环生成1000个订单 for (i <- 0 until 1000) { // 随机生成订单ID(UUID) val id = UUID.randomUUID().toString // 随机生成用户ID(0-2) val userId = Random.nextInt(3) // 随机生成订单金额(0-100) val money = Random.nextInt(101) // 时间戳为当前系统时间 val timestamp = System.currentTimeMillis() // 收集数据 ctx.collect(Order(id, userId, money, timestamp)) // 每隔1秒生成一个订单 TimeUnit.SECONDS.sleep(1) } } override def cancel(): Unit = () }) // 3. 打印数据 orderDataStream.print() // 4. 执行程序 env.execute() } }
我们可以通过使用FlinkKafkaConsumer010来从Kafka中获取消息:
示例:
使用Flink流处理方式,读取Kafka的数据,并打印.
开发步骤:
**Kafka相关操作: **
创建topic
kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic kafkatopic --zookeeper node01:2181,node02:2181,node03:2181
模拟生产者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafkatopic
模拟消费者
kafka-console-consumer.sh --from-beginning --topic kafkatopic --zookeeper node01:2181,node02:2181,node03:2181
代码
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.apache.kafka.clients.CommonClientConfigs object DataSource_kafka { def main(args: Array[String]): Unit = { //1. 创建流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2. 指定kafka数据流的相关信息 val kafkaCluster = "node01:9092,node02:9092,node03:9092" val kafkaTopicName = "kafkatopic" //3. 创建kafka数据流 val properties = new Properties() properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster) val kafka010 = new FlinkKafkaConsumer010[String](kafkaTopicName, new SimpleStringSchema(), properties) //4. 添加数据源addSource(kafka010) val text: DataStream[String] = env.addSource(kafka010) //5. 打印数据 text.print() //6. 执行任务 env.execute("flink-kafka-wordcunt") } }
上面我们已经使用了自定义数据源和Flink自带的Kafka source,那么接下来就模仿着写一个从 MySQL 中读取数据的 Source。
示例
自定义数据源, 读取MySql数据库(test)表(user)数据.
| id | username | password | name |
|---|---|---|---|
| 1 | zhangsan | 111111 | 张三 |
| 2 | lisi | 222222 | 李四 |
| 3 | wangwu | 333333 | 王五 |
| 4 | zhaoliu | 444444 | 赵六 |
| 5 | tianqi | 555555 | 田七 |
相关依赖
<!-- 指定mysql-connector的依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
开发步骤
代码
package com.itheima.stream import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object DataSource_mysql { def main(args: Array[String]): Unit = { // 1. 创建流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 2. 设置并行度 env.setParallelism(1) // 3. 添加自定义MySql数据源 val source = env.addSource(new MySql_source) // 4. 打印结果 source.print() // 5. 执行任务 env.execute() } } class MySql_source extends RichSourceFunction[(Int, String, String, String)] { override def run(ctx: SourceContext[(Int, String, String, String)]): Unit = { // 1. 加载MySql驱动 Class.forName("com.mysql.jdbc.Driver") // 2. 链接MySql var connection: Connection = DriverManager.getConnection("jdbc:mysql:///test", "root", "123456") // 3. 创建PreparedStatement val sql = "select id , username , password , name from user" var ps: PreparedStatement = connection.prepareStatement(sql) // 4. 执行Sql查询 val queryRequest = ps.executeQuery() // 5. 遍历结果 while (queryRequest.next()) { val id = queryRequest.getInt("id") val username = queryRequest.getString("username") val password = queryRequest.getString("password") val name = queryRequest.getString("name") // 收集数据 ctx.collect((id, username, password, name)) } } override def cancel(): Unit = {} }
和DataSet批处理一样,DataStream也包括一系列的Transformation操作.
流数据处理和批数据处理有很多操作是类似的,所以就不再一 一讲解。我们主要讲解,和批处理不一样的一些操作。
按照指定的key来进行分流,类似于批处理中的groupBy。可以按照索引名/字段名来指定分组的字段.
示例
读取socket数据源, 进行单词的计数
开发步骤
代码
/** * KeyBy算子的使用 */ object Transformation_KeyBy { def main(args: Array[String]): Unit = { // 1.获取流处理运行环境 val senv = StreamExecutionEnvironment.getExecutionEnvironment // 2.设置并行度 senv.setParallelism(3) //3. 获取Socket数据源 val stream = senv.socketTextStream("node01", 9999, '\n') //4. 转换操作,以空格切分,每个元素计数1,以单词分组,累加 val text = stream.flatMap(_.split("\\s")) .map((_,1)) //TODO 逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散列分区来实现的 .keyBy(_._1) //TODO 这里的sum并不是分组去重后的累加值,如果统计去重后累加值,则使用窗口函数 .sum(1) //5. 打印到控制台 text.print() //6. 执行任务 senv.execute() } }
Connect用来将两个DataStream组装成一个ConnectedStreams。它用了两个泛型,即不要求两个dataStream的element是同一类型。这样我们就可以把不同的数据组装成同一个结构.
示例
读取两个不同类型的数据源,使用connect进行合并打印。
开发步骤
自定义数据源
/** * 创建自定义并行度为1的source * 实现从1开始产生递增数字 */ class MyLongSourceScala extends SourceFunction[Long] { var count = 1L var isRunning = true override def run(ctx: SourceContext[Long]) = { while (isRunning) { ctx.collect(count) count += 1 TimeUnit.SECONDS.sleep(1) } } override def cancel() = { isRunning = false } } /** * 创建自定义并行度为1的source * 实现从1开始产生递增字符串 */ class MyStringSourceScala extends SourceFunction[String] { var count = 1L var isRunning = true override def run(ctx: SourceContext[String]) = { while (isRunning) { ctx.collect("str_" + count) count += 1 TimeUnit.SECONDS.sleep(1) } } override def cancel() = { isRunning = false } }
代码
object StreamingDemoConnectScala { def main(args: Array[String]): Unit = { // 1. 创建流式处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 2. 添加两个自定义数据源 val text1: DataStream[Long] = env.addSource(new MyLongSourceScala) val text2: DataStream[String] = env.addSource(new MyStringSourceScala) // 3. 使用connect合并两个数据流,创建ConnectedStreams对象 val connectedStreams: ConnectedStreams[Long, String] = text1.connect(text2) // 4. 遍历ConnectedStreams对象,转换为DataStream val result: DataStream[Any] = connectedStreams.map(line1 => { line1 }, line2 => { line2 }) // 5. 打印输出,设置并行度为1 result.print().setParallelism(1) // 6. 执行任务 env.execute("StreamingDemoWithMyNoParallelSourceScala") } }
split就是将一个DataStream分成多个流,用SplitStream来表示
DataStream → SplitStream
select就是获取分流后对应的数据,跟split搭配使用,从SplitStream中选择一个或多个流
SplitStream → DataStream
示例
加载本地集合(1,2,3,4,5,6), 使用split进行数据分流,分为奇数和偶数. 并打印奇数结果
开发步骤
代码:
/** * 演示Split和Select方法 * Split: DataStream->SplitStream * Select: SplitStream->DataStream */ object SplitAndSelect { def main(args: Array[String]): Unit = { // 1. 创建批处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 2. 设置并行度 env.setParallelism(1) // 3. 加载本地集合 val elements: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6) // 4. 数据分流,分为奇数和偶数 val split_data: SplitStream[Int] = elements.split( (num: Int) => num % 2 match { case 0 => List("even") case 1 => List("odd") } ) // 5. 获取分流后的数据 val even: DataStream[Int] = split_data.select("even") val odd: DataStream[Int] = split_data.select("odd") val all: DataStream[Int] = split_data.select("odd", "even") // 6. 打印数据 odd.print() // 7. 执行任务 env.execute() } }
Flink将数据进行sink操作到本地文件/本地集合/HDFS等和之前的批处理操作一致.这里重点说下sink到Kafka以及MySQL的操作
kafka-console-consumer.sh --from-beginning --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
示例
读取MySql的数据, 落地到Kafka中
开发步骤
FlinkKafkaProducer010代码
object DataSink_kafka { def main(args: Array[String]): Unit = { // 1. 创建流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 2. 设置并行度 env.setParallelism(1) // 3. 添加自定义MySql数据源 val source: DataStream[(Int, String, String, String)] = env.addSource(new MySql_source) // 4. 转换元组数据为字符串 val strDataStream: DataStream[String] = source.map( line => line._1 + line._2 + line._3 + line._4 ) //5. 构建FlinkKafkaProducer010 val p: Properties = new Properties p.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092") val sink = new FlinkKafkaProducer010[String]("test2", new SimpleStringSchema(), p) // 6. 添加sink strDataStream.addSink(sink) // 7. 执行任务 env.execute("flink-kafka-wordcount") } }
示例
加载下列本地集合,导入MySql中
List(
(10, "dazhuang", "123456", "大壮"),
(11, "erya", "123456", "二丫"),
(12, "sanpang", "123456", "三胖")
)
开发步骤
RichSinkFunctionopen方法,获取Connection和PreparedStatementinvoke方法,执行插入操作close方法,关闭连接操作代码
object DataSink_MySql { def main(args: Array[String]): Unit = { //1.创建流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.准备数据 val value: DataStream[(Int, String, String, String)] = env.fromCollection(List( (10, "dazhuang", "123456", "大壮"), (11, "erya", "123456", "二丫"), (12, "sanpang", "123456", "三胖") )) // 3. 添加sink value.addSink(new MySql_Sink) //4.触发流执行 env.execute() } } // 自定义落地MySql的Sink class MySql_Sink extends RichSinkFunction[(Int, String, String, String)] { private var connection: Connection = null private var ps: PreparedStatement = null override def open(parameters: Configuration): Unit = { //1:加载驱动 Class.forName("com.mysql.jdbc.Driver") //2:创建连接 connection = DriverManager.getConnection("jdbc:mysql:///test", "root", "123456") //3:获得执行语句 val sql = "insert into user(id , username , password , name) values(?,?,?,?);" ps = connection.prepareStatement(sql) } override def invoke(value: (Int, String, String, String)): Unit = { try { //4.组装数据,执行插入操作 ps.setInt(1, value._1) ps.setString(2, value._2) ps.setString(3, value._3) ps.setString(4, value._4) ps.executeUpdate() } catch { case e: Exception => println(e.getMessage) } } //关闭连接操作 override def close(): Unit = { if (connection != null) { connection.close() } if (ps != null) { ps.close() } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。