赞
踩
约5千万条出租车gps数据存储在mysql中,数据按照一定时间频率记录,想要获取出其中的订单数量。
(数据示例)
plate_number是车牌号,state=1表示车上有人。数据先按照车辆分组,然后按时间排序,最后选出连续的序列state=1作为一个订单
由于数据在mysql中被分成了200+个表,有两种处理方式:
2021.10.30
需求:一天的出租车轨迹表被成了几百个表,需要将这些表合并
思路:读出这个数据库中所有的表名,循环将所有表合并到一个新表中(使用存储过程)
学习过程:
# 两个表之间的连接查询,但是会删除两个表中的重复数据`
select * from table union select * from table2
两个表之间的连接查询,不删除两个表中的重复数据`
select * from table union all select * from table2
comment:这样子写200+个句子不不现实,应该使用循环或者直接获取数据库中的表名
存储过程
DELIMITER $$ USE `taxigps`$$ DROP PROCEDURE IF EXISTS `test11`$$ CREATE DEFINER=`root`@`%` PROCEDURE `test11`() BEGIN DECLARE stopflag INT DEFAULT 0; DECLARE tablename VARCHAR(64); -- 创建一个游标变量,declare 变量名 cursor ... DECLARE tablename_cur CURSOR FOR SELECT table_name FROM information_schema.tables WHERE table_schema='taxigps' AND table_name != 'taxi'; -- 游标是保存查询结果的临时区域 -- 游标变量username_cur保存了查询的临时结果,实际上就是结果集 -- 当游标变量中保存的结果都查询一遍(遍历),到达结尾,将变量stopflag设置为1,用于循环中判断是否结束 DECLARE CONTINUE HANDLER FOR NOT FOUND SET stopflag=1; OPEN tablename_cur; -- 打卡游标 FETCH tablename_cur INTO tablename; -- 游标向前走一步,取出一条记录放到变量username中 # SELECT tablename,000; WHILE(stopflag=0) DO -- 如果游标还没有结尾,就继续 BEGIN -- 在用户名前门拼接 '_cur' 字符串 SET @sql = CONCAT('insert into taxi SELECT * FROM ',tablename); PREPARE ss FROM @sql; EXECUTE ss; #INSERT INTO taxi SELECT * FROM trablename; FETCH tablename_cur INTO tablename; END; END WHILE; -- 结束循环 CLOSE tablename_cur; -- 关闭游标 END$$ DELIMITER ;
""" @author:HY @time:2021/10/31:14:51 """ from threading import Thread import threading from time import sleep, ctime import pandas as pd import time import pickle def solve_file(name, file): for index, row in file.iterrows(): date, taxi_time, _, plate_number, lng, lat, _, _, state, _ = row if plate_number in taxi_dic.keys(): taxi_dic[plate_number][taxi_time] = state else: taxi_dic[plate_number] = {taxi_time: state} df = pd.read_csv('test.csv', header=None, sep='\t') # df = pd.read_csv('20160920_taxigps.csv', header=None, sep='\t') line_num = len(df) taxi_dic = {} time1 = time.time() thread_num = 1 for i in range(thread_num): i = i + 1 exec('divide_file' + str(i) + ' = int(line_num * ' + str(i) + ' / thread_num)') if i == 1: exec('df' + str(i) + ' = df[divide_file1:]') elif i == thread_num: exec('df' + str(i) + ' = df[:divide_file' + str(thread_num) + ']') else: exec('df' + str(i) + ' = df[divide_file' + str(i-1) + ':divide_file' + str(i) + ']') for i in range(thread_num): exec('t' + str(i) + ' = threading.Thread(target=solve_file, args=('+ str(i) + ', df'+ str(i) + '))') exec('t' + str(i) + '.start()') exec('t' + str(i) + '.join()') time2 = time.time() with open('多线程-车辆数据字典.pickle', 'wb') as f: pickle.dump(taxi_dic, f) taxi_dic.clear() time3 = time.time() solve_file('all',df) time4 = time.time() with open('单线程-车辆数据字典.pickle', 'wb') as f: pickle.dump(taxi_dic, f) print(f'多线程聚合耗时:time={time2-time1},单线程聚合耗时:time={time4-time3}')
需求:3.2获得了每个车辆的数据,对于每个车辆这里需要按时间排序,然后生成一个订单
思路:多线程遍历每个车,寻找连续的state=1序列(多线程没单线程快…)
学习过程:
关于锁:一个锁锁一个东西,这个东西释放了,别人才能拿到这个锁
锁的描述
字典按照key排序:
sort_date = sorted(data.items(), key=lambda item: item[0])
""" @author:HY @time:2021/11/1:10:34 """ import pickle import pandas as pd from tqdm import tqdm import threading class Node: def __init__(self, value): self.value = value self.next = None class Queue: """ 队列用于存储每个线程要处理的数据 """ def __init__(self): self.head = None self.tail = None self.length = 0 def pop(self): if self.head is None: return None a = self.head if self.head.next is None: self.head = self.tail = None else: self.head = self.head.next self.length -= 1 return a def push(self, node): if self.head is None: self.head = self.tail = node else: self.tail.next = node self.tail = node self.length += 1 class Request: def __init__(self, start_time, s_lng, s_lat): self.start_time = start_time self.s_lng = s_lng self.s_lat = s_lat self.e_lng = None self.e_lat = None self.end_time = None class MY_Server(threading.Thread): def __init__(self, name, value=Queue()): threading.Thread.__init__(self) self.mName = name self.mEvent = threading.Event() self.data_queue = value self.is_running = False self.recieving = True def run(self): while self.recieving or self.data_queue.length > 0: # 逐个车遍历。当该线程还在接受车辆或者队列还有的数据的时候都不停遍历 data = self.data_queue.pop() # data是一个列表,每个元素是[time, state,lng,lat] while data is None and self.recieving: self.is_running = False data = self.data_queue.pop() if data is None: break self.is_running = True # 排序字典:按时间排序 sort_date = sorted(data.value, key=lambda item: item[0]) vehicle_req_list = self.get_request(sort_date) # 将该车的订单加入列表 lock.acquire() req_list.extend(vehicle_req_list) lock.release() def get_request(self, sort_data): """ sort_data是一个列表,每个元素是一个列表 now, state, lng, lat """ last = [None, 0, None, None] request_list = [] one_request = None for s in sort_data: now, state, lng, lat = s # 此刻状态 last_now, last_state, last_lng, last_lat = last # 之前状态 if last_state == 0: # 之前是没有人的状态,找1建立request if state == 0: continue # 中间的0全部跳过 elif state == 1: # 找到了一个首字母1,建立request并且last记录为当前 one_request = Request(now, lng, lat) last = [now, state, lng, lat] elif last_state == 1: # 有人的状态,找0前面的1 if state == 1: # 不断记录直到最后一个 last = [now, state, lng, lat] elif state == 0: one_request.end_time = last_now one_request.e_lng = last_lng one_request.e_lat = last_lat request_list.append(one_request) return request_list def pause(self): self.mEvent.clear() def resume(self): self.mEvent.set() import time if __name__ == "__main__": time1 = time.time() # 数据获取 # all_dic = {'a': {30: 1, 40: 0, 50: 1}, 'b': {30: 1, 40: 0, 50: 0}} with open('多线程-车辆数据字典.pickle', 'rb') as f: all_dic = pickle.load(f) # 生成一个锁对象 lock = threading.Lock() # 结果集 req_list = [] # 线程数量 thread_num = 1 # 开启线程 # t0 = MY_Server(0) # t0.start() # # t1 = MY_Server(1) # t1.start() # # t2 = MY_Server(2) # t2.start() for i in range(thread_num): # 12个线程 exec('t' + str(i) + ' = MY_Server(i)') exec('t' + str(i) + '.start()') # 遍历数据输入给每个线程 taxi_num = 0 for _, value in all_dic.items(): # value是字典,key为时间,value为别的 turn_to_whom = taxi_num % thread_num n = Node(value) exec('t' + str(turn_to_whom) + '.data_queue.push(n)') # 告诉所有线程数据传送完毕 for i in range(thread_num): # 12个线程 exec('t' + str(i) + '.recieving = False') # join() for i in range(thread_num): # 12个线程 exec('t' + str(i) + '.join()') time2 = time.time() print(len(req_list)) print('耗时', time2 - time1) # 保存订单数据 with open('shenzhen_req.pickle', 'wb') as f: pickle.dump(req_list, f)
最后发现不如单线程跑得快,因此最终程序为
""" @author:HY @time:2021/11/1:16:51 """ import pandas as pd import time import pickle from tqdm import tqdm import datetime class Request: def __init__(self, start_time, s_lng, s_lat): self.start_time = start_time self.s_lng = s_lng self.s_lat = s_lat self.e_lng = None self.e_lat = None self.end_time = None def solve_file(name, file): taxi_dic = {} for index, row in tqdm(file.iterrows(), desc='处理成逐个车辆'): date, taxi_time, _, plate_number, lng, lat, _, _, state, _ = row if plate_number in taxi_dic.keys(): taxi_dic[plate_number].append([taxi_time, state, lng, lat]) else: taxi_dic[plate_number] = [[taxi_time, state, lng, lat],] print('车辆数目', len(taxi_dic.keys())) with open('数据文件/单线程-车辆数据字典.pickle', 'wb') as f: pickle.dump(taxi_dic, f) return taxi_dic def sum_request(taxi_dic): req_list = [] for _, value in tqdm(taxi_dic.items(), desc='处理每个车辆'): sort_date = sorted(value, key=lambda item: item[0]) vehicle_req_list = get_request(sort_date) req_list.extend(vehicle_req_list) # 保存订单数据 with open('数据文件/shenzhen_req.pickle', 'wb') as f: pickle.dump(req_list, f) def get_request(sort_data): """ sort_data是一个列表,每个元素是一个列表 now, state, lng, lat """ last = [None, 0, None, None] request_list = [] one_request = None for s in sort_data: now, state, lng, lat = s # 此刻状态 last_now, last_state, last_lng, last_lat = last # 之前状态 if last_state == 0: # 之前是没有人的状态,找1建立request if state == 0: continue # 中间的0全部跳过 elif state == 1: # 找到了一个首字母1,建立request并且last记录为当前 one_request = Request(now, lng, lat) last = [now, state, lng, lat] elif last_state == 1: # 有人的状态,找0前面的1 if state == 0: one_request.end_time = last_now one_request.e_lng = last_lng one_request.e_lat = last_lat request_list.append(one_request) last = [now, state, lng, lat] last_now, last_state, last_lng, last_lat = last # 之前状态 if last_state == 1: one_request.end_time = last_now one_request.e_lng = last_lng one_request.e_lat = last_lat request_list.append(one_request) return request_list def get_file(): """ 处理原文件获得订单 :return: """ time1 = time.time() df = pd.read_csv('20160920_taxigps.csv', header=None, sep='\t') time2 = time.time() taxi_dic = solve_file('all', df) time3 = time.time() sum_request(taxi_dic) time4 = time.time() print(f'读数据时间{time2-time1},处理成单车辆时间{time3-time2},统计订单时间{time4-time3}')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。