赞
踩
springboot
vue.js
element-ui
spark
hadoop
lstm情感分析模型
KNN/CNN卷积神经/线性回归
协同过滤算法(用户+物品)
MLP神经网络
SVD深度学习模型
echarts
python爬虫
mysql
neo4j
修改密码短信接收验证码
4种深度学习/机器学习推荐算法
深度学习预测算法
百度AI识别
支付宝沙箱支付
数据可视化大屏
知识图谱推荐/可视化
支付宝沙箱支付
医生评论情感分析
30+种创新点
爬虫代码
#!/usr/bin/env python3 # coding=utf-8 import requests import json import pymysql # 数据库配置信息 host="bigdata" port=3306 user="root" password="123456" #database="MetroSwiping" database="MetroData" timer = 5 # 连接失败后重连的间隔时长,单位秒 # API配置信息 appKey="3ef4a029a8fe405a94c2c97d977dc2f9" rows = 500 # 总数据量 1337000,每页500条的话共计2674页 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36", }
数据分析分析代码
package com.metroData; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; /** * 数据查询语言DQL,select操作 * 数据操纵语言DML,insert delete update操作 * 数据定义语言DDL,创建表、视图、索引等操作 * 数据控制语言DCL,赋权、回滚、提交等操作 */ // 先有原始数据Source 对数据进行处理 trans 将处理好的数据发送到mysql sink public class RealtimeAnalysis { public static void main(String[] args) throws InterruptedException { // 执行环境 EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); TableEnvironment tableEnv= TableEnvironment.create(settings); // FlinkSQL 数据Sink到Mysql时,非空存在null值,插入mysql报错,配置下面的参数进行解决: Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.setString("table.exec.sink.not-null-enforcer","drop"); // 创建source表 String kafkaSource = "CREATE TABLE MetroSwipingData (\n" + " card_no VARCHAR,\n" + " deal_date VARCHAR,\n" + " deal_type VARCHAR,\n" + " deal_money DECIMAL(16,2),\n" + " deal_value DECIMAL(16,2),\n" + " equ_no VARCHAR,\n" + " company_name VARCHAR,\n" + " station VARCHAR,\n" + " car_no VARCHAR,\n" + " conn_mark VARCHAR,\n" + " close_date VARCHAR\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'MetroSwipingData',\n" + " 'properties.bootstrap.servers' = 'bigdata:9092',\n" + " 'properties.group.id' = 'testGroup123',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")"; // sink测试打印到控制台 String sinkPrint = "CREATE TABLE sinkPrint (\n" + " station STRING,\n" + " amount DECIMAL(16,2)\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"; // 创建Sink表,连接到MySQL,每个站的累计营收额 String sink_station_amount = "CREATE TABLE station_amount (\n" + " station STRING,\n" + " amount DECIMAL(16,2),\n" + " PRIMARY KEY (station) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://bigdata:3306/report?useSSL=false&useUnicode=true&characterEncoding=utf-8',\n" + " 'table-name' = 'station_amount',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'connection.max-retry-timeout' = '60s',\n" + " 'sink.max-retries' = '3',\n" + " 'lookup.max-retries' = '3'\n" + ")"; // 创建Sink表,连接到MySQL,每个站的出入站人数 String sink_station_peopleNum = "CREATE TABLE station_peopleNum ( \n" + "station STRING, \n" + "enterNum INT, \n" + "outNum INT, \n" + "PRIMARY KEY (station) NOT ENFORCED \n" + ") WITH ( \n" + "'connector' = 'jdbc', \n" + "'url' = 'jdbc:mysql://bigdata:3306/report?useSSL=false&useUnicode=true&characterEncoding=utf-8', \n" + "'table-name' = 'station_peopleNum', \n" + "'username' = 'root', \n" + "'password' = '123456', \n" + "'connection.max-retry-timeout' = '60s', \n" + "'sink.max-retries' = '3', \n" + "'lookup.max-retries' = '3' \n" + ")"; // 创建Sink表,连接到MySQL,每条线路的营收额 String sink_line_amount = "CREATE TABLE line_amount ( \n" + "line STRING, \n" + "amount DECIMAL(16,2),\n" + "PRIMARY KEY (line) NOT ENFORCED \n" + ") WITH ( \n" + "'connector' = 'jdbc', \n" + "'url' = 'jdbc:mysql://bigdata:3306/report?useSSL=false&useUnicode=true&characterEncoding=utf-8', \n" + "'table-name' = 'line_amount', \n" + "'username' = 'root', \n" + "'password' = '123456', \n" + "'connection.max-retry-timeout' = '60s', \n" + "'sink.max-retries' = '3', \n" + "'lookup.max-retries' = '3' \n" + ")"; // 创建Sink表,连接到MySQL,每天的客运量 // String sink_data_peopleNum = "CREATE TABLE data_peopleNum ( \n" + // "dt STRING, \n" + // "peopleNum BIGINT,\n" + // "PRIMARY KEY (dt) NOT ENFORCED \n" + // ") WITH ( \n" + // "'connector' = 'jdbc', \n" + // "'url' = 'jdbc:mysql://bigdata:3306/report?useSSL=false&useUnicode=true&characterEncoding=utf-8', \n" + // "'table-name' = 'data_peopleNum', \n" + // "'username' = 'root', \n" + // "'password' = '123456', \n" + // "'connection.max-retry-timeout' = '60s', \n" + // "'sink.max-retries' = '3', \n" + // "'lookup.max-retries' = '3' \n" + // ")"; // 创建Sink表,连接到MySQL,累计营收额 String sink_total_amount = "CREATE TABLE total_amount ( \n" + "id INT,\n" + "total_amount DECIMAL(16,2),\n" + "PRIMARY KEY (id) NOT ENFORCED \n" + ") WITH ( \n" + "'connector' = 'jdbc', \n" + "'url' = 'jdbc:mysql://bigdata:3306/report?useSSL=false&useUnicode=true&characterEncoding=utf-8', \n" + "'table-name' = 'total_amount', \n" + "'username' = 'root', \n" + "'password' = '123456', \n" + "'connection.max-retry-timeout' = '60s', \n" + "'sink.max-retries' = '3', \n" + "'lookup.max-retries' = '3' \n" + ")"; // 创建Sink表,连接到MySQL,累计实收额 String sink_real_amount = "CREATE TABLE real_amount ( \n" + "id INT,\n" + "real_amount DECIMAL(16,2),\n" + "PRIMARY KEY (id) NOT ENFORCED \n" + ") WITH ( \n" + "'connector' = 'jdbc', \n" + "'url' = 'jdbc:mysql://bigdata:3306/report?useSSL=false&useUnicode=true&characterEncoding=utf-8', \n" + "'table-name' = 'real_amount', \n" + "'username' = 'root', \n" + "'password' = '123456', \n" + "'connection.max-retry-timeout' = '60s', \n" + "'sink.max-retries' = '3', \n" + "'lookup.max-retries' = '3' \n" + ")"; // 创建Sink表,连接到MySQL,累计运营总人次 String sink_people_cnt = "CREATE TABLE people_cnt ( \n" + "id INT,\n" + "people_cnt BIGINT,\n" + "PRIMARY KEY (id) NOT ENFORCED \n" + ") WITH ( \n" + "'connector' = 'jdbc', \n" + "'url' = 'jdbc:mysql://bigdata:3306/report?useSSL=false&useUnicode=true&characterEncoding=utf-8', \n" + "'table-name' = 'people_cnt', \n" + "'username' = 'root', \n" + "'password' = '123456', \n" + "'connection.max-retry-timeout' = '60s', \n" + "'sink.max-retries' = '3', \n" + "'lookup.max-retries' = '3' \n" + ")"; // 每个站的累计营收额 String station_amount = "insert into station_amount select station,sum(deal_money) from MetroSwipingData group by station"; // 每个站点的累计出入站人数 String station_peopleNum = "insert into station_peopleNum select \n" + "station,\n" + "sum(case when deal_type='地铁入站' then 1 else 0 end)as enterNum,\n" + "sum(case when deal_type='地铁出站' then 1 else 0 end)as outNum\n" + "from MetroSwipingData group by station"; // 每条线路的营业额 String line_amount = "insert into line_amount select company_name,sum(deal_value) as amount from MetroSwipingData group by company_name"; // 每天客流量 sink // String data_peopleNum = "insert into data_peopleNum select close_date,count(DISTINCT card_no) as peopleNum from MetroSwipingData group by close_date"; // 累计营收额 trans String total_amount = "insert into total_amount select 1 as id, sum(deal_value) as total_amount from MetroSwipingData"; // 累计实收额 String real_amount = "insert into real_amount select 1 as id, sum(deal_money) as real_amount from MetroSwipingData"; // 累计总客运量 String people_cnt = "insert into people_cnt select 1 as id, count(1) as people_cnt from MetroSwipingData where deal_type='地铁入站'"; // 开始启动 System.out.println("========================================== Flink任务开始启动... =========================================="); // 创建source表 System.out.println("开始创建source表 [MetroSwipingData]..."); tableEnv.executeSql(kafkaSource); System.out.println("source表 [MetroSwipingData] 创建成功..."); // 创建sink表 System.out.println("开始创建sink表 [station_amount]..."); tableEnv.executeSql(sink_station_amount); tableEnv.executeSql(sink_station_peopleNum); tableEnv.executeSql(sink_line_amount); // tableEnv.executeSql(sink_data_peopleNum); tableEnv.executeSql(sink_total_amount); tableEnv.executeSql(sink_real_amount); tableEnv.executeSql(sink_people_cnt); System.out.println("sink表 [station_amount | station_peopleNum | station_peopleNum | line_amount | total_amount | real_amount | people_cnt] 创建成功..."); // 开始执行计算逻辑 System.out.println("开始执行逻辑操作"); tableEnv.executeSql(station_amount); tableEnv.executeSql(station_peopleNum); tableEnv.executeSql(line_amount); // tableEnv.executeSql(data_peopleNum); tableEnv.executeSql(total_amount); tableEnv.executeSql(real_amount); tableEnv.executeSql(people_cnt); System.out.println("========================================== Flink任务启动成功... =========================================="); } }
计算机毕业设计吊打导师hadoop+spark知识图谱医生推荐系统 门诊预测 医疗数据可视化 医疗大数据 医疗数据分析 医生爬虫 大数据毕业设计 大数据毕设
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。