In [1]:
# prepare for installation of pyspark by findspark 
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')

from pyspark.sql import SparkSession
In [14]:
from pyspark.sql.functions import col, desc
In [7]:
# create spark session
spark = SparkSession\
        .builder\
        .appName("SparkSQLExampleApp")\
        .getOrCreate()
In [8]:
# path to data set
path = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
In [9]:
df = spark.read.format("csv")\
        .option("inferSchema", "true")\
        .option("header", "true")\
        .load(path)
In [5]:
df.printSchema()
root
 |-- date: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)

In [8]:
df.show(n=5)
+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 5 rows

In [9]:
# sql用にtemp_view作成
df.createOrReplaceTempView("us_delay_flights_tbl")
In [10]:
# ex1
spark.sql("""
SELECT distance, origin, destination
FROM us_delay_flights_tbl
WHERE distance > 1000
ORDER BY distance DESC""").show(10)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows

In [17]:
# ex1 を pythonで
df.select("distance", "origin", "destination").where(col("distance") > 1000).orderBy(desc("distance")).show(10)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows

In [18]:
# または、
df.select("distance", "origin", "destination").where("distance > 1000").orderBy("distance", ascending=False).show(10)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows

In [11]:
# ex2)
spark.sql("""
SELECT date, origin, destination
FROM us_delay_flights_tbl
WHERE delay > 120 AND origin = 'SFO' AND destination = 'ORD'
ORDER BY delay DESC""").show(10)
+-------+------+-----------+
|   date|origin|destination|
+-------+------+-----------+
|2190925|   SFO|        ORD|
|1031755|   SFO|        ORD|
|1022330|   SFO|        ORD|
|1051205|   SFO|        ORD|
|1190925|   SFO|        ORD|
|2171115|   SFO|        ORD|
|1071040|   SFO|        ORD|
|1051550|   SFO|        ORD|
|3120730|   SFO|        ORD|
|1261104|   SFO|        ORD|
+-------+------+-----------+
only showing top 10 rows

In [23]:
df.select("date", "origin", "destination")\
    .where((col("delay")>120) & (col("origin")=="SFO") & (col("destination")=="ORD"))\
    .orderBy("delay", ascending=False).show(10)
+-------+------+-----------+
|   date|origin|destination|
+-------+------+-----------+
|2190925|   SFO|        ORD|
|1031755|   SFO|        ORD|
|1022330|   SFO|        ORD|
|1051205|   SFO|        ORD|
|1190925|   SFO|        ORD|
|2171115|   SFO|        ORD|
|1071040|   SFO|        ORD|
|1051550|   SFO|        ORD|
|3120730|   SFO|        ORD|
|1261104|   SFO|        ORD|
+-------+------+-----------+
only showing top 10 rows

In [12]:
# ex3)
spark.sql("""
SELECT delay, origin, destination,
    CASE
        WHEN delay > 360 THEN 'Very Long Delays'
        WHEN delay >= 120 AND delay <= 360 THEN 'Long Delays'
        WHEN delay >= 60 AND delay < 120 THEN 'Short Delays'
        WHEN delay > 0 AND delay < 60 THEN 'Tolerable Delays'
        WHEN delay = 0 THEN 'No Delays'
        ELSE 'Early'
    END AS Flight_Delays
FROM us_delay_flights_tbl
ORDER BY origin, delay DESC""").show(10) # <- 最初にoriginでASCEの順序でソート後、delayでDESCの順序でソート
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows

In [37]:
# python
from pyspark.sql.functions import when

new_df = df.withColumn("Flight_Delays",
                           when(col("delay") > 360, "Very Long Delays")\
                           .when((col("delay") >= 120) & (col("delay") <= 360), "Long Delays")\
                           .when((col("delay") >= 60) & (col("delay") < 120), "Short Delays")\
                           .when((col("delay") > 0) & (col("delay") < 60), "Tolerable Delays")\
                           .when(col("delay") == 0, "No Delays")\
                           .otherwise("Early"))\
            .orderBy(col("origin").asc(), col("delay").desc())
In [38]:
new_df.select("delay", "origin", "destination", "Flight_Delays").show(10)
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows

In [36]:
new_df.orderBy(col("origin").asc(), col("delay").desc()).show(10)
+-------+-----+--------+------+-----------+-------------+
|   date|delay|distance|origin|destination|Flight_Delays|
+-------+-----+--------+------+-----------+-------------+
|1220625|  333|     602|   ABE|        ATL|  Long Delays|
|2120625|  305|     602|   ABE|        ATL|  Long Delays|
|3021725|  275|     602|   ABE|        ATL|  Long Delays|
|2150625|  257|     602|   ABE|        ATL|  Long Delays|
|2211245|  247|     369|   ABE|        DTW|  Long Delays|
|2211215|  247|     602|   ABE|        ATL|  Long Delays|
|1220607|  219|     569|   ABE|        ORD|  Long Delays|
|3201725|  211|     602|   ABE|        ATL|  Long Delays|
|3121245|  197|     369|   ABE|        DTW|  Long Delays|
|2141628|  192|     569|   ABE|        ORD|  Long Delays|
+-------+-----+--------+------+-----------+-------------+
only showing top 10 rows

In [ ]:
 

Creating SQL Database and Table

In [2]:
# create spark session
spark = SparkSession\
        .builder\
        .appName("SparkSQLExampleApp")\
        .getOrCreate()

Creating managed table

In [7]:
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")
Out[7]:
DataFrame[]
In [8]:
# path to data set
path = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
In [9]:
df = spark.read.format("csv")\
        .option("inferSchema", "true")\
        .option("header", "true")\
        .load(path)
In [10]:
# sql用にtemp_view作成
df.createOrReplaceTempView("us_delay_flights_tbl")
In [13]:
# spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING);")
# 以下、エラーメッセージ
# AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);
# 'CreateTable `learn_spark_db`.`managed_us_delay_flights_tbl`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists



# 上記ではエラーになるので、 temp_viewを使って作成
# spark.sql("CREATE TABLE managed_us_delay_flights_tbl (SELECT * FROM us_delay_flights_tbl)")

# 上記でもエラーになる
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-13-1fae7367c658> in <module>
      7 
      8 # 上記ではエラーになるので、 temp_viewを使って作成
----> 9 spark.sql("CREATE TABLE managed_us_delay_flights_tbl (SELECT * FROM us_delay_flights_tbl)")

~/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/session.py in sql(self, sqlQuery)
    721         [Row(f1=1, f2='row1'), Row(f1=2, f2='row2'), Row(f1=3, f2='row3')]
    722         """
--> 723         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    724 
    725     def table(self, tableName):

~/spark-3.1.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

~/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);
'CreateTable `learn_spark_db`.`managed_us_delay_flights_tbl`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists
+- Project [date#16, delay#17, distance#18, origin#19, destination#20]
   +- SubqueryAlias us_delay_flights_tbl
      +- Relation[date#16,delay#17,distance#18,origin#19,destination#20] csv
In [11]:
# 別の作り方
df.write.saveAsTable("managed_us_delay_flights_tbl")
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-11-b0480773e7c0> in <module>
      1 # 別の作り方
----> 2 df.write.saveAsTable("managed_us_delay_flights_tbl")

~/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options)
   1156         if format is not None:
   1157             self.format(format)
-> 1158         self._jwrite.saveAsTable(name)
   1159 
   1160     def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,

~/spark-3.1.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

~/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Can not create the managed table('`managed_us_delay_flights_tbl`'). The associated location('file:/home/yoshi-1/ドキュメント/spark-warehouse/learn_spark_db.db/managed_us_delay_flights_tbl') already exists.
In [12]:
# check table
spark.catalog.listTables()
Out[12]:
[Table(name='us_delay_flights_tbl', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

Creating unmanaged table

In [16]:
# dfテーブルのデータを、optionで指定したパスに保存する
df.write.option("path", "tmp/data/us_flight_delay").saveAsTable("us_delay_flights_tbl")

Creating Views

In [7]:
# 一時的なViewも作成できる
df_sfo = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin='SFO'")
df_jfk = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin='JFK'")

df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")
In [12]:
spark.sql("SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view").show()
+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|1011250|   55|   SFO|        JFK|
|1012230|    0|   SFO|        JFK|
|1010705|   -7|   SFO|        JFK|
|1010620|   -3|   SFO|        MIA|
|1010915|   -3|   SFO|        LAX|
|1011005|   -8|   SFO|        DFW|
|1011800|    0|   SFO|        ORD|
|1011740|   -7|   SFO|        LAX|
|1012015|   -7|   SFO|        LAX|
|1012110|   -1|   SFO|        MIA|
|1011610|  134|   SFO|        DFW|
|1011240|   -6|   SFO|        MIA|
|1010755|   -3|   SFO|        DFW|
|1010020|    0|   SFO|        DFW|
|1010705|   -6|   SFO|        LAX|
|1010925|   -3|   SFO|        ORD|
|1010555|   -6|   SFO|        ORD|
|1011105|   -8|   SFO|        DFW|
|1012330|   32|   SFO|        ORD|
|1011330|    3|   SFO|        DFW|
+-------+-----+------+-----------+
only showing top 20 rows

In [13]:
# viewの削除
spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")
spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")
In [ ]:
 

Reading Tables into DataFrames

In [13]:
spark.catalog.listTables()
Out[13]:
[Table(name='us_delay_flights_tbl', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
In [14]:
spark.catalog.listDatabases()
Out[14]:
[Database(name='default', description='default database', locationUri='file:/home/yoshi-1/ドキュメント/spark-warehouse'),
 Database(name='learn_spark_db', description='', locationUri='file:/home/yoshi-1/ドキュメント/spark-warehouse/learn_spark_db.db')]
In [15]:
us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
In [16]:
us_flights_df.show(5)
+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 5 rows

In [ ]:
 

Data Sources for DataFrames and SQL Tables

In [17]:
# DataFrameReader.format(args).option("key", "value").schema(args).load()
# DataFrameReaderのインスタンス
SparkSession.read
or
SparkSession.readStream
In [ ]:
# DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)

# DataFrameWriter.format(args)
#                .option(args)
#                .bucketBy(args)
#                .partitionBy(args)
#                .save(path)
In [ ]:
DataFrame.write
or
DataFrame.writeStream
In [ ]:
 

Parquet

In [19]:
# create spark session
spark = SparkSession\
        .builder\
        .appName("SparkParkquetApp")\
        .getOrCreate()
In [21]:
file = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/"
df = spark.read.format("parquet").load(file)
In [22]:
df.show(5)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

Reading Parquet files into a Spark table

In [24]:
%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING parquet
    OPTIONS (
        path "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/")
  File "<ipython-input-24-eb971e722f67>", line 2
    CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
           ^
SyntaxError: invalid syntax

Writing DataFrame to Parquet files

In [25]:
df.write.format("parquet").mode("overwrite").option("compression", "snappy")\
    .save("/tmp/data/parquet/df_parquet")

Writing DataFrame to Spark SQL tables

In [26]:
df.write.mode("overwrite").saveAsTable("us_delay_flights_tbl")
In [ ]:
 

JSON

In [27]:
file = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
df = spark.read.format("json").load(file) # jsonフォルダの全jsonファイルをまとめて読み込む
In [30]:
print((df.count(), len(df.columns)))
(1502, 3)

Writing DataFrame to JSON files

In [ ]:
df.write.format("json")\
        .mode("overwrite")\
        .option("compression", "snappy")\
        .save("/tmp/data/json/df_json")
In [ ]:
 

CSV

In [42]:
file = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"

schema = "DEST_COUNTRY_NAME string, ORIGIN_COUNTRY_NAME string, count int"

df = spark.read.format("csv").option("header", "true").schema(schema).option("mode", "FAILFAST").option("nullValue", "").load(file)
In [43]:
df.show(3)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows

In [44]:
df.printSchema()
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

In [45]:
df.createOrReplaceTempView("us_delay_flights_tbl")
In [46]:
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
|    United States|          Singapore|   25|
|    United States|            Grenada|   54|
|       Costa Rica|      United States|  477|
|          Senegal|      United States|   29|
|    United States|   Marshall Islands|   44|
+-----------------+-------------------+-----+
only showing top 10 rows

Writing DataFrame to CSV files

In [47]:
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")
In [ ]:
 

Avro, ORC,

In [49]:
# 省略

Images

In [50]:
from pyspark.ml import image
In [51]:
image_dir = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/cctvVideos/train_images/label=1/"
images_df = spark.read.format("image").load(image_dir) # フォルダ内の全画像ファイルを読み込んでいる
In [52]:
images_df.printSchema()
root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)

In [55]:
images_df.show(3)
+--------------------+
|               image|
+--------------------+
|{file:///home/yos...|
|{file:///home/yos...|
|{file:///home/yos...|
+--------------------+
only showing top 3 rows

In [59]:
images_df.select("image.height", "image.width", "image.nChannels", "image.mode").show(5, truncate=False)
+------+-----+---------+----+
|height|width|nChannels|mode|
+------+-----+---------+----+
|288   |384  |3        |16  |
|288   |384  |3        |16  |
|288   |384  |3        |16  |
|288   |384  |3        |16  |
|288   |384  |3        |16  |
+------+-----+---------+----+
only showing top 5 rows

In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]: