# prepare for installation of pyspark by findspark
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession
# create spark session
spark = SparkSession.builder.appName('myFirstSparkSession').getOrCreate()
df = spark.read.csv(
'/home/yoshi-1/ダウンロード/spark-with-python-master/employee.csv',
header=True,
inferSchema=True)
df.show()
df.columns
df.printSchema()
df.select(['employee_name', 'age', 'location']).show()
df_new = df.withColumn('overtime_time', df.hours * 3)
df_new.show()
df_rename = df.withColumnRenamed('hours', 'working_hours')
df_rename.show()
df_new = df.drop("working_hours")
df.head(3)
df.describe().show()
# sparkセッション作成
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
# データ読み込み
df = spark.read.csv(
'/home/yoshi-1/ダウンロード/spark-with-python-master/employee.csv',
header=True,
inferSchema=True)
# creating a view called associates
# dfをassociatesという名のビューに変換
df.createOrReplaceTempView("associates")
# sqlによる抽出
sql_result_1 = spark.sql("SELECT* FROM associates")
sql_result_1.show()
# SQLによる抽出2
sql_result_2 = spark.sql("SELECT * FROM associates WHERE age BETWEEN 45 AND 60 AND location='California'")
sql_result_2.show()
df.show()
# SQLによる列の追加
sql = '''
SELECT
employee_id,
employee_name,
age,
hours,
hours + 100 as overtime_work
FROM
associates
'''
sql_result_3 = spark.sql(sql)
sql_result_3.show()
spark = SparkSession.builder.appName("SparkFilter").getOrCreate()
df = spark.read.csv(
'/home/yoshi-1/ダウンロード/spark-with-python-master/items_bought.csv',
header=True,
inferSchema=True)
df.show(4)
df.filter("total_amount > 1500").show()
df.filter((df["item_price"]>1000) & (df['tax_amount']>500)).show()
result_data = df.filter((df['total_amount'] == 1924.74)).collect()
type(result_data)
result_data
len(result_data)
result_data[0].asDict()
spark = SparkSession.builder.appName('sparkGroupBy&Agg').getOrCreate()
df = spark.read.csv(
'/home/yoshi-1/ダウンロード/spark-with-python-master/company_product_revenue.csv',
header=True,
inferSchema=True)
df.show()
df.groupBy('company_name').sum().show()
df.agg({'revenue_sales': 'sum'}).show()
df.groupBy('company_name').max().show()
df.groupBy('company_name').agg({'revenue_sales': 'max'}).show()
df.orderBy('revenue_sales').show()
df.orderBy(df['revenue_sales'].desc()).show()
from pyspark.sql.functions import mean, avg, format_number
spark = SparkSession.builder.appName("SparkInbuildFunctions").getOrCreate()
df = spark.read.csv(
'/home/yoshi-1/ダウンロード/spark-with-python-master/company_product_revenue.csv',
header=True,
inferSchema=True
)
df.select(mean("revenue_sales").alias('Mean Revenue Sales')).show()
result_avg = df.select(avg("revenue_sales").alias("Average Revenue Sales"))
print("Average Revenue Sales value is {0}".format(result_avg.head()[0]))
result_avg.show()
result_avg.select(format_number("Average Revenue Sales", 2).alias("Formatted Average")).show()
spark = SparkSession.builder.appName("SparkMisingData").getOrCreate()
df = spark.read.csv(
'/home/yoshi-1/ダウンロード/spark-with-python-master/employee_data.csv',
header=True,
inferSchema=True)
df.show()
print("Data after dropping the rows having null values")
df.na.drop().show()
print('Data after droppingthe rows having atleast 4 non-null values')
df.na.drop(thresh=4).show()
print('Data after dropping the rows having null values in hours column')
df.na.drop(subset='hours').show()
print('Datya after filling the rows having null values in hours columns')
df.na.fill(12, subset='hours').show()
from pyspark.sql.functions import mean
mean_value = df.select(mean('hours'))
mean_value.collect()[0]
mean_value.collect()[0][0]
mean_value = mean_value.collect()[0][0]
df.na.fill(mean_value, subset='hours').show()
df.na.replace("Pichai", "Sundar", subset="employee_name").show()
spark = SparkSession.builder.appName("SparkDateTime").getOrCreate()
df = spark.read.csv('/home/yoshi-1/ダウンロード/spark-with-python-master/items_bought.csv',
header=True,
inferSchema=True)
df.show()
df.printSchema()
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date
# cast()メソッドで、列の型を変換
updated_df = df.withColumn('formatted_date', to_date(unix_timestamp(df['date'], 'dd-MM-yyyy').cast('timestamp')))
print("Schema with date column string datetype converted to date datatype")
updated_df.show()
updated_df.printSchema()
updated_df = updated_df.drop('date')
updated_df.show(3)
from pyspark.sql.functions import weekofyear, dayofmonth, month, year, date_format
print("Data Extraction from dates")
final_df = updated_df.select(updated_df["item_name"],
updated_df["total_amount"],
weekofyear(updated_df["formatted_date"]).alias("week_number"),
dayofmonth(updated_df["formatted_date"]).alias("day_number"),
month(updated_df["formatted_date"]).alias("month"),
year(updated_df["formatted_date"]).alias("year"))
final_df.show()
# converting date type to a different date format string
date_string_value = updated_df.select(df["item_name"], date_format(updated_df["formatted_date"], 'MM/dd/yyyy'))
date_string_value.show()
date_string_value.printSchema()
final_df.show()
final_df.groupBy("year").sum().show()
print("Usecae - Total amount of items purchased in that particular year")
final_format = final_df.groupBy("year").sum().select(["year", "sum(total_amount)"])
final_format.withColumnRenamed("sum(total_amount)", "Total Expenditure").show()