* pysparkによるグリッドサーチ
* 交差検証
* pipeline
* ロジスティクス回帰
* 決定木
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
# sparksessionのインスタンス化
ss = SparkSession \
.builder \
.appName("Classsification") \
.enableHiveSupport() \
.getOrCreate()
# 読み込むcsvファイルのスキーマを定義
struct = StructType([
StructField('Year', StringType(), False),
StructField('FarmID', DoubleType(), False),
StructField('MeanHighestTemperature', DoubleType(), False),
StructField('MeanMinimumtemperature', DoubleType(), False),
StructField('MeanWhc', DoubleType(), False),
StructField('MeanDaylightHours', DoubleType(), False),
StructField('MeanDayOfSoilAcidityRange', DoubleType(), False),
StructField('TotalYield', DoubleType(), False),
StructField('Area', DoubleType(), False),
StructField('YieldPerA', DoubleType(), False),
StructField('label', DoubleType(), False)
])
# csv読み込み
df5 = ss.read.csv('./batchAnalysticsData_train_5.csv',
header=True,
encoding='UTF-8',
schema=struct)
df5.show(5, truncate=False)
df5TrainData, df5TestData = df5.randomSplit([0.7, 0.3], 50)
# 特徴量のベクトル化ステージ
assemblerForDTC = VectorAssembler(inputCols=[
"MeanHighestTemperature",
"MeanMinimumtemperature",
"MeanWhc",
"MeanDaylightHours",
"MeanDayOfSoilAcidityRange",
], outputCol="features")
# 決定木のステージ
classifierByDT = DecisionTreeClassifier().setLabelCol("label").setFeaturesCol("features")
# pipeline作成
pipelineForDTC = Pipeline(stages=[assemblerForDTC, classifierByDT])
# グリッドサーチ用インスタンスの生成
# サーチ対象
# ・maxBins:連続値を不連続値にビン分けするときのビンの数
# ・maxDepth:木の深さ
paramGridForDTC = ParamGridBuilder()\
.addGrid(
classifierByDT.maxBins,
[10, 20, 30, 40, 50])\
.addGrid(
classifierByDT.maxDepth,
[2, 3, 4]).build()
# Evaluator(モデル評価用インスタンス)の生成
evaluatorForDTC = BinaryClassificationEvaluator()\
.setLabelCol("label")\
.setRawPredictionCol(classifierByDT.getRawPredictionCol())\
.setMetricName("areaUnderROC")
# クロスバリデーション用インスタンスの生成
crossValForDTC = CrossValidator()\
.setEstimator(pipelineForDTC)\
.setEvaluator(evaluatorForDTC)\
.setEstimatorParamMaps(paramGridForDTC)\
.setNumFolds(10)
# クロスバリデーションの実施
crossValForDTCModel = crossValForDTC.fit(df5TrainData)
# 訓練データで予測を行い、AUCを出力
predictionByDTC = crossValForDTCModel.transform(df5TrainData)
aucByDTC = evaluatorForDTC.evaluate(predictionByDTC)
print(" AUC-TrainData(DecisionTree): ", aucByDTC)
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import LogisticRegression
# 特徴量選択のため、候補となる組み合わせ分のVectorAssemblerを定義する
# 候補
# 1. 畑×土壌酸度範囲内日数×平均最高気温
# 2. 畑×土壌酸度範囲内日数×平均最低気温
# 3. 畑×土壌酸度範囲内日数×平均含水分量
# 4. 畑×土壌酸度範囲内日数×日照合計時間
assemblerForLC = []
# 1
assemblerForLC.append(
VectorAssembler(inputCols=[
"FarmID",
"MeanDayOfSoilAcidityRange",
"MeanHighestTemperature",
],
outputCol="features")
)
# 2
assemblerForLC.append(
VectorAssembler(inputCols=[
"FarmID",
"MeanDayOfSoilAcidityRange",
"MeanMinimumtemperature",
],
outputCol="features")
)
# 3
assemblerForLC.append(
VectorAssembler(inputCols=[
"FarmID",
"MeanDayOfSoilAcidityRange",
"MeanWhc",
],
outputCol="features")
)
# 4
assemblerForLC.append(
VectorAssembler(inputCols=[
"FarmID",
"MeanDayOfSoilAcidityRange",
"MeanDaylightHours",
],
outputCol="features")
)
# Pipelineの定義
# 標準化ステージ
scalerForLC = StandardScaler(
inputCol="features",
outputCol="standardedFeature",
withStd=True, withMean=True)
# ロジスティクス回帰ステージ
logisticClassification = LogisticRegression().setLabelCol("label")\
.setFeaturesCol("standardedFeature")\
.setStandardization(True)
# 特徴量組み合わせごとのpipeline入れるリスト
pipelineForLC = []
# 特徴量組み合わせごとにpipelineを生成し、リストに入れる
for assembler in assemblerForLC:
pipelineForLC.append(
Pipeline(
stages=[
assembler,
scalerForLC,
logisticClassification
]
)
)
# グリッドサーチ、クロスバリデーション
# グリッドサーチ生成
# 最適化するパラメータの種類と、検証対象の値をセット
paramGridForLC = ParamGridBuilder()\
.addGrid(
logisticClassification.regParam,
[0.001, 0.01, 0.1, 1.0, 10.0, 100.0])\
.addGrid(
logisticClassification.maxIter,
[10, 100, 1000])\
.build()
# Evaluatorの生成
evaluatorForLC = BinaryClassificationEvaluator().setLabelCol("label").setMetricName("areaUnderROC")
# クロスバリデーションの生成
crossValidatorForLC = []
for pipeline in pipelineForLC:
crossValidatorForLC.append(
CrossValidator().setEstimator(pipeline).setEvaluator(evaluatorForLC)\
.setEstimatorParamMaps(paramGridForLC).setNumFolds(10))
%%time
# モデルを作成し、訓練データをインプットに予測を行う
# クロスバリデーションモデルの生成
modelForLC = []
for crossValidator in crossValidatorForLC:
modelForLC.append(crossValidator.fit(df5TrainData))
# 訓練データで予測を行い、AUCを取得し、出力
print(" -- df5TrainData --")
df5TrainData.show()
print(" -- AUC-TrainData(Logistic Regression) --")
for i, model in enumerate(modelForLC):
prediction = model.transform(df5TrainData)
auc = evaluatorForLC.evaluate(prediction)
print(i, auc)
print("")
# 上記結果より、1の組み合わせ(畑×土壌酸度範囲内日数×平均最高気温)を選択する
# 決定木モデルにテストデータ渡して、AUCを取得
predictionTestDataByDTC = crossValForDTCModel.transform(df5TestData)
aucTestDataByDTC = evaluatorForDTC.evaluate(predictionTestDataByDTC)
print("-- AUC-TestData(Decision Tree) --")
print(aucTestDataByDTC, "\n")
# ロジスティクス回帰モデルにテストデータ渡して、AUCを取得
predictionTestDataByLC = modelForLC[0].transform(df5TestData)
aucTestDataByLC = evaluatorForLC.evaluate(predictionTestDataByLC)
print("-- AUC-TestData(Logistic Regression) --")
print(aucTestDataByLC, "\n")
# 上記より、ロジスティクス回帰のほうが精度が高いので、ロジスティクス回帰を選択する
# 未知データを用いてロジスティクス回帰で予測してみる
# 未知データよりDataFrame生成
df5Predict = ss.read.csv('./batchAnalysticsData_predict_5.csv',
header=True, encoding="UTF-8", schema=struct)
df5Predict.show(10)
# 予測
print("-- AUC-FutureData(Logistic Regression) --")
predictionFutureDataByLC = modelForLC[0].transform(df5Predict)
predictionFutureDataByLC.select("FarmID", "probability", "prediction").show()