In [1]:
# prepare for installation of pyspark by findspark 
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')

from pyspark.sql import SparkSession
In [13]:
from pyspark.sql.functions import col, countDistinct, concat, lit
In [3]:
spark = SparkSession.builder.appName("checschema").getOrCreate()
In [4]:
df = spark.read.csv(
    './LearningSparkV2-master/chapter3/data/sf-fire-calls.csv',header=True,inferSchema=True)
In [5]:
df.printSchema()
root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- RowID: string (nullable = true)
 |-- Delay: double (nullable = true)

In [6]:
df.head(3)[0]
Out[6]:
Row(CallNumber=20110016, UnitID='T13', IncidentNumber=2003235, CallType='Structure Fire', CallDate='01/11/2002', WatchDate='01/10/2002', CallFinalDisposition='Other', AvailableDtTm='01/11/2002 01:51:44 AM', Address='2000 Block of CALIFORNIA ST', City='SF', Zipcode=94109, Battalion='B04', StationArea='38', Box='3362', OriginalPriority='3', Priority='3', FinalPriority=3, ALSUnit=False, CallTypeGroup=None, NumAlarms=1, UnitType='TRUCK', UnitSequenceInCallDispatch=2, FirePreventionDistrict='4', SupervisorDistrict='5', Neighborhood='Pacific Heights', Location='(37.7895840679362, -122.428071912459)', RowID='020110016-T13', Delay=2.95)
In [7]:
df.columns
Out[7]:
['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallDate',
 'WatchDate',
 'CallFinalDisposition',
 'AvailableDtTm',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'Delay']
In [8]:
temp_df = df.select("IncidentNumber", "AvailableDtTm", "CallType").where(col("CallType") != "Medical Incident")
temp_df.show(5, truncate=False)
+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows

In [9]:
# calltypeの種類数
df.select("CallType")\
    .where(col("CallType").isNotNull())\
    .agg(countDistinct("CallType")).alias("DistictCallTypes")\
    .show()
+---------------+
|count(CallType)|
+---------------+
|             30|
+---------------+

In [10]:
# calltypeの上位10種表示
df.select("CallType")\
    .where(col("CallType").isNotNull())\
    .distinct()\
    .show(10, False)
+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows

In [11]:
new_df = df.withColumnRenamed("Delay", "ResponseDelaydinMins")
In [12]:
new_df.select("ResponseDelaydinMins")\
    .where(col("ResponseDelaydinMins") > 5)\
    .show(5, False)
+--------------------+
|ResponseDelaydinMins|
+--------------------+
|5.35                |
|6.25                |
|5.2                 |
|5.6                 |
|7.25                |
+--------------------+
only showing top 5 rows

日付操作

  • to_timestamp()
  • dayofmonth()
  • dayofyear()
  • dayofweek()
  • year()
  • month()
In [13]:
from pyspark.sql.functions import to_timestamp
In [9]:
df.printSchema()
root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- RowID: string (nullable = true)
 |-- Delay: double (nullable = true)

In [10]:
new_df = df.select("City","StationArea")
new_df.show(3)
+----+-----------+
|City|StationArea|
+----+-----------+
|  SF|         38|
|  SF|         42|
|  SF|         01|
+----+-----------+
only showing top 3 rows

In [15]:
# 新しい列の作成

# col1とcol2の値を足し合わせて新しい"newcol"を作成
# concat関数を利用する
new_col_df = new_df.withColumn("newcol", concat(col('City'), lit(' - '), col('StationArea')))
new_col_df.show(3)
+----+-----------+-------+
|City|StationArea| newcol|
+----+-----------+-------+
|  SF|         38|SF - 38|
|  SF|         42|SF - 42|
|  SF|         01|SF - 01|
+----+-----------+-------+
only showing top 3 rows

In [14]:
fire_ts_df = new_df.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))\
                    .drop("CallDate")\
                    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))\
                    .drop("WatchDate")\
                    .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))\
                    .drop("AvailableDtTm")
In [15]:
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False)
+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows

In [17]:
from pyspark.sql.functions import year
In [18]:
fire_ts_df.select(year("IncidentDate"))\
            .distinct()\
            .orderBy(year("IncidentDate"))\
            .show()
+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+

In [20]:
fire_ts_df\
    .select("CallType")\
    .where(col("CallType").isNotNull())\
    .groupBy("CallType")\
    .count()\
    .orderBy("count", ascending=False)\
    .show(n=10, truncate=False)
+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows

最大、最小などの関数呼び出し

importでpysparkのmax()を呼び出すと、pythonのmax()と衝突するので、以下のように呼び出したほうがよい

In [21]:
import pyspark.sql.functions as F
In [23]:
fire_ts_df\
    .select(F.sum("NumAlarms"), F.avg("ResponseDelaydinMins"),
               F.min("ResponseDelaydinMins"), F.max("ResponseDelaydinMins"))\
    .show()
+--------------+-------------------------+-------------------------+-------------------------+
|sum(NumAlarms)|avg(ResponseDelaydinMins)|min(ResponseDelaydinMins)|max(ResponseDelaydinMins)|
+--------------+-------------------------+-------------------------+-------------------------+
|        176170|       3.8923641541750413|              0.016666668|                  1844.55|
+--------------+-------------------------+-------------------------+-------------------------+

In [25]:
fire_ts_df.select("ResponseDelaydinMins").describe().show()
+-------+--------------------+
|summary|ResponseDelaydinMins|
+-------+--------------------+
|  count|              175296|
|   mean|  3.8923641541750413|
| stddev|   9.378286170882717|
|    min|         0.016666668|
|    max|             1844.55|
+-------+--------------------+

In [26]:
fire_ts_df.columns
Out[26]:
['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallFinalDisposition',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'ResponseDelaydinMins',
 'IncidentDate',
 'OnWatchDate',
 'AvailableDtTS']
In [33]:
# 2018年のCallType
fire_ts_df\
    .where(year("IncidentDate") == 2018)\
    .groupBy("CallType")\
    .count()\
    .orderBy("count", ascending=False)\
    .show()
+--------------------+-----+
|            CallType|count|
+--------------------+-----+
|    Medical Incident| 7004|
|              Alarms| 1144|
|      Structure Fire|  906|
|   Traffic Collision|  433|
|        Outside Fire|  153|
|               Other|  114|
|Citizen Assist / ...|  113|
|Gas Leak (Natural...|   69|
|        Water Rescue|   43|
|Elevator / Escala...|   36|
|   Electrical Hazard|   30|
|        Vehicle Fire|   28|
|Smoke Investigati...|   28|
|Odor (Strange / U...|   10|
|          Fuel Spill|   10|
|Train / Rail Inci...|    5|
|              HazMat|    5|
|  Suspicious Package|    3|
|           Explosion|    1|
|       Assist Police|    1|
+--------------------+-----+

In [ ]:
 
In [110]:
from pyspark.sql.functions import month, year
In [37]:
# 2018年で最も呼ばれた月

fire_2018 = fire_ts_df.where(year("IncidentDate") == 2018)

fire_2018 = fire_2018.withColumn("month", month(col("IncidentDate")))
fire_2018.groupBy("month").count().orderBy("count", ascending=False).show()
+-----+-----+
|month|count|
+-----+-----+
|   10| 1068|
|    5| 1047|
|    3| 1029|
|    8| 1021|
|    1| 1007|
|    7|  974|
|    6|  974|
|    9|  951|
|    4|  947|
|    2|  919|
|   11|  199|
+-----+-----+

In [44]:
# 2018年にサンフランシスコで、最も電話したneighborhoodは?
fire_2018.where(col("city") == "San Francisco").groupBy("Neighborhood").count().orderBy("count", ascending=False).show(5)
+--------------------+-----+
|        Neighborhood|count|
+--------------------+-----+
|          Tenderloin| 1393|
|     South of Market| 1052|
|             Mission|  911|
|Financial Distric...|  764|
|Bayview Hunters P...|  513|
+--------------------+-----+
only showing top 5 rows

In [65]:
neighbor_df = fire_2018\
        .where(col("city") == "San Francisco")\
        .groupBy("Neighborhood")\
        .count()\
        .select("Neighborhood", "count")
   
In [96]:
neighbor_df\
    .where(col("count") == neighbor_df.select(F.max("count")).first()[0])\
    .show()
+------------+-----+
|Neighborhood|count|
+------------+-----+
|  Tenderloin| 1393|
+------------+-----+

In [93]:
test = neighbor_df.select(F.max("count")).first()[0]
In [95]:
test[0]
Out[95]:
1393
In [106]:
# 2018年のサンフランシスコで最も救急車が来るのが遅かったneighbor
fire_2018\
    .where(col("city") == "San Francisco")\
    .groupBy("Neighborhood")\
    .mean()\
    .select("Neighborhood", "avg(ResponseDelaydinMins)")\
    .orderBy("avg(ResponseDelaydinMins)", ascending=False).show(5)
+--------------------+-------------------------+
|        Neighborhood|avg(ResponseDelaydinMins)|
+--------------------+-------------------------+
|     Treasure Island|            11.3208333375|
|            Presidio|       6.2481480977777775|
|           Chinatown|         6.15881830687831|
|        McLaren Park|        4.744047642857143|
|Bayview Hunters P...|        4.629759600955165|
+--------------------+-------------------------+
only showing top 5 rows

In [116]:
# こっちが解答
fire_2018\
    .where(col("city") == "San Francisco")\
    .select("Neighborhood", "ResponseDelaydinMins")\
    .orderBy("ResponseDelaydinMins", ascending=False)\
    .show(10, False)
+------------------------------+--------------------+
|Neighborhood                  |ResponseDelaydinMins|
+------------------------------+--------------------+
|Chinatown                     |491.26666           |
|Financial District/South Beach|406.63333           |
|Tenderloin                    |340.48334           |
|Haight Ashbury                |175.86667           |
|Bayview Hunters Point         |155.8               |
|Financial District/South Beach|135.51666           |
|Pacific Heights               |129.01666           |
|Potrero Hill                  |109.8               |
|Inner Sunset                  |106.13333           |
|South of Market               |94.71667            |
+------------------------------+--------------------+
only showing top 10 rows

In [103]:
fire_2018.select("ResponseDelaydinMins").show(5)
+--------------------+
|ResponseDelaydinMins|
+--------------------+
|           2.8833334|
|           6.3333335|
|                2.65|
|           3.5333333|
|                 1.1|
+--------------------+
only showing top 5 rows

In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]: