PCAを用いて、特徴量の利用数を決める

In [1]:
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')

from pyspark.sql import SparkSession
from pyspark.sql.types import *

from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import PCA
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

訓練データ読み込み

In [2]:
# sparksessionのインスタンス化
ss = SparkSession \
            .builder \
            .appName("PCA") \
            .enableHiveSupport() \
            .getOrCreate()
In [6]:
# 読み込むcsvファイルのスキーマを定義
struct = StructType([
            StructField('Year', StringType(), False),
            StructField('FarmID', DoubleType(), False),
            StructField('MeanHighestTemperature', DoubleType(), False),
            StructField('MeanMinimumtemperature', DoubleType(), False),
            StructField('MeanWhc', DoubleType(), False),
            StructField('MeanDaylightHours', DoubleType(), False),
            StructField('MeanDayOfSoilAcidityRange', DoubleType(), False),
            StructField('TotalYield', DoubleType(), False),
            StructField('Area', DoubleType(), False),
            StructField('YieldPerA', DoubleType(), False),
            StructField('label', DoubleType(), False)
        ])

# または、下の方法でも良い
# struct = StructType().add('Year', StringType())\
#                     .add('FarmID', DoubleType())\
#                     .add('MeanHighestTemperature', DoubleType())\
#                     .add('MeanMinimumtemperature', DoubleType())\
#                     .add('MeanWhc', DoubleType())\
#                     .add('MeanDaylightHours', DoubleType())\
#                     .add('MeanDayOfSoilAcidityRange', DoubleType())\
#                     .add('TotalYield', DoubleType())\
#                     .add('Area', DoubleType())\
#                     .add('YieldPerA', DoubleType())\
#                     .add('label', DoubleType())
In [9]:
# csv読み込み
df5 = ss.read.csv('./batchAnalysticsData_train_5.csv', 
                    header=True,
                     encoding='UTF-8',
                     schema=struct)

df5.show(5, truncate=False)
+----+------+----------------------+----------------------+-------+-----------------+-------------------------+----------+----+---------+-----+
|Year|FarmID|MeanHighestTemperature|MeanMinimumtemperature|MeanWhc|MeanDaylightHours|MeanDayOfSoilAcidityRange|TotalYield|Area|YieldPerA|label|
+----+------+----------------------+----------------------+-------+-----------------+-------------------------+----------+----+---------+-----+
|2007|1.0   |6.93                  |-1.3                  |14.17  |171.12           |18.0                     |1423222.21|4.5 |3162.72  |0.0  |
|2007|2.0   |7.77                  |-0.63                 |15.83  |172.62           |18.0                     |1457585.51|5.0 |2915.17  |0.0  |
|2007|3.0   |7.77                  |-1.13                 |14.5   |169.28           |18.0                     |1150258.61|3.0 |3834.2   |1.0  |
|2007|4.0   |6.77                  |0.03                  |16.67  |170.12           |19.0                     |2327859.58|6.0 |3879.77  |1.0  |
|2007|5.0   |6.93                  |-1.47                 |17.5   |173.78           |18.0                     |1448612.55|4.0 |3621.53  |1.0  |
+----+------+----------------------+----------------------+-------+-----------------+-------------------------+----------+----+---------+-----+
only showing top 5 rows

Pipelineの生成

In [12]:
# pipelineの各ステージの生成

# 特徴量のベクトル化用ステージ
assemblerForPCA = VectorAssembler(inputCols=df5.columns[1:6], outputCol="feature")

# 標準化のステージ(PCAは特徴量の標準化が必要)
scalerForPCA = StandardScaler(inputCol="feature", outputCol="standardedFeature",
                             withStd=True, withMean=True)

# PCAをステージに指定する準備
pca = PCA(k=5, inputCol="standardedFeature", outputCol="pcaScore")

# pipelineの生成
pipelineForPCA = Pipeline(
                    stages=[
                        assemblerForPCA,
                        scalerForPCA,
                        pca
                    ])

Pipelineの実行とPCAの結果確認

In [13]:
# モデルの生成
modelForPCA = pipelineForPCA.fit(df5)

# モデルの実行
resultFromPCA = modelForPCA.transform(df5)

# 寄与率の出力
print(modelForPCA.stages[2].explainedVariance)
[0.31398994868731134,0.25726992728560183,0.20181501063487386,0.12373536931107512,0.10318974408113785]
In [15]:
print("上位3特徴量の寄与率:", 0.31 + 0.26 + 0.20)
上位3特徴量の寄与率: 0.77

↑上記より、上位3つの特徴量で寄与率が0.7を超えたので、今回のモデルは3つの特徴量で表現する!

In [ ]:
 
In [ ]:
 
In [ ]: