当前位置:   article > 正文

Spark ML特征提取、转换和选择操作详解_import org.apache.spark.ml.feature.{hashingtf, idf

import org.apache.spark.ml.feature.{hashingtf, idf, tokenizer} import org.ap

一、特征的提取

1、TF-IDF(词频-逆向文档频率)

        TF(词频):HashingTF与CountVectorizer用于生成词频TF向量。HashingTF是一个特征词集的转换器(Transformer),它可以将这些集合转换成固定长度的特征向量。HashingTF利用hashingtrick,原始特征通过应用哈希函数映射到索引中。然后根据映射的索引计算词频。这种方法避免了计算全局特征词对索引映射的需要,这对于大型语料库来说可能是昂贵的,但是它具有潜在的哈希冲突,其中不同的原始特征可以在散列之后变成相同的特征词。为了减少碰撞的机会,我们可以增加目标特征维度,即哈希表的桶数。由于使用简单的模数将散列函数转换为列索引,建议使用两个幂作为特征维,否则不会将特征均匀地映射到列。默认功能维度为2^18=262144。可选的二进制切换参数控制词频计数。当设置为true时,所有非零频率计数设置为1。这对于模拟二进制而不是整数的离散概率模型尤其有用。

        IDF(逆向文档频率):IDF是一个适合数据集并生成IDFModel的评估器,IDFModel获取特征向量并缩放每列。直观地说,它下调了再语料库中频繁出现的列。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
  3. import org.apache.spark.sql.SparkSession
  4. object TFIDF {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("TFIDF")
  8. .master("local[*]")
  9. .getOrCreate()
  10. //通过代码的方式,设置Spark log4j的级别
  11. spark.sparkContext.setLogLevel("WARN")
  12. val sentenceData = spark.createDataFrame(Seq(
  13. (0.0, "Hi I heard about Spark"),
  14. (0.0, "I wish Java could use case classes"),
  15. (1.0, "Logistic regression models are neat")
  16. )).toDF("label", "sentence")
  17. val tokenizer = new Tokenizer()
  18. .setInputCol("sentence")
  19. .setOutputCol("words")
  20. val wordData = tokenizer.transform(sentenceData)
  21. wordData.show()
  22. val hashingTF = new HashingTF()
  23. .setInputCol("words")
  24. .setOutputCol("rawFeatures")
  25. .setNumFeatures(20)
  26. val featurizedData = hashingTF.transform(wordData)
  27. featurizedData.show()
  28. val idf = new IDF()
  29. .setInputCol("rawFeatures")
  30. .setOutputCol("features")
  31. val idfModel = idf.fit(featurizedData)
  32. val rescaledData = idfModel.transform(featurizedData)
  33. rescaledData.show()
  34. }
  35. }

运行结果如下:

  1. +-----+--------------------+--------------------+
  2. |label| sentence| words|
  3. +-----+--------------------+--------------------+
  4. | 0.0|Hi I heard about ...|[hi, i, heard, ab...|
  5. | 0.0|I wish Java could...|[i, wish, java, c...|
  6. | 1.0|Logistic regressi...|[logistic, regres...|
  7. +-----+--------------------+--------------------+
  8. +-----+--------------------+--------------------+--------------------+
  9. |label| sentence| words| rawFeatures|
  10. +-----+--------------------+--------------------+--------------------+
  11. | 0.0|Hi I heard about ...|[hi, i, heard, ab...|(20,[0,5,9,17],[1...|
  12. | 0.0|I wish Java could...|[i, wish, java, c...|(20,[2,7,9,13,15]...|
  13. | 1.0|Logistic regressi...|[logistic, regres...|(20,[4,6,13,15,18...|
  14. +-----+--------------------+--------------------+--------------------+
  15. +-----+--------------------+--------------------+--------------------+--------------------+
  16. |label| sentence| words| rawFeatures| features|
  17. +-----+--------------------+--------------------+--------------------+--------------------+
  18. | 0.0|Hi I heard about ...|[hi, i, heard, ab...|(20,[0,5,9,17],[1...|(20,[0,5,9,17],[0...|
  19. | 0.0|I wish Java could...|[i, wish, java, c...|(20,[2,7,9,13,15]...|(20,[2,7,9,13,15]...|
  20. | 1.0|Logistic regressi...|[logistic, regres...|(20,[4,6,13,15,18...|(20,[4,6,13,15,18...|
  21. +-----+--------------------+--------------------+--------------------+--------------------+

2、Word2Vec

        Word2Vec是一个评估器,它采用表示文档的单词序列,并训练一个Word2VecModel。该模型将每个单词映射到一个唯一的固定的大小向量。Word2VecModel使用文档中所有单词的平均值将每个文档转换为向量,该向量然后可用作预测,文档相似性计算等功能。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.Word2Vec
  3. import org.apache.spark.sql.{Row, SparkSession}
  4. import org.apache.spark.ml.linalg.Vector
  5. object Word2vec {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("Word2vec")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val documentDF = spark.createDataFrame(Seq(
  13. "Hi I heard about Spark".split(" "),
  14. "I wish Java could use case classes".split(" "),
  15. "Logistic regression models are neat".split(" ")
  16. ).map(Tuple1.apply)).toDF("text")
  17. val word2vec = new Word2Vec()
  18. .setInputCol("text")
  19. .setOutputCol("result")
  20. .setVectorSize(3)
  21. .setMinCount(0)
  22. val model = word2vec.fit(documentDF)
  23. val result = model.transform(documentDF)
  24. result.show(false)
  25. result.collect().foreach{
  26. case Row(text:Seq[_], features:Vector) =>
  27. println(s"Text: [${text.mkString(",")}] => \nVector: $features\n")
  28. }
  29. }
  30. }

运行结果如下:

  1. +------------------------------------------+----------------------------------------------------------------+
  2. |text |result |
  3. +------------------------------------------+----------------------------------------------------------------+
  4. |[Hi, I, heard, about, Spark] |[-0.008142343163490296,0.02051363289356232,0.03255096450448036] |
  5. |[I, wish, Java, could, use, case, classes]|[0.043090314205203734,0.035048123182994974,0.023512658663094044]|
  6. |[Logistic, regression, models, are, neat] |[0.038572299480438235,-0.03250147425569594,-0.01552378609776497]|
  7. +------------------------------------------+----------------------------------------------------------------+
  8. Text: [Hi,I,heard,about,Spark] =>
  9. Vector: [-0.008142343163490296,0.02051363289356232,0.03255096450448036]
  10. Text: [I,wish,Java,could,use,case,classes] =>
  11. Vector: [0.043090314205203734,0.035048123182994974,0.023512658663094044]
  12. Text: [Logistic,regression,models,are,neat] =>
  13. Vector: [0.038572299480438235,-0.03250147425569594,-0.01552378609776497]

3、CountVectorizer

        CountVectorizer和CountVectorizerModel是将文本文档集合转换为向量。 当先验词典不可用时,CountVectorizer可以用作估计器来提取词汇表,并生成CountVectorizerModel。该模型通过词汇生成文档的稀疏表示,然后可以将其传递给其他算法,如LDA。在拟合过程中,CountVectorizer将选择通过语料库按术语频率排序的top前几vocabSize词。 可选参数minDF还通过指定术语必须出现以包含在词汇表中的文档的最小数量(或小于1.0)来影响拟合过程。 另一个可选的二进制切换参数控制输出向量。 如果设置为true,则所有非零计数都设置为1.对于模拟二进制而不是整数的离散概率模型,这是非常有用的。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.{CountVectorizerModel, CountVectorizer}
  3. import org.apache.spark.sql.SparkSession
  4. object CountVectorizer {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("CountVectorizer")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val df = spark.createDataFrame(Seq(
  12. (0, Array("a", "b", "c")),
  13. (1, Array("a", "b", "b", "c", "a"))
  14. )).toDF("id", "words")
  15. val cvModel: CountVectorizerModel = new CountVectorizer()
  16. .setInputCol("words")
  17. .setOutputCol("features")
  18. .setVocabSize(3)
  19. .setMinDF(2)
  20. .fit(df)
  21. val cvm = new CountVectorizerModel(Array("a", "b", "c"))
  22. .setInputCol("words")
  23. .setOutputCol("features")
  24. cvModel.transform(df).show(false)
  25. cvm.transform(df).show(false)
  26. }
  27. }

运行结果如下:

  1. +---+---------------+-------------------------+
  2. |id |words |features |
  3. +---+---------------+-------------------------+
  4. |0 |[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])|
  5. |1 |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
  6. +---+---------------+-------------------------+
  7. +---+---------------+-------------------------+
  8. |id |words |features |
  9. +---+---------------+-------------------------+
  10. |0 |[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])|
  11. |1 |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
  12. +---+---------------+-------------------------+

二、特征的变换

1、Tokenizer(分词器)

       Tokenization是将文本(如一个句子)拆分成单词的过程。(在Spark ML中)Tokenizer(分词器)提供此功能。RegexTokenizer 提供了(更高级的)基于正则表达式 (regex) 匹配的(对句子或文本的)单词拆分。默认情况下,参数"pattern"(默认的正则表达式: "\\s+") 作为分隔符用于拆分输入的文本。或者,用户可以将参数“gaps”设置为 false ,指定正则表达式"pattern"表示"tokens",而不是分隔符,这样作为划分结果找到所有匹配项。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.sql.functions._
  5. object Tokenizer {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("Tokenizer")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val sentenceDataFrame = spark.createDataFrame(Seq(
  13. (0, "Hi I heard about Spark"),
  14. (1, "I wish Java could use case classes"),
  15. (2, "Logistic,regression,models,are,neat")
  16. )).toDF("id", "sentence")
  17. val tokenizer = new Tokenizer()
  18. .setInputCol("sentence")
  19. .setOutputCol("words")
  20. val regexTokenizer = new RegexTokenizer()
  21. .setInputCol("sentence")
  22. .setOutputCol("words")
  23. //.setPattern("\\w")//alternatively .setPattern("\\w+").setGaps(falsa)
  24. val countTokens = udf{(words: Seq[String]) => words.length}
  25. val tokenized = tokenizer.transform(sentenceDataFrame)
  26. tokenized.show(false)
  27. tokenized.select("sentence", "words")
  28. .withColumn("tokens", countTokens(col("words"))).show(false)
  29. val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
  30. regexTokenized.select("sentence", "words")
  31. .withColumn("tokens", countTokens(col("words"))).show(false)
  32. }
  33. }

运行结果如下:

  1. +---+-----------------------------------+------------------------------------------+
  2. |id |sentence |words |
  3. +---+-----------------------------------+------------------------------------------+
  4. |0 |Hi I heard about Spark |[hi, i, heard, about, spark] |
  5. |1 |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|
  6. |2 |Logistic,regression,models,are,neat|[logistic,regression,models,are,neat] |
  7. +---+-----------------------------------+------------------------------------------+
  8. +-----------------------------------+------------------------------------------+------+
  9. |sentence |words |tokens|
  10. +-----------------------------------+------------------------------------------+------+
  11. |Hi I heard about Spark |[hi, i, heard, about, spark] |5 |
  12. |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 |
  13. |Logistic,regression,models,are,neat|[logistic,regression,models,are,neat] |1 |
  14. +-----------------------------------+------------------------------------------+------+
  15. +-----------------------------------+------------------------------------------+------+
  16. |sentence |words |tokens|
  17. +-----------------------------------+------------------------------------------+------+
  18. |Hi I heard about Spark |[hi, i, heard, about, spark] |5 |
  19. |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 |
  20. |Logistic,regression,models,are,neat|[logistic,regression,models,are,neat] |1 |
  21. +-----------------------------------+------------------------------------------+------+

2、StopWordsRemover(去停用词)

        Stop words(停用字)是在文档中频繁出现,但未携带太多意义的词语,它们不应该参与算法运算。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.StopWordsRemover
  3. import org.apache.spark.sql.SparkSession
  4. object StopWordsRemover {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("StopWordsRemover")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val dataset = spark.createDataFrame(Seq(
  12. (0, Seq("I", "saw", "the", "red", "baloon")),
  13. (1, Seq("Mary", "had", "a", "little", "lamb"))
  14. )).toDF("id", "raw")
  15. val remover = new StopWordsRemover()
  16. .setInputCol("raw")
  17. .setOutputCol("filtered")
  18. remover.transform(dataset).show()
  19. }
  20. }

运行结果如下:

  1. +---+--------------------+--------------------+
  2. | id| raw| filtered|
  3. +---+--------------------+--------------------+
  4. | 0|[I, saw, the, red...| [saw, red, baloon]|
  5. | 1|[Mary, had, a, li...|[Mary, little, lamb]|
  6. +---+--------------------+--------------------+

3、N-gram(N元模型)

        一个N-gram是一个长度为N(整数)的字的序列。NGram可用于将输入特征转换成N-grams。N-gram的输入为一系列的字符串,参数n表示每个N-gram中单词的数量。输出将由N-gram序列组成,其中每个N-gram由空格分割的n个连续词的字符串表示。如果输入的字符串序列少于n个单词,NGram输出为空。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.NGram
  3. import org.apache.spark.sql.SparkSession
  4. object Ngram {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("Ngram")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val dataset = spark.createDataFrame(Seq(
  12. (0, Array("I", "saw", "the", "red", "baloon")),
  13. (1, Array("Mary", "had", "a", "little", "lamb")),
  14. (2, Array("xzw", "had", "as", "age", "qwe"))
  15. )).toDF("id", "words")
  16. val ngram = new NGram()
  17. .setN(2)
  18. .setInputCol("words")
  19. .setOutputCol("ngrams")
  20. val ngramDF = ngram.transform(dataset)
  21. ngramDF.select("ngrams").show(false)
  22. }
  23. }

运行结果如下所示:

  1. +----------------------------------------+
  2. |ngrams |
  3. +----------------------------------------+
  4. |[I saw, saw the, the red, red baloon] |
  5. |[Mary had, had a, a little, little lamb]|
  6. |[xzw had, had as, as age, age qwe] |
  7. +----------------------------------------+

4、Binarizer(二值化)

        Binarization是将数值特征阈值化为二进制特征的过程。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.Binarizer
  3. import org.apache.spark.sql.SparkSession
  4. object Binarizer {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("Binarizer")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
  12. val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
  13. val binarizer: Binarizer = new Binarizer()
  14. .setInputCol("feature")
  15. .setOutputCol("binarized_feature")
  16. .setThreshold(0.5)
  17. val binarizerDataFrame = binarizer.transform(dataFrame)
  18. println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
  19. binarizerDataFrame.show(false)
  20. }
  21. }

运行结果如下:

  1. Binarizer output with Threshold = 0.5
  2. +---+-------+-----------------+
  3. |id |feature|binarized_feature|
  4. +---+-------+-----------------+
  5. |0 |0.1 |0.0 |
  6. |1 |0.8 |1.0 |
  7. |2 |0.2 |0.0 |
  8. +---+-------+-----------------+

5、PCA(主元分析)

        PCA是使用正交变换将可能相关变量的一组观察值转换为主成分的线性不相关变量的值的一组统计过程。PCA类训练使用PCA将向量投影到低维空间的模型。

       主成分分析是一种统计学方法,它使用正交转换从一系列可能相关的变量中提取线性无关变量集,提取出的变量集中的元素称为主成分。使用PCA方法可以对变量集合进行降维。下面的示例将会展示如何将5维特征向量转换为3维主成分向量。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.PCA
  3. import org.apache.spark.ml.linalg.Vectors
  4. import org.apache.spark.sql.SparkSession
  5. object PCA {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("PCA")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val data = Array(
  13. Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  14. Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  15. Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
  16. )
  17. val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
  18. val pca = new PCA()
  19. .setInputCol("features")
  20. .setOutputCol("pcafeatures")
  21. .setK(3)
  22. .fit(df)
  23. val result = pca.transform(df)
  24. .select("pcafeatures")
  25. result.show(false)
  26. }
  27. }

运行结果如下:

  1. +-----------------------------------------------------------+
  2. |pcafeatures |
  3. +-----------------------------------------------------------+
  4. |[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
  5. |[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
  6. |[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
  7. +-----------------------------------------------------------+

6、PolynomialExpansion(多项式扩展)

        多项式扩展是将特征扩展为多项式空间的过程,多项式空间由原始维度的n度组合而成。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.PolynomialExpansion
  3. import org.apache.spark.ml.linalg.Vectors
  4. import org.apache.spark.sql.SparkSession
  5. object PolynomialExpansion {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("PolynomialExpansion")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val data = Array(
  13. Vectors.dense(2.0, 1.0),
  14. Vectors.dense(0.0, 0.0),
  15. Vectors.dense(3.0, -1.0)
  16. )
  17. val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
  18. val polyExpansion = new PolynomialExpansion()
  19. .setInputCol("features")
  20. .setOutputCol("polyFeatures")
  21. .setDegree(3)
  22. val polyDF = polyExpansion.transform(df)
  23. polyDF.show(false)
  24. }
  25. }

运行结果如下:

  1. +----------+------------------------------------------+
  2. |features |polyFeatures |
  3. +----------+------------------------------------------+
  4. |[2.0,1.0] |[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0] |
  5. |[0.0,0.0] |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] |
  6. |[3.0,-1.0]|[3.0,9.0,27.0,-1.0,-3.0,-9.0,1.0,3.0,-1.0]|
  7. +----------+------------------------------------------+

7、Discrete Cosine Transform(DCT离散余弦变换)

        离散余弦变换是将时域的N维实数序列转换成频域的N维实数序列的过程,类似于离散的傅里叶变换。DCT类提供了离散余弦变换的功能,将离散余弦变换后结果乘以得到一个与时域矩阵长度一致的矩阵。没有偏移被应用于变换的序列,即输入序列与输出之间是一一对应的。

离散余弦变换是与傅里叶变换相关的一种变换,它类似于离散傅立叶变换但是只使用实数。离散余弦变换相当于一个长度大概是它两倍的离散傅里叶变换,这个离散傅里叶变换是对一个实偶函数进行的(因为一个实偶函数的傅里叶变换仍然是一个实偶函数)。离散余弦变换,经常被信号处理和图像处理使用,用于对信号和图像(包括静止图像和运动图像)进行有损数据压缩。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.DCT
  3. import org.apache.spark.ml.linalg.Vectors
  4. import org.apache.spark.sql.SparkSession
  5. object DCT {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("DCT")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val data = Array(
  13. Vectors.dense(0.0, 1.0, -2.0, 3.0),
  14. Vectors.dense(2.0, 0.0, 3.0, 4.0),
  15. Vectors.dense(4.0, 0.0, 0.0, 6.0)
  16. )
  17. val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
  18. val dct = new DCT()
  19. .setInputCol("features")
  20. .setOutputCol("featuresdct")
  21. .setInverse(false)
  22. val dctDF = dct.transform(df)
  23. dctDF.select("featuresdct").show(false)
  24. }
  25. }

运行结果如下所示:

  1. +----------------------------------------------------------------+
  2. |featuresdct |
  3. +----------------------------------------------------------------+
  4. |[1.0,-1.1480502970952693,2.0000000000000004,-2.7716385975338604]|
  5. |[4.5,-2.118357115095672,1.5000000000000002,1.418648347168368] |
  6. |[5.0,-1.3065629648763766,5.000000000000001,-0.5411961001461971] |
  7. +----------------------------------------------------------------+

8、StringIndexer(字符串-索引变换)

        StringIndexer(字符串-索引变换)将标签的字符串列编号改成标签索引列。标签索引序列的取值范围是[0,numLabels(字符串中所有出现的单词去掉重复的词后的总和)],按照标签出现频率排序,出现最多的标签索引为0。如果输入是数值型,我们先将数值映射到字符串,再对字符串迕行索引化。如果下游的 pipeline(例如:Estimator 或者 Transformer)需要用到索引化后的标签序列,则需要将这个 pipeline 的输入列名字指定为索引化序列的名字。大部分情况下,通过setInputCol设置输入的列名。

9、IndexToString(索引-字符串变换)

        与StringIndexer对应,IndexToString 将索引化标签还原成原始字符串。一个常用的场景是先通过 StringIndexer 产生索引化标签,然后使用索引化标签进行训练,最后再对预测结果使用IndexToString来获得其原始的标签字符串。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
  3. import org.apache.spark.sql.SparkSession
  4. object StringToIndexer {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("StringToIndexer")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val df = spark.createDataFrame(Seq(
  12. (0, "a"),
  13. (1, "b"),
  14. (2, "c"),
  15. (3, "a"),
  16. (4, "a"),
  17. (5, "c")
  18. )).toDF("id", "category")
  19. //StringIndexer
  20. val indexer = new StringIndexer()
  21. .setInputCol("category")
  22. .setOutputCol("categoryIndex")
  23. .fit(df)
  24. val indexed = indexer.transform(df)
  25. indexed.show()
  26. //IndexToString
  27. val converter = new IndexToString()
  28. .setInputCol("categoryIndex")
  29. .setOutputCol("origCategory")
  30. val converted = converter.transform(indexed)
  31. converted.select("id", "categoryIndex", "origCategory").show()
  32. }
  33. }

运行结果如下所示:

  1. +---+--------+-------------+
  2. | id|category|categoryIndex|
  3. +---+--------+-------------+
  4. | 0| a| 0.0|
  5. | 1| b| 2.0|
  6. | 2| c| 1.0|
  7. | 3| a| 0.0|
  8. | 4| a| 0.0|
  9. | 5| c| 1.0|
  10. +---+--------+-------------+
  11. +---+-------------+------------+
  12. | id|categoryIndex|origCategory|
  13. +---+-------------+------------+
  14. | 0| 0.0| a|
  15. | 1| 2.0| b|
  16. | 2| 1.0| c|
  17. | 3| 0.0| a|
  18. | 4| 0.0| a|
  19. | 5| 1.0| c|
  20. +---+-------------+------------+

10、OneHotEncoder(独热编码)

        独热编码将一列标签索引映射到一列二进制向量,最多只有一个单值。该编码允许期望连续特征的算法使用分类特征。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
  3. import org.apache.spark.sql.SparkSession
  4. object OneHotEncoder {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("StringToIndexer")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val df = spark.createDataFrame(Seq(
  12. (0, "a"),
  13. (1, "b"),
  14. (2, "c"),
  15. (3, "a"),
  16. (4, "a"),
  17. (5, "c")
  18. )).toDF("id", "category")
  19. //StringIndexer
  20. val indexer = new StringIndexer()
  21. .setInputCol("category")
  22. .setOutputCol("categoryIndex")
  23. .fit(df)
  24. val indexed = indexer.transform(df)
  25. val encoder = new OneHotEncoder()
  26. .setInputCol("categoryIndex")
  27. .setOutputCol("categoryVec")
  28. val encoded = encoder.transform(indexed)
  29. encoded.show()
  30. }
  31. }

运行结果如下:

  1. +---+--------+-------------+-------------+
  2. | id|category|categoryIndex| categoryVec|
  3. +---+--------+-------------+-------------+
  4. | 0| a| 0.0|(2,[0],[1.0])|
  5. | 1| b| 2.0| (2,[],[])|
  6. | 2| c| 1.0|(2,[1],[1.0])|
  7. | 3| a| 0.0|(2,[0],[1.0])|
  8. | 4| a| 0.0|(2,[0],[1.0])|
  9. | 5| c| 1.0|(2,[1],[1.0])|
  10. +---+--------+-------------+-------------+

11、VectorIndexer(向量类型索引化)

         VectorIndexer是指定向量数据集中的分类(离散)特征。它可以自动确定哪些特征是离散的,并将原始值转换为离散索引。具体来说,它执行以下操作:取一个Vector类型的输入列和一个参数maxCategories;根据不同值的数量确定哪些特征是离散,其中最多maxCategories的功能被声明为分类;为每个分类功能计算基于0的类别索引;索引分类特征并将原始特征值转换为索引;索引分类功能允许诸如决策树和树组合之类的算法适当地处理分类特征,提高性能。

12、Interaction(相互作用)

        交互是一个变换器,它采用向量或双值列,并生成一个单个向量列,其中包含来自每个输入列的一个值的所有组合的乘积。例如:你有2个向量类型的列,每个列具有3个维度作为输入列,那么你将获得一个9维向量作为输出列。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.{Interaction, VectorAssembler}
  3. import org.apache.spark.sql.SparkSession
  4. object Interaction {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("Interaction")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val df = spark.createDataFrame(Seq(
  12. (1, 1, 2, 3, 8, 4, 5),
  13. (2, 4, 3, 8, 7, 9, 8),
  14. (3, 6, 1, 9, 2, 3, 6),
  15. (4, 10, 8, 6, 9, 4, 5),
  16. (5, 9, 2, 7, 10, 7, 3),
  17. (6, 1, 1, 4, 2, 8, 4)
  18. )).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")
  19. val assembler1 = new VectorAssembler()
  20. .setInputCols(Array("id2", "id3", "id4"))
  21. .setOutputCol("vec1")
  22. val assembled1 = assembler1.transform(df)
  23. val assembler2 = new VectorAssembler().
  24. setInputCols(Array("id5", "id6", "id7")).
  25. setOutputCol("vec2")
  26. val assembled2 = assembler2.transform(assembled1)
  27. .select("id1", "vec1", "vec2")
  28. val interaction = new Interaction()
  29. .setInputCols(Array("id1", "vec1", "vec2"))
  30. .setOutputCol("interactedCol")
  31. val interacted = interaction.transform(assembled2)
  32. interacted.show(truncate = false)
  33. }
  34. }

运行结果如下:

  1. +---+--------------+--------------+------------------------------------------------------+
  2. |id1|vec1 |vec2 |interactedCol |
  3. +---+--------------+--------------+------------------------------------------------------+
  4. |1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0] |
  5. |2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0] |
  6. |3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0] |
  7. |4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]|
  8. |5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] |
  9. |6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0] |
  10. +---+--------------+--------------+------------------------------------------------------+

13、Normalizer(范数p-norm规范化)

        Normalizer是一个转换器,它可以将一组特征向量规划范,参数为p,默认值为2,p指定规范化中使用的p-norm。规范化操作可以使输入数据标准化,对后期机器学习算法的结果也有更好的表现。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.Normalizer
  3. import org.apache.spark.ml.linalg.Vectors
  4. import org.apache.spark.sql.SparkSession
  5. object Norm {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("norm")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val data = Seq(
  13. (0, Vectors.dense(0.0, 1.0, -2.0)),
  14. (1, Vectors.dense(2.0, 0.0, 3.0)),
  15. (2, Vectors.dense(4.0, 10.0, 2.0))
  16. )
  17. val df = spark.createDataFrame(data).toDF("id", "features")
  18. val normalizer = new Normalizer()
  19. .setInputCol("features")
  20. .setOutputCol("normFeatures")
  21. .setP(1.0)
  22. val l1NormData = normalizer.transform(df)
  23. l1NormData.show()
  24. val lInfNormData = normalizer.transform(df, normalizer.p -> Double.PositiveInfinity)
  25. lInfNormData.show()
  26. }
  27. }

运行结果如下:

  1. +---+--------------+--------------------+
  2. | id| features| normFeatures|
  3. +---+--------------+--------------------+
  4. | 0|[0.0,1.0,-2.0]|[0.0,0.3333333333...|
  5. | 1| [2.0,0.0,3.0]| [0.4,0.0,0.6]|
  6. | 2|[4.0,10.0,2.0]| [0.25,0.625,0.125]|
  7. +---+--------------+--------------------+
  8. +---+--------------+--------------------+
  9. | id| features| normFeatures|
  10. +---+--------------+--------------------+
  11. | 0|[0.0,1.0,-2.0]| [0.0,0.5,-1.0]|
  12. | 1| [2.0,0.0,3.0]|[0.66666666666666...|
  13. | 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
  14. +---+--------------+--------------------+

14、StandardScaler(标准化)

        StandardScaler转换Vector行的数据集,使每个要素标准化以具有单位标准偏差和或零均值。它需要参数:
        withStd:默认为True。将数据缩放到单位标准偏差。
        withMean:默认为false。在缩放之前将数据中心为平均值。它将构建一个密集的输出,所以在应用于稀疏输入时要小心。
        StandardScaler是一个Estimator,可以适合数据集生成StandardScalerModel; 还相当于计算汇总统计数据。 然后,模型可以将数据集中的向量列转换为具有单位标准偏差和或零平均特征。
        请注意,如果特征的标准偏差为零,它将在该特征的向量中返回默认的0.0值。

15、MinMaxScaler(最大-最小规范化)

        MinMaxScaler转换Vector行的数据集,将每个要素重新映射到特定范围(通常为[0,1])。它需要参数:
        min:默认为0.0,转换后的下限。
        max:默认为1.0,转换后的上限。
        MinMaxScaler计算数据集的统计信息,并生成MinMaxScalerModel。然后,模型可以单独转换每个要素,使其在给定的范围内。
        特征E的重新缩放值被计算为:

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.MinMaxScaler
  3. import org.apache.spark.ml.linalg.Vectors
  4. import org.apache.spark.sql.SparkSession
  5. object MinMaxScaler {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("MinMaxScaler")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val data = Seq(
  13. (0, Vectors.dense(0.0, 1.0, -2.0)),
  14. (1, Vectors.dense(2.0, 0.0, 3.0)),
  15. (2, Vectors.dense(4.0, 10.0, 2.0))
  16. )
  17. val df = spark.createDataFrame(data).toDF("id", "features")
  18. val scaler = new MinMaxScaler()
  19. .setInputCol("features")
  20. .setOutputCol("scaledFeatures")
  21. val scalerModel = scaler.fit(df)
  22. val scaledData = scalerModel.transform(df)
  23. println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
  24. scaledData.select("features", "scaledFeatures").show()
  25. }
  26. }

运行代码如下:

  1. Features scaled to range: [0.0, 1.0]
  2. +--------------+--------------+
  3. | features|scaledFeatures|
  4. +--------------+--------------+
  5. |[0.0,1.0,-2.0]| [0.0,0.1,0.0]|
  6. | [2.0,0.0,3.0]| [0.5,0.0,1.0]|
  7. |[4.0,10.0,2.0]| [1.0,1.0,0.8]|
  8. +--------------+--------------+

16、MaxAbsScaler(绝对值规范化)

        MaxAbsScaler转换Vector行的数据集,通过划分每个要素中的最大绝对值,将每个要素的重新映射到范围[-1,1]。 它不会使数据移动/居中,因此不会破坏任何稀疏性。MaxAbsScaler计算数据集的统计信息,并生成MaxAbsScalerModel。然后,模型可以将每个要素单独转换为范围[-1,1]。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.MaxAbsScaler
  3. import org.apache.spark.ml.linalg.Vectors
  4. import org.apache.spark.sql.SparkSession
  5. object MaxAbsScaler {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("MaxAbsScaler")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val data = Seq(
  13. (0, Vectors.dense(0.0, 1.0, -2.0)),
  14. (1, Vectors.dense(2.0, 0.0, 3.0)),
  15. (2, Vectors.dense(4.0, 10.0, 2.0))
  16. )
  17. val df = spark.createDataFrame(data).toDF("id", "features")
  18. val scaler = new MaxAbsScaler()
  19. .setInputCol("features")
  20. .setOutputCol("scaledFeatures")
  21. val scalerModel = scaler.fit(df)
  22. val scaledData = scalerModel.transform(df)
  23. scaledData.select("features", "scaledFeatures").show()
  24. }
  25. }

运行结果如下:

  1. +--------------+--------------------+
  2. | features| scaledFeatures|
  3. +--------------+--------------------+
  4. |[0.0,1.0,-2.0]|[0.0,0.1,-0.66666...|
  5. | [2.0,0.0,3.0]| [0.5,0.0,1.0]|
  6. |[4.0,10.0,2.0]|[1.0,1.0,0.666666...|
  7. +--------------+--------------------+

17、VectorAssembler(特征向量合并)

        VectorAssembler 是将给定的一系列的列合并到单个向量列中的 transformer。它可以将原始特征和不同特征transformers(转换器)生成的特征合并为单个特征向量,来训练ML模型,如逻辑回归和决策树等机器学习算法。VectorAssembler可接受以下的输入列类型:所有数值型、布尔类型、向量类型。输入列的值将按指定顺序依次添加到一个向量中。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.VectorAssembler
  3. import org.apache.spark.ml.linalg.Vectors
  4. import org.apache.spark.sql.SparkSession
  5. object VectorAssembler {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("VectorAssembler")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val data = Seq(
  13. (0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0)
  14. )
  15. val df = spark.createDataFrame(data).toDF("id", "hour", "mobile", "userFeatures", "clicked")
  16. val assembler = new VectorAssembler()
  17. .setInputCols(Array("hour", "mobile", "userFeatures"))
  18. .setOutputCol("features")
  19. val output = assembler.transform(df)
  20. println(output.select("features", "clicked").first())
  21. }
  22. }

运行结果如下:

[[18.0,1.0,0.0,10.0,0.5],1.0]

18、QuantileDiscretizer(分位数离散化)

        QuantileDiscretizer(分位数离散化)采用具有连续特征的列,并输出具有分类特征的列。bin(分级)的数量由numBuckets 参数设置。buckets(区间数)有可能小于这个值,例如,如果输入的不同值太少,就无法创建足够的不同的quantiles(分位数)。

        NaN values:在QuantileDiscretizer fitting时,NaN值会从列中移除,还将产生一个Bucketizer模型进行预测。在转换过程中,Bucketizer 会发出错误信息当在数据集中找到NaN值,但用户也可以通过设置handleInvalid来选择保留或删除数据集中的NaN值。如果用户选择保留NaN值,那么它们将被特别处理并放入自己的bucket(区间)中。例如,如果使用4个buckets(区间),那么非NaN数据将放入buckets[0-3],NaN将计数在特殊的bucket[4]中。

        Algorithm:使用近似算法来选择bin的范围。可以使用relativeError参数来控制近似的精度。当设置为零时,计算精确的quantiles(分位数)。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.QuantileDiscretizer
  3. import org.apache.spark.sql.SparkSession
  4. object QuantileDiscretizer {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("QuantileDiscretizer")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
  12. var df = spark.createDataFrame(data).toDF("id", "hour")
  13. val discretizer = new QuantileDiscretizer()
  14. .setInputCol("hour")
  15. .setOutputCol("result")
  16. .setNumBuckets(3)
  17. val result = discretizer.fit(df)
  18. .transform(df)
  19. result.show()
  20. }
  21. }

运行结果如下:

  1. +---+----+------+
  2. | id|hour|result|
  3. +---+----+------+
  4. | 0|18.0| 2.0|
  5. | 1|19.0| 2.0|
  6. | 2| 8.0| 1.0|
  7. | 3| 5.0| 1.0|
  8. | 4| 2.2| 0.0|
  9. +---+----+------+

19、其他的几种变换

示例1:

  1. package sparkml
  2. import org.apache.spark.ml.feature.SQLTransformer
  3. import org.apache.spark.sql.SparkSession
  4. object SQLTransformer {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("SQLTransformer")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val df = spark.createDataFrame(
  12. Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
  13. val sqlTrans = new SQLTransformer()
  14. .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
  15. sqlTrans.transform(df).show(false)
  16. }
  17. }

运行结果如下:

  1. +---+---+---+---+----+
  2. |id |v1 |v2 |v3 |v4 |
  3. +---+---+---+---+----+
  4. |0 |1.0|3.0|4.0|3.0 |
  5. |2 |2.0|5.0|7.0|10.0|
  6. +---+---+---+---+----+

示例2:

  1. package sparkml
  2. import org.apache.spark.ml.feature.Bucketizer
  3. import org.apache.spark.sql.SparkSession
  4. object Bucketizer {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("Bucketizer")
  8. .master("local[*]")
  9. .getOrCreate()
  10. val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
  11. val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
  12. val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
  13. val bucketizer = new Bucketizer()
  14. .setInputCol("features")
  15. .setOutputCol("bucketedFeatures")
  16. .setSplits(splits)
  17. val bucketedData = bucketizer.transform(dataFrame)
  18. println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
  19. bucketedData.show()
  20. }
  21. }

运行结果如下:

  1. Bucketizer output with 4 buckets
  2. +--------+----------------+
  3. |features|bucketedFeatures|
  4. +--------+----------------+
  5. | -999.9| 0.0|
  6. | -0.5| 1.0|
  7. | -0.3| 1.0|
  8. | 0.0| 2.0|
  9. | 0.2| 2.0|
  10. | 999.9| 3.0|
  11. +--------+----------------+

示例3:

  1. package sparkml
  2. import org.apache.spark.ml.feature.ElementwiseProduct
  3. import org.apache.spark.ml.linalg.Vectors
  4. import org.apache.spark.sql.SparkSession
  5. object ElementwiseProduct {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("Bucketizer")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val dataFrame = spark.createDataFrame(Seq(
  13. ("a", Vectors.dense(1.0, 2.0, 3.0)),
  14. ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
  15. val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
  16. val transformer = new ElementwiseProduct()
  17. .setScalingVec(transformingVector)
  18. .setInputCol("vector")
  19. .setOutputCol("transformedVector")
  20. transformer.transform(dataFrame).show(false)
  21. }
  22. }

 运行结果如下:

  1. +---+-------------+-----------------+
  2. |id |vector |transformedVector|
  3. +---+-------------+-----------------+
  4. |a |[1.0,2.0,3.0]|[0.0,2.0,6.0] |
  5. |b |[4.0,5.0,6.0]|[0.0,5.0,12.0] |
  6. +---+-------------+-----------------+

三、特征的选择

1、VectorSlicer(向量切片机)

        向量切片机是一个转换器,它采用特征向量,并输出一个新的特征向量与原始特征的子阵列。从向量列中提取特征很有用。向量切片机接受具有指定索引的向量列,然后输出一个新的向量列,其值通过这些索引进行选择。有两种类型的指数:代表向量中的索引的整数索引,setIndices();表示向量中特征名称的字符串索引,setNames(),此类要求向量列有AttributeGroup,因为实现在Attribute的name字段上的匹配。

整数和字符串的规格都可以接受。此外,可以同时使用整数索引和字符串名称。必须至少选择一个特征。重复的功能是不允许的,所以选择的索引和名词之间不能有重叠。如果选择了功能的名称,则在遇到空的输入属性时会抛出异常。

示例:

  1. package sparkml
  2. import java.util
  3. import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
  4. import org.apache.spark.ml.feature.VectorSlicer
  5. import org.apache.spark.ml.linalg.Vectors
  6. import org.apache.spark.sql.types.StructType
  7. import org.apache.spark.sql.{Row, SparkSession}
  8. object VectorSlicer {
  9. def main(args: Array[String]): Unit = {
  10. val spark = SparkSession.builder()
  11. .appName("VectorSlicer")
  12. .master("local[*]")
  13. .getOrCreate()
  14. spark.sparkContext.setLogLevel("WARN")
  15. val data = util.Arrays.asList(
  16. Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
  17. Row(Vectors.dense(-2.0, 2.3, 0.0))
  18. )
  19. val defaultAttr = NumericAttribute.defaultAttr
  20. val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
  21. val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
  22. val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))
  23. val slicer = new VectorSlicer()
  24. .setInputCol("userFeatures")
  25. .setOutputCol("features")
  26. slicer.setIndices(Array(1)).setNames(Array("f3"))
  27. val output = slicer.transform(dataset)
  28. output.show(false)
  29. }
  30. }

运行结果显示:

  1. +--------------------+-------------+
  2. |userFeatures |features |
  3. +--------------------+-------------+
  4. |(3,[0,1],[-2.0,2.3])|(2,[0],[2.3])|
  5. |[-2.0,2.3,0.0] |[2.3,0.0] |
  6. +--------------------+-------------+

2、RFormula(R模型公式)

        RFormula选择由R模型公式指定的列。目前,支持R运算符的有限子集,包括'~','.',':',‘+’以及'-',基本操作如下:~分割目标和对象;+合并对象,“+0”表示删除截距;-删除对象,“-1”表示删除截距;:交互(数字乘法或二值化分类值);.出了目标外的全部列。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.RFormula
  3. import org.apache.spark.sql.SparkSession
  4. object RFormula {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("RFormula")
  8. .master("local[*]")
  9. .getOrCreate()
  10. spark.sparkContext.setLogLevel("WARN")
  11. val dataset = spark.createDataFrame(Seq(
  12. (7, "US", 18, 1.0),
  13. (8, "CA", 12, 0.0),
  14. (9, "NZ", 15, 0.0)
  15. )).toDF("id", "country", "hour", "clicked")
  16. val formula = new RFormula()
  17. .setFormula("clicked ~ country + hour")
  18. .setFeaturesCol("features")
  19. .setLabelCol("label")
  20. val output = formula.fit(dataset).transform(dataset)
  21. output.select("features", "label").show()
  22. }
  23. }

运行结果如下:

  1. +--------------+-----+
  2. | features|label|
  3. +--------------+-----+
  4. |[0.0,0.0,18.0]| 1.0|
  5. |[1.0,0.0,12.0]| 0.0|
  6. |[0.0,1.0,15.0]| 0.0|
  7. +--------------+-----+

3、ChiSqSelector(卡方特征选择器)

        ChiSqSelector代表卡方特征选择。它适用于带有类别特征的标签数据。ChiSqSelector使用卡方独立测试来决定选择哪些特征。它支持三种选择方法:numTopFeatures, percentile, fpr。
        numTopFeatures根据卡方检验选择固定数量的顶级功能。返类似于产生具有最大预测能力的功能;
        percentile类似于numTopFeatures,但选择所有功能的一部分,而不是固定数量;
        fpr选择p值低于阈值的所有特征,从而控制选择的假阳性率。
        默认情况下,选择方法是numTopFeatures,默认的顶级功能数量设置为50。用户可以使用setSelectorType选择一种选择方法。

示例:

  1. package sparkml
  2. import org.apache.spark.ml.feature.ChiSqSelector
  3. import org.apache.spark.ml.linalg.Vectors
  4. import org.apache.spark.sql.SparkSession
  5. object ChiSqSelector {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("ChiSqSelector")
  9. .master("local[*]")
  10. .getOrCreate()
  11. spark.sparkContext.setLogLevel("WARN")
  12. val data = Seq(
  13. (7, Vectors.dense(0.0, 1.0, -2.0, 1.0), 1.0),
  14. (8, Vectors.dense(2.0, 0.0, 3.0, 0.0), 0.0),
  15. (9, Vectors.dense(4.0, 10.0, 2.0, 0.1), 0.0)
  16. )
  17. val df = spark.createDataFrame(data).toDF("id", "features", "clicked")
  18. val selector = new ChiSqSelector()
  19. .setNumTopFeatures(1)
  20. .setFeaturesCol("features")
  21. .setLabelCol("clicked")
  22. .setOutputCol("selectedFeatures")
  23. val result = selector.fit(df)
  24. .transform(df)
  25. println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
  26. result.show()
  27. }
  28. }

运行结果如下:

  1. ChiSqSelector output with top 1 features selected
  2. +---+------------------+-------+----------------+
  3. | id| features|clicked|selectedFeatures|
  4. +---+------------------+-------+----------------+
  5. | 7|[0.0,1.0,-2.0,1.0]| 1.0| [0.0]|
  6. | 8| [2.0,0.0,3.0,0.0]| 0.0| [2.0]|
  7. | 9|[4.0,10.0,2.0,0.1]| 0.0| [4.0]|
  8. +---+------------------+-------+----------------+

 


四、矩阵向量计算

        Spark MLlib底层的向量、矩阵运算使用了Breeze库,Breeze库提供了Vector/Matrix的实现以及相应计算的接口(Linalg)。但是在MLlib里面同事也提供了Vector和Linalg等的实现。

1、Breeze创建函数

2、Breeze元素访问 

3、Breeze元素操作 

4、Breeze数值计算函数 

5、Breeze求和函数 

6、Breeze布尔函数 

 7、Breeze线性代数函数

8、Breeze取整函数 

9、Breeze其他函数

  1. Breeze三角函数:
  2. sinsinhasinasinhcoscoshacosacoshtantanhatanatanhatan2、sinc(x),即sin(x)/x、sincpi(x),即sinc(x*pi)
  3. Breeze对数和指数函数:
  4. logexplog10log1pexpm1sqrt、sbrt、pow

五、分类效果评估指标

示例代码: 

  1. //正确率
  2. val evaluator1 = new MulticlassClassificationEvaluator()
  3. .setLabelCol("indexedLabel")
  4. .setPredictionCol("prediction")
  5. .setMetricName("accuracy")
  6. val accuracy = evaluator1.evaluate(predictions)
  7. println(accuracy)
  8. //f1
  9. val evaluator2 = new MulticlassClassificationEvaluator()
  10. .setLabelCol("indexedLabel")
  11. .setPredictionCol("prediction")
  12. .setMetricName("f1")
  13. val f1 = evaluator2.evaluate(predictions)
  14. println(f1)
  15. //Precision
  16. val evaluator3 = new MulticlassClassificationEvaluator()
  17. .setLabelCol("indexedLabel")
  18. .setPredictionCol("prediction")
  19. .setMetricName("weightedPrecision")
  20. val Precision = evaluator3.evaluate(predictions)
  21. println(Precision)
  22. //Recall
  23. val evaluator4 = new MulticlassClassificationEvaluator()
  24. .setLabelCol("indexedLabel")
  25. .setPredictionCol("prediction")
  26. .setMetricName("weightedRecall")
  27. val Recall = evaluator4.evaluate(predictions)
  28. println(Recall)
  29. //AUC
  30. val evaluator5 = new BinaryClassificationEvaluator()
  31. .setLabelCol("indexedLabel")
  32. .setRawPredictionCol("prediction")
  33. .setMetricName("areaUnderROC")
  34. val auc = evaluator5.evaluate(predictions)
  35. println(auc)
  36. //aupr
  37. val evaluator6 = new BinaryClassificationEvaluator()
  38. .setLabelCol("indexedLabel")
  39. .setRawPredictionCol("prediction")
  40. .setMetricName("areaUnderPR")
  41. val aupr = evaluator6.evaluate(predictions)
  42. println(aupr)//正确率
  43. val evaluator1 = new MulticlassClassificationEvaluator()
  44. .setLabelCol("indexedLabel")
  45. .setPredictionCol("prediction")
  46. .setMetricName("accuracy")
  47. val accuracy = evaluator1.evaluate(predictions)
  48. println(accuracy)
  49. //f1
  50. val evaluator2 = new MulticlassClassificationEvaluator()
  51. .setLabelCol("indexedLabel")
  52. .setPredictionCol("prediction")
  53. .setMetricName("f1")
  54. val f1 = evaluator2.evaluate(predictions)
  55. println(f1)
  56. //Precision
  57. val evaluator3 = new MulticlassClassificationEvaluator()
  58. .setLabelCol("indexedLabel")
  59. .setPredictionCol("prediction")
  60. .setMetricName("weightedPrecision")
  61. val Precision = evaluator3.evaluate(predictions)
  62. println(Precision)
  63. //Recall
  64. val evaluator4 = new MulticlassClassificationEvaluator()
  65. .setLabelCol("indexedLabel")
  66. .setPredictionCol("prediction")
  67. .setMetricName("weightedRecall")
  68. val Recall = evaluator4.evaluate(predictions)
  69. println(Recall)
  70. //AUC
  71. val evaluator5 = new BinaryClassificationEvaluator()
  72. .setLabelCol("indexedLabel")
  73. .setRawPredictionCol("prediction")
  74. .setMetricName("areaUnderROC")
  75. val auc = evaluator5.evaluate(predictions)
  76. println(auc)
  77. //aupr
  78. val evaluator6 = new BinaryClassificationEvaluator()
  79. .setLabelCol("indexedLabel")
  80. .setRawPredictionCol("prediction")
  81. .setMetricName("areaUnderPR")
  82. val aupr = evaluator6.evaluate(predictions)
  83. println(aupr)

六、交叉-验证方法

        交叉验证法先将数据集D划分为k个大小相似的互斥子集,即D=D1并D2并...并Dk,每个子集之间没有交集。然后每次用k-1个子集的并集作为训练集,余下的那个作为测试集,这样得到k组训练/测试集。可以进行k次训练和测试,最终返回的是这个k个结果的均值。可以随机使用不同的划分多次,例如:10次10折交叉验证通常把交叉验证法称为“k折交叉验证”(k-fold cross validation),k最常用的取值时10,为10折交叉验证。

示例:交叉验证

  1. package sparkml
  2. import org.apache.log4j.{Level, Logger}
  3. import org.apache.spark.ml.Pipeline
  4. import org.apache.spark.ml.classification.LogisticRegression
  5. import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
  6. import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
  7. import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
  8. import org.apache.spark.sql.SparkSession
  9. import org.apache.spark.sql.Row
  10. import org.apache.spark.ml.linalg.Vector
  11. object JiaoChaYanZheng {
  12. def main(args: Array[String]): Unit = {
  13. //设置日志输出级别
  14. Logger.getLogger("org").setLevel(Level.WARN)
  15. //定义SparkSession
  16. val spark = SparkSession.builder()
  17. .appName("jcyz")
  18. .master("local[*]")
  19. .getOrCreate()
  20. import spark.implicits._
  21. //样本数据,格式为(id, text, label)
  22. val training = spark.createDataFrame(Seq(
  23. (0L, "a b c d e spark", 1.0),
  24. (1L, "b d", 0.0),
  25. (2L, "spark f g h", 0.0),
  26. (3L, "hadoop mapreduce", 0.0),
  27. (4L, "b spark who", 1.0),
  28. (5L, "g d a y", 0.0),
  29. (6L, "spark fly", 1.0),
  30. (7L, "was mapreduce", 0.0),
  31. (8L, "e spark program", 1.0),
  32. (9L, "a e c l", 0.0),
  33. (10L, "spark compile", 1.0),
  34. (11L, "hadoop software", 0.0)
  35. )).toDF("id", "text", "label")
  36. //建立ML管道,包括:tokenizer,hashingTF,lr
  37. val tokenizer = new Tokenizer()
  38. .setInputCol("text")
  39. .setOutputCol("words")
  40. val hashingTF = new HashingTF()
  41. .setInputCol(tokenizer.getOutputCol)
  42. .setOutputCol("features")
  43. val lr = new LogisticRegression()
  44. .setMaxIter(10)
  45. val pipeline = new Pipeline()
  46. .setStages(Array(tokenizer, hashingTF, lr))
  47. //采用ParamGridBuilde方法来建立网格搜索
  48. //网格的参数包括:hashingTF.numFeatures 3个参数,lr.regParam 2个参数
  49. //网格总共大小为:3 * 2 = 6,采用交叉验证来选择最优参数
  50. val paramGrid = new ParamGridBuilder()
  51. .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
  52. .addGrid(lr.regParam, Array(0.1, 0.01))
  53. .build()
  54. //建立一个交叉验证的评估器,设置评估的参数
  55. val cv = new CrossValidator()
  56. .setEstimator(pipeline)
  57. .setEvaluator(new BinaryClassificationEvaluator())
  58. .setEstimatorParamMaps(paramGrid)
  59. .setNumFolds(2)
  60. //运行交叉验证评估器,得到最佳参数集的模型
  61. val cvModel = cv.fit(training)
  62. //测试数据
  63. val test = spark.createDataFrame(Seq(
  64. (4L, "spark i j k"),
  65. (5L, "l m n"),
  66. (6L, "mapreduce spark"),
  67. (7L, "apache hadoop")
  68. )).toDF("id", "text")
  69. //测试,cvModel会选择最佳的lrModel进行预测
  70. val result = cvModel.transform(test)
  71. result.select("id", "text", "probability", "prediction")
  72. .collect()
  73. .foreach{
  74. case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
  75. println(s"($id, $text) --> prob = $prob, prediction = $prediction")
  76. }
  77. }
  78. }

运行结果如下所示:

  1. (4, spark i j k) --> prob = [0.38806038783805663,0.6119396121619434], prediction = 1.0
  2. (5, l m n) --> prob = [0.9395446853416303,0.06045531465836969], prediction = 0.0
  3. (6, mapreduce spark) --> prob = [0.557958097564678,0.4420419024353221], prediction = 0.0
  4. (7, apache hadoop) --> prob = [0.885348428830688,0.11465157116931203], prediction = 0.0

参考:

1《Spark 2.0机器学习》

更多:参照特征提取、特征变换、特征选择

2.1 特征提取

2.1.1 词频-逆向文件频率(TF-IDF)

2.1.2 Word2Vec

2.1.3 计数向量器

2.2 特征变换

2.2.1 分词器

2.2.2 停用词移除

2.2.3 n-gram

2.2.4 二值化

2.2.5 主成分分析(PCA)

2.2.6 多项式展开

2.2.7 离散余弦变换(DCT)

2.2.8 字符串-索引变换

2.2.9 索引-字符串变换

2.2.10 独热编码

2.2.11 向量-索引变换

2.2.12 正则化

2.2.13 标准缩放

2.2.14 最大值-最小值缩放

2.2.15 最大值-平均值缩放

2.2.16 离散化重组

2.2.17 元素乘积

2.2.18 SQL转换器

2.2.19 向量汇编

2.2.20 分位数求解器

2.3 特征选择

2.3.1 向量机

2.3.2 R公式

2.3.3 选择

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/885158
推荐阅读
相关标签
  

闽ICP备14008679号