当前位置:   article > 正文

2.2 如何使用FlinkSQL读取&写入到文件系统(HDFS\Local\Hive)_sink.parallelism

sink.parallelism

目录

1、文件系统 SQL 连接器

2、如何指定文件系统类型

3、如何指定文件格式

4、读取文件系统

4.1 开启 目录监控 

4.2 可用的 Metadata

5、写出文件系统

5.1 创建分区表

5.2 滚动策略、文件合并、分区提交

5.3 指定 Sink Parallelism

6、示例_通过FlinkSQL读取kafka在写入hive表

6.1、创建 kafka source表用于读取kafka

6.2、创建 hdfs sink表用于写出到hdfs

6.3、insert into 写入到 hdfs_sink_table

6.4、查询 hdfs_sink_table

6.5、创建hive表,指定local


1、文件系统 SQL 连接器

文件系统连接器允许从本地分布式文件系统进行读写数据

官网链接:文件系统 SQL 连接器


2、如何指定文件系统类型

创建表时通过 'path' = '协议名称:///path' 来指定 文件系统类型

参考官网:文件系统类型

  1. CREATE TABLE filesystem_table (
  2. id INT,
  3. name STRING,
  4. ds STRING
  5. ) partitioned by (ds) WITH (
  6. 'connector' = 'filesystem',
  7. -- 本地文件系统
  8. 'path' = 'file:///URI',
  9. -- HDFS文件系统
  10. 'path' = 'hdfs://URI',
  11. -- 阿里云对象存储
  12. 'path' = 'oss://URI',
  13. 'format' = 'json'
  14. );

3、如何指定文件格式

FlinkSQL 文件系统连接器支持多种format,来读取和写入文件

比如当读取的source格式为 csv、json、Parquet... 可以在建表是指定相应的格式类型

来对数据进行解析后映射到表中的字段中

  1. CREATE TABLE filesystem_table_file_format (
  2. id INT,
  3. name STRING,
  4. ds STRING
  5. ) partitioned by (ds) WITH (
  6. 'connector' = 'filesystem',
  7. -- 指定文件格式类型
  8. 'format' = 'json|csv|orc|raw'
  9. );

4、读取文件系统

FlinkSQL可以将单个文件或整个目录的数据读取到单个表中

注意:

        1、当读取目录时,对目录中的文件进行 无序的读取

        2、默认情况下,读取文件时为批处理模式,只会扫描配置路径一遍后就会停止

             当开启目录监控(source.monitor-interval)时,才是流处理模式

4.1 开启 目录监控 

通过设置 source.monitor-interval 属性来开启目录监控,以便在新文件出现时继续扫描

注意:

        只会对指定目录内新增文件进行读取,不会读取更新后的旧文件

  1. -- 目录监控
  2. drop table filesystem_source_table;
  3. CREATE TABLE filesystem_source_table (
  4. id INT,
  5. name STRING,
  6. `file.name` STRING NOT NULL METADATA
  7. ) WITH (
  8. 'connector' = 'filesystem',
  9. 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016',
  10. 'format' = 'json',
  11. 'source.monitor-interval' = '3' -- 开启目录监控,设置监控时间间隔
  12. );
  13. -- 持续读取
  14. select * from filesystem_source_table;

4.2 可用的 Metadata

使用FLinkSQL读取文件系统中的数据时,支持对 metadata 进行读取

注意: 所有 metadata 都是只读的

  1. -- 可用的Metadata
  2. drop table filesystem_source_table_read_metadata;
  3. CREATE TABLE filesystem_source_table_read_metadata (
  4. id INT,
  5. name STRING,
  6. `file.path` STRING NOT NULL METADATA,
  7. `file.name` STRING NOT NULL METADATA,
  8. `file.size` BIGINT NOT NULL METADATA,
  9. `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
  10. ) WITH (
  11. 'connector' = 'filesystem',
  12. 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  13. 'format' = 'json'
  14. );
  15. select * from filesystem_source_table_read_metadata;

运行结果:


5、写出文件系统

5.1 创建分区表

FlinkSQL支持创建分区表,并且通过 insert into(追加) insert overwrite(覆盖) 写入数据

  1. -- 创建分区表
  2. drop table filesystem_source_table_partition;
  3. CREATE TABLE filesystem_source_table_partition (
  4. id INT,
  5. name STRING,
  6. ds STRING
  7. ) partitioned by (ds) WITH (
  8. 'connector' = 'filesystem',
  9. 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  10. 'partition.default-name' = 'default_partition',
  11. 'format' = 'json'
  12. );
  13. -- 动态分区写入
  14. insert into filesystem_source_table_partition
  15. SELECT * FROM (VALUES
  16. (1,'a','20231010')
  17. , (2,'b','20231010')
  18. , (3,'c','20231011')
  19. , (4,'d','20231011')
  20. , (5,'e','20231012')
  21. , (6,'f','20231012')
  22. ) AS user1 (id,name,ds);
  23. -- 静态分区写入
  24. insert into filesystem_source_table_partition partition(ds = '20231010')
  25. SELECT * FROM (VALUES
  26. (1,'a')
  27. , (2,'b')
  28. , (3,'c')
  29. , (4,'d')
  30. , (5,'e')
  31. , (6,'f')
  32. ) AS user1 (id,name);
  33. -- 查询分区表数据
  34. select * from filesystem_source_table_partition where ds = '20231010';

5.2 滚动策略、文件合并、分区提交

可以看之前的博客:flink写入文件时分桶策略

官网链接:官网分桶策略


5.3 指定 Sink Parallelism

当使用FlinkSQL写出到文件系统时,可以通过 sink.parallelism 设置sink算子的并行度

注意:当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常

  1. CREATE TABLE hdfs_sink_table (
  2. `log` STRING,
  3. `dt` STRING, -- 分区字段,天
  4. `hour` STRING -- 分区字段,小时
  5. ) partitioned by (dt,`hour`) WITH (
  6. 'connector' = 'filesystem',
  7. 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  8. 'sink.parallelism' = '2', -- 指定sink算子并行度
  9. 'format' = 'raw'
  10. );

6、示例_通过FlinkSQL读取kafka在写入hive表

需求:

        使用FlinkSQL将kafka数据写入到hdfs指定目录中

        根据kafka的timestamp进行分区(按小时分区)

6.1、创建 kafka source表用于读取kafka

  1. -- TODO 创建读取kafka表时,同时读取kafka元数据字段
  2. drop table kafka_source_table;
  3. CREATE TABLE kafka_source_table(
  4. `log` STRING,
  5. `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
  6. ) WITH (
  7. 'connector' = 'kafka',
  8. 'topic' = '20231017',
  9. 'properties.bootstrap.servers' = 'worker01:9092',
  10. 'properties.group.id' = 'FlinkConsumer',
  11. 'scan.startup.mode' = 'earliest-offset',
  12. 'format' = 'raw'
  13. );

6.2、创建 hdfs sink表用于写出到hdfs

  1. drop table hdfs_sink_table;
  2. CREATE TABLE hdfs_sink_table (
  3. `log` STRING,
  4. `dt` STRING, -- 分区字段,天
  5. `hour` STRING -- 分区字段,小时
  6. ) partitioned by (dt,`hour`) WITH (
  7. 'connector' = 'filesystem',
  8. 'path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  9. 'sink.parallelism' = '2', -- 指定sink算子并行度
  10. 'format' = 'raw'
  11. );

6.3、insert into 写入到 hdfs_sink_table

  1. -- 流式 sql,插入文件系统表
  2. insert into hdfs_sink_table
  3. select
  4. log
  5. ,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt
  6. ,DATE_FORMAT(`timestamp`,'HH') as `hour`
  7. from kafka_source_table;

6.4、查询 hdfs_sink_table

  1. -- 批式 sql,使用分区修剪进行选择
  2. select * from hdfs_sink_table;

6.5、创建hive表,指定local

  1. create table `kafka_to_hive` (
  2. `log` string comment '日志数据'
  3. comment '埋点日志数据' PARTITIONED BY (dt string,`hour` string)
  4. row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
  5. LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/718956
推荐阅读
相关标签
  

闽ICP备14008679号