赞
踩
- #针对输入格式如下的txt文件-------------->姓名,科目,成绩
- from pyspark.sql import SparkSession,DataFrame
- from pyspark.sql.functions import split
-
- #配置mysql信息
- PROP = {}
- PROP['driver'] = 'com.mysql.jdbc.Driver'
- PROP['user'] = 'root'
- PROP['password'] = 'password'
- URL = 'jdbc:mysql://localhost:3306/spark'
- TABLE = 'test'
- def test_insert_mysql(df:DataFrame,batch): #表test name,subject,score
- df.write.jdbc(url=URL,table=TABLE,properties=PROP,mode='append')
-
- if __name__ == '__main__':
- spark = SparkSession.builder.getOrCreate()
- lines = spark.readStream.text(path='/opt/tmp/',wholetext=False)#spark针对一个文件只读取一次
- stu_info = lines.select(split(lines.value,",").alias("info"))
- stu = stu_info.select(stu_info['info'][0].alias('name'),stu_info['info'][1].alias('subject'),stu_info['info'].alias('score'))
- #此处可以创建表测试,创建表可以进行复杂sql计算或者通过.select/.groupBy等方法计算
- """
- stu.createOrReplaceTempView("student")
- result = spark.sql("select * from student")
- query = result.writeStream.outputMode('update').foreachBatch(test_insert_mysql).trigger(processingTime="8 seconds").start()
- query.awaitTermination()
- """
- query = stu.writeStream.outputMode('update').foreachBatch(test_insert_mysql).trigger(processingTime="8 seconds").start()
- query.awaitTermination()
- #备注:outputMode三种:complete:用于聚合时 update:不支持会话窗口 和append

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。