import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import PCA
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
# sparksessionのインスタンス化
ss = SparkSession \
.builder \
.appName("PCA") \
.enableHiveSupport() \
.getOrCreate()
# 読み込むcsvファイルのスキーマを定義
struct = StructType([
StructField('Year', StringType(), False),
StructField('FarmID', DoubleType(), False),
StructField('MeanHighestTemperature', DoubleType(), False),
StructField('MeanMinimumtemperature', DoubleType(), False),
StructField('MeanWhc', DoubleType(), False),
StructField('MeanDaylightHours', DoubleType(), False),
StructField('MeanDayOfSoilAcidityRange', DoubleType(), False),
StructField('TotalYield', DoubleType(), False),
StructField('Area', DoubleType(), False),
StructField('YieldPerA', DoubleType(), False),
StructField('label', DoubleType(), False)
])
# または、下の方法でも良い
# struct = StructType().add('Year', StringType())\
# .add('FarmID', DoubleType())\
# .add('MeanHighestTemperature', DoubleType())\
# .add('MeanMinimumtemperature', DoubleType())\
# .add('MeanWhc', DoubleType())\
# .add('MeanDaylightHours', DoubleType())\
# .add('MeanDayOfSoilAcidityRange', DoubleType())\
# .add('TotalYield', DoubleType())\
# .add('Area', DoubleType())\
# .add('YieldPerA', DoubleType())\
# .add('label', DoubleType())
# csv読み込み
df5 = ss.read.csv('./batchAnalysticsData_train_5.csv',
header=True,
encoding='UTF-8',
schema=struct)
df5.show(5, truncate=False)
# pipelineの各ステージの生成
# 特徴量のベクトル化用ステージ
assemblerForPCA = VectorAssembler(inputCols=df5.columns[1:6], outputCol="feature")
# 標準化のステージ(PCAは特徴量の標準化が必要)
scalerForPCA = StandardScaler(inputCol="feature", outputCol="standardedFeature",
withStd=True, withMean=True)
# PCAをステージに指定する準備
pca = PCA(k=5, inputCol="standardedFeature", outputCol="pcaScore")
# pipelineの生成
pipelineForPCA = Pipeline(
stages=[
assemblerForPCA,
scalerForPCA,
pca
])
# モデルの生成
modelForPCA = pipelineForPCA.fit(df5)
# モデルの実行
resultFromPCA = modelForPCA.transform(df5)
# 寄与率の出力
print(modelForPCA.stages[2].explainedVariance)
print("上位3特徴量の寄与率:", 0.31 + 0.26 + 0.20)