Skip to content

Preprocessing and Feature Engineering🔗

Formatting Models According to your Use Case🔗

  • general structure for each advanced analytics task in MLlib.
    • In most cases of classification and regression algorithms, Organise data into column of type Double to represent the label and a column of type Vector (dense/sparse) to represent features
    • In case of recommendation, get data into a columns of users, a column of items and a column of ratings
    • In case of unsupervised learning a column of type Vector is needed to represent features.
    • In the case of graph analytics, DataFrame of vertices and a DataFrame of edges.
  • NOTE: Following are all synthetic datasets we are going to use
sales = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/data/retail-data/by-day/*.csv")\
  .coalesce(5)\
  .where("Description IS NOT NULL") # NOTE we filtered nulls here.
fakeIntDF = spark.read.parquet("/data/simple-ml-integers")
simpleDF = spark.read.json("/data/simple-ml")
scaleDF = spark.read.parquet("/data/simple-ml-scaling")
  • best way to achieve the required structure is to use transformer.

Transformers🔗

  • Transformers are functions that convert raw data in some way. This might be to create a new interaction variable (from two other variables), to normalize a column, or to simply turn it into a Double to be input into a model.
  • The Tokenizer is an example of a transformer. It tokenizes a string, splitting on a given character, and has nothing to learn from our data; it simply applies a function. Will discuss in upcoming chapter in detail.
import org.apache.spark.ml.feature.Tokenizer
val tkn = new Tokenizer().setInputCol("Description")
tkn.transform(sales.select("Description")).show(false)

+-----------------------------------+------------------------------------------+
|Description                        |tok_7de4dfc81ab7__output                  |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT                 |[rabbit, night, light]                    |
|DOUGHNUT LIP GLOSS                 |[doughnut, lip, gloss]                    |
...
|AIRLINE BAG VINTAGE WORLD CHAMPION |[airline, bag, vintage, world, champion]  |
|AIRLINE BAG VINTAGE JET SET BROWN  |[airline, bag, vintage, jet, set, brown]  |
+-----------------------------------+------------------------------------------+

Estimators for Preprocessing🔗

  • An estimator is necessary when a transformation you would like to perform must be initialized with data or information about the input column (often derived by doing a pass over the input column itself)
  • Examples, scaling values in column to have mean zero and unit variance
  • In effect, an estimator can be a transformer configured according to your particular input data. In simplest terms, you can either blindly apply a transformation (a “regular” transformer type) or perform a transformation based on your data (an estimator type).
import org.apache.spark.ml.feature.StandardScaler
val ss = new StandardScaler().setInputCol("features")
ss.fit(scaleDF).transform(scaleDF).show(false)

+---+--------------+------------------------------------------------------------+
|id |features      |stdScal_d66fbeac10ea__output                                |
+---+--------------+------------------------------------------------------------+
|0  |[1.0,0.1,-1.0]|[1.1952286093343936,0.02337622911060922,-0.5976143046671968]|
...
|1  |[3.0,10.1,3.0]|[3.5856858280031805,2.3609991401715313,1.7928429140015902]  |
+---+--------------+------------------------------------------------------------+

Transformer Properties🔗

  • All transformers require you to specify, at a minimum, the inputCol and the outputCol
  • all transformers have different parameters that you can tune

High-Level Transformers🔗

RFormula🔗

  • RFormula is easiest transformer to use when we have conventionally formatted data. It is borrowed from R to make transformations easier.
  • With this transformer, values can be either numerical or categorical and you do not need to extract values from strings or manipulate them in any way. The RFormula will automatically handle categorical inputs (specified as strings) by performing something called one-hot encoding.
  • Basic RFormula operators are:
    • ~ : Separate target and terms
    • + : Concat terms; “+ 0” means removing the intercept (this means that the y-intercept of the line that we will fit will be 0)
    • - : Remove a term; “- 1” means removing the intercept (this means that the y-intercept of the line that we will fit will be 0—yes, this does the same thing as “+ 0”
    • : : Interaction (multiplication for numeric values, or binarized categorical values)
    • . : All columns except the target/dependent variable
  • RFormula also uses default columns of label and features to label
from pyspark.ml.feature import RFormula

supervised = RFormula(formula="lab ~ . + color:value1 + color:value2")
supervised.fit(simpleDF).transform(simpleDF).show()

+-----+----+------+------------------+--------------------+-----+
|color| lab|value1|            value2|            features|label|
+-----+----+------+------------------+--------------------+-----+
|green|good|     1|14.386294994851129|(10,[1,2,3,5,8],[...|  1.0|
| blue| bad|     8|14.386294994851129|(10,[2,3,6,9],[8....|  0.0|
...
|  red| bad|     1| 38.97187133755819|(10,[0,2,3,4,7],[...|  0.0|
|  red| bad|     2|14.386294994851129|(10,[0,2,3,4,7],[...|  0.0|
+-----+----+------+------------------+--------------------+-----+

SQL Transformers🔗

  • Any SELECT statement from SQL is a valid transformation, only change is that instead of table we will use THIS keyword.
from pyspark.ml.feature import SQLTransformer

basicTransformation = SQLTransformer()\
  .setStatement("""
    SELECT sum(Quantity), count(*), CustomerID
    FROM __THIS__
    GROUP BY CustomerID
  """)

basicTransformation.transform(sales).show()

-------------+--------+----------+
|sum(Quantity)|count(1)|CustomerID|
+-------------+--------+----------+
|          119|      62|   14452.0|
...
|          138|      18|   15776.0|
+-------------+--------+----------+

Vector Assembler🔗

  • Very important tool, will be used in all pipelines
  • Helps concatenate all features into one big vector that we can pass into an estimator
  • Typically last step of a machine learning pipeline and takes as input a number of columns of Boolean, Double, or Vector
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler().setInputCols(["int1", "int2", "int3"])
va.transform(fakeIntDF).show()

+----+----+----+--------------------------------------------+
|int1|int2|int3|VectorAssembler_403ab93eacd5585ddd2d__output|
+----+----+----+--------------------------------------------+
|   1|   2|   3|                               [1.0,2.0,3.0]|
|   4|   5|   6|                               [4.0,5.0,6.0]|
|   7|   8|   9|                               [7.0,8.0,9.0]|
+----+----+----+--------------------------------------------+

Working with Continuous Features🔗

Continous features are just values on a number line. There are two common transformers for continuous features.

First convert continuous features into categorical features via bucketing or we can scale and normalise features according to several different requirements.

NOTE: these transformers will work on only Double type.

contDF = spark.range(20).selectExpr("cast(id as double)")

Bucketing🔗

  • We can bin a continous range by binning (bucketing) using Bucketizer
  • We can specify bucket creation via an array or list of Double values. E.g. Bucketing weight ranges using buckets like overweight, average or underweight.
  • To specify a bucket, define its border like setting splits to 5.00, 10.00, 250.0 on our contDF will fail because it doesn’t cover all possible input ranges.
    • Min value in your splits must be less than minimum in your DF
    • Max value in your splits must be greater than maximum in your DF
    • Specify at a minimum three values in the splits array, which creates 2 buckets
  • Another option to cover all ranges can be scala.Double.NegativeInfinity or scala.Double.PositiveInfinity, while in python float('inf') or float('-inf')
  • To handle null or NaN values, you must specify handleInvalid parameter as certain value.
from pyspark.ml.feature import Bucketizer
bucketBorders = [-1.0, 5.0, 10.0, 250.0, 600.0]
bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("id")
bucketer.transform(contDF).show()

+----+---------------------------------------+
|  id|Bucketizer_4cb1be19f4179cc2545d__output|
+----+---------------------------------------+
| 0.0|                                    0.0|
...
|10.0|                                    2.0|
|11.0|                                    2.0|
...
+----+---------------------------------------+
  • Another way to split based on percentiles in your data using QuantileDiscretizer
  • For instance, the 90th quantile is the point in your data at which 90% of the data is below that value. You can control how finely the buckets should be split by setting the relative error for the approximate quantiles calculation using setRelativeError.
from pyspark.ml.feature import QuantileDiscretizer
bucketer = QuantileDiscretizer().setNumBuckets(5).setInputCol("id")
fittedBucketer = bucketer.fit(contDF)
fittedBucketer.transform(contDF).show()

+----+----------------------------------------+
|  id|quantileDiscretizer_cd87d1a1fb8e__output|
+----+----------------------------------------+
| 0.0|                                     0.0|
...
| 6.0|                                     1.0|
| 7.0|                                     2.0|
...
|14.0|                                     3.0|
|15.0|                                     4.0|
...
+----+----------------------------------------+

More advanced bucketing techniques like LHS(locality sensitive hashing) are also available in MLlib.

Scaling and Normalizing🔗

  • Usually preffered when our data has multiple columns but based on different scale. E.g. Weight(in kgs) and Height(in feet). If we don’t scale or normalise, algorithm will becomes less sensitive to variations in heights because height values are much lower than weight values.
  • In MLlib we always apply normalisation to columns of type Vector. MLlib looks across all rows in a given column (of type Vector) and then treat every dimension in those vectors as its own particular column. Applying scaling or normalization function on each dimension separately.

Standard Scaler🔗

  • Standardizes a set of features to have zero mean and a standard deviation of 1.
  • flag withStd will scale the data to unit standard deviation, withMean (false default) will center data prior to scaling it.
  • NOTE: Centering can be very expensive on sparse vectors because it generally turns them into dense vectors, so be careful before centering your data.
from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+------------------------------------------------------------+
|id |features      |StandardScaler_41aaa6044e7c3467adc3__output                 |
+---+--------------+------------------------------------------------------------+
|0  |[1.0,0.1,-1.0]|[1.1952286093343936,0.02337622911060922,-0.5976143046671968]|
...
|1  |[3.0,10.1,3.0]|[3.5856858280031805,2.3609991401715313,1.7928429140015902]  |
+---+--------------+------------------------------------------------------------+

MinMaxScaler🔗

  • scale values in a vector (component wise) to a proportional values on a scale from a given min value to max value.
  • If you specify the minimum value to be 0 and the maximum value to be 1, then all the values will fall in between 0 and 1
from pyspark.ml.feature import MinMaxScaler
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()

+---+--------------+-----------------------------------------+
| id|      features|MinMaxScaler_460cbafafbe6b9ab7c62__output|
+---+--------------+-----------------------------------------+
|  0|[1.0,0.1,-1.0]|                            [5.0,5.0,5.0]|
...
|  1|[3.0,10.1,3.0]|                         [10.0,10.0,10.0]|
+---+--------------+-----------------------------------------+

MaxAbsScaler🔗

  • scales data by dividing each value by maximum absolute value in this feature.
  • All values end up between -1 and 1. NOTE: this transformer doesn’t shift or center data at all.
from pyspark.ml.feature import MaxAbsScaler
maScaler = MaxAbsScaler().setInputCol("features")
fittedmaScaler = maScaler.fit(scaleDF)
fittedmaScaler.transform(scaleDF).show()

+---+--------------+----------------------------------------------------------+
|id |features      |MaxAbsScaler_402587e1d9b6f268b927__output                 |
+---+--------------+----------------------------------------------------------+
|0  |[1.0,0.1,-1.0]|[0.3333333333333333,0.009900990099009901,-0.3333333333333]|
...
|1  |[3.0,10.1,3.0]|[1.0,1.0,1.0]                                             |
+---+--------------+----------------------------------------------------------+

ElementwiseProduct🔗

  • scale each value in a vector by an arbitrary value.
  • Naturally the dimensions of the scaling vector must match the dimensions of the vector inside the relevant column
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
scaleUpVec = Vectors.dense(10.0, 15.0, 20.0)
scalingUp = ElementwiseProduct()\
  .setScalingVec(scaleUpVec)\
  .setInputCol("features")
scalingUp.transform(scaleDF).show()

+---+--------------+-----------------------------------------------+
| id|      features|ElementwiseProduct_42b29ea5a55903e9fea6__output|
+---+--------------+-----------------------------------------------+
|  0|[1.0,0.1,-1.0]|                               [10.0,1.5,-20.0]|
...
|  1|[3.0,10.1,3.0]|                              [30.0,151.5,60.0]|
+---+--------------+-----------------------------------------------+

Normalizer🔗

  • The normalizer allows us to scale multidimensional vectors using one of several power norms, set through the parameter “p”
  • For example, we can use the Manhattan norm (or Manhattan distance) with p = 1, Euclidean norm with p = 2, and so on
from pyspark.ml.feature import Normalizer
manhattanDistance = Normalizer().setP(1).setInputCol("features")
manhattanDistance.transform(scaleDF).show()

+---+--------------+-------------------------------+
| id|      features|normalizer_1bf2cd17ed33__output|
+---+--------------+-------------------------------+
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  1|[3.0,10.1,3.0]|           [0.18633540372670...|
+---+--------------+-------------------------------+

Working with Categorical Features🔗

  • most common categorical task is indexing
  • Indexing converts a categorical variable in a column to a numerical one
  • In general, you should re-index every categorical variable when pre-processing just for the sake of consistency

StringIndexer🔗

  • maps strings to different numerical IDs, it also creates metadata attached to DataFrame that specify what inputs correspond to what outputs.
# in Python
from pyspark.ml.feature import StringIndexer
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|labelInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     1.0|
...
|  red| bad|     2|14.386294994851129|     0.0|
+-----+----+------+------------------+--------+
  • we can apply StringIndexer to columns that are not strings, in that case, they will be converted to strings before being indexed
  • String Indexer is an estimator that must be fit on the input data meaning it must see all inputs to select a mapping of inputs to IDs.
  • If StringIndexer is trained on a, b, c but you feed d as input, it will throw error, a way to handle this is set setHandleInvalid as skip.
valIndexer.setHandleInvalid("skip")
valIndexer.fit(simpleDF).setHandleInvalid("skip")

Converting Indexed Values Back to Text🔗

  • Use IndexToString to convert indexed values back to Text.
from pyspark.ml.feature import IndexToString
labelReverse = IndexToString().setInputCol("labelInd")
labelReverse.transform(idxRes).show()

+-----+----+------+------------------+--------+--------------------------------+
|color| lab|value1|            value2|labelInd|IndexToString_415...2a0d__output|
+-----+----+------+------------------+--------+--------------------------------+
|green|good|     1|14.386294994851129|     1.0|                            good|
...
|  red| bad|     2|14.386294994851129|     0.0|                             bad|
+-----+----+------+------------------+--------+--------------------------------+

Idexing in Vectors🔗

  • VectorIndexer is a helpful tool for working with categorical variables that are already found inside of vectors in your dataset.
  • It will automaticallyy find categorical features inside of input vectors and convert them to categorical features with zero-based category indices.
  • By setting maxCategories to 2 in our VectorIndexer, we are instructing Spark to take any column in our vector with two or less distinct values and convert it to a categorical variable.
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.linalg import Vectors
idxIn = spark.createDataFrame([
  (Vectors.dense(1, 2, 3),1),
  (Vectors.dense(2, 5, 6),2),
  (Vectors.dense(1, 8, 9),3)
]).toDF("features", "label")
indxr = VectorIndexer()\
  .setInputCol("features")\
  .setOutputCol("idxed")\
  .setMaxCategories(2)
indxr.fit(idxIn).transform(idxIn).show()

+-------------+-----+-------------+
|     features|label|        idxed|
+-------------+-----+-------------+
|[1.0,2.0,3.0]|    1|[0.0,2.0,3.0]|
|[2.0,5.0,6.0]|    2|[1.0,5.0,6.0]|
|[1.0,8.0,9.0]|    3|[0.0,8.0,9.0]|
+-------------+-----+-------------+

One-Hot Encoding🔗

  • common transformation after indexing categorical variables, as after encoding colors lets say we have blue for 1 and green for 2, then it appears to have greater value than other color, but for us these might be independent.
  • To avoid this, we use OneHotEncoder, which will convert each distinct value to a Boolean flag (1 or 0) as a component in a vector.
from pyspark.ml.feature import OneHotEncoder, StringIndexer
lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
ohe = OneHotEncoder().setInputCol("colorInd")
ohe.transform(colorLab).show()

+-----+--------+------------------------------------------+
|color|colorInd|OneHotEncoder_46b5ad1ef147bb355612__output|
+-----+--------+------------------------------------------+
|green|     1.0|                             (2,[1],[1.0])|
| blue|     2.0|                                 (2,[],[])|
...
|  red|     0.0|                             (2,[0],[1.0])|
|  red|     0.0|                             (2,[0],[1.0])|
+-----+--------+------------------------------------------+

Text Data Transformer🔗

  • Text is always a tricky input because it often requires lots of manipulation to map to a format easier to train ML models.
  • There are two types of text : free-from text and string categorical variables

Tokenizing Text🔗

  • Tokenization is process of converting free-form text into lits of tokens or individual words. Easiest way to do this is using Tokenizer class.
from pyspark.ml.feature import Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn.transform(sales.select("Description"))
tokenized.show(20, False)

+-----------------------------------+------------------------------------------+
|Description                        DescOut                                    |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT                 |[rabbit, night, light]                    |
|DOUGHNUT LIP GLOSS                 |[doughnut, lip, gloss]                    |
...
|AIRLINE BAG VINTAGE WORLD CHAMPION |[airline, bag, vintage, world, champion]  |
|AIRLINE BAG VINTAGE JET SET BROWN  |[airline, bag, vintage, jet, set, brown]  |
+-----------------------------------+------------------------------------------+
  • regular expression based Tokenizer
from pyspark.ml.feature import RegexTokenizer
rt = RegexTokenizer()\
  .setInputCol("Description")\
  .setOutputCol("DescOut")\
  .setPattern(" ")\
  .setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

+-----------------------------------+------------------------------------------+
|Description                        DescOut                                    |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT                 |[rabbit, night, light]                    |
|DOUGHNUT LIP GLOSS                 |[doughnut, lip, gloss]                    |
...
|AIRLINE BAG VINTAGE WORLD CHAMPION |[airline, bag, vintage, world, champion]  |
|AIRLINE BAG VINTAGE JET SET BROWN  |[airline, bag, vintage, jet, set, brown]  |
+-----------------------------------+------------------------------------------+
  • Another way using is to matching output values with the provided pattern instead of using for split(gaps)
from pyspark.ml.feature import RegexTokenizer
rt = RegexTokenizer()\
  .setInputCol("Description")\
  .setOutputCol("DescOut")\
  .setPattern(" ")\
  .setGaps(False)\
  .setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

Removing common Words🔗

  • Common operation after tokenization is to filter stop words, common words which do not have any relevance in many kinds of analysis and should be removed. Like the, and, and but. Spark already has this list.
# in Python
from pyspark.ml.feature import StopWordsRemover
# supported languages for stopwords are “danish,” “dutch,” “english,” “finnish,” “french,” 
# “german,” “hungarian,” “italian,” “norwegian,” “portuguese,” “russian,” “spanish,” 
# “swedish,” and “turkish”
englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
stops = StopWordsRemover()\
  .setStopWords(englishStopWords)\
  .setInputCol("DescOut")
stops.transform(tokenized).show()

+--------------------+--------------------+------------------------------------+
|         Description|             DescOut|StopWordsRemover_4ab18...6ed__output|
+--------------------+--------------------+------------------------------------+
...
|SET OF 4 KNICK KN...|[set, of, 4, knic...|                [set, 4, knick, k...|
...
+--------------------+--------------------+------------------------------------+

Creating Word Combinations🔗

  • After tokenization and filtering stop words, It is usually of interest to look few words together. Word Combinations are techincally called as n-grams
  • An n-gram of length 1 is called a unigrams; those of length 2 are called bigrams, and those of length 3 are called trigrams and so-on
  • Order of n-gram matters. The goal when creating n-grams is to better capture sentence structure and more information than can be gleaned by simply looking at all words individually.

Example:

  • The bigrams of “Big Data Processing Made Simple” are:
    • “Big Data”
    • “Data Processing”
    • “Processing Made”
    • “Made Simple”
  • While the trigrams are:
    • “Big Data Processing”
    • “Data Processing Made”
    • “Procesing Made Simple”
from pyspark.ml.feature import NGram
unigram = NGram().setInputCol("DescOut").setN(1)
bigram = NGram().setInputCol("DescOut").setN(2)
unigram.transform(tokenized.select("DescOut")).show(False)
bigram.transform(tokenized.select("DescOut")).show(False)

Bigrams Output
+------------------------------------------+------------------------------------
DescOut                                    |ngram_6e68fb3a642a__output       ...
+------------------------------------------+------------------------------------
|[rabbit, night, light]                    |[rabbit night, night light]      ...
|[doughnut, lip, gloss]                    |[doughnut lip, lip gloss]        ...
...
|[airline, bag, vintage, world, champion]  |[airline bag, bag vintage, vintag...
|[airline, bag, vintage, jet, set, brown]  |[airline bag, bag vintage, vintag...
+------------------------------------------+------------------------------------

Converting Words into Numerical Representation🔗

  • next task is to count instances of words and word combinaiton for use in our models, we include binary counts of a word in a given document.
  • Essentially, we’re measuring whether or not each row contains a given word. This is a simple way to normalize for document sizes and occurrence counts and get numerical features that allow us to classify documents based on content.
  • We can count words using a CountVectorizer or reweigh them according to prevalence of a given word in all the documents using TF-IDF transformation.
  • A CountrVectorizer operates on our tokenized data and does two operation
    • During the fit process, it finds the set of words in all the documents and then counts the occurrences of those words in those documents.
    • It then counts the occurrences of a given word in each row of the DataFrame column during the transformation process and outputs a vector with the terms that occur in that row.
  • Conceptually this tranformer treats every row as a document and every word as a term and the total collection of all terms as the vocabulary. These are all tunable parameters, meaning we can set the minimum term frequency (minTF) for the term to be included in the vocabulary (effectively removing rare words from the vocabulary); minimum number of documents a term must appear in (minDF) before being included in the vocabulary (another way to remove rare words from the vocabulary); and finally, the total maximum vocabulary size (vocabSize).
  • By default the CountVectorizer will output the counts of a term in a document. To just return whether or not a word exists in a document, we can use setBinary(true).
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer()\
  .setInputCol("DescOut")\
  .setOutputCol("countVec")\
  .setVocabSize(500)\
  .setMinTF(1)\
  .setMinDF(2)
fittedCV = cv.fit(tokenized)
fittedCV.transform(tokenized).show(False)

+---------------------------------+--------------------------------------------+
DescOut                           |countVec                                    |
+---------------------------------+--------------------------------------------+
|[rabbit, night, light]           |(500,[150,185,212],[1.0,1.0,1.0])           |
|[doughnut, lip, gloss]           |(500,[462,463,492],[1.0,1.0,1.0])           |
...
|[airline, bag, vintage, world,...|(500,[2,6,328],[1.0,1.0,1.0])               |
|[airline, bag, vintage, jet, s...|(500,[0,2,6,328,405],[1.0,1.0,1.0,1.0,1.0]) |
+---------------------------------+--------------------------------------------+

Term frequency-inverse document frequency🔗

  • Another way to approach the problem of converting text into a numerical representation is to use term frequency–inverse document frequency
  • TF–IDF measures how often a word occurs in each document, weighted according to how many documents that word occurs in. The result is that words that occur in a few documents are given more weight than words that occur in many documents. E.g. words like a, an, the are given less weight as compared to word like streaming
tfIdfIn = tokenized\
  .where("array_contains(DescOut, 'red')")\
  .select("DescOut")\
  .limit(10)
tfIdfIn.show(10, False)

+---------------------------------------+
DescOut                                 |
+---------------------------------------+
|[gingham, heart, , doorstop, red]      |
...
|[red, retrospot, oven, glove]          |
|[red, retrospot, plate]                |
+---------------------------------------+

# applying TF-IDF on above Df which has red on every line
from pyspark.ml.feature import HashingTF, IDF
tf = HashingTF()\
  .setInputCol("DescOut")\
  .setOutputCol("TFOut")\
  .setNumFeatures(10000)
idf = IDF()\
  .setInputCol("TFOut")\
  .setOutputCol("IDFOut")\
  .setMinDocFreq(2)

idf.fit(tf.transform(tfIdfIn)).transform(tf.transform(tfIdfIn)).show(10, False)

# ~ Output ~ Notice red is given very low weight.
(10000,[2591,4291,4456],[1.0116009116784799,0.0,0.0])

Word2Vec🔗

  • Word2Vec is a deep learning–based tool for computing a vector representation of a set of words. The goal is to have similar words close to one another in this vector space, so we can then make generalizations about the words themselves.
  • This model is easy to train and use, and has been shown to be useful in a number of natural language processing applications, including entity recognition, disambiguation, parsing, tagging, and machine translation.
  • Word2Vec is notable for capturing relationships between words based on their semantics. For example, if v~king, v~queen, v~man, and v~women represent the vectors for those four words, then we will often get a representation where v~king − v~man + v~woman ~= v~queen. To do this, Word2Vec uses a technique called “skip-grams” to convert a sentence of words into a vector representation (optionally of a specific size). It does this by building a vocabulary, and then for every sentence, it removes a token and trains the model to predict the missing token in the "n-gram” representation. Word2Vec works best with continuous, free-form text in the form of tokens.
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text",
  outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] =>
Vector: [-0.008142343163490296,0.02051363289356232,0.03255096450448036]

Text: [I, wish, Java, could, use, case, classes] =>
Vector: [0.043090314205203734,0.035048123182994974,0.023512658663094044]

Text: [Logistic, regression, models, are, neat] =>
Vector: [0.038572299480438235,-0.03250147425569594,-0.01552378609776497]

Feature Manipulation🔗

  • following algorithms and tools are automated means to either expand feature vector or reduce dimensionality of features

PCA🔗

  • Principal Components Analysis (PCA) is a mathematical technique for finding the most important aspects of our data (the principal components). It changes the feature representation of our data by creating a new set of features (“aspects”). Each new feature is a combination of the original features.
  • https://en.wikipedia.org/wiki/Principal_component_analysis
  • Using PCA, we can find the most important combinations of features and only include those in our machine learning model. PCA takes a parameter 𝘬, specifying the number of output features to create. Generally, this should be much smaller than your input vectors’ dimension.
from pyspark.ml.feature import PCA
pca = PCA().setInputCol("features").setK(2)
pca.fit(scaleDF).transform(scaleDF).show(20, False)

+---+--------------+------------------------------------------+
|id |features      |pca_7c5c4aa7674e__output                  |
+---+--------------+------------------------------------------+
|0  |[1.0,0.1,-1.0]|[0.0713719499248418,-0.4526654888147822]  |
...
|1  |[3.0,10.1,3.0]|[-10.872398139848944,0.030962697060150646]|
+---+--------------+------------------------------------------+

Interaction🔗

  • In some cases, you might have domain knowledge about specific variables in your dataset.
  • For example, you might know that a certain interaction between the two variables is an important variable to include in a downstream estimator.
  • The feature transformer Interaction allows you to create an interaction between two variables manually. It just multiplies the two features together—something that a typical linear model would not do for every possible pair of features in your data.

Polynomial Expansion🔗

  • Polynomial expansion is used to generate interaction variables of all the input columns. With polynomial expansion, we specify to what degree we would like to see various interactions. For example, for a degree-2 polynomial, Spark takes every value in our feature vector, multiplies it by every other value in the feature vector, and then stores the results as features.
  • For instance, if we have two input features, we’ll get four output features if we use a second degree polynomial (2x2). If we have three input features, we’ll get nine output features (3x3).
from pyspark.ml.feature import PolynomialExpansion
pe = PolynomialExpansion().setInputCol("features").setDegree(2)
pe.transform(scaleDF).show()

+---+--------------+-----------------------------------------------------------+
|id |features      |poly_9b2e603812cb__output                                  |
+---+--------------+-----------------------------------------------------------+
|0  |[1.0,0.1,-1.0]|[1.0,1.0,0.1,0.1,0.010000000000000002,-1.0,-1.0,-0.1,1.0]  |
...
|1  |[3.0,10.1,3.0]|[3.0,9.0,10.1,30.299999999999997,102.00999999999999,3.0... |
+---+--------------+-----------------------------------------------------------+

Feature Selection🔗

  • Often, you will have a large range of possible features and want to select a smaller subset to use for training.
  • This process is called feature selection. There are a number of ways to evaluate feature importance once you’ve trained a model but another option is to do some rough filtering beforehand. Spark has some simple options for doing that, such as ChiSqSelector

ChiSqSelector🔗

  • ChiSqSelector leverages a statistical test to identify features that are not independent from the label we are trying to predict, and drop the uncorrelated features. It’s often used with categorical data in order to reduce the number of features you will input into your model, as well as to reduce the dimensionality of text data (in the form of frequencies or counts)
  • Since this method is based on the Chi-Square test, there are several different ways we can pick the “best” features. The methods are numTopFeatures, which is ordered by p-value; percentile, which takes a proportion of the input features (instead of just the top N features); and fpr, which sets a cut off p-value.
from pyspark.ml.feature import ChiSqSelector, Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn\
  .transform(sales.select("Description", "CustomerId"))\
  .where("CustomerId IS NOT NULL")
prechi = fittedCV.transform(tokenized)\
  .where("CustomerId IS NOT NULL")
chisq = ChiSqSelector()\
  .setFeaturesCol("countVec")\
  .setLabelCol("CustomerId")\
  .setNumTopFeatures(2)
chisq.fit(prechi).transform(prechi)\
  .drop("customerId", "Description", "DescOut").show()

Advanced Topics🔗

Persisting Transformer🔗

  • once we have used an estimator to configure a transformer, it can be helpful to write it to disk and simply load when needed.
# save
fittedPCA = pca.fit(scaleDF)
fittedPCA.write().overwrite().save("/tmp/fittedPCA")

# load
from pyspark.ml.feature import PCAModel
loadedPCA = PCAModel.load("/tmp/fittedPCA")
loadedPCA.transform(scaleDF).show()

Writing a Custom Transformer🔗

Writing a custom transformer can be valuable when you want to encode some of your own business logic in a form that you can fit into an ML Pipeline, pass on to hyperparameter search, and so on. In general you should try to use the built-in modules (e.g., SQLTransformer) as much as possible because they are optimized to run efficiently

  • A simple tokenizer example
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable,
  Identifiable}
import org.apache.spark.sql.types.{ArrayType, StringType, DataType}
import org.apache.spark.ml.param.{IntParam, ParamValidators}

class MyTokenizer(override val uid: String)
  extends UnaryTransformer[String, Seq[String],
    MyTokenizer] with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("myTokenizer"))

  val maxWords: IntParam = new IntParam(this, "maxWords",
    "The max number of words to return.",
  ParamValidators.gtEq(0))

  def setMaxWords(value: Int): this.type = set(maxWords, value)

  def getMaxWords: Integer = $(maxWords)

  override protected def createTransformFunc: String => Seq[String] = (
    inputString: String) => {
      inputString.split("\\s").take($(maxWords))
  }

  override protected def validateInputType(inputType: DataType): Unit = {
    require(
      inputType == StringType, s"Bad input type: $inputType. Requires String.")
  }

  override protected def outputDataType: DataType = new ArrayType(StringType,
    true)
}

// this will allow you to read it back in by using this object.
object MyTokenizer extends DefaultParamsReadable[MyTokenizer]
val myT = new MyTokenizer().setInputCol("someCol").setMaxWords(2)
myT.transform(Seq("hello world. This text won't show.").toDF("someCol")).show()