# prepare for installation of pyspark by findspark
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType # 64bit-intger型
# create spark session
spark = SparkSession\
.builder\
.appName("SparkUDFExampleApp")\
.getOrCreate()
# Create cueb function
def cubed(s):
return s * s * s
# Register UDF
spark.udf.register("cubed", cubed, LongType()) # register(登録関数名,登録する関数, 戻り値の型)
# Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test") # [1,2,...,8]のテーブル作成
spark.sql("SELECT * FROM udf_test").show()
# cuded()関数をSQLのクエリに使用
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()
import pandas as pd
# Import various pyspark SQL functions including pandas_udf
from pyspark.sql.functions import col, pandas_udf
# Declare the cubed function
def cubed(a: pd.Series) -> pd.Series:
return a * a * a
# Create the pandas UDF for the cubed functioon
# pandas_udfでpandasのdataframeを引数にとり、datarframeを返す関数を作成できる
cubed_udf = pandas_udf(cubed, returnType=LongType())
cubed_udf
x = pd.Series([1,2,3])
# The function for a pandas_udf executed with local Pandas data
y = cubed(x)
print(y)
print(type(y))
# Create a Spark DataFrame
# create spark session
spark = SparkSession\
.builder\
.appName("SparkExampleApp")\
.getOrCreate()
df = spark.range(1,4)
df.show()
# Excute function as a Spark vetorized UDF
df.select("id", cubed_udf(col("id"))).show()
# Read Option 1: Loading data from a JDBC source using load method
jdbcDF1 = spark.read\
.format("jdbc")\
.option("url", "jdbc:postgresql://[DBSERVER]")\
.option("dbtable", "[SCHEMA].[TABLENAME]")\
.option("user", "[USERNAME]")\
.option("password", "[PASSWORD]")\
.load()
# Read Option 2: Loading data from a JDBC source using jdbc methon
jdbcDF2 = spark.read\
.jdbc("jdbc:postgresql://[DBSERVER]",
"[SCHEMA].[TABLENAME]",
properties={"user": "[USERNAME]", "password": "[PASSWORD]"})
# Write option 1: Saving data to a JDBC source using save method
jdbcDF1\
.write\
.format("jdbc")\
.option("url", "jdbc:postgresql://[DBSERVER]")\
.option("dbtable", "[SCHEMA].[TABLENAME]")\
.option("user", "[USERNAME]")\
.option("password", "[PASSWORD]")\
.save()
# Write option 2: Saving data to a JDBC source using jdbc method
jdbcDF2\
.write\
.jdbc("jdbc:postgresql://[DBSERVER]",
"[SCHEMA].[TABLENAME]",
properties={"user": "[USERNAME]", "password": "[PASSWORD]")
# Loading data from a JDBC source using load method
jdbcDF = spark.read\
.format("jdbc")\
.option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")\
.option("driver", "com.mysql.jdbc.Driver")\
.option("dbtable", "[TABLENAME]")\
.option("user", "[USERNAME]")\
.option("password", "[PASSWORD]")\
.load()
# Saving data from a JDBC source using load method
jdbcDF.write\
.format("jdbc")\
.option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")\
.option("driver", "com.mysql.jdbc.Driver")\
.option("dbtable", "[TABLENAME]")\
.option("user", "[USERNAME]")\
.option("password", "[PASSWORD]")\
.save()
# 省略
# In this nested SQL statement, we first explode(values), which creates an new row (with the id) for each element (value) within values:
-- In SQL
SELECT
id,
collect_list(value + 1) AS values
FROM
(SELECT
id,
EXPLODE(values) AS values
FROM
table) x
GROUP BY id
# while collect_list() returns a list of objects with duplicates,
# the GROUP BY statement requires shuffle operations,
# meaning the order of the re-collected array isn't
# necessarily the same as that of the original array.
# As values could be any number of dimensions (a really wide and/or really long array)
# and we're doing a GROUP BY, this approach could be very expensive.
# To perform the same task (adding 1 to each element in values), we can alse create a UDF that users map()
# to iterate through each element (values) and perform the addition operation:
// In Scala
def addOne(values: Seq[Int]): Seq[Int] = {
values.map(value => value + 1)
}
va. plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])
spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show()
-- In SQL
transform(values, value -> lambda expression)
The transform() function takes an array (values) and anonymous function (lambda expression) as input.
the function transparently creates a new array by applying the anonymous function to each element, and then
assigning the result to the output array (similar to the UDF approach, but more efficiently).
from pyspark.sql.types import *
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])
# StructType()で列名(StructField)と型を設定
# ArrayType()でリスト型を指定
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]] # make tuple
t_c = spark.createDataFrame(t_list, schema)
t_c.show()
t_c.printSchema()
t_c.createOrReplaceTempView("tC")
# Calculate Fahrenheit from Celsius for an array of tempratures
spark.sql("""
SELECT
celsius,
transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
FROM
tC
""").show()
# Filter tempratures > 38C for array of tempratures
spark.sql("""
SELECT
celsius,
filter(celsius, t -> t > 38) AS high
FROM
tC
""").show()
# Is there a temprature of 38C in the array of tempratures
spark.sql("""
SELECT
celsius,
exists(celsius, t -> t = 38) AS threshold
FROM
tC
""").show()
# Calkculate avarage temperature and convert to F
spark.sql("""
SELECT
celsius,
reduce(
celsius,
0,
(t, acc) -> t + acc,
acc -> (acc div size(celsius) * 9 div 5) + 32
) AS avgFahrenheit
FROM
tC
""").show()
# reduce() 関数が見つからない? 無くなった?
from pyspark.sql.functions import expr # 文字列をカラムに変換する
# create spark session
spark = SparkSession\
.builder\
.appName("SparkExampleApp")\
.getOrCreate()
# Set file paths
tripdelaysFilePath = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
airportsnafilePath = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
# Obtain airports data set
airpotsna = spark.read\
.format("csv")\
.options(header="true", inferSchema="true", sep="\t")\
.load(airportsnafilePath)
airpotsna.show(5)
# make temp table
airpotsna.createOrReplaceTempView("airports_na")
# Obtain departure delays data set
departureDelays = spark.read\
.format("csv")\
.options(header="true")\
.load(tripdelaysFilePath)
departureDelays.show(5)
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.printSchema()
# delay列、distance列の型変換
departureDelays = departureDelays\
.withColumn("delay", expr("CAST(delay as INT) as delay"))\
.withColumn("distance", expr("CAST(distance as INT) as distance"))
departureDelays.show(5)
departureDelays.printSchema()
# Create temporary small table
foo = departureDelays\
.filter(expr("""
origin == 'SEA' and destination == 'SFO' and
date like '01010%' and delay > 0
"""))
foo.show(5)
foo.createOrReplaceTempView("foo")
spark.sql("SELECT * FROM airports_na LIMIT 10").show()
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()
spark.sql("SELECT * FROM foo").show()
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")
# Show the union (filtering for SEA and SFO in a specific time range)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date like '01010%' AND delay > 0 """)).show()
# Join departure delays data (foo) with airport info
foo.join(
airpotsna,
airpotsna.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()
# In SQL
spark.sql("""
SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
FROM foo f
JOIN airports_na a
ON a.IATA = f.origin
""").show()
CREATE TABLE departureDelaysWindow AS
SELECT
origin,
destination,
SUM(delay) AS TotalDelays
FROM
departureDelays
WHERE
origin IN ('SEA', 'SFO', 'JFK')
AND
destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY
origin,
destination;
# SQL
SELECT
origin,
destination,
SUM(TotalDelays) AS TotalDelays
FROM
departureDelaysWindow
WHERE
origin = '[origin]'
GROUP BY
origin, destination
ORDER BY
SUM(TotalDelays) DESC
# SQL
spark.sql("""
SELECT
origin, destination, TotakDelays, rank
FROM (
SELECT
origin, destination, TotalDelays, dense_rank()
OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
) t
WHERE
rank <= 3
""").show()
foo.show()
from pyspark.sql.functions import expr
foo2 = foo.withColumn(
"status",
expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
)
foo2.show()
foo3 = foo2.drop("delay")
foo3.show()
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()
# SQL
SELECT
destination,
CAST(SUBSTRING(date, 0, 2) AS int) AS month,
delay
FROM
departureDelays
WHERE
origin = 'SEA
# Result of upper SQL
destination | month | delay
ORD | 1 | 92
JFK | 1 | -7
DFW | 1 | -5
MIA | 1 | -3
......
Pivoting allows you to place names in the month column (instead of 1 and 2 you can show Jan and Feb, respectively) as well as perform aggregate calculations (in this case average and max) on the delays by destination and month:
# SQL
SELECT
*
FROM (
SELECT
destination,
CAST(SUBSTRING(date, 0, 2) AS int) AS month,
delay
FROM
departureDelays
WHERE
origin = 'SEA'
)
PIVOT (
CAST(AVG(delay) AS DECIMAL(4,2)) AS AvgDelay,
MAX(delay) AS MaxDelay
FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination
# Result
destination | JAN_AvgDelay | JAN_MaxDelay | FEB_AvgDelay | FEB_MaxDelay
ABQ | 19.86 | 316 | 11.42 | 69
ANC | 4.44 | 149 | 7.90 | 141
ATL | 11.98 | 397 | 7.73 | 145
......