CHAPTER 6 - Supervised Learning with Spark

Liner Regression with Single Variable

In [1]:
# prepare for installation of pyspark by findspark 
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
In [2]:
from pyspark.sql import SparkSession
In [3]:
spark = SparkSession.builder.appName("SingleVaribleLinearReg").getOrCreate()
In [4]:
from pyspark.ml.regression import LinearRegression
In [5]:
data = spark.read.csv('/home/yoshi-1/ダウンロード/spark-with-python-master/single_variable_regression.csv',
                     header=True,
                     inferSchema=True)
In [6]:
print("Initial Data")
data.show()
Initial Data
+----------+----------+
|house_size|price_sold|
+----------+----------+
|      1490|        60|
|      2500|        95|
|      1200|        55|
|       900|        45|
|      1300|        56|
|      1000|        50|
|       850|        43|
|       750|        40|
|      2000|        80|
|      1600|        70|
+----------+----------+

In [7]:
# importing the VectorAssembler to convert the features into spark accepted format
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
In [8]:
# Step2 - Data pre-processing and converting the data to spark accepted format

# converting the feature(s) into spark accepted data format
assembler_object = VectorAssembler(
                            inputCols=['house_size'],
                            outputCol='house_size_feature'
                            )

feature_vector_dataframe = assembler_object.transform(data)
In [9]:
print("Data after adding house_size columns as a spark accepted feature")
feature_vector_dataframe.show()
Data after adding house_size columns as a spark accepted feature
+----------+----------+------------------+
|house_size|price_sold|house_size_feature|
+----------+----------+------------------+
|      1490|        60|          [1490.0]|
|      2500|        95|          [2500.0]|
|      1200|        55|          [1200.0]|
|       900|        45|           [900.0]|
|      1300|        56|          [1300.0]|
|      1000|        50|          [1000.0]|
|       850|        43|           [850.0]|
|       750|        40|           [750.0]|
|      2000|        80|          [2000.0]|
|      1600|        70|          [1600.0]|
+----------+----------+------------------+

In [10]:
feature_vector_dataframe.printSchema()
root
 |-- house_size: integer (nullable = true)
 |-- price_sold: integer (nullable = true)
 |-- house_size_feature: vector (nullable = true)

In [11]:
formatted_data = feature_vector_dataframe.select('house_size_feature', 'price_sold')
In [12]:
print("Consolidated Data with accepted features and labels")
formatted_data.show()
Consolidated Data with accepted features and labels
+------------------+----------+
|house_size_feature|price_sold|
+------------------+----------+
|          [1490.0]|        60|
|          [2500.0]|        95|
|          [1200.0]|        55|
|           [900.0]|        45|
|          [1300.0]|        56|
|          [1000.0]|        50|
|           [850.0]|        43|
|           [750.0]|        40|
|          [2000.0]|        80|
|          [1600.0]|        70|
+------------------+----------+

In [13]:
# Step 3 - Training our Linear Regression model with single variable

# splitting the data into 70 and 30 percent
train_data, test_data = formatted_data.randomSplit([0.7, 0.3])
In [14]:
# Defining our Linear Regression
lireg = LinearRegression(featuresCol='house_size_feature', labelCol='price_sold')
In [15]:
# Training our model with training data
lireg_model = lireg.fit(train_data)
In [16]:
# Step 4 - Evaluating of Trained Model

# Evaluating our model with testing data
test_result = lireg_model.evaluate(test_data)
In [17]:
print("Residuals info - distance between data points and fitted regression line")
test_result.residuals.show()
Residuals info - distance between data points and fitted regression line
+------------------+
|         residuals|
+------------------+
|0.6660785214189673|
+------------------+

In [18]:
print("Root Mean Square Error {}".format(test_result.rootMeanSquaredError))
Root Mean Square Error 0.6660785214189673
In [19]:
print("R square value {}".format(test_result.r2))
R square value -inf
In [20]:
# Step 5 - Peforming Predictions with novel data

# Creating unlabeld data from test data by removing the label in order to get predictions
unlabeled_data = test_data.select('house_size_feature')

predictions = lireg_model.transform(unlabeled_data)
In [21]:
print("\nPredictions for Novel Data")
predictions.show()
Predictions for Novel Data
+------------------+-----------------+
|house_size_feature|       prediction|
+------------------+-----------------+
|          [1200.0]|54.33392147858103|
+------------------+-----------------+

In [22]:
# Checking our model with new value manually
house_size_coeff = lireg_model.coefficients[0] # 係数
intercept = lireg_model.intercept # 切片

print("Cofficient is {}".format(house_size_coeff))
print("Intercept is {}".format(intercept))
Cofficient is 0.03241379310344824
Intercept is 15.982758620689705
In [23]:
lireg_model.coefficients
Out[23]:
DenseVector([0.0324])
In [24]:
new_house_size = 950

# Mimicking the hypothesis function to get a prediction
price = (intercept) + (house_size_coeff) * new_house_size

print("Predicted house price for house size {} is {}".format(new_house_size, price))
Predicted house price for house size 950 is 46.77586206896553
In [ ]:
 

Linear Regression with Multiple Variables

In [27]:
from pyspark.sql import SparkSession
In [28]:
spark = SparkSession.builder.appName('MultiVariableLinerReg').getOrCreate()
In [30]:
from pyspark.ml.regression import LinearRegression
In [25]:
data = spark.read.csv('/home/yoshi-1/ダウンロード/spark-with-python-master/multi_variable_regression.csv',
                     header=True,
                     inferSchema=True)
In [26]:
print("Initial Data")
data.show()
Initial Data
+----------+--------+------+---------+--------------+----------+
|house_size|bedrooms|floors|house_age|          area|price_sold|
+----------+--------+------+---------+--------------+----------+
|      1490|       2|     2|       10|    Ave Avenue|        60|
|      2500|       3|     2|       20|    Ave Avenue|        95|
|      1200|       2|     1|        5|       MG Road|        55|
|       900|       2|     2|       15|       MG Road|        45|
|      1350|       2|     2|        5|Dollors Colony|        65|
|      1000|       2|     1|        2|Dollors Colony|        60|
|       850|       1|     1|        5|   Wall Street|        40|
|       750|       1|     1|        1|   Wall Street|        40|
|      2000|       3|     1|        2|    Ave Avenue|        85|
|      1600|       2|     1|       20|       MG Road|        65|
+----------+--------+------+---------+--------------+----------+

In [33]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
In [27]:
# Step2 - Data pre-processing and converting any string data to spark accepted format

# importing the StringIndexer to convert the locality feature into spark accepted format
from pyspark.ml.feature import StringIndexer
In [28]:
# convert the locality feature of string type into spark accepted data format
string_indexer_object = StringIndexer(inputCol='area', outputCol='area_feature')

string_indexed_df_object = string_indexer_object.fit(data)

final_data = string_indexed_df_object.transform(data)

print("Data after converting the string column locality into spark accepted feature")
final_data.show()
Data after converting the string column locality into spark accepted feature
+----------+--------+------+---------+--------------+----------+------------+
|house_size|bedrooms|floors|house_age|          area|price_sold|area_feature|
+----------+--------+------+---------+--------------+----------+------------+
|      1490|       2|     2|       10|    Ave Avenue|        60|         0.0|
|      2500|       3|     2|       20|    Ave Avenue|        95|         0.0|
|      1200|       2|     1|        5|       MG Road|        55|         1.0|
|       900|       2|     2|       15|       MG Road|        45|         1.0|
|      1350|       2|     2|        5|Dollors Colony|        65|         2.0|
|      1000|       2|     1|        2|Dollors Colony|        60|         2.0|
|       850|       1|     1|        5|   Wall Street|        40|         3.0|
|       750|       1|     1|        1|   Wall Street|        40|         3.0|
|      2000|       3|     1|        2|    Ave Avenue|        85|         0.0|
|      1600|       2|     1|       20|       MG Road|        65|         1.0|
+----------+--------+------+---------+--------------+----------+------------+

In [29]:
print(final_data.columns)
['house_size', 'bedrooms', 'floors', 'house_age', 'area', 'price_sold', 'area_feature']
In [30]:
# Step3 - Data pre-processing and converting the numeric data to spark accepted format

# converting the feature(s) into spark accepted data format
# Passing multiple columns as the input columns
assember_object = VectorAssembler(
                    inputCols=['house_size', 'bedrooms', 'floors', 'house_age', 'area_feature'],
                    outputCol='house_features')

feature_vector_dataframe = assember_object.transform(final_data)

print(feature_vector_dataframe.show())
+----------+--------+------+---------+--------------+----------+------------+--------------------+
|house_size|bedrooms|floors|house_age|          area|price_sold|area_feature|      house_features|
+----------+--------+------+---------+--------------+----------+------------+--------------------+
|      1490|       2|     2|       10|    Ave Avenue|        60|         0.0|[1490.0,2.0,2.0,1...|
|      2500|       3|     2|       20|    Ave Avenue|        95|         0.0|[2500.0,3.0,2.0,2...|
|      1200|       2|     1|        5|       MG Road|        55|         1.0|[1200.0,2.0,1.0,5...|
|       900|       2|     2|       15|       MG Road|        45|         1.0|[900.0,2.0,2.0,15...|
|      1350|       2|     2|        5|Dollors Colony|        65|         2.0|[1350.0,2.0,2.0,5...|
|      1000|       2|     1|        2|Dollors Colony|        60|         2.0|[1000.0,2.0,1.0,2...|
|       850|       1|     1|        5|   Wall Street|        40|         3.0|[850.0,1.0,1.0,5....|
|       750|       1|     1|        1|   Wall Street|        40|         3.0|[750.0,1.0,1.0,1....|
|      2000|       3|     1|        2|    Ave Avenue|        85|         0.0|[2000.0,3.0,1.0,2...|
|      1600|       2|     1|       20|       MG Road|        65|         1.0|[1600.0,2.0,1.0,2...|
+----------+--------+------+---------+--------------+----------+------------+--------------------+

None
In [31]:
feature_vector_dataframe.printSchema()
root
 |-- house_size: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- house_age: integer (nullable = true)
 |-- area: string (nullable = true)
 |-- price_sold: integer (nullable = true)
 |-- area_feature: double (nullable = false)
 |-- house_features: vector (nullable = true)

In [32]:
# choose formatted feature col and target variable
formatted_data = feature_vector_dataframe.select('house_features', 'price_sold')
In [33]:
formatted_data.show()
+--------------------+----------+
|      house_features|price_sold|
+--------------------+----------+
|[1490.0,2.0,2.0,1...|        60|
|[2500.0,3.0,2.0,2...|        95|
|[1200.0,2.0,1.0,5...|        55|
|[900.0,2.0,2.0,15...|        45|
|[1350.0,2.0,2.0,5...|        65|
|[1000.0,2.0,1.0,2...|        60|
|[850.0,1.0,1.0,5....|        40|
|[750.0,1.0,1.0,1....|        40|
|[2000.0,3.0,1.0,2...|        85|
|[1600.0,2.0,1.0,2...|        65|
+--------------------+----------+

In [34]:
# Step4 - Training our Linear Regression model with multiple variables

# Splitting the data into 60 and 40 percent
train_data, test_data = formatted_data.randomSplit([0.6, 0.4])
In [35]:
# Defining our Linear regression
lireg = LinearRegression(featuresCol='house_features', labelCol='price_sold')
In [36]:
# Training our model with training data
lireg_model = lireg.fit(train_data)
In [37]:
# Step5 - Evaluating of Trained data

# Evaluating our model with testing data
test_results = lireg_model.evaluate(test_data)
In [38]:
print("Residuals info - distance between data points and fitted regression line\n")
test_results.residuals.show()
Residuals info - distance between data points and fitted regression line

+-------------------+
|          residuals|
+-------------------+
| -5.406360424028264|
| 3.9752650176684767|
| -11.48409893992914|
|-0.9187279151935854|
+-------------------+

In [40]:
print("Root Mean Square Error {}".format(test_results.rootMeanSquaredError))
Root Mean Square Error 6.666334345789959
In [41]:
print("R square value {}".format(test_results.r2))
R square value 0.8222399455605647
In [42]:
# Step6 - Performing Predictions with novel data

# Creating unlabeled data from test data by removing the label in order to get predictions
unlabeled_data = test_data.select('house_features')
predictions = lireg_model.transform(unlabeled_data)
In [43]:
print('Predictions for Novel Data')
predictions.show()
Predictions for Novel Data
+--------------------+------------------+
|      house_features|        prediction|
+--------------------+------------------+
|[1200.0,2.0,1.0,5...|60.406360424028264|
|[1600.0,2.0,1.0,2...| 61.02473498233152|
|[2000.0,3.0,1.0,2...| 96.48409893992914|
|[2500.0,3.0,2.0,2...| 95.91872791519359|
+--------------------+------------------+

In [44]:
test_data.show()
+--------------------+----------+
|      house_features|price_sold|
+--------------------+----------+
|[1200.0,2.0,1.0,5...|        55|
|[1600.0,2.0,1.0,2...|        65|
|[2000.0,3.0,1.0,2...|        85|
|[2500.0,3.0,2.0,2...|        95|
+--------------------+----------+

In [46]:
# Checking our model with new value manually

print('Coeffecients are {}'.format(lireg_model.coefficients))
Coeffecients are [0.024734982332155053,17.120141342756117,-1.802120141342574,-0.6183745583039176,2.6855123674910257]
In [47]:
print('Intercept is {}'.format(lireg_model.intercept))
Intercept is -1.307420494698906
In [ ]:
 

Logistic Regression with Spark

In [3]:
spark = SparkSession.builder.appName('SparkLogReg').getOrCreate()

Step1: Importing data

In [4]:
data = spark.read.csv('brain_tumor_dataset.csv', header=True, inferSchema=True)
data.show(3)
+------+---+----+----------+---------+
|  name|age| sex|tumor_size|cancerous|
+------+---+----+----------+---------+
|Roland| 58|Male|       7.0|        1|
| Adolf| 65|Male|       9.0|        1|
| Klaus| 50|Male|       3.0|        0|
+------+---+----+----------+---------+
only showing top 3 rows

Step2: Data pre-processing and converting any categorical data to spark accepted format

In [5]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, OneHotEncoder

Formatting the categorical column -sex

In [6]:
# Creating a String Indexer - To convert every string into a unique number
sex_string_indexer_direct = StringIndexer(inputCol='sex', outputCol='sexIndexer')

indexed_data = sex_string_indexer_direct.fit(data)

final_string_indexed_data = indexed_data.transform(data)
In [7]:
final_string_indexed_data.show(3)
# Male - 1 and Female - 0
+------+---+----+----------+---------+----------+
|  name|age| sex|tumor_size|cancerous|sexIndexer|
+------+---+----+----------+---------+----------+
|Roland| 58|Male|       7.0|        1|       0.0|
| Adolf| 65|Male|       9.0|        1|       0.0|
| Klaus| 50|Male|       3.0|        0|       0.0|
+------+---+----+----------+---------+----------+
only showing top 3 rows

In [8]:
# Performing OneHotEncoding - convert this value(SexIndexer) into an array form
sex_encoder_direct = OneHotEncoder(inputCol='sexIndexer', outputCol='sexVector')

fit_encoded_data = sex_encoder_direct.fit(final_string_indexed_data)

encoded_data = fit_encoded_data.transform(final_string_indexed_data)

encoded_data.show(3)
+------+---+----+----------+---------+----------+-------------+
|  name|age| sex|tumor_size|cancerous|sexIndexer|    sexVector|
+------+---+----+----------+---------+----------+-------------+
|Roland| 58|Male|       7.0|        1|       0.0|(1,[0],[1.0])|
| Adolf| 65|Male|       9.0|        1|       0.0|(1,[0],[1.0])|
| Klaus| 50|Male|       3.0|        0|       0.0|(1,[0],[1.0])|
+------+---+----+----------+---------+----------+-------------+
only showing top 3 rows

In [9]:
assember_direct = VectorAssembler(inputCols=['age', 'sexVector', 'tumor_size'], outputCol='features')

assembler_data = assember_direct.transform(encoded_data)

final_data_direct = assembler_data.select('features', 'cancerous')

print("Conslidated Data with accepted features and labels")
final_data_direct.show(3)
Conslidated Data with accepted features and labels
+--------------+---------+
|      features|cancerous|
+--------------+---------+
|[58.0,1.0,7.0]|        1|
|[65.0,1.0,9.0]|        1|
|[50.0,1.0,3.0]|        0|
+--------------+---------+
only showing top 3 rows

Step3: Training our Logistic Regression model

In [10]:
from pyspark.ml.classification import LogisticRegression
In [11]:
logreg_direct = LogisticRegression(featuresCol='features', labelCol='cancerous')

train_data_direct, test_data_direct = final_data_direct.randomSplit([0.6, 0.4])

logreg_model_direct = logreg_direct.fit(train_data_direct)

Step4: Evaluating and performing Predictions on our model

In [12]:
# Evaluating our model with testing data

# Direct Evaluation using Trivial method
predictions_labels = logreg_model_direct.evaluate(test_data_direct)

print("Prediction Data")
predictions_labels.predictions.select(['features', 'cancerous', 'prediction']).show(3)
Prediction Data
+--------------+---------+----------+
|      features|cancerous|prediction|
+--------------+---------+----------+
|[26.0,0.0,2.0]|        0|       1.0|
|[40.0,0.0,6.6]|        1|       1.0|
|[52.0,1.0,8.7]|        1|       1.0|
+--------------+---------+----------+
only showing top 3 rows

In [13]:
# Evaluation using BinaryClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
In [14]:
direct_evaluation = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='cancerous')

AUC_direct = direct_evaluation.evaluate(predictions_labels.predictions)

print("Area Under the Curve value is {}".format(AUC_direct))

print("\nCoeffecients are {}".format(logreg_model_direct.coefficients))

print("\nIntercept is {}".format(logreg_model_direct.intercept))
Area Under the Curve value is 0.5

Coeffecients are [-0.7583473809034436,-73.10537929426606,24.186552233457554]

Intercept is -24.21681066070073
In [ ]:
 

Pipelines

  • A Spark Pipeline is specified as a sequence of stages, & each stage is either a Transformer or an Estimator. These stages are run in order, and the input Data Frame is transformed as it passes through each stage.
In [18]:
from pyspark.ml import Pipeline

from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, OneHotEncoder
In [19]:
spark = SparkSession.builder.appName('SparkLogReg').getOrCreate()
In [20]:
data = spark.read.csv('brain_tumor_dataset.csv', header=True, inferSchema=True)
data.show(4)
+------+---+------+----------+---------+
|  name|age|   sex|tumor_size|cancerous|
+------+---+------+----------+---------+
|Roland| 58|  Male|       7.0|        1|
| Adolf| 65|  Male|       9.0|        1|
| Klaus| 50|  Male|       3.0|        0|
|  Rosh| 26|Female|       2.0|        0|
+------+---+------+----------+---------+
only showing top 4 rows

In [21]:
# Stage 1
sex_string_indexer = StringIndexer(inputCol='sex', outputCol='sexIndexer')

# Stage 2
sex_encoder = OneHotEncoder(inputCol='sexIndexer', outputCol='sexVector')

# Stage 3
assembler = VectorAssembler(inputCols=['age', 'sexVector', 'tumor_size'], outputCol='features')

# Stage 4
logreg = LogisticRegression(featuresCol='features', labelCol='cancerous')
In [22]:
# passing the 4 stages directly into a pipeline object
pipeline_object = Pipeline(stages=[sex_string_indexer, sex_encoder, assembler, logreg])

train_data, test_data = data.randomSplit([0.6, 0.4])

logreg_model = pipeline_object.fit(train_data)

model_results = logreg_model.transform(test_data)

print("Pridiction Data")
model_results.select('cancerous', 'prediction').show(4)
Pridiction Data
+---------+----------+
|cancerous|prediction|
+---------+----------+
|        1|       1.0|
|        1|       1.0|
|        1|       1.0|
|        0|       0.0|
+---------+----------+
only showing top 4 rows

In [23]:
evaluation_object = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='cancerous')

AUC = evaluation_object.evaluate(model_results)

print("Area Under the Curve value is {}".format(AUC))
Area Under the Curve value is 0.875
In [ ]:
 
In [ ]: