赞
踩
1.抽象出来一个动态表,并未进行存储,是Flink支持流数据的table API 和sql的核心概念,随时间变化的,查询动态表会生成一个连续的查询,结果是一个动态表
2.hive进入命令行需要先启动元数据服务,在查数据的时候数据是不变的
3.除非是有界流,否则连续的查询是不会停止的
4.将流转化(定义)成动态表,在动态表上计算一个连续的查询,生成一个新的动态表,最后转换成流,连续查询从不停止,会根据输入表上的更新对结果表进行更新
5.更新查询和追加查询,两者最后转化成的流是不一样的,滚动的事件时间窗口,每个窗口不更新,只追加
6.产生更新更改的查询需要维护更多的状态
7.流:Append-only Retract(撤回) Upsert
8.创建tabkleEnvironment
构建流,再转换成表
import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.types.Row object Demo2DAtaStreamToTable { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //创建flink-sql的流处理的环境,构建一个流 val table: StreamTableEnvironment = StreamTableEnvironment.create(env) val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val wordsDS: DataStream[String] = linesDS.flatMap(_.split(",")) //在流上定义表,是一个仅追加的动态表 val wordsTable: Table =table.fromDataStream( wordsDS, //指定流 Schema .newBuilder() .column("f0",DataTypes.STRING()) //指定表结构 .build() )
flink sql有两种写法,SQL(重点)和DSL(基于表或者视图,但是基本不用)
1.在表上进行连续查询:
val countTable: Table =wordsTable
.groupBy($"f0")
.select($"f0",$"f0".count())
2.或者创建临时视图来写SQL:
table.createTemporaryView("word",wordsTable)
val countTable: Table = table.sqlQuery(
"""
|select f0,count(1) as con
|from
|word
|group by f0
|
|""".stripMargin)
val countDS: DataStream[Row] = countTable.toChangelogStream
countDS.print()
env.execute()
}
}
创建flink-sql表环境
val settings: EnvironmentSettings =EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val table: TableEnvironment =TableEnvironment.create(settings)
只能作为source表:
CREATE TABLE datagen (
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5', -- 每秒生成的数据行数据
'fields.id.length' = '5', --字段长度限制
'fields.name.length'='3',
'fields.age.min' ='1', -- 最小值
'fields.age.max'='100' -- 最大值
)
-- ts AS localtimestamp, : localtimestamp获取当前时间戳,
table.executeSql(
""" 一次只能写一个sql,另开一个代码写
|""".stripMargin)
基于已有的表结构创建print表 print仅用于sink表
-- LIKE: 基于已有的表创建表
CREATE TABLE print_table
WITH ('connector' = 'print')
LIKE datagen (EXCLUDING ALL)
采用插入建表
insert into print_table select * from datagen
进行操作后需要,手动设置字段
CREATE TABLE print_table (
age INT,
num BIGINT
)
WITH ('connector' = 'print')
CREATE TABLE blackhole_table
WITH ('connector' = 'blackhole')
LIKE datagen (EXCLUDING ALL)
kafka和csv的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.15.0</version>
</dependency>
CREATE TABLE student_kafka ( id STRING, name STRING, age INT, gender STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'student', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv', 'csv.field-delimiter'=',', -- csv格式数据的分隔符 'csv.ignore-parse-errors'='true', -- 如果出现脏数据,默认补null 'csv.allow-comments'='true' --跳过#注释行(并非跳过脏数据行) )
参数介绍:
scan.startup.mode:
earliest-offset: 读取所有的数据
latest-offset:读取最新的数据,只能读取到任务启动之后生产的数据
group-offsets(默认值): 基于以消费者组读取数据,一个组内只被读取一次,如果消费者组不存在读取最新的数据
timestamp :指定时间戳读取数据
specific-offsets:指定偏移量读取数据
format:
csv: 文本格式,指定字段时需要按照顺序映射,flink sql会自动解析
SQL的结果是更新流或者追加流,然后使用flink向kafka中写数据,此时根据流的情况分为两种情况
读取kafka、插入表、存入kafka kafka sink表创建:(追加流写入kafka) CREATE TABLE student_kafka_sink ( id STRING, name STRING, age INT, gender STRING, clazz STRING ) WITH ( 'connector' = 'kafka',-- 只支持追加的流 'topic' = 'student_nan', //存到新的topic,可以自动生成 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'format' = 'csv', 'csv.field-delimiter'='\t' -- csv格式数据的分隔符 ) -------------------------------------------------------------------------- 注意:非聚合类的连续查询返回的动态表是一个追加表,可以直接写入Kafka中,进行了类似count的操作的表返回的是一个更新表,不断有新的数据流出进行分组 //取出性别为男的学生,将数据写入到kafka中 insert into student_kafka_sink select * from student_kafka_source where gender ='男'
创建sink表: CREATE TABLE gender_num_sink ( gender STRING, num BIGINT, PRIMARY KEY (gender) NOT ENFORCED -- 更新的流要加上设置唯一主键 ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'gender_num', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'key.format' = 'csv', 'value.format' = 'csv' ) -------------------------------------------------------------------------- --执行sql -- 将更新的流写入kafka -- 已唯一的主键作为kafka中key -- 已数据作为kafkavalue insert into gender_num_sink select gender,count(1) as num from student_kafka (kafka source表)---- where gender is not null group by gender -------------------------------------------------------------------------- --使用控制台查看数据(消费者) kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic gender_num --也可以使用Java代码进行消费(能取出key,每次基于最新的key作为基础进行统计)
所报错误看看缺失哪个jar包,提交到集群运行需要先将kafka依赖包上传到flink lib目录下
flink-sql-connector-kafka-1.15.0.jar
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.0</version>
</dependency>
jdbc有界流 字段按照名称和类型进行映射的,flink sql中表的字段和类型必须和数据库中保持一致比如(varchar--String),官网中有数据类型的映射关系 ------------------------------------------------------------------------ jdbc source: CREATE TABLE student_mysql ( id BIGINT, name STRING, age BIGINT, gender STRING, clazz STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata', 'table-name' = 'students', 'username' = 'root', 'password' = '123456' ) ------------------------------------------------------------------------ -- 创建print sink 表 CREATE TABLE print_table WITH ('connector' = 'print') LIKE student_mysql (EXCLUDING ALL) -- 执行sql insert into print_table select * from student_mysql
示例:读取kafka中的数据(消费者),实时统计每个班级学生的人数,将结果保存到mysql中 1.两个地方需要加主键,sink表中需要添加主键,去数据库中建表也需要指定主键,否则会出现数据重复的现象 2.Kafka-source是无界流,得到的结果是更新表,新数据会经过分组后进行统计 ---------------------------------------------------- -- flink sql kafka source表 学生表 CREATE TABLE student_kafka ( id STRING, name STRING, age INT, gender STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'student', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv', 'csv.field-delimiter'=',', -- csv格式数据的分隔符 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null 'csv.allow-comments'='true'--跳过#注释行 ) jdbc-sink: CREATE TABLE clazz_num_mysql ( clazz STRING, num BIGINT, PRIMARY KEY (clazz) NOT ENFORCED -- 按照主键更新数据 ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'clazz_num', -- 需要手动到数据库中创建表 'username' = 'root', 'password' = '123456' ) -- 以下查询返回的是一个更新流,flinksql会自动按照主键更新数据 insert into clazz_num_mysql select clazz,count(1) as num from student_kafka where clazz is not null group by clazz
本地文件,hdfs 其它的文件系统
流处理模式(连续的结果)和批处理模式(最终的结果) 文件source: CREATE TABLE student_file ( id STRINg, name STRING, age INT, gender STRING, clazz STRING ) WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'data/students.txt', -- 必选:指定路径 'format' = 'csv' -- 必选:文件系统连接器指定format ) -- 读取csv格式字段需要按照顺序映射 ---------------------------------------------------- --print sink CREATE TABLE print_table ( clazz STRING, num BIGINT ) WITH ('connector' = 'print') ---------------------------------------------------- --执行sql insert into print_table select clazz,count(1) as num from student_file group by clazz
source表:随机生成数 CREATE TABLE datagen ( id STRING, name STRING, age INT, gender STRING, clazz STRING, ts AS localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '500', -- 每秒生成的数据行数据 'fields.id.length' = '5', --字段长度限制 'fields.name.length'='3', 'fields.gender.length'='1', 'fields.clazz.length'='5', 'fields.age.min' ='1', -- 最小值 'fields.age.max'='100' -- 最大值 ) ---------------------------------------------------- -- 创建file sink表 CREATE TABLE file_sink ( id STRING, name STRING, age INT, gender STRING, clazz STRING, `day` STRING, `hour` STRING ) PARTITIONED BY (`day`,`hour`) WITH ( 'connector'='filesystem', 'path'='data/flink_sink', 'format'='csv', 'sink.rolling-policy.file-size' ='100kb'--滚动生成新的文件的大小,默认128M ) ---------------------------------------------------- -- 执行sql insert into file_sink select id,name,age,gender,clazz, DATE_FORMAT(ts, 'yyyy-MM-dd') as `day`, DATE_FORMAT(ts, 'HH') as `hour` from datagen
hbase(16010页面)回顾:
列示存储数据库,创建表时不需要指定列但是需要指定列簇,插入数据时再指定列
通过rowKey对海量数据进行毫秒级别的延迟
数据保存到内存和hdfs中
再写入数据时会根据rowKey进行排序,建立索引,提高查询效率
hbase会将一个表拆分为多个region,实现分布式存储
1.引入依赖: idea中maven下载依赖找包: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase-1.4</artifactId> <version>1.15.0</version> </dependency> 2.加一个插件,将依赖加入jar包中 将flink-hbase-1.0-jar-with-dependencies.jar依赖包上传到flink的lib目录下 3.是我们自己通过mavrn打的以包,官网提供的依赖包有问题 yarn application -list # 关闭 yarn application -kill application_1658546198162_0009 # 启动 yarn-session.sh -d 4.先启动zookeeper start-hbase.sh
既可以用于写数据,也可以用于读数据,flink读取habse也是一个有界流
-- source表 CREATE TABLE student_kafka ( id STRING, name STRING, age INT, gender STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'student', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv', 'csv.field-delimiter'=',', -- csv格式数据的分隔符 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null 'csv.allow-comments'='true'--跳过#注释行 ) ----------------------------------------------------------------------- -- sink 表 CREATE TABLE student_hbase ( id STRING, info ROW<name STRING,age INT,gender STRING,clazz STRING>, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 'student', 'zookeeper.quorum' = 'master:2181,node1:2181,node2:2181' ); ------------------------------------------------------------------------ -- 需要先再hbase中创建表 hbase shell create 'student','info' ------------------------------------------------------------------------ -- 将数据写入habse insert into student_hbase select id,ROW(name,age,gender,clazz) as info from student_kafka;
增加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.15.0</version>
</dependency>
-- source 表 CREATE TABLE student_file_json ( id STRINg, name STRING, age INT, gender STRING, clazz STRING ) WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'data/students.json', -- 必选:指定路径 'format' = 'json' , -- 必选:文件系统连接器指定 format 'json.ignore-parse-errors' = 'true' --跳过脏数据 ) ---------------------------------------------------- -- sink 表 CREATE TABLE print_table WITH ('connector' = 'print') LIKE student_file_json (EXCLUDING ALL) ---------------------------------------------------- --执行sql insert into print_table select * from student_file_json
-- source 表 CREATE TABLE student_file_json ( id STRINg, name STRING, age INT, gender STRING, clazz STRING ) WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'data/students.json', -- 必选:指定路径 'format' = 'json' , -- 必选:文件系统连接器指定 format 'json.ignore-parse-errors' = 'true' ) ---------------------------------------------------- -- kafka sink CREATE TABLE student_kafka_sink ( id STRING, name STRING, age INT, gender STRING, clazz STRING ) WITH ( 'connector' = 'kafka',-- 只支持追加的流 'topic' = 'student_flink_json', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'format' = 'json' ) ---------------------------------------------------- -- 执行sql insert into student_kafka_sink select * from student_file_json
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。