赞
踩
设X是连续随机变量,X服从逻辑斯谛分布是指X具有下列分布函数和密度函数:
F
(
x
)
=
P
(
X
⩽
x
)
=
1
1
+
e
(
x
−
u
)
/
γ
f
(
x
)
=
F
′
(
x
)
=
e
−
(
x
−
u
)
/
γ
γ
(
1
+
e
(
x
−
u
)
/
γ
)
2
其
中
,
u
为
位
置
参
数
,
γ
为
形
状
参
数
F(x) = P(X \leqslant x) = \frac{1}{1+e^{(x-u)/\gamma}} \\ f(x) = F^{'}(x) = \frac{e^{-(x-u)/\gamma}}{\gamma(1+e^{(x-u)/\gamma})^2} \\ 其中,u为位置参数,\gamma 为形状参数
F(x)=P(X⩽x)=1+e(x−u)/γ1f(x)=F′(x)=γ(1+e(x−u)/γ)2e−(x−u)/γ其中,u为位置参数,γ为形状参数
Logistics Regression 是目前使用最广泛的一种学习算法,主要用于二分类,也可以用于多分类,在分类问题中,我们要预测的变量是离散的,以二分类为例,我们输出的结果要么是0,要么是1,所以我们希望找到一个满足某个性质的假设函数,使他的输出值在0和1之间。如果考虑一般的线性模型,其输出的预测值可能超出[0,1]这个范围,因此把线性模型的结果带入一个非线性变换的函数中,使得其预测结果在[0,1]之间,这个函数就是Sigmoid函数,Sigmoid函数服从逻辑斯谛分布。它输出的结果也不再是预测结果,而是一个值预测为正例的概率。
g
(
z
)
=
1
1
+
e
−
z
g(z) = \frac{1}{1+e^{-z}} \\
g(z)=1+e−z1
h
θ
(
x
)
=
g
(
θ
T
⋅
x
)
h_\theta(x) = g(\theta^T·x)
hθ(x)=g(θT⋅x)
在逻辑回归中,我们预测:
当 h θ ( x ) ≥ 0.5 时 , 预 测 y = 1 ; 当 h θ ( x ) < 0.5 时 , 预 测 y = 0 ; 当 h_\theta(x) \geq 0.5时 ,预测y = 1 ;当h_\theta(x) <0.5时,预测y=0; 当hθ(x)≥0.5时,预测y=1;当hθ(x)<0.5时,预测y=0;
即 当 θ T x ≥ 0 时 , 预 测 y = 1 ; 当 θ T x < 0 , 预 测 y = 0 ; 即当\theta^Tx \geq 0时,预测y = 1 ;当\theta^Tx < 0,预测y=0; 即当θTx≥0时,预测y=1;当θTx<0,预测y=0;
J
(
θ
)
=
1
m
∑
i
=
1
m
C
o
s
t
(
h
θ
(
x
i
)
,
y
(
i
)
)
J(\theta) = \frac{1}{m} \sum_{i=1}^m Cost(h_\theta(x^{i}),y^{(i)}) \\
J(θ)=m1i=1∑mCost(hθ(xi),y(i))
C
o
s
t
(
h
θ
(
x
(
i
)
)
,
y
(
i
)
)
=
u
(
x
)
=
{
−
l
o
g
(
h
θ
(
x
)
)
if
y
=
1
−
l
o
g
(
1
−
h
θ
(
x
)
)
if
y
=
0
Cost(h_\theta(x^{(i)}),y^{(i)}) = u(x) =
h θ ( x ) 与 C o s t ( h θ ( x ) , y ) 之 间 的 关 系 如 下 图 所 示 : h_\theta(x) 与 Cost(h_\theta(x),y )之间的关系如下图所示: hθ(x)与Cost(hθ(x),y)之间的关系如下图所示:
因 为 y 取 值 为 0 或 1 , 所 以 C o s t ( h θ ( x ( i ) ) , y ( i ) ) = − y l o g ( h θ ( x ) ) − ( 1 − y ) l o g ( 1 − h θ ( x ) ) 因为y取值为0或1 ,所以Cost(h_\theta(x^{(i)}),y^{(i)}) = -ylog(h_\theta(x)) - (1-y)log(1-h_\theta(x)) 因为y取值为0或1,所以Cost(hθ(x(i)),y(i))=−ylog(hθ(x))−(1−y)log(1−hθ(x))
带入代价函数即可得到:
J
(
θ
)
=
−
1
m
∑
i
=
1
m
[
y
(
i
)
l
o
g
(
h
θ
(
x
(
i
)
)
)
+
(
1
−
y
(
i
)
)
l
o
g
(
1
−
h
θ
(
x
(
i
)
)
)
]
J(\theta) = -\frac{1}{m}\sum_{i=1}^m [y^{(i)} log(h_\theta(x^{(i)})) + (1-y^{(i)})log(1-h_\theta(x^{(i)}))]
J(θ)=−m1i=1∑m[y(i)log(hθ(x(i)))+(1−y(i))log(1−hθ(x(i)))]
这样我们就可以使用梯度下降法来求得代价函数最小的参数了。
package CH6_LogisticsRegression import org.apache.spark.sql.functions.{col, mean, udf} import org.apache.spark.ml.feature.{ IndexToString, StringIndexer, StringIndexerModel, VectorAssembler } import org.apache.spark.sql.{DataFrame, SparkSession} import breeze.linalg.{DenseVector => densevector} import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.stat.Summarizer.{mean => summaryMean} import org.apache.spark.ml.util.Identifiable import scala.beans.BeanProperty /** * Created by WZZC on 2019/12/9 **/ case class LRModel(data: DataFrame) { private val spark: SparkSession = data.sparkSession import spark.implicits._ @BeanProperty var itr: Int = 40 //迭代次数 @BeanProperty var lrate: Double = 0.05 //学习率 @BeanProperty var error: Double = 1e-3 // 初始化差值 @BeanProperty var fts: Array[String] = _ @BeanProperty var labelColName: String = _ var w: densevector[Double] = _ private val ftsName: String = Identifiable.randomUID("LRModel") private val indexedLabel: String = Identifiable.randomUID("indexedLabel") private val stringIndexer: StringIndexerModel = new StringIndexer() .setInputCol(labelColName) .setOutputCol(indexedLabel) .fit(data) def dataTransForm(df: DataFrame) = { new VectorAssembler() .setInputCols(fts) .setOutputCol(ftsName) .transform(data) } // sigmoid function def sigmoid(x: Double) = 1 / (1 + math.exp(-x)) def sigmoidUdf(initW: densevector[Double]) = udf((ftsVal: Vector) => { val d = initW.dot(densevector(ftsVal.toArray)) sigmoid(d) }) // 计算损失函数 def lossUdf = udf((sigmoid: Double, y: Double) => y * sigmoid + (1 - y) * (1 - sigmoid)) // 计算梯度下降 def gradientDescentUdf = udf((ftsVal: Vector, y: Double, sigmoid: Double) => { val gd: Array[Double] = ftsVal.toArray.map(_ * (sigmoid - y)) Vectors.dense(gd) }) // 预测 def predictUdf(w: densevector[Double]) = udf((ftsVal: Vector) => { val d: Double = w.dot(densevector(ftsVal.toArray)) if (d >= 0) 1.0 else 0.0 }) private def fitModel = { var currentLoss: Double = Double.MaxValue //当前损失函数最小值 var change: Double = error + 0.1 // 梯度下降前后的损失函数的差值 var i = 0 // 迭代次数 var initW: densevector[Double] = densevector.rand[Double](fts.length) while (change > error & i < itr) { //创建一个初始化的随机向量作为初始权值向量 val vecDf: DataFrame = dataTransForm(this.data) val sigmoidDf = stringIndexer .transform(vecDf) .select(ftsName, indexedLabel) .withColumn("sigmoid", sigmoidUdf(initW)(col(ftsName))) .cache() val loss = sigmoidDf .select(lossUdf($"sigmoid", col(indexedLabel)) as "loss") .agg(mean($"loss")) .head .getDouble(0) change = math.abs(currentLoss - loss) currentLoss = loss val gdVector: Vector = sigmoidDf .select( gradientDescentUdf(col(ftsName), col(indexedLabel), $"sigmoid") as "gd" ) .agg(summaryMean($"gd") as "gd") .head .getAs[Vector]("gd") initW -= densevector(gdVector.toArray.map(_ * lrate)) sigmoidDf.unpersist() i += 1 } (initW, currentLoss) } def fit = { w = fitModel._1 } def predict(df: DataFrame): DataFrame = { val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predictedLabel") .setLabels(stringIndexer.labels) val vecDf: DataFrame = dataTransForm(df) val preDf = vecDf.withColumn("prediction", predictUdf(w)(col(ftsName))) labelConverter .transform(preDf) .drop(ftsName, "prediction") } }
import org.apache.spark.sql.SparkSession /** * Created by WZZC on 2019/12/9 **/ object lrRunner { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(s"${this.getClass.getSimpleName}") .master("local[*]") .getOrCreate() val iris = spark.read .option("header", true) .option("inferSchema", true) .csv("F:\\DataSource\\iris2.csv") val model: LRModel = LRModel(iris) model.setLabelColName("class") model.setFts(iris.columns.filterNot(_ == "class")) model.fit model.predict(iris).show() spark.stop() } }
参考资料:
《统计学习方法》
https://study.163.com/course/courseMain.htm?courseId=1004570029
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。