CHAPTER 7 - Tree Methods with Spark

In [1]:
# prepare for installation of pyspark by findspark 
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')

from pyspark.sql import SparkSession

Decision Trees

Decision Tree Classifier

Step1: Importing the data

In [2]:
spark = SparkSession.builder.appName('SparkTrees').getOrCreate()

data = spark.read.csv('breast-cancer.csv', header=True, inferSchema=True)

data.show(4)
+---------+---------+----------+-----+
|thickness|cell_size|cell_shape|label|
+---------+---------+----------+-----+
|        5|        1|         1|    2|
|        5|        4|         4|    2|
|        3|        1|         1|    2|
|        6|        8|         8|    2|
+---------+---------+----------+-----+
only showing top 4 rows

Step2: Data pre-procissing and convering data to spark accepted format

In [3]:
from pyspark.ml.feature import VectorAssembler
In [4]:
assembler = VectorAssembler(inputCols=['thickness', 'cell_size', 'cell_shape'], outputCol='features')

assembler_data = assembler.transform(data)

final_data = assembler_data.select('features', 'label')

print("Consolidated Data with features and labels")
final_data.show(4)
Consolidated Data with features and labels
+-------------+-----+
|     features|label|
+-------------+-----+
|[5.0,1.0,1.0]|    2|
|[5.0,4.0,4.0]|    2|
|[3.0,1.0,1.0]|    2|
|[6.0,8.0,8.0]|    2|
+-------------+-----+
only showing top 4 rows

Step3: Training our Decision model

In [5]:
# Splitting the data into 80 and 20 percent
train_data, test_data = final_data.randomSplit([0.8, 0.2])
In [6]:
from pyspark.ml.classification import DecisionTreeClassifier
In [7]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')

dt_model = dt.fit(train_data)

dt_predictions = dt_model.transform(test_data)

Step4: Evaluating our Trained Model

In [8]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
In [12]:
eval_obj = BinaryClassificationEvaluator(labelCol='label',rawPredictionCol='prediction')

print("Area Under the Curve value is {}".format(eval_obj.evaluate(dt_predictions)))

mul_eval_obj = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')

print("\nAccuracy of Decision Tree is {}".format(mul_eval_obj.evaluate(dt_predictions)))

print("\nPrediction Data")
dt_predictions.show(4)

print("\nDetemining which feature played a major role in Decision Making\n")
print(dt_model.featureImportances)
Area Under the Curve value is 1.0

Accuracy of Decision Tree is 0.9605263157894737

Prediction Data
+-------------+-----+--------------------+--------------------+----------+
|     features|label|       rawPrediction|         probability|prediction|
+-------------+-----+--------------------+--------------------+----------+
|[1.0,1.0,1.0]|    2|[0.0,0.0,327.0,0....|[0.0,0.0,0.984939...|       2.0|
|[1.0,1.0,1.0]|    2|[0.0,0.0,327.0,0....|[0.0,0.0,0.984939...|       2.0|
|[1.0,1.0,1.0]|    2|[0.0,0.0,327.0,0....|[0.0,0.0,0.984939...|       2.0|
|[1.0,1.0,1.0]|    2|[0.0,0.0,327.0,0....|[0.0,0.0,0.984939...|       2.0|
+-------------+-----+--------------------+--------------------+----------+
only showing top 4 rows


Detemining which feature played a major role in Decision Making

(3,[0,1,2],[0.08473047573894874,0.8549365052500262,0.060333019011025016])

Decision Tree Regressor

Step1: Importing the data

In [14]:
spark = SparkSession.builder.appName('SparkDTRegression').getOrCreate() 
In [26]:
data = spark.read.csv('car-dimension-price.csv', header=True, inferSchema=True)
data.show(4)
+----------+------+-----+------+-----+
|wheel-base|length|width|height|price|
+----------+------+-----+------+-----+
|      88.6| 168.8| 64.1|  48.8|13495|
|      88.6| 168.8| 64.1|  48.8|16500|
|      94.5| 171.2| 65.5|  52.4|16500|
|      99.8| 176.6| 66.2|  54.3|13950|
+----------+------+-----+------+-----+
only showing top 4 rows

Step2: Data pre-processing and converting data to spark accepted format

In [27]:
data.columns
Out[27]:
['wheel-base', 'length', 'width', 'height', 'price']
In [28]:
# null チェック

df = data.filter((data["wheel-base"].isNull() | 
                  data["length"].isNull() | 
                  data["width"].isNull() | 
                  data["height"].isNull() | 
                  data["price"].isNull()))
df.show()
df.count()
+----------+------+-----+------+-----+
|wheel-base|length|width|height|price|
+----------+------+-----+------+-----+
|      99.5| 178.2| 67.9|  52.0| null|
|      94.5| 155.9| 63.6|  52.0| null|
|      94.5| 155.9| 63.6|  52.0| null|
|      98.4| 175.7| 72.3|  50.5| null|
+----------+------+-----+------+-----+

Out[28]:
4
In [29]:
# null削除
data = data.na.drop()

# null 再チェック
df = data.filter((data["wheel-base"].isNull() | 
                  data["length"].isNull() | 
                  data["width"].isNull() | 
                  data["height"].isNull() | 
                  data["price"].isNull()))
df.show()
df.count()
+----------+------+-----+------+-----+
|wheel-base|length|width|height|price|
+----------+------+-----+------+-----+
+----------+------+-----+------+-----+

Out[29]:
0
In [30]:
from pyspark.ml.feature import VectorAssembler
In [31]:
assembler = VectorAssembler(inputCols=['wheel-base', 'length', 'width', 'height'], outputCol='features')

assembler_data = assembler.transform(data)
final_data = assembler_data.select('features', 'price')

print("Consolidated Data with features and labels")
final_data.show(4)
Consolidated Data with features and labels
+--------------------+-----+
|            features|price|
+--------------------+-----+
|[88.6,168.8,64.1,...|13495|
|[88.6,168.8,64.1,...|16500|
|[94.5,171.2,65.5,...|16500|
|[99.8,176.6,66.2,...|13950|
+--------------------+-----+
only showing top 4 rows

Step3: Training our Decision model

In [32]:
# Splitting the data into 80 and 20 percent
train_data, test_data = final_data.randomSplit([0.8, 0.2])
In [33]:
from pyspark.ml.regression import DecisionTreeRegressor
In [34]:
dt = DecisionTreeRegressor(labelCol='price', featuresCol='features')

dt_model = dt.fit(train_data)

dt_predictions = dt_model.transform(test_data)

Step4: Evaluationg our Trained Model

In [35]:
from pyspark.ml.evaluation import RegressionEvaluator
In [36]:
regression_evaluator_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='price', metricName='r2')
In [37]:
R2 = regression_evaluator_r2.evaluate(dt_predictions)

print("The R square value is {}".format(R2))

print("\nDetermining which feature played a major role in Decision Making")
print(dt_model.featureImportances)
The R square value is 0.60802348832559

Determining which feature played a major role in Decision Making
(4,[0,1,2,3],[0.19394495493151057,0.5608651116371716,0.17431973817604318,0.07087019525527469])
In [ ]:
 

Random Forest

Step1: Importing the data

In [38]:
spark = SparkSession.builder.appName('SparkRFRegression').getOrCreate()
In [39]:
data = spark.read.csv('car-performance-price.csv', header=True, inferSchema=True)

data.show(4)
+----------+--------+------------+---------------+-----+
|horsepower|peak-rpm|city-mileage|highway-mileage|price|
+----------+--------+------------+---------------+-----+
|       111|    5000|          21|             27|13495|
|       111|    5000|          21|             27|16500|
|       154|    5000|          19|             26|16500|
|       102|    5500|          24|             30|13950|
+----------+--------+------------+---------------+-----+
only showing top 4 rows

Step2: Data pre-processing and converting data to spark accepted format

In [40]:
data.columns
Out[40]:
['horsepower', 'peak-rpm', 'city-mileage', 'highway-mileage', 'price']
In [41]:
# null チェック

df = data.filter((data["horsepower"].isNull() | 
                  data["peak-rpm"].isNull() | 
                  data["city-mileage"].isNull() | 
                  data["highway-mileage"].isNull() | 
                  data["price"].isNull()))
df.show()
df.count()
+----------+--------+------------+---------------+-----+
|horsepower|peak-rpm|city-mileage|highway-mileage|price|
+----------+--------+------------+---------------+-----+
|       160|    5500|          16|             22| null|
|        70|    5400|          38|             43| null|
|        70|    5400|          38|             43| null|
|       288|    5750|          17|             28| null|
|      null|    null|          23|             31| 9295|
|      null|    null|          23|             31| 9895|
+----------+--------+------------+---------------+-----+

Out[41]:
6
In [42]:
# null削除
data = data.na.drop()

# null 再チェック
df = data.filter((data["horsepower"].isNull() | 
                  data["peak-rpm"].isNull() | 
                  data["city-mileage"].isNull() | 
                  data["highway-mileage"].isNull() | 
                  data["price"].isNull()))
df.show()
df.count()
+----------+--------+------------+---------------+-----+
|horsepower|peak-rpm|city-mileage|highway-mileage|price|
+----------+--------+------------+---------------+-----+
+----------+--------+------------+---------------+-----+

Out[42]:
0
In [43]:
from pyspark.ml.feature import VectorAssembler
In [44]:
assembler = VectorAssembler(inputCols=['horsepower', 'peak-rpm', 'city-mileage', 'highway-mileage'], outputCol='features')

assembler_data = assembler.transform(data)

final_data = assembler_data.select('features', 'price')

print("Consolidated Data with feature and labels")
final_data.show(4)
Consolidated Data with feature and labels
+--------------------+-----+
|            features|price|
+--------------------+-----+
|[111.0,5000.0,21....|13495|
|[111.0,5000.0,21....|16500|
|[154.0,5000.0,19....|16500|
|[102.0,5500.0,24....|13950|
+--------------------+-----+
only showing top 4 rows

Step3: Training our Decision model

In [45]:
# Splitting the data into 80 and 20 percent
train_data, test_data = final_data.randomSplit([0.8, 0.2])
In [46]:
from pyspark.ml.regression import RandomForestRegressor
In [47]:
rf = RandomForestRegressor(labelCol='price', featuresCol='features', numTrees=120)
In [48]:
rf_model = rf.fit(train_data)

rf_predictions = rf_model.transform(test_data)

Step4: EValuating our trained Model

In [49]:
from pyspark.ml.evaluation import RegressionEvaluator
In [50]:
regression_evaluator_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='price', metricName="r2")
In [52]:
R2 = regression_evaluator_r2.evaluate(rf_predictions)

print("The R square value is {}".format(R2))

print("\nDetemining which feature played a major role in Decision Making")
print(rf_model.featureImportances)
The R square value is 0.9341537255917971

Detemining which feature played a major role in Decision Making
(4,[0,1,2,3],[0.41311072771572743,0.06997676096121414,0.29503022630096093,0.22188228502209742])
In [ ]:
 

Step1: Importing the data

In [80]:
from pyspark.ml.feature import VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, DecisionTreeClassifier, RandomForestClassifier
In [53]:
spark = SparkSession.builder.appName("SparkTreeComparisions").getOrCreate()
In [74]:
data = spark.read.format('libsvm').load('sample_libsvm_data.txt')
# IBSVMは国立台湾大学のグループによって作られたサポートベクターマシンのライブラリで、専用のデータ形式がある

print("Libsvm format Data - Fully formatted and ready to use data")
data.show(4)
Libsvm format Data - Fully formatted and ready to use data
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
+-----+--------------------+
only showing top 4 rows

In [77]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

Step2: Training our Tree models

In [78]:
# Splitting the data into 70 and 30 percent
train_data, test_data = data.randomSplit([0.7, 0.3])
In [79]:
gbt = GBTClassifier(featuresCol="indexedFeatures", maxIter=10) # Gradient Boosted Trees

rf = RandomForestClassifier(featuresCol="indexedFeatures", numTrees=150) # Random Forest with 150 Trees

dt = DecisionTreeClassifier(featuresCol="indexedFeatures") # Decision Trees
In [81]:
# Chain indexer and GBT in a Pipeline
pipeline_gbt = Pipeline(stages=[featureIndexer, gbt])

pipeline_rf = Pipeline(stages=[featureIndexer, rf])

pipeline_dt = Pipeline(stages=[featureIndexer, dt])
In [82]:
# Train model.  This also runs the indexer.
gbt_model = pipeline_gbt.fit(train_data)

rf_model = pipeline_rf.fit(train_data)

dt_model = pipeline_dt.fit(train_data)
In [83]:
# Make predictions.
gbt_predictions = gbt_model.transform(test_data)

rf_predictions = rf_model.transform(test_data)

dt_predictions = dt_model.transform(test_data)
In [84]:
# Select example rows to display.
print("Gradient Boosting's predictions")
gbt_predictions.select("prediction", "label", "features").show(5)
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(692,[95,96,97,12...|
|       0.0|  0.0|(692,[123,124,125...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[126,127,128...|
+----------+-----+--------------------+
only showing top 5 rows

In [85]:
# Select example rows to display.
print("Random Forest's predictions")
rf_predictions.select("prediction", "label", "features").show(5)
Random Forest's predictions
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(692,[95,96,97,12...|
|       0.0|  0.0|(692,[123,124,125...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[126,127,128...|
+----------+-----+--------------------+
only showing top 5 rows

In [86]:
# Select example rows to display.
print("Decision Tree's predictions")
dt_predictions.select("prediction", "label", "features").show(5)
Decision Tree's predictions
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(692,[95,96,97,12...|
|       0.0|  0.0|(692,[123,124,125...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[126,127,128...|
+----------+-----+--------------------+
only showing top 5 rows

In [87]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
gbt_rmse = evaluator.evaluate(gbt_predictions)
rf_rmse = evaluator.evaluate(rf_predictions)
dt_rmse = evaluator.evaluate(dt_predictions)
print("Gradient Boosting's RMSE on test data = %g" % gbt_rmse)
print("Random Forest's RMSE on test data = %g" % rf_rmse)
print("Decision Tree's RMSE on test data = %g" % dt_rmse)
Gradient Boosting's RMSE on test data = 0.258199
Random Forest's RMSE on test data = 0
Decision Tree's RMSE on test data = 0.258199
In [88]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
gbt_acc = evaluator.evaluate(gbt_predictions)
rf_acc = evaluator.evaluate(rf_predictions)
dt_acc = evaluator.evaluate(dt_predictions)
print("Gradient Boosting's Accuracy on test data = %g" % gbt_acc)
print("Random Forest's Accuracy on test data = %g" % rf_acc)
print("Decision Tree's Accuracy on test data = %g" % dt_acc)
Gradient Boosting's Accuracy on test data = 0.933333
Random Forest's Accuracy on test data = 1
Decision Tree's Accuracy on test data = 0.933333
In [89]:
print("Feature Importance")
print("Gradient Boosting's feature importance: {}".format(gbt_model.featureImportances))
# pipeline使ったときのfeatureimportanceの取り出し方調べる
Feature Importance
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-89-2b760219e830> in <module>
      1 print("Feature Importance")
----> 2 print("Gradient Boosting's feature importance: {}".format(gbt_model.featureImportances))

AttributeError: 'PipelineModel' object has no attribute 'featureImportances'
In [ ]:
 
In [ ]: