赞
踩
随机森林是由多个决策树构成的森林,算法分类结果由决策树的投票结果得到,其属于集成学习中的bagging方法。算法的主要原理如下:
算法流程如下:
弹性分布式缓存(ResilientDistributed Dataset,RDD),是Spark最核心的抽象概念,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD可以被加载内存中,每次对RDD数据集的操作之后的结果直接存放在内存中,当后续需要被调用时直接内存输入,省去了Map Reduce大量的磁盘IO操作。
RDD的操作分为两类:Transaction与Action。其中Transaction是惰性执行的,惰性执行表示真正需要时才被执行,这里是在需要具体的Action去触发才会开始执行,每个Action的触发都会提交一个Job。如图2.1所示,首先通过textfile操作从外部存储HDFS中读取文件,构建两个RDD实例A和实例C,然后A做flatMap和Map转换操作,对C做Map型操作和reduceByKey转换操作,最后对得到的B和E两个做联合操作,并通过saveAsSequenceFlie操作将最终的F实例持久化到外部存储系统HDFS上。
图2.1 RDD操作流程示意图
由此可见,Spark在内存中分布式的存储和迭代并行处理数据,最终将数据整合并保存起来。RDD的依赖分为窄依赖和宽依赖,窄依赖指父RDD的每个分区都只被子RDD的一个分区所使用,例如map、filter;宽依赖指父RDD的分区被多个子RDD的分区所依赖,例如groupByKey,reduceByKey等操作。如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。 这种划分有两个用处。首先,窄依赖支持在一个结点上管道化执行。例如基于一对一的关系,可以在 filter之后执行 map。其次,窄依赖支持更高效的故障还原。因为对于窄依赖,只有丢失的父 RDD的分区需要重新计算。而对于宽依赖,一个结点的故障可能导致来自所有父RDD的分区丢失,因此就需要完全重新执行。因此对于宽依赖,Spark会在持有各个父分区的结点上,将中间数据持久化来简化故障还原,就像 MapReduce会持久化 map 的输出一样。
图2.2 RDD的窄依赖和宽依赖
如图2.2所示,其中每一个方框表示一个RDD,其内部阴影表示RDD的分区。对于窄依赖,可以进行pipeline操作,即允许在单个集群节点流水线式地执行,这个节点可以计算所有父级分区。对RDD依赖可以从下面两个方面理解:
(1). 依赖本身是描述两个RDD之间的关系,但一个RDD可以与多个RDD有有依赖关系;
(2). 宽依赖和窄依赖的判断:在RDD的各个分区中对父RDD的分区的依赖关系。
Spark将数据在分布式环境下分区,然后将作业转化为DAG,并分阶段进行DAG的调用和任务的分布式并行处理。
图2.3 DAG典型示意图
1) 数据集简介
本数据集采用Kaggle上欧洲信用卡使用者的消费数据creditcard.csv,共有284807条样本数据,其中492条消费记录属于信用卡诈骗,其余均为正常交易样本数据,数据属性特征序列:time,v1,v2,…,v28,amount和类别标签class,1表示欺骗样本,0表示正常,在实际情况中,由于诈骗的样本案例较少,实际正常的样本过多所以该样本存在严重的不平衡现象,采用一般的诸如:SVM,朴素贝叶斯等分类算法会造成严重过拟合导致预测结果偏向于正常样本,而随机森林是采用决策树融合的方式,其两大机制样本随机抽样和属性随机抽样能够很好的针对不平衡样本进行分类决策,同时减少过拟合。
运行环境配置如下:
表3.2 运行环境配置
应用 | 版本 |
Windows | 10 |
Spark | 2.3.0 |
Python | 3.5 |
Jupyter notebook | 4.3.0 |
pyspark | / |
Java | 1.8 |
Hadoop | 2.7.6 |
2) 算法实现:
算法实现步骤:
3) 算法输出执行效果:
图3.1算法执行总结
表3.2 模型评估
模型精度 | 99.967% |
训练时间 | 8.978s |
召回率 | 83% |
ROC | 95.073 |
附录代码:
- from pyspark.context import SparkContext
-
- from pyspark.sql.session import SparkSession
-
- #通用配置
-
- CSV_PATH = "creditcard.csv"
-
- APP_NAME = "Random Forest Example"
-
- SPARK_URL = "local[*]"
-
- RANDOM_SEED = 13579
-
- TRAINING_DATA_RATIO = 0.7
-
- RF_NUM_TREES = 3
-
- RF_MAX_DEPTH = 4
-
- RF_NUM_BINS = 32
-
-
- #数据集加载
-
- spark = SparkSession.builder \
-
- .appName(APP_NAME) \
-
- .master(SPARK_URL) \
-
- .getOrCreate()
-
-
- df = spark.read \
-
- .options(header = "true", inferschema = "true") \
-
- .csv(CSV_PATH)
-
-
- print("Total number of rows: %d" %df.count())
-
-
- #将DataFrame数据类型转换成RDD的DataFrame
-
- from pyspark.mllib.linalg import Vectors
-
- from pyspark.mllib.regression importLabeledPoint
-
-
- transformed_df = df.rdd.map(lambdarow:LabeledPoint(row[-1],Vectors.dense(row[0:-1])))
-
- splits =[TRAINING_DATA_RATIO,1.0-TRAINING_DATA_RATIO]
-
- training_data,test_data =transformed_df.randomSplit(splits,RANDOM_SEED)
-
-
- print("Number of training setrow:%d"%training_data.count())
-
- print("Number oftest set rows:%d"%test_data.count())
-
-
- #随机森林算法模型训练
-
- from pyspark.mllib.tree import RandomForest
-
- from time import *
-
- start_time = time()
-
- model =RandomForest.trainClassifier(training_data,numClasses=2,categoricalFeaturesInfo={},\
-
- numTrees=RF_NUM_TREES,featureSubsetStrategy='auto',impurity="gini",\
-
- maxDepth=RF_MAX_DEPTH,maxBins=RF_NUM_BINS,seed=RANDOM_SEED)
-
-
- end_time = time()
-
- elapsed_time = end_time - start_time
-
- print("Time to train model:%.3fseconds"% elapsed_time)
-
-
- #预测和计算准确度
-
- predictions =model.predict(test_data.map(lambda x:x.features))
-
- labels_and_predictions = test_data.map(lambdax:x.label).zip(predictions)
-
- acc = labels_and_predictions.filter(lambda x:x[0] == x[1]).count() / float(test_data.count())
-
- print("Modelaccuracy:%.3f%%"%(acc*100))
-
-
- #模型评估
-
- from pyspark.mllib.evaluation import BinaryClassificationMetrics
-
-
- strart_time = time()
-
- metrics =BinaryClassificationMetrics(labels_and_predictions)
-
- print("Area under Predicton/Recall(PR)curve:%.f"%(metrics.areaUnderPR*100))
-
- print("Area under Receiver OperatingCharacteristic(ROC) curve:%.3f"%(metrics.areaUnderROC*100))
-
-
- end_time = time()
-
- elapsed_time = end_time - start_time
-
- print("Time to evaluate model:%.3fseconds"%elapsed_time)
-

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。