CHAPTER 8: Un-Supervised Learning with Spark

In [1]:
# prepare for installation of pyspark by findspark 
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')

from pyspark.sql import SparkSession

K-Means Clustering Algorithm

Step 1: Importing the Data

In [2]:
spark = SparkSession.builder.appName('K-MeansClustring').getOrCreate()
In [3]:
data = spark.read.csv('latitude_longitude.csv', header=True, inferSchema=True)
data.show(4)
+-------------+--------------+
|     latitude|     longitude|
+-------------+--------------+
|          0.0|           0.0|
|32.8247811394|-116.870394352|
| 45.326414382|-117.807811103|
|39.4708861702|-119.659926097|
+-------------+--------------+
only showing top 4 rows

In [11]:
data.count()
Out[11]:
459540

Step 2: Data pre-procissing and convering data to spark accepted format

In [4]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
In [5]:
assembler = VectorAssembler(inputCols=['latitude', 'longitude'], outputCol='features')
final_assembled_data = assembler.transform(data)
print("Consolidated Data with features")
final_assembled_data.show(4)
Consolidated Data with features
+-------------+--------------+--------------------+
|     latitude|     longitude|            features|
+-------------+--------------+--------------------+
|          0.0|           0.0|           (2,[],[])|
|32.8247811394|-116.870394352|[32.8247811394,-1...|
| 45.326414382|-117.807811103|[45.326414382,-11...|
|39.4708861702|-119.659926097|[39.4708861702,-1...|
+-------------+--------------+--------------------+
only showing top 4 rows

Step 3: Training our K-Means Model

In [7]:
# Since Initial Data is well scaled, we can pass it directly to our K-Means
kmeans = KMeans(featuresCol='features', k=3)
kmeans_model = kmeans.fit(final_assembled_data)

Step 4: Displaying the predictions

In [8]:
predictions = kmeans_model.transform(final_assembled_data)

predictions.show(4)
+-------------+--------------+--------------------+----------+
|     latitude|     longitude|            features|prediction|
+-------------+--------------+--------------------+----------+
|          0.0|           0.0|           (2,[],[])|         1|
|32.8247811394|-116.870394352|[32.8247811394,-1...|         0|
| 45.326414382|-117.807811103|[45.326414382,-11...|         2|
|39.4708861702|-119.659926097|[39.4708861702,-1...|         2|
+-------------+--------------+--------------------+----------+
only showing top 4 rows

In [10]:
print("Prediction Data")
# Determining the centroids of the cluster
centres = kmeans_model.clusterCenters()

print("The company can setup 3 of their towers at these locations- latitudes and longitudes for optimal network coverage")

cluster_list = []
i = 1
for centre in centres:
    print("{} - {}".format(i, centre))
    i = i + 1
    
print("\nDetermining tghe number of users that belongs to each clusters")
predictions.groupBy('prediction').count().show()
Prediction Data
The company can setup 3 of their towers at these locations- latitudes and longitudes for optimal network coverage
1 - [  34.5288658  -116.34531612]
2 - [0. 0.]
3 - [  39.57392652 -121.24864484]

Determining tghe number of users that belongs to each clusters
+----------+------+
|prediction| count|
+----------+------+
|         1| 27683|
|         2|197942|
|         0|233915|
+----------+------+

Step 4: Evaluating our model

In [12]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator_object = ClusteringEvaluator(predictionCol='prediction', featuresCol='features')

以下は、シルエット分析でクラスタリングを評価

クラスタリングの評価で、computeCost()メソッドはver3から無くなり、代わりにClusterEvaluator関数を使って評価を行う

  • ClusterEvaluator()関数は、シルエット分析で評価するための関数
In [16]:
Silhouette_Score = evaluator_object.evaluate(predictions)

print("The Silhouette Score when k=3 is {}".format(Silhouette_Score))

# print("\nWithin set Sum of Square Error {}\n".format(kmeans_model.computeCost(final_assembled_data)))
The Silhouette Score when k=3 is 0.7182719651203601

(Optional) Peforming K-Means with Scaled Features

Example of Scaling the Data and performing K-Means

In [17]:
from pyspark.ml.feature import StandardScaler
In [19]:
# transform to [mean:0, variance:1]

scalar_object = StandardScaler(inputCol='features', outputCol='ScaledFeatures')

scalar_model = scalar_object.fit(final_assembled_data)

final_scaled_data = scalar_model.transform(final_assembled_data)
In [20]:
print("Consolidated Data with Scaled Features")
final_scaled_data.show(4)
Consolidated Data with Scaled Features
+-------------+--------------+--------------------+--------------------+
|     latitude|     longitude|            features|      ScaledFeatures|
+-------------+--------------+--------------------+--------------------+
|          0.0|           0.0|           (2,[],[])|           (2,[],[])|
|32.8247811394|-116.870394352|[32.8247811394,-1...|[3.52019771373825...|
| 45.326414382|-117.807811103|[45.326414382,-11...|[4.86089883133903...|
|39.4708861702|-119.659926097|[39.4708861702,-1...|[4.23293982267510...|
+-------------+--------------+--------------------+--------------------+
only showing top 4 rows

In [21]:
scaled_kmeans = KMeans(featuresCol='ScaledFeatures', k=5)

scaled_kmeans_model = scaled_kmeans.fit(final_scaled_data)

scaled_predictions = scaled_kmeans_model.transform(final_scaled_data)
In [22]:
print("Prediction Data")
scaled_predictions.select('latitude', 'longitude', 'ScaledFeatures', 'prediction').show(4)
Prediction Data
+-------------+--------------+--------------------+----------+
|     latitude|     longitude|      ScaledFeatures|prediction|
+-------------+--------------+--------------------+----------+
|          0.0|           0.0|           (2,[],[])|         1|
|32.8247811394|-116.870394352|[3.52019771373825...|         0|
| 45.326414382|-117.807811103|[4.86089883133903...|         3|
|39.4708861702|-119.659926097|[4.23293982267510...|         2|
+-------------+--------------+--------------------+----------+
only showing top 4 rows

In [24]:
scaled_centres = scaled_kmeans_model.clusterCenters()

print("Scaled Tower Locations \n{}".format(scaled_centres))
Scaled Tower Locations 
[array([ 3.68023323, -4.09666765]), array([0., 0.]), array([ 4.2372102 , -4.21931911]), array([ 4.74379864, -4.28920569]), array([ 4.02062823, -4.26712098])]
In [25]:
Scaled_Silhouette_Score = evaluator_object.evaluate(scaled_predictions)

print("\nThe Silhouette Score when k=5 is {}".format(Scaled_Silhouette_Score))
The Silhouette Score when k=5 is 0.5431986306079193
In [26]:
print("Determining the number of users that belongs to each clusters")
scaled_predictions.groupBy('prediction').count().show()
Determining the number of users that belongs to each clusters
+----------+------+
|prediction| count|
+----------+------+
|         1| 27683|
|         3| 44817|
|         4|109730|
|         2| 57334|
|         0|219976|
+----------+------+

In [ ]:
 
In [ ]:
 
In [ ]: