赞
踩
- import string
- import random
- from flask import Flask, render_template, request, jsonify, redirect, session
- import pymysql
- from flask_cors import CORS
- import time
- import schedule
- from datetime import datetime
- import threading
- from datetime import timedelta
-
- app = Flask(__name__)
- # 创建一个名为app的Flask实例
- app.secret_key = 'man_what_can_i_say'
- CORS(app)
- # 启用CORS机制,允许跨域访问
-
- #存储每天已经打卡的用户
- user_attendance_day_list = []
-
- # 设置会话有效时间为30分钟
- app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=30)
-
- # 请求钩子,在每次请求之前被调用
- @app.before_request
- def before_request():
- session.permanent = True
- app.permanent_session_lifetime = timedelta(minutes=30) # 设置每次请求后会话的有效时间为30分钟
-
- # 如果用户已经登录或会话已经存在,则更新会话的最后访问时间
- if 'user_id' in session:
- session.modified = True
-
- ##
- # 数据库模块
- ##
-
- # MySQL数据库配置
- DATABASE_CONFIG = {
- 'host': '127.0.0.1',
- 'user': 'root',
- 'password': 'Your_Password',
- 'database': 'sql_web',
- 'cursorclass': pymysql.cursors.DictCursor
- # 指定游标类型为字典类型,使查询结果以字典形式返回,键为列名,值为对应数据
- }
-
- # 创建MySQL连接
- # 将字典DATABASE_CONFIG中的键对值解包后作为参数传递给connect函数,函数会返回一个表示与数据库连接的对象
- def create_connection():
- return pymysql.connect(**DATABASE_CONFIG)
-
- # 创建用户表
- def create_users_table():
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS users (
- u_id SMALLINT AUTO_INCREMENT PRIMARY KEY,
- username VARCHAR(255) NOT NULL,
- password VARCHAR(255) NOT NULL,
- phone VARCHAR(255) NOT NULL,
- department_id TINYINT NOT NULL,
- user_post_id TINYINT NOT NULL,
- attendance_day TINYINT DEFAULT 0,
- leave_day TINYINT DEFAULT 0,
- absence_day TINYINT DEFAULT 0,
- achievement_bouns INT DEFAULT 0,
- absence_nopay INT DEFAULT 0,
- other_nopay INT DEFAULT 0,
- gross_pay INT DEFAULT 0,
- after_tax_salary INT DEFAULT 0
- )
- ''')
- # 依次为用户姓名、手机号、所属部门、用户职务、工资、考勤天数、请假天数、缺勤天数、绩效奖金、缺勤扣除、其他扣除、税前工资、税后工资
- #至于不在用户表中设置基础工资的原因:因为基础工资是和所属部门和职务挂钩的,而且基础工资内容可以改变,不在用户表中设置基础工资,
- # 可以实现在显示用户的基础工资时,其个人中心的基础工资和工资制度中的基础工资是相吻合的
- conn.commit() # 提交对数据库的更改
- conn.close() # 关闭数据库连接
-
- # 创建部门基础工资表
- def create_department_salary_table():
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS department_salary(
- id SMALLINT AUTO_INCREMENT PRIMARY KEY,
- department VARCHAR(255) NOT NULL,
- position VARCHAR(255) NOT NULL,
- basic_salary INT DEFAULT 0,
- overtime_pay INT DEFAULT 0,
- other_salary INT DEFAULT 0,
- d_id SMALLINT NOT NULL,
- p_id SMALLINT NOT NULL
- )
- ''')
- conn.commit()
- conn.close()
-
- # 创建用户考勤表
- def create_user_attendance_table(u_id):
- id = "attendance_table" + str(u_id)
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS `{}`(
- date_time DATE,
- attendance_value SMALLINT
- )
- '''.format(id)) # 第一个字段用来统计日期,第二个字段分别表示打卡(1)、请假(0)、缺勤(-1)
- print("考勤表创建")
- conn.commit()
- conn.close()
-
- # 登录功能数据库模块
- def get_user(username, password):
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT * FROM users WHERE username = %s AND password = %s', (username, password))
- user = cursor.fetchone() # 将查询的这行数据存储到变量user中
- conn.close()
- return user
-
- # 删除用户数据库模块
- def user_delete_sql(u_id):
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('DELETE FROM users WHERE u_id = %s', u_id) # 先在用户表里删除
- cursor.execute('DROP TABLE `{}`'.format("attendance_table" + str(u_id)) ) # 再将该用户的考勤表删除
- conn.commit()
- conn.close()
-
- # 修改用户信息数据库模块
- def user_update_sql(u_id, username, password, phone, department_id, user_post_id, gross_pay):
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('UPDATE users SET username=%s,password=%s,phone=%s,department_id=%s,'
- 'user_post_id=%s,gross_pay=%s WHERE u_id=%s',(username,password,phone,department_id,
- user_post_id,gross_pay,u_id))
- conn.commit()
- conn.close()
-
- # 用户列表数据库模块
- def user_list_sql():
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT * FROM users')
- user_list = cursor.fetchall()
- conn.close()
- return user_list
-
- # 个人信息数据库模块
- def user_information_sql(u_id):
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT * FROM users WHERE u_id = %s', u_id)
- user_info = cursor.fetchone()
- conn.close()
- return user_info
-
- # 管理界面添加用户数据库模块
- def user_add_sql(username, password, phone, department_id, user_post_id,gross_pay,after_tax_salary):
- create_users_table()
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('INSERT INTO users(username,password,phone,department_id,user_post_id,'
- 'gross_pay,after_tax_salary)'
- 'VALUES(%s,%s,%s,%s,%s,%s,%s)', (username, password, phone, department_id,
- user_post_id,gross_pay,after_tax_salary))
- conn.commit()
- conn.close()
-
- # 管理界面搜索用户数据库模块
- def user_search_sql(username):
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT * FROM users WHERE username=%s', username)
- search_result = cursor.fetchall()
- conn.close()
- return search_result
-
- # 系统概览数据库模块
- def system_overview_sql():
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT SUM(gross_pay) FROM users')
- salary_total = cursor.fetchone()['SUM(gross_pay)']
- cursor.execute('SELECT COUNT(*) FROM users')
- staff_total = cursor.fetchone()['COUNT(*)']
- return staff_total, salary_total
-
- # 自动根据员工职务以及所属部门分配工资数据库模块
- def automatic_salary_allocation(department_id,user_post_id):
- create_department_salary_table()
- id_ = 0
- if department_id == 1 and user_post_id == 1:
- id_ = 1
- elif department_id == 1 and user_post_id == 2:
- id_ = 2
- elif department_id == 1 and user_post_id == 3:
- id_ = 3
- elif department_id == 1 and user_post_id == 4:
- id_ = 4
- elif department_id == 2 and user_post_id == 1:
- id_ = 5
- elif department_id == 2 and user_post_id == 2:
- id_ = 6
- elif department_id == 2 and user_post_id == 3:
- id_ = 7
- elif department_id == 2 and user_post_id == 4:
- id_ = 8
- elif department_id == 3 and user_post_id == 1:
- id_ = 9
- elif department_id == 3 and user_post_id == 2:
- id_ = 10
- elif department_id == 3 and user_post_id == 3:
- id_ = 11
- elif department_id == 3 and user_post_id == 4:
- id_ = 12
- elif department_id == 4 and user_post_id == 1:
- id_ = 13
- elif department_id == 4 and user_post_id == 2:
- id_ = 14
- elif department_id == 4 and user_post_id == 3:
- id_ = 15
- elif department_id == 4 and user_post_id == 4:
- id_ = 16
- elif department_id == 5 and user_post_id == 1:
- id_ = 17
- elif department_id == 5 and user_post_id == 2:
- id_ = 18
- elif department_id == 5 and user_post_id == 3:
- id_ = 19
- elif department_id == 5 and user_post_id == 4:
- id_ = 20
- elif department_id == 6 and user_post_id == 1:
- id_ = 21
- elif department_id == 6 and user_post_id == 2:
- id_ = 22
- elif department_id == 6 and user_post_id == 3:
- id_ = 23
- elif department_id == 6 and user_post_id == 4:
- id_ = 24
- else:
- id_ = 25
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT basic_salary,overtime_pay,other_salary FROM department_salary WHERE id=%s',id_)
- salary_tuple = cursor.fetchone()
- return salary_tuple
-
- #公共单次数据库查询表数据库模块
- def public_search_one_sql(variable_one,variable_two,variable_three,variable_four):
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute(f'SELECT `{variable_one}` FROM `{variable_two}` WHERE `{variable_three}`=%s',variable_four)
- public_result = cursor.fetchone()
- conn.close()
- return public_result
-
- #公共单次数据库修改表数据库模块
- def public_update_one_sql(variable_one,variable_two,variable_three,variable_four,variable_five):
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute(f'UPDATE `{variable_one}` SET `{variable_two}`= %s WHERE `{variable_three}`=%s',
- (variable_four,variable_five))
- conn.commit()
- conn.close()
-
- #公共单次查询数据库-无条件 数据库模块
- def public_search_one_no_condition(variable_one,variable_two):
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute(f'SELECT `{variable_one}` FROM `{variable_two}`')
- public_result = cursor.fetchall()
- conn.close()
- return public_result
-
- #python定时任务来定时插入考勤表数据,也就是用户每天考勤表是默认为缺勤的,用户在点击考勤之后,会更新考勤表数据,以此来记录
- def timing_insert_attendance_table_sql():
- print("timing函数调用")
- global user_attendance_day_list
- global now_date
- user_attendance_day_list.clear()
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT u_id FROM users')
- variable_list = cursor.fetchall()
- for variable_tuple in variable_list:
- create_user_attendance_table(variable_tuple['u_id'])
- variable_temporary = 'attendance_table'+str(variable_tuple['u_id'])
- cursor.execute(f'INSERT INTO `{variable_temporary}`(date_time,attendance_value) VALUES(%s,-1)',str(now_date))
- conn.commit()
- conn.close()
-
- #获取所有用户的u_id
- def get_all_users_u_id():
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT u_id FROM users')
- users_u_id = cursor.fetchall()
- conn.close()
- return users_u_id
-
- #各部门工资组成列表
- def salary_composition_list():
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT id,basic_salary,overtime_pay,other_salary,d_id,p_id FROM department_salary')
- salary_composition_list = cursor.fetchall()
- return salary_composition_list
-
- #管理员修改各部门基础工资功能数据库模块
- def modify_salary_sql():
- return 1
-
- # 用户密码生成函数
- def create_password(length=8):
- # 定义包含所有可能的字符串
- all_characters = string.ascii_letters + string.digits + string.punctuation
- # 随机选择字符生成密码
- password = ''.join(random.choice(all_characters) for _ in range(length))
- return password
-
- #绩效奖金/其他罚金 管理函数
- def user_achievement_bouns_or_other_nopay(u_id,new_salary):
- print("user_achievement_bouns_or_other_nopay函数调用")
- old_salary = public_search_one_sql('gross_pay','users','u_id',u_id)['gross_pay']
- print(old_salary)
- print(new_salary)
- if old_salary > new_salary:
- public_update_one_sql('users','other_nopay','u_id',(old_salary-new_salary),u_id)
- after_tax_salary = new_salary
- if 5000 < new_salary <= 15000:
- after_tax_salary = new_salary - (new_salary - 5000) * 0.05
- else:
- after_tax_salary = new_salary - (new_salary - 5000) * 0.1 - 350
- public_update_one_sql('users', 'after_tax_salary', 'u_id', after_tax_salary, u_id)
-
- elif old_salary < new_salary:
- public_update_one_sql('users','achievement_bouns','u_id',(new_salary-old_salary),u_id)
- after_tax_salary = new_salary
- if 5000 < new_salary <= 15000:
- after_tax_salary = new_salary - (new_salary - 5000) * 0.05
- else:
- after_tax_salary = new_salary - (new_salary - 5000) * 0.1 - 350
- public_update_one_sql('users', 'after_tax_salary', 'u_id', after_tax_salary, u_id)
-
- #用户工资算法(要在每天8点之后核算,这里用定时任务执行)
- def user_salary_algorithm():
- if get_current_time():
- global user_attendance_day_list
- user_all_list = [d['u_id'] for d in get_all_users_u_id()]
- difference_list = list(set(user_all_list)-set(user_attendance_day_list))
- #所有的用户与打卡的用户之间的不同就是缺勤的用户
- for u_id in difference_list:
- old_absence_nopay = public_search_one_sql('absence_nopay','users','u_id',u_id)['absence_nopay']
- gross_pay = public_search_one_sql('gross_pay','users','u_id',u_id)['gross_pay']
- public_update_one_sql('users','absence_nopay','u_id',(100+old_absence_nopay),u_id)
- #更新用户的缺勤罚金
- public_update_one_sql('users','gross_pay','u_id',(gross_pay-100),u_id)
- #更新用户的税前工资
- after_tax_salary = gross_pay
- if 5000 < gross_pay <= 15000:
- after_tax_salary = gross_pay - (gross_pay - 5000) * 0.05
- else:
- after_tax_salary = gross_pay - (gross_pay - 5000) * 0.1 - 350
- public_update_one_sql('users','absence_nopay','u_id',after_tax_salary,u_id)
-
- #用户列表中的关于考勤数据的更新
- def user_table_about_attendance_column():
- global now_date
- user_all_list = [d['u_id'] for d in get_all_users_u_id()]
- conn = create_connection()
- cursor = conn.cursor()
- for u_id in user_all_list:
- table_name = 'attendance_table'+str(u_id)
- attendance_value = public_search_one_sql('attendance_value',table_name,'date_time',now_date)['attendance_value']
- column_name = ''
- if attendance_value > 0:
- column_name = 'attendance_day'
- elif attendance_value==0:
- column_name = 'leave_day'
- else:
- column_name = 'absence_day'
- cursor.execute(f'UPDATE users SET `{column_name}`=`{column_name}`+1 WHERE u_id=%s', u_id)
- conn.commit()
- conn.close()
-
- #获取系统时间函数
- def get_current_time():
- now = datetime.now()
- return now.strftime("%Y-%m-%d")
-
- #定时任务函数
- def run_schedule():
- while True:
- schedule.run_pending()
- time.sleep(1)
-
- # 页面路由
- @app.route("/")
- def index():
- if session.get('logged_in'):
- u_id = session['user']['u_id']
- total = system_overview_sql()
- staff_total = total[0]
- salary_total = total[1]
- average_wage = staff_total / salary_total
- # timing_insert_attendance_table_sql()
- # user_salary_algorithm()
- # user_table_about_attendance_column()
- return render_template("index.html", staff_total=staff_total, salary_total=salary_total,
- average_wage=average_wage,u_id=u_id)
- else:
- return redirect("/login_interface")
-
- # 用户登录界面路由
- @app.route("/login_interface")
- def login_interface():
- return render_template("login.html")
-
- # 登陆验证模块
- @app.route("/login", methods=['POST'])
- def login():
- create_users_table()
- data = request.json
- username = data.get('Username')
- password = data.get('Password')
- # 调用方法,实现对用户名和密码是否匹配的验证
- user = get_user(username, password)
- # 使用session将用户数据存储在其中,从而存储用户登录信息,方便后续调用
- if user:
- session['user'] = {
- 'username': username,
- 'u_id': user['u_id'],
- 'user_post_id': user['user_post_id']
- }
- session['logged_in'] = True
- u_id = session['user']['u_id']
- print(public_search_one_sql('absence_nopay','users','u_id',u_id)['absence_nopay'])
- return jsonify({'success': True, 'message': '登录成功'})
- else:
- return jsonify({'success': False, 'message': '无效的用户名或密码'}), 401
-
- # 用户注销
- @app.route("/logout")
- def logout():
- session.pop('user', None)
- session.pop('logged_in', None)
- return redirect("/")
-
- # 用户管理界面
- @app.route("/manage_interface")
- def manage_interface():
- if session['user']['user_post_id'] <= 1:
- return render_template("manage_interface.html")
-
- #用户管理界面-用户列表
- @app.route("/manage_user_list")
- def manage_interface_user_list():
- user_list = user_list_sql()
- return jsonify(user_list)
-
- # 用户管理功能-添加用户
- @app.route("/user_add", methods=['POST'])
- def user_add():
- data = request.json
- username = data.get('Username')
- password = create_password()
- phone = data.get('Phone')
- department_id = data.get('Department_id')
- user_post_id = data.get('Position_id')
- salary_tuple = automatic_salary_allocation(int(department_id), int(user_post_id))
- basic_salary = salary_tuple['basic_salary'] #基本工资
- overtime_pay = salary_tuple['overtime_pay'] #加班工资
- other_salary = salary_tuple['other_salary'] #其他津贴
- gross_pay = basic_salary+overtime_pay+other_salary #根据用户的所属部门和职务来获取基本工资、加班工资、其他津贴
- after_tax_salary = gross_pay
- if 5000 < gross_pay <= 15000:
- after_tax_salary = gross_pay-(gross_pay-5000)*0.05
- else:
- after_tax_salary = gross_pay-(gross_pay-5000)*0.1-350
- user_add_sql(username, password, phone, department_id, user_post_id,gross_pay,after_tax_salary)
- return jsonify({'success': True, 'message': '添加成功'})
-
- #用户管理界面-编辑用户
- @app.route("/user_update",methods=['POST'])
- def user_update():
- data = request.json
- u_id = int(data.get('u_id'))
- username = data.get('username')
- password = data.get('password')
- phone = data.get('phone')
- department_id = int(data.get('department_id'))
- user_post_id = int(data.get('user_post_id'))
- gross_pay = int(data.get('gross_pay'))
- user_achievement_bouns_or_other_nopay(u_id,gross_pay)
- user_update_sql(u_id,username,password,phone,department_id,user_post_id,gross_pay)
- return jsonify({'success': True, 'message': '编辑成功'})
-
- #用户管理界面-删除用户
- @app.route("/user_delete",methods=['POST'])
- def user_delete():
- data = request.json
- u_id = int(data.get('userId'))
- user_delete_sql(u_id)
- return jsonify({'success': True, 'message': '删除成功'})
-
- #用户管理界面-搜索用户
- @app.route("/user_search",methods=['POST'])
- def user_search():
- data = request.json
- username = data.get('Uname')
- conn = create_connection()
- cursor = conn.cursor()
- cursor.execute('SELECT * FROM users WHERE username=%s',username)
- search_result_list = cursor.fetchall()
- print(search_result_list)
- return jsonify({'success': True, 'message': '搜索成功'})
-
-
- # 个人信息界面
- @app.route("/profile_interface")
- def profile_interface():
- if session.get('logged_in'):
- user_info = user_information_sql(session['user']['u_id'])
- user_department_post_salary = automatic_salary_allocation(int(user_info['department_id']),
- int(user_info['user_post_id']))
- return render_template("profile.html", username=str(user_info['username']), phone=user_info['phone'],
- department_id=user_info['department_id'],
- user_post_id=user_info['user_post_id'],
- basic_salary=user_department_post_salary['basic_salary'],
- overtime_pay = user_department_post_salary['overtime_pay'],
- other_salary =user_department_post_salary['other_salary'],
- attendance_day=user_info['attendance_day'],
- leave_day=user_info['leave_day'],absence_day=user_info['absence_day'],
- achievement_bouns=user_info['achievement_bouns'],
- absence_nopay=user_info['absence_nopay'],
- other_nopay=user_info['other_nopay'],
- gross_pay=user_info['gross_pay'],
- after_tax_salary=user_info['after_tax_salary']
- )
- else:
- return redirect("/login_interface")
-
- #考勤功能
- @app.route("/attendance_function",methods=['POST'])
- def attendance_function():
- data = request.json
- u_id = session['user']['u_id']
- global user_attendance_day_list
- if u_id not in user_attendance_day_list:
- user_attendance_day_list.append(u_id)
- attendance_value = int(data.get('action'))
- attendance_table = 'attendance_table'+str(u_id)
- global now_date
- public_update_one_sql(attendance_table,'attendance_value','date_time',attendance_value,now_date)
- return jsonify({'success': True, 'message': '今日打卡成功'})
- else:
- return jsonify({'success': True, 'message': '今日已打卡'})
-
- # 财务界面
- @app.route("/finance_department")
- def finance_interface():
- if session['user']['user_post_id'] <= 2:
- return render_template("finance_department.html")
-
- # 报销页面
- @app.route("/reimbursement_interface")
- def reimbursement_interface():
- return render_template("reimbursement.html")
-
- # 报销功能
- @app.route("/reimbursement")
- def reimbursement():
- return 1
-
- # 工资制度
- @app.route("/salary_composition")
- def salary_composition():
- # a=salary_composition_list()
- # print(a)
- return render_template("salary_composition.html",salary_composition_list=salary_composition_list())
-
- #工资制度修改
- @app.route("/salary_composition_change")
- def salary_composition_change():
- return 1
-
- #后勤采购
- @app.route("/logistics_procurement_interface")
- def logistics_procurement_interface():
- return render_template("logistics_procurement.html")
-
- schedule.every().day.at("08:00").do(timing_insert_attendance_table_sql)
- schedule.every().day.at("08:01").do(user_salary_algorithm)
- schedule.every().day.at("08:02").do(user_table_about_attendance_column)
- # 创建定时任务线程
- schedule_thread = threading.Thread(target=run_schedule)
- schedule_thread.daemon = True # 设置为守护线程,当主线程结束时,该线程也会结束
- schedule_thread.start()
-
- #获取系统时间
- now_date = get_current_time()
-
- if __name__ == '__main__':
- app.run(debug=True)
略
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。