# prepare for installation of pyspark by findspark
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkTrees').getOrCreate()
data = spark.read.csv('breast-cancer.csv', header=True, inferSchema=True)
data.show(4)
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['thickness', 'cell_size', 'cell_shape'], outputCol='features')
assembler_data = assembler.transform(data)
final_data = assembler_data.select('features', 'label')
print("Consolidated Data with features and labels")
final_data.show(4)
# Splitting the data into 80 and 20 percent
train_data, test_data = final_data.randomSplit([0.8, 0.2])
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
eval_obj = BinaryClassificationEvaluator(labelCol='label',rawPredictionCol='prediction')
print("Area Under the Curve value is {}".format(eval_obj.evaluate(dt_predictions)))
mul_eval_obj = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
print("\nAccuracy of Decision Tree is {}".format(mul_eval_obj.evaluate(dt_predictions)))
print("\nPrediction Data")
dt_predictions.show(4)
print("\nDetemining which feature played a major role in Decision Making\n")
print(dt_model.featureImportances)
spark = SparkSession.builder.appName('SparkDTRegression').getOrCreate()
data = spark.read.csv('car-dimension-price.csv', header=True, inferSchema=True)
data.show(4)
data.columns
# null チェック
df = data.filter((data["wheel-base"].isNull() |
data["length"].isNull() |
data["width"].isNull() |
data["height"].isNull() |
data["price"].isNull()))
df.show()
df.count()
# null削除
data = data.na.drop()
# null 再チェック
df = data.filter((data["wheel-base"].isNull() |
data["length"].isNull() |
data["width"].isNull() |
data["height"].isNull() |
data["price"].isNull()))
df.show()
df.count()
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['wheel-base', 'length', 'width', 'height'], outputCol='features')
assembler_data = assembler.transform(data)
final_data = assembler_data.select('features', 'price')
print("Consolidated Data with features and labels")
final_data.show(4)
# Splitting the data into 80 and 20 percent
train_data, test_data = final_data.randomSplit([0.8, 0.2])
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(labelCol='price', featuresCol='features')
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
from pyspark.ml.evaluation import RegressionEvaluator
regression_evaluator_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='price', metricName='r2')
R2 = regression_evaluator_r2.evaluate(dt_predictions)
print("The R square value is {}".format(R2))
print("\nDetermining which feature played a major role in Decision Making")
print(dt_model.featureImportances)
spark = SparkSession.builder.appName('SparkRFRegression').getOrCreate()
data = spark.read.csv('car-performance-price.csv', header=True, inferSchema=True)
data.show(4)
data.columns
# null チェック
df = data.filter((data["horsepower"].isNull() |
data["peak-rpm"].isNull() |
data["city-mileage"].isNull() |
data["highway-mileage"].isNull() |
data["price"].isNull()))
df.show()
df.count()
# null削除
data = data.na.drop()
# null 再チェック
df = data.filter((data["horsepower"].isNull() |
data["peak-rpm"].isNull() |
data["city-mileage"].isNull() |
data["highway-mileage"].isNull() |
data["price"].isNull()))
df.show()
df.count()
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['horsepower', 'peak-rpm', 'city-mileage', 'highway-mileage'], outputCol='features')
assembler_data = assembler.transform(data)
final_data = assembler_data.select('features', 'price')
print("Consolidated Data with feature and labels")
final_data.show(4)
# Splitting the data into 80 and 20 percent
train_data, test_data = final_data.randomSplit([0.8, 0.2])
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol='price', featuresCol='features', numTrees=120)
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)
from pyspark.ml.evaluation import RegressionEvaluator
regression_evaluator_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='price', metricName="r2")
R2 = regression_evaluator_r2.evaluate(rf_predictions)
print("The R square value is {}".format(R2))
print("\nDetemining which feature played a major role in Decision Making")
print(rf_model.featureImportances)
from pyspark.ml.feature import VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, DecisionTreeClassifier, RandomForestClassifier
spark = SparkSession.builder.appName("SparkTreeComparisions").getOrCreate()
data = spark.read.format('libsvm').load('sample_libsvm_data.txt')
# IBSVMは国立台湾大学のグループによって作られたサポートベクターマシンのライブラリで、専用のデータ形式がある
print("Libsvm format Data - Fully formatted and ready to use data")
data.show(4)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Splitting the data into 70 and 30 percent
train_data, test_data = data.randomSplit([0.7, 0.3])
gbt = GBTClassifier(featuresCol="indexedFeatures", maxIter=10) # Gradient Boosted Trees
rf = RandomForestClassifier(featuresCol="indexedFeatures", numTrees=150) # Random Forest with 150 Trees
dt = DecisionTreeClassifier(featuresCol="indexedFeatures") # Decision Trees
# Chain indexer and GBT in a Pipeline
pipeline_gbt = Pipeline(stages=[featureIndexer, gbt])
pipeline_rf = Pipeline(stages=[featureIndexer, rf])
pipeline_dt = Pipeline(stages=[featureIndexer, dt])
# Train model. This also runs the indexer.
gbt_model = pipeline_gbt.fit(train_data)
rf_model = pipeline_rf.fit(train_data)
dt_model = pipeline_dt.fit(train_data)
# Make predictions.
gbt_predictions = gbt_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)
dt_predictions = dt_model.transform(test_data)
# Select example rows to display.
print("Gradient Boosting's predictions")
gbt_predictions.select("prediction", "label", "features").show(5)
# Select example rows to display.
print("Random Forest's predictions")
rf_predictions.select("prediction", "label", "features").show(5)
# Select example rows to display.
print("Decision Tree's predictions")
dt_predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
gbt_rmse = evaluator.evaluate(gbt_predictions)
rf_rmse = evaluator.evaluate(rf_predictions)
dt_rmse = evaluator.evaluate(dt_predictions)
print("Gradient Boosting's RMSE on test data = %g" % gbt_rmse)
print("Random Forest's RMSE on test data = %g" % rf_rmse)
print("Decision Tree's RMSE on test data = %g" % dt_rmse)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
gbt_acc = evaluator.evaluate(gbt_predictions)
rf_acc = evaluator.evaluate(rf_predictions)
dt_acc = evaluator.evaluate(dt_predictions)
print("Gradient Boosting's Accuracy on test data = %g" % gbt_acc)
print("Random Forest's Accuracy on test data = %g" % rf_acc)
print("Decision Tree's Accuracy on test data = %g" % dt_acc)
print("Feature Importance")
print("Gradient Boosting's feature importance: {}".format(gbt_model.featureImportances))
# pipeline使ったときのfeatureimportanceの取り出し方調べる