赞
踩
上一节我们已经讲过什么是ID Mapping 了,顾名思义我们知道ID Mapping 的操作对象是ID,目标或者是动作是Mapping,也就是说我们要做的事情其实就是想把不同平台不同设备上的ID 打通,从而更好的去刻画用户,也就是说我们希望能打通用户各个维度的数据,从而更好的去服务业务服务用户。
上一节我们讲了常见的ID,例如登陆ID、设备ID 还有例如身份证号、手机号码等一些列的ID,都可以去标示一个用户,接下来我们就看一下如何去做ID Mapping,关于理论我们不多说。
这里有一点要说一下ID Mapping 做完之后我们就会做One ID ,也就是说我们为这些Mapping 上的ID 生成一个新的ID,这个ID 就是One ID,也就是说当我们的One ID 生成之后我们就可以使用这个ID 来打通所有的业务系统,关于One ID 可以参考下一篇文章。
图计算(Graph Computing)在金融行业、互联网行业、社交领域、内容推荐领域都有着举足轻重的地位,更多应用可以参考,所以如果感兴趣你可以查阅资料,下面我们就看一下如何使用图计算实现我们的ID Mapping,并且这个图计算 体系也非常大,可以说是一个专门的学科,也不是一句两句说的清楚,大致可以分为下面四个方向。
在整个图论里学习里,我们知道有很多经典的应用,例如最短路径规划、PageRank、连通性 等,而我们的ID Mapping使用的就是连通性,下面我们简单介绍一下这个概念
在图论中,连通图基于连通的概念。在一个无向图 G 中,若从顶点i到顶点j有路径相连(当然从j到i也一定有路径),则称i和j是连通的。如果 G 是有向图,那么连接i和j的路径中所有的边都必须同向。如果图中任意两点都是连通的,那么图被称作连通图。如果此图是有向图,则称为强连通图(注意:有向图需要双向都有路径),图的连通性是图的基本性质。
连通分量:无向图 G的一个极大连通子图称为 G的一个连通分量(或连通分支)。连通图只有一个连通分量,即其自身;非连通的无向图有多个连通分量。到这里就可以解决我们的问题了,我们的一个图中有多个连通图,也就是多个连通分量,而这里的多个连通分量就是一个用户信息集合,其中里面的一个个顶点就是用户不同的ID 。
到这里我们的理论阶段就结束了,核心就是连通图,也就是说我们的目标就是利用图计算工具来计算图的连通性,来找到各种id标识之间的关联关系,从而识别出哪些id标识属于同一个人, ID Mapping的最后目标,就是形成一个id映射字典:
ID guid
idx01 -> gid01
idy01 -> gid01
idz01 -> gid01
idx02 -> gid01
这里的 guid 就是连通图的一个标示, 我们可以先不考虑,我们只需要知道我们的这里的意思是 同一个guid 也就是同一连通图里的ID 是同一个用户的,这里就是idx01 idy01 idz01 idx02
或者是这样的一个字典
guid ids
gid01 idx01,idy01,idz01,idx02
这里我们使用Spark 的GraphX 组件实现,关于这个组件的使用可以参考Spark—GraphX编程指南 ,下面是我们的测试数据,每一个JSON 就是一个用户的数据,下面我们就需要将用户的数据打通。
{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","imsi":"imsi_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_002"}}
{"name":"zs","uid":"u_001","phone":{"mac":"mac_zs_002","imsi":"imsi_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_001","mac":"mac_ls_001","imsi":"imsi_ls_001","androidId":"androidId_ls_001","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_002","mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_002","uuid":"uuid_ls_002"}}
下面就是代码实现
def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("id-mapping") .master("local[1]") .getOrCreate() import spark.implicits._ // 读取数据 val rawData = spark.read.textFile("data/graphx/idmapping/idmapping.txt") val data: RDD[Array[String]] = rawData.rdd.map(line => { //将每行数据解析成json对象 val jsonObj = JSON.parseObject(line) // 从json对象中取user对象 // val userObj = jsonObj.getJSONObject("user") val uid = jsonObj.getString("uid") // 从user对象中取phone对象,也就是我们用户的各种标示 也就是ID val phoneObj = jsonObj.getJSONObject("phone") val imei = phoneObj.getOrDefault("imei", "").toString val mac = phoneObj.getOrDefault("mac", "").toString val imsi = phoneObj.getOrDefault("imsi", "").toString val androidId = phoneObj.getOrDefault("androidId", "").toString val deviceId = phoneObj.getOrDefault("deviceId", "").toString val uuid = phoneObj.getOrDefault("uuid", "").toString Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_)) } ) // 构建顶点集 val vertices: RDD[(Long, String)] = data.flatMap(arr => { // 这个Graph的数据格式要求,其实hashCode 存在重复的问题,后面我们会在One ID 中解决这个问题 for (id <- arr) yield (id.hashCode.toLong, id) }).distinct // 构建边集 这里使用了双重for 循环,是为了将一个用户的不同ID 连接起来 val edges: RDD[Edge[String]] = data.flatMap(arr => { for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "") }) .map(edge => (edge, 1)).reduceByKey(_ + _) .map(x => x._1) //用 点集合 和 边集合 构造一张图 使用Graph算法 val graph = Graph(vertices, edges) //并调用最大连同子图算法VertexRDD[VertexId] ==>rdd 里面装的元组(Long值,组中最小值) val res: VertexRDD[VertexId] = graph.connectedComponents().vertices val firstIds = res.toDF("id_hashcode", "guid_hashcode") firstIds.show(10000,false) }
我们看一下运行的结果
+-----------+-----------+ |id_hashcode|guid_hashcode| +-----------+-----------+ |110929768 |-1782473361| |2140645736 |2140645736 | |-1884715312|-1884715312| |-1381665247|-1381665247| |1985563774 |1985563774 | |-1483907198|-1483907198| |-1908595409|-1908595409| |-1419274017|-1782473361| |-1115460503|-1782473361| |-1419274018|-1419274018| |-1018465903|-1908595409| |-1483907197|-1483907197| |-1782473362|-1782473362| |1985563773 |-1782473361| |-1782473361|-1782473361| |-714652389 |-714652389 | |2140645735 |-1782473361| |-714652388 |-1908595409| |110929767 |-1908595409| |-1908595408|-1908595408| |-1381665248|-1908595409| |-1753513447|-1908595409| |-1018465904|-1018465904| |-1884715311|-1884715311| +-----------+-----------+
我们看到这个结果的可读性不高,这是因为这个关系是ID 的 hashcode 的,所以我们需要将根据hashcode 将节点的ID 找出来。
//并调用最大连同子图算法VertexRDD[VertexId] ==>rdd 里面装的元组(Long值,组中最小值) val res: VertexRDD[VertexId] = graph.connectedComponents().vertices val firstIds = res.toDF("id_hashcode", "guid_hashcode") val verticesDF=vertices.toDF("id_hashcode", "id") firstIds.createOrReplaceTempView("ids") verticesDF.createOrReplaceTempView("vertices") // sql 实现 spark.sql( """ |select | b.id,c.id as guid |from | ids a |inner join | vertices b |on | a.id_hashcode=b.id_hashcode |inner join | vertices c |on | a.guid_hashcode=c.id_hashcode |""".stripMargin) .show()
这里我们使用Spark SQL 的语法,当然你也可以使用DataFram api 或者 RDD api,下面我们看一下结果
+----------------+-----------+ | id| guid| +----------------+-----------+ | deviceId_ls_001|imei_ls_001| |androidId_ls_001|imei_ls_001| | u_002|imei_ls_001| | uuid_ls_001|imei_ls_001| | imei_ls_001|imei_ls_001| | mac_ls_001|imei_ls_001| | deviceId_ls_002|imei_ls_001| |androidId_ls_002|imei_ls_001| | uuid_ls_002|imei_ls_001| | imsi_ls_001|imei_ls_001| | imsi_ls_002|imei_ls_001| | imei_ls_002|imei_ls_001| | imsi_zs_002|uuid_zs_001| |androidId_zs_001|uuid_zs_001| | imsi_zs_001|uuid_zs_001| | mac_zs_002|uuid_zs_001| | imei_zs_001|uuid_zs_001| | uuid_zs_002|uuid_zs_001| | imei_zs_002|uuid_zs_001| | u_001|uuid_zs_001| +----------------+-----------+
这个结果就和我们上面说的字典的形式一样了,下面我们看一下另外一种字典的形式
val mapping= spark.sql( """ |select | b.id,c.id as guid |from | ids a |inner join | vertices b |on | a.id_hashcode=b.id_hashcode |inner join | vertices c |on | a.guid_hashcode=c.id_hashcode |""".stripMargin) mapping.createOrReplaceTempView("mapping") spark.sql( """ |select | guid,collect_list(id) as ids |from | mapping |group by | guid |""".stripMargin) .show(false)
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|guid |ids |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|imei_ls_001|[deviceId_ls_001, androidId_ls_001, u_002, uuid_ls_001, imei_ls_001, mac_ls_001, deviceId_ls_002, androidId_ls_002, uuid_ls_002, imsi_ls_001, imsi_ls_002, imei_ls_002]|
|uuid_zs_001|[imsi_zs_002, androidId_zs_001, imsi_zs_001, mac_zs_002, imei_zs_001, uuid_zs_002, imei_zs_002, u_001, uuid_zs_001, mac_zs_001, deviceId_zs_001, androidId_zs_002] |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
假如一个人偶尔用他朋友的手机登录过一次,则会出现他的uid跟他朋友的各设备id之间产生关联,也就会出现我们会把这两个用户判断成一个人,解决方案就是去掉弱关联,保留强关联,我们在构造edge 的时候可以根据阈值进行过滤
val edges: RDD[Edge[String]] = data.flatMap(arr => {
for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
})
.map(edge => (edge, 1)).reduceByKey(_ + _)
.filter((edge,cnt)=>cnt>2)
.map(x => x._1)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。