# prepare for installation of pyspark by findspark
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SingleVaribleLinearReg").getOrCreate()
from pyspark.ml.regression import LinearRegression
data = spark.read.csv('/home/yoshi-1/ダウンロード/spark-with-python-master/single_variable_regression.csv',
header=True,
inferSchema=True)
print("Initial Data")
data.show()
# importing the VectorAssembler to convert the features into spark accepted format
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Step2 - Data pre-processing and converting the data to spark accepted format
# converting the feature(s) into spark accepted data format
assembler_object = VectorAssembler(
inputCols=['house_size'],
outputCol='house_size_feature'
)
feature_vector_dataframe = assembler_object.transform(data)
print("Data after adding house_size columns as a spark accepted feature")
feature_vector_dataframe.show()
feature_vector_dataframe.printSchema()
formatted_data = feature_vector_dataframe.select('house_size_feature', 'price_sold')
print("Consolidated Data with accepted features and labels")
formatted_data.show()
# Step 3 - Training our Linear Regression model with single variable
# splitting the data into 70 and 30 percent
train_data, test_data = formatted_data.randomSplit([0.7, 0.3])
# Defining our Linear Regression
lireg = LinearRegression(featuresCol='house_size_feature', labelCol='price_sold')
# Training our model with training data
lireg_model = lireg.fit(train_data)
# Step 4 - Evaluating of Trained Model
# Evaluating our model with testing data
test_result = lireg_model.evaluate(test_data)
print("Residuals info - distance between data points and fitted regression line")
test_result.residuals.show()
print("Root Mean Square Error {}".format(test_result.rootMeanSquaredError))
print("R square value {}".format(test_result.r2))
# Step 5 - Peforming Predictions with novel data
# Creating unlabeld data from test data by removing the label in order to get predictions
unlabeled_data = test_data.select('house_size_feature')
predictions = lireg_model.transform(unlabeled_data)
print("\nPredictions for Novel Data")
predictions.show()
# Checking our model with new value manually
house_size_coeff = lireg_model.coefficients[0] # 係数
intercept = lireg_model.intercept # 切片
print("Cofficient is {}".format(house_size_coeff))
print("Intercept is {}".format(intercept))
lireg_model.coefficients
new_house_size = 950
# Mimicking the hypothesis function to get a prediction
price = (intercept) + (house_size_coeff) * new_house_size
print("Predicted house price for house size {} is {}".format(new_house_size, price))
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MultiVariableLinerReg').getOrCreate()
from pyspark.ml.regression import LinearRegression
data = spark.read.csv('/home/yoshi-1/ダウンロード/spark-with-python-master/multi_variable_regression.csv',
header=True,
inferSchema=True)
print("Initial Data")
data.show()
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Step2 - Data pre-processing and converting any string data to spark accepted format
# importing the StringIndexer to convert the locality feature into spark accepted format
from pyspark.ml.feature import StringIndexer
# convert the locality feature of string type into spark accepted data format
string_indexer_object = StringIndexer(inputCol='area', outputCol='area_feature')
string_indexed_df_object = string_indexer_object.fit(data)
final_data = string_indexed_df_object.transform(data)
print("Data after converting the string column locality into spark accepted feature")
final_data.show()
print(final_data.columns)
# Step3 - Data pre-processing and converting the numeric data to spark accepted format
# converting the feature(s) into spark accepted data format
# Passing multiple columns as the input columns
assember_object = VectorAssembler(
inputCols=['house_size', 'bedrooms', 'floors', 'house_age', 'area_feature'],
outputCol='house_features')
feature_vector_dataframe = assember_object.transform(final_data)
print(feature_vector_dataframe.show())
feature_vector_dataframe.printSchema()
# choose formatted feature col and target variable
formatted_data = feature_vector_dataframe.select('house_features', 'price_sold')
formatted_data.show()
# Step4 - Training our Linear Regression model with multiple variables
# Splitting the data into 60 and 40 percent
train_data, test_data = formatted_data.randomSplit([0.6, 0.4])
# Defining our Linear regression
lireg = LinearRegression(featuresCol='house_features', labelCol='price_sold')
# Training our model with training data
lireg_model = lireg.fit(train_data)
# Step5 - Evaluating of Trained data
# Evaluating our model with testing data
test_results = lireg_model.evaluate(test_data)
print("Residuals info - distance between data points and fitted regression line\n")
test_results.residuals.show()
print("Root Mean Square Error {}".format(test_results.rootMeanSquaredError))
print("R square value {}".format(test_results.r2))
# Step6 - Performing Predictions with novel data
# Creating unlabeled data from test data by removing the label in order to get predictions
unlabeled_data = test_data.select('house_features')
predictions = lireg_model.transform(unlabeled_data)
print('Predictions for Novel Data')
predictions.show()
test_data.show()
# Checking our model with new value manually
print('Coeffecients are {}'.format(lireg_model.coefficients))
print('Intercept is {}'.format(lireg_model.intercept))
spark = SparkSession.builder.appName('SparkLogReg').getOrCreate()
data = spark.read.csv('brain_tumor_dataset.csv', header=True, inferSchema=True)
data.show(3)
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, OneHotEncoder
# Creating a String Indexer - To convert every string into a unique number
sex_string_indexer_direct = StringIndexer(inputCol='sex', outputCol='sexIndexer')
indexed_data = sex_string_indexer_direct.fit(data)
final_string_indexed_data = indexed_data.transform(data)
final_string_indexed_data.show(3)
# Male - 1 and Female - 0
# Performing OneHotEncoding - convert this value(SexIndexer) into an array form
sex_encoder_direct = OneHotEncoder(inputCol='sexIndexer', outputCol='sexVector')
fit_encoded_data = sex_encoder_direct.fit(final_string_indexed_data)
encoded_data = fit_encoded_data.transform(final_string_indexed_data)
encoded_data.show(3)
assember_direct = VectorAssembler(inputCols=['age', 'sexVector', 'tumor_size'], outputCol='features')
assembler_data = assember_direct.transform(encoded_data)
final_data_direct = assembler_data.select('features', 'cancerous')
print("Conslidated Data with accepted features and labels")
final_data_direct.show(3)
from pyspark.ml.classification import LogisticRegression
logreg_direct = LogisticRegression(featuresCol='features', labelCol='cancerous')
train_data_direct, test_data_direct = final_data_direct.randomSplit([0.6, 0.4])
logreg_model_direct = logreg_direct.fit(train_data_direct)
# Evaluating our model with testing data
# Direct Evaluation using Trivial method
predictions_labels = logreg_model_direct.evaluate(test_data_direct)
print("Prediction Data")
predictions_labels.predictions.select(['features', 'cancerous', 'prediction']).show(3)
# Evaluation using BinaryClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
direct_evaluation = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='cancerous')
AUC_direct = direct_evaluation.evaluate(predictions_labels.predictions)
print("Area Under the Curve value is {}".format(AUC_direct))
print("\nCoeffecients are {}".format(logreg_model_direct.coefficients))
print("\nIntercept is {}".format(logreg_model_direct.intercept))
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, OneHotEncoder
spark = SparkSession.builder.appName('SparkLogReg').getOrCreate()
data = spark.read.csv('brain_tumor_dataset.csv', header=True, inferSchema=True)
data.show(4)
# Stage 1
sex_string_indexer = StringIndexer(inputCol='sex', outputCol='sexIndexer')
# Stage 2
sex_encoder = OneHotEncoder(inputCol='sexIndexer', outputCol='sexVector')
# Stage 3
assembler = VectorAssembler(inputCols=['age', 'sexVector', 'tumor_size'], outputCol='features')
# Stage 4
logreg = LogisticRegression(featuresCol='features', labelCol='cancerous')
# passing the 4 stages directly into a pipeline object
pipeline_object = Pipeline(stages=[sex_string_indexer, sex_encoder, assembler, logreg])
train_data, test_data = data.randomSplit([0.6, 0.4])
logreg_model = pipeline_object.fit(train_data)
model_results = logreg_model.transform(test_data)
print("Pridiction Data")
model_results.select('cancerous', 'prediction').show(4)
evaluation_object = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='cancerous')
AUC = evaluation_object.evaluate(model_results)
print("Area Under the Curve value is {}".format(AUC))