赞
踩
- from pyflink.dataset import ExecutionEnvironment
- from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings
- from pyflink.table.catalog import HiveCatalog
- from pyflink.table import SqlDialect
-
- env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
- t_env = BatchTableEnvironment.create(environment_settings=env_settings)
-
- catalog = HiveCatalog("myhive", "ods", "/home/hadoop/hive-3.1.2/conf")
-
- # Register the catalog
- t_env.register_catalog("myhive", catalog)
- # set the HiveCatalog as the current catalog of the sessionT_env.use_catalog("myhive")
- t_env.use_catalog("myhive")
- t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
- # Create a catalog table
- t_env.execute_sql("""CREATE TABLE IF NOT EXISTS sink_parent_info(
- etl_date STRING
- ,id BIGINT
- ,user_id BIGINT
- ,height DECIMAL(5,2)
- ,weight DECIMAL(5,2)
- )
- """)
-
- # should return the tables in current catalog and database.
- t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
- t_env.execute_sql(f"""
- CREATE TEMPORARY TABLE source_parent_info(
- id bigint
- ,user_id bigint
- ,height decimal(5,2)
- ,weight decimal(5,2)
- ) with (
- 'connector.type' = 'jdbc',
- 'connector.url' = 'jdbc:mysql://xxxx:3306/xxxx',
- 'connector.driver'= 'com.mysql.cj.jdbc.Driver',
- 'connector.table' = 'parent_info',
- 'connector.username' = 'root',
- 'connector.password' = 'xxxx',
- 'connector.write.flush.interval' = '1s')
- """)
-
- t_env.execute_sql("""
- INSERT INTO sink_parent_info
- SELECT
- id
- ,user_id
- ,height
- ,weight
- FROM source_parent_info
- """).wait()

参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。