当前位置:   article > 正文

python 版-MapReduce综合应用案例 — 电信数据清洗

python 版-mapreduce综合应用案例 — 电信数据清洗

评测之前先在命令行启动hadoop:start-all.sh;

dbhelper.py

# dbhelper.py
 
import pymysql
import sys
import codecs
 
class DBHelper:
 
    def get_connection():
        # 根据题目提供的凭据建立到mysql服务器的连接"conn",注意字符集指定为"utf8mb4"
        ########  Begin   ############
       
        conn = pymysql.connect(host='localhost',port=3306,\
                        user='root',passwd='123123',\
                        charset='utf8mb4',db='mydb')
       
        ########  End    ############    
        return conn
 
    @classmethod
    def get_region(cls):
        conn = cls.get_connection()
        regions = dict()
        with conn.cursor() as cur:
            #从数据库中查询所有的省市代码和省市名称,并保存到字典regions中。
            ############  Begin ###################
            
            cur.execute("select CodeNum,Address from allregion")
 
            for s in cur.fetchall():
                regions[s[0]] = s[1]           
            
            ############  End    #################
        conn.close()
        return regions
 
    @classmethod
    def get_userphones(cls):
        conn = cls.get_connection()
        userphones = dict()
        with conn.cursor() as cur:
        #从数据库中查询所有的电话号码和对应的姓名,并保存到字典userphones中。
        ############  Begin ###################
         
            cur.execute("select phone,trueName from userphone")
 
            for t in cur.fetchall():
                userphones[t[0]] = t[1] 
 
        ############  End    #################
        conn.close()
        return userphones
 
def main():
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    region = DBHelper.get_region()
    users = DBHelper.get_userphones() 
 
if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

mapper.py

#! /usr/bin/python3
#
# mapper.py
import sys
from dbhelper import DBHelper
import codecs
import time
 
# 获取“省市代码:省市名称”项并保存在字典regions中;
# 获取“电话号码:姓名”项并保存在字典userphones中。
regions = DBHelper.get_region()
userphones = DBHelper.get_userphones()
 
def main():
    # 正确输出utf-8编码的汉字
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    for line in sys.stdin:
        line = line.strip()
        mapper(line)
 
def mapper(line):
    # 输出形如“邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市”的字符串
    # 本题不需要reduce阶段,输出题目要求的内容即可,不需要使用“键\t值”的形式。
    ##########  begin      ##############
    
    items = line.split(',')
 
    caller = userphones.get(items[0])
    reciever = userphones.get(items[1])
    begin_time = int(items[2])
    end_time = int(items[3])
    caller_address = regions.get(items[4])
    reciever_address = regions.get(items[5])
 
    print(caller,reciever,sep=',',end=',')
    print(','.join(items[:2]),end=',')
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(begin_time)),end=',')
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(end_time)),end=',')
    print(str(end_time - begin_time),end=',')
    print(caller_address,reciever_address,sep=',')
        
    ###########  End  #################
 
if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号