当前位置:   article > 正文

pyspark之Structured Streaming结果保存到Mysql数据库_structuredstreaming输出到mysql

structuredstreaming输出到mysql
  1. #针对输入格式如下的txt文件-------------->姓名,科目,成绩
  2. from pyspark.sql import SparkSession,DataFrame
  3. from pyspark.sql.functions import split
  4. #配置mysql信息
  5. PROP = {}
  6. PROP['driver'] = 'com.mysql.jdbc.Driver'
  7. PROP['user'] = 'root'
  8. PROP['password'] = 'password'
  9. URL = 'jdbc:mysql://localhost:3306/spark'
  10. TABLE = 'test'
  11. def test_insert_mysql(df:DataFrame,batch): #表test name,subject,score
  12. df.write.jdbc(url=URL,table=TABLE,properties=PROP,mode='append')
  13. if __name__ == '__main__':
  14. spark = SparkSession.builder.getOrCreate()
  15. lines = spark.readStream.text(path='/opt/tmp/',wholetext=False)#spark针对一个文件只读取一次
  16. stu_info = lines.select(split(lines.value,",").alias("info"))
  17. stu = stu_info.select(stu_info['info'][0].alias('name'),stu_info['info'][1].alias('subject'),stu_info['info'].alias('score'))
  18. #此处可以创建表测试,创建表可以进行复杂sql计算或者通过.select/.groupBy等方法计算
  19. """
  20. stu.createOrReplaceTempView("student")
  21. result = spark.sql("select * from student")
  22. query = result.writeStream.outputMode('update').foreachBatch(test_insert_mysql).trigger(processingTime="8 seconds").start()
  23. query.awaitTermination()
  24. """
  25. query = stu.writeStream.outputMode('update').foreachBatch(test_insert_mysql).trigger(processingTime="8 seconds").start()
  26. query.awaitTermination()
  27. #备注:outputMode三种:complete:用于聚合时 update:不支持会话窗口 和append

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/709938
推荐阅读
相关标签
  

闽ICP备14008679号