# prepare for installation of pyspark by findspark
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc
# create spark session
spark = SparkSession\
.builder\
.appName("SparkSQLExampleApp")\
.getOrCreate()
# path to data set
path = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
df = spark.read.format("csv")\
.option("inferSchema", "true")\
.option("header", "true")\
.load(path)
df.printSchema()
df.show(n=5)
# sql用にtemp_view作成
df.createOrReplaceTempView("us_delay_flights_tbl")
# ex1
spark.sql("""
SELECT distance, origin, destination
FROM us_delay_flights_tbl
WHERE distance > 1000
ORDER BY distance DESC""").show(10)
# ex1 を pythonで
df.select("distance", "origin", "destination").where(col("distance") > 1000).orderBy(desc("distance")).show(10)
# または、
df.select("distance", "origin", "destination").where("distance > 1000").orderBy("distance", ascending=False).show(10)
# ex2)
spark.sql("""
SELECT date, origin, destination
FROM us_delay_flights_tbl
WHERE delay > 120 AND origin = 'SFO' AND destination = 'ORD'
ORDER BY delay DESC""").show(10)
df.select("date", "origin", "destination")\
.where((col("delay")>120) & (col("origin")=="SFO") & (col("destination")=="ORD"))\
.orderBy("delay", ascending=False).show(10)
# ex3)
spark.sql("""
SELECT delay, origin, destination,
CASE
WHEN delay > 360 THEN 'Very Long Delays'
WHEN delay >= 120 AND delay <= 360 THEN 'Long Delays'
WHEN delay >= 60 AND delay < 120 THEN 'Short Delays'
WHEN delay > 0 AND delay < 60 THEN 'Tolerable Delays'
WHEN delay = 0 THEN 'No Delays'
ELSE 'Early'
END AS Flight_Delays
FROM us_delay_flights_tbl
ORDER BY origin, delay DESC""").show(10) # <- 最初にoriginでASCEの順序でソート後、delayでDESCの順序でソート
# python
from pyspark.sql.functions import when
new_df = df.withColumn("Flight_Delays",
when(col("delay") > 360, "Very Long Delays")\
.when((col("delay") >= 120) & (col("delay") <= 360), "Long Delays")\
.when((col("delay") >= 60) & (col("delay") < 120), "Short Delays")\
.when((col("delay") > 0) & (col("delay") < 60), "Tolerable Delays")\
.when(col("delay") == 0, "No Delays")\
.otherwise("Early"))\
.orderBy(col("origin").asc(), col("delay").desc())
new_df.select("delay", "origin", "destination", "Flight_Delays").show(10)
new_df.orderBy(col("origin").asc(), col("delay").desc()).show(10)
# create spark session
spark = SparkSession\
.builder\
.appName("SparkSQLExampleApp")\
.getOrCreate()
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")
# path to data set
path = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
df = spark.read.format("csv")\
.option("inferSchema", "true")\
.option("header", "true")\
.load(path)
# sql用にtemp_view作成
df.createOrReplaceTempView("us_delay_flights_tbl")
# spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING);")
# 以下、エラーメッセージ
# AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);
# 'CreateTable `learn_spark_db`.`managed_us_delay_flights_tbl`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists
# 上記ではエラーになるので、 temp_viewを使って作成
# spark.sql("CREATE TABLE managed_us_delay_flights_tbl (SELECT * FROM us_delay_flights_tbl)")
# 上記でもエラーになる
# 別の作り方
df.write.saveAsTable("managed_us_delay_flights_tbl")
# check table
spark.catalog.listTables()
# dfテーブルのデータを、optionで指定したパスに保存する
df.write.option("path", "tmp/data/us_flight_delay").saveAsTable("us_delay_flights_tbl")
# 一時的なViewも作成できる
df_sfo = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin='SFO'")
df_jfk = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin='JFK'")
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")
spark.sql("SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view").show()
# viewの削除
spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")
spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")
spark.catalog.listTables()
spark.catalog.listDatabases()
us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
us_flights_df.show(5)
# DataFrameReader.format(args).option("key", "value").schema(args).load()
# DataFrameReaderのインスタンス
SparkSession.read
or
SparkSession.readStream
# DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)
# DataFrameWriter.format(args)
# .option(args)
# .bucketBy(args)
# .partitionBy(args)
# .save(path)
DataFrame.write
or
DataFrame.writeStream
# create spark session
spark = SparkSession\
.builder\
.appName("SparkParkquetApp")\
.getOrCreate()
file = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/"
df = spark.read.format("parquet").load(file)
df.show(5)
%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING parquet
OPTIONS (
path "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/")
df.write.format("parquet").mode("overwrite").option("compression", "snappy")\
.save("/tmp/data/parquet/df_parquet")
df.write.mode("overwrite").saveAsTable("us_delay_flights_tbl")
file = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
df = spark.read.format("json").load(file) # jsonフォルダの全jsonファイルをまとめて読み込む
print((df.count(), len(df.columns)))
df.write.format("json")\
.mode("overwrite")\
.option("compression", "snappy")\
.save("/tmp/data/json/df_json")
file = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
schema = "DEST_COUNTRY_NAME string, ORIGIN_COUNTRY_NAME string, count int"
df = spark.read.format("csv").option("header", "true").schema(schema).option("mode", "FAILFAST").option("nullValue", "").load(file)
df.show(3)
df.printSchema()
df.createOrReplaceTempView("us_delay_flights_tbl")
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")
# 省略
from pyspark.ml import image
image_dir = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/cctvVideos/train_images/label=1/"
images_df = spark.read.format("image").load(image_dir) # フォルダ内の全画像ファイルを読み込んでいる
images_df.printSchema()
images_df.show(3)
images_df.select("image.height", "image.width", "image.nChannels", "image.mode").show(5, truncate=False)