当前位置:   article > 正文

pyflink实时接收kafka数据至mysql_pyflink读取kafka写入mysql

pyflink读取kafka写入mysql
  1. from pyflink.dataset import ExecutionEnvironment
  2. from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings
  3. from pyflink.table.catalog import HiveCatalog
  4. from pyflink.table import SqlDialect
  5. env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
  6. t_env = BatchTableEnvironment.create(environment_settings=env_settings)
  7. catalog = HiveCatalog("myhive", "ods", "/home/hadoop/hive-3.1.2/conf")
  8. # Register the catalog
  9. t_env.register_catalog("myhive", catalog)
  10. # set the HiveCatalog as the current catalog of the sessionT_env.use_catalog("myhive")
  11. t_env.use_catalog("myhive")
  12. t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
  13. # Create a catalog table
  14. t_env.execute_sql("""CREATE TABLE IF NOT EXISTS sink_parent_info(
  15. etl_date STRING
  16. ,id BIGINT
  17. ,user_id BIGINT
  18. ,height DECIMAL(5,2)
  19. ,weight DECIMAL(5,2)
  20. )
  21. """)
  22. # should return the tables in current catalog and database.
  23. t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
  24. t_env.execute_sql(f"""
  25. CREATE TEMPORARY TABLE source_parent_info(
  26. id bigint
  27. ,user_id bigint
  28. ,height decimal(5,2)
  29. ,weight decimal(5,2)
  30. ) with (
  31. 'connector.type' = 'jdbc',
  32. 'connector.url' = 'jdbc:mysql://xxxx:3306/xxxx',
  33. 'connector.driver'= 'com.mysql.cj.jdbc.Driver',
  34. 'connector.table' = 'parent_info',
  35. 'connector.username' = 'root',
  36. 'connector.password' = 'xxxx',
  37. 'connector.write.flush.interval' = '1s')
  38. """)
  39. t_env.execute_sql("""
  40. INSERT INTO sink_parent_info
  41. SELECT
  42. id
  43. ,user_id
  44. ,height
  45. ,weight
  46. FROM source_parent_info
  47. """).wait()

参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号