# prepare for installation of pyspark by findspark
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, concat, lit
spark = SparkSession.builder.appName("checschema").getOrCreate()
df = spark.read.csv(
'./LearningSparkV2-master/chapter3/data/sf-fire-calls.csv',header=True,inferSchema=True)
df.printSchema()
df.head(3)[0]
df.columns
temp_df = df.select("IncidentNumber", "AvailableDtTm", "CallType").where(col("CallType") != "Medical Incident")
temp_df.show(5, truncate=False)
# calltypeの種類数
df.select("CallType")\
.where(col("CallType").isNotNull())\
.agg(countDistinct("CallType")).alias("DistictCallTypes")\
.show()
# calltypeの上位10種表示
df.select("CallType")\
.where(col("CallType").isNotNull())\
.distinct()\
.show(10, False)
new_df = df.withColumnRenamed("Delay", "ResponseDelaydinMins")
new_df.select("ResponseDelaydinMins")\
.where(col("ResponseDelaydinMins") > 5)\
.show(5, False)
from pyspark.sql.functions import to_timestamp
df.printSchema()
new_df = df.select("City","StationArea")
new_df.show(3)
# 新しい列の作成
# col1とcol2の値を足し合わせて新しい"newcol"を作成
# concat関数を利用する
new_col_df = new_df.withColumn("newcol", concat(col('City'), lit(' - '), col('StationArea')))
new_col_df.show(3)
fire_ts_df = new_df.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))\
.drop("CallDate")\
.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))\
.drop("WatchDate")\
.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))\
.drop("AvailableDtTm")
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False)
from pyspark.sql.functions import year
fire_ts_df.select(year("IncidentDate"))\
.distinct()\
.orderBy(year("IncidentDate"))\
.show()
fire_ts_df\
.select("CallType")\
.where(col("CallType").isNotNull())\
.groupBy("CallType")\
.count()\
.orderBy("count", ascending=False)\
.show(n=10, truncate=False)
import pyspark.sql.functions as F
fire_ts_df\
.select(F.sum("NumAlarms"), F.avg("ResponseDelaydinMins"),
F.min("ResponseDelaydinMins"), F.max("ResponseDelaydinMins"))\
.show()
fire_ts_df.select("ResponseDelaydinMins").describe().show()
fire_ts_df.columns
# 2018年のCallType
fire_ts_df\
.where(year("IncidentDate") == 2018)\
.groupBy("CallType")\
.count()\
.orderBy("count", ascending=False)\
.show()
from pyspark.sql.functions import month, year
# 2018年で最も呼ばれた月
fire_2018 = fire_ts_df.where(year("IncidentDate") == 2018)
fire_2018 = fire_2018.withColumn("month", month(col("IncidentDate")))
fire_2018.groupBy("month").count().orderBy("count", ascending=False).show()
# 2018年にサンフランシスコで、最も電話したneighborhoodは?
fire_2018.where(col("city") == "San Francisco").groupBy("Neighborhood").count().orderBy("count", ascending=False).show(5)
neighbor_df = fire_2018\
.where(col("city") == "San Francisco")\
.groupBy("Neighborhood")\
.count()\
.select("Neighborhood", "count")
neighbor_df\
.where(col("count") == neighbor_df.select(F.max("count")).first()[0])\
.show()
test = neighbor_df.select(F.max("count")).first()[0]
test[0]
# 2018年のサンフランシスコで最も救急車が来るのが遅かったneighbor
fire_2018\
.where(col("city") == "San Francisco")\
.groupBy("Neighborhood")\
.mean()\
.select("Neighborhood", "avg(ResponseDelaydinMins)")\
.orderBy("avg(ResponseDelaydinMins)", ascending=False).show(5)
# こっちが解答
fire_2018\
.where(col("city") == "San Francisco")\
.select("Neighborhood", "ResponseDelaydinMins")\
.orderBy("ResponseDelaydinMins", ascending=False)\
.show(10, False)
fire_2018.select("ResponseDelaydinMins").show(5)