User-Defined Functions (UDF)

In [1]:
# prepare for installation of pyspark by findspark 
import findspark
findspark.init('/home/yoshi-1/spark-3.1.1-bin-hadoop2.7')
In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType # 64bit-intger型
In [3]:
# create spark session
spark = SparkSession\
        .builder\
        .appName("SparkUDFExampleApp")\
        .getOrCreate()
In [4]:
# Create cueb function
def cubed(s):
    return s * s * s
In [5]:
# Register UDF
spark.udf.register("cubed", cubed, LongType()) # register(登録関数名,登録する関数, 戻り値の型)
Out[5]:
<function __main__.cubed(s)>
In [6]:
# Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test") # [1,2,...,8]のテーブル作成
In [7]:
spark.sql("SELECT * FROM udf_test").show()
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
+---+

In [8]:
# cuded()関数をSQLのクエリに使用
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()
+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+

Speeding up and distributing Pyspark UDFs with Pandas UDFs

In [9]:
import pandas as pd
In [10]:
# Import various pyspark SQL functions including pandas_udf
from pyspark.sql.functions import col, pandas_udf
In [11]:
# Declare the cubed function
def cubed(a: pd.Series) -> pd.Series:
    return a * a * a
In [12]:
# Create the pandas UDF for the cubed functioon
# pandas_udfでpandasのdataframeを引数にとり、datarframeを返す関数を作成できる
cubed_udf = pandas_udf(cubed, returnType=LongType())
In [13]:
cubed_udf
Out[13]:
<function __main__.cubed(a: pandas.core.series.Series) -> pandas.core.series.Series>
In [14]:
x = pd.Series([1,2,3])
In [15]:
# The function for a pandas_udf executed with local Pandas data
y = cubed(x)
In [16]:
print(y)
0     1
1     8
2    27
dtype: int64
In [17]:
print(type(y))
<class 'pandas.core.series.Series'>
In [ ]:
 
In [18]:
# Create a Spark DataFrame

# create spark session
spark = SparkSession\
        .builder\
        .appName("SparkExampleApp")\
        .getOrCreate()

df = spark.range(1,4)
df.show()
+---+
| id|
+---+
|  1|
|  2|
|  3|
+---+

In [19]:
# Excute function as a Spark vetorized UDF
df.select("id", cubed_udf(col("id"))).show()
+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+

In [ ]:
 

External Data Sources

PostgreSQL

  • To connect to a PostgreSQL database, build or download the JDBC jar from Maven and add it to your classpath. Then start a Spark shell(spark-shell or pyspark9, specifying that jar:
    • bin/spark-shell --jars postgresql-42.2.6.jar
In [ ]:
# Read Option 1: Loading data from a JDBC source using load method
jdbcDF1 = spark.read\
                .format("jdbc")\
                .option("url", "jdbc:postgresql://[DBSERVER]")\
                .option("dbtable", "[SCHEMA].[TABLENAME]")\
                .option("user", "[USERNAME]")\
                .option("password", "[PASSWORD]")\
                .load()
In [ ]:
# Read Option 2: Loading data from a JDBC source using jdbc methon
jdbcDF2 = spark.read\
                .jdbc("jdbc:postgresql://[DBSERVER]", 
                      "[SCHEMA].[TABLENAME]",
                      properties={"user": "[USERNAME]", "password": "[PASSWORD]"})
In [ ]:
# Write option 1: Saving data to a JDBC source using save method
jdbcDF1\
    .write\
    .format("jdbc")\
    .option("url", "jdbc:postgresql://[DBSERVER]")\
    .option("dbtable", "[SCHEMA].[TABLENAME]")\
    .option("user", "[USERNAME]")\
    .option("password", "[PASSWORD]")\
    .save()
In [ ]:
# Write option 2: Saving data to a JDBC source using jdbc method
jdbcDF2\
    .write\
    .jdbc("jdbc:postgresql://[DBSERVER]", 
          "[SCHEMA].[TABLENAME]",
          properties={"user": "[USERNAME]", "password": "[PASSWORD]")

MySQL

  • To connect to a MYSQL database, build or download th JDBC jar from Maven or MYSQL and add it to your classpath. Then start a Spark shell(spark-shel or pyspark), specifying that jar:
    • bin/spark-shell --jars mysql-connector-java_8.0.16-bin.jar
In [ ]:
# Loading data from a JDBC source using load method
jdbcDF = spark.read\
              .format("jdbc")\
              .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")\
              .option("driver", "com.mysql.jdbc.Driver")\
              .option("dbtable", "[TABLENAME]")\
              .option("user", "[USERNAME]")\
              .option("password", "[PASSWORD]")\
              .load()
In [ ]:
# Saving data from a JDBC source using load method
jdbcDF.write\
      .format("jdbc")\
      .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")\
      .option("driver", "com.mysql.jdbc.Driver")\
      .option("dbtable", "[TABLENAME]")\
      .option("user", "[USERNAME]")\
      .option("password", "[PASSWORD]")\
      .save()

Azure Cosmos DB

In [ ]:
# 省略
In [ ]:
 

Higher-Order Fuctions in DataFrames and Spark SQL

Because complex data types are amalgamations of simple data types, it is tempting to manipulate them directory. There are two typical solutions for manipulating complex data types:
  • Exploding the nested structure into individual rows, applying some function, and then re-creating the nested structure
  • Building a user-defined function
These approache have the benefit of allowing you to think of the problem in tabular format. They typically involve( but are not limited to) using utility functions such as get_json_object(), from_json(), to_json(), explode(), and selectExpr().

Option 1: Explode and Collect

In [ ]:
# In this nested SQL statement, we first explode(values), which creates an new row (with the id) for each element (value) within values:
-- In SQL
SELECT
    id, 
    collect_list(value + 1) AS values
FROM
    (SELECT
            id,
            EXPLODE(values) AS values
     FROM
            table) x
GROUP BY id

# while collect_list() returns a list of objects with duplicates,
# the GROUP BY statement requires shuffle operations,
# meaning the order of the re-collected array isn't 
# necessarily the same as that of the original array.
# As values could be any number of dimensions (a really wide and/or really long array)
# and we're doing a GROUP BY, this approach could be very expensive.

Option 2: User-Defined Function

In [ ]:
# To perform the same task (adding 1 to each element in values), we can alse create a UDF that users map()
# to iterate through each element (values) and perform the addition operation:
// In Scala
def addOne(values: Seq[Int]): Seq[Int] = {
    values.map(value => value + 1)
}
va. plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])
In [ ]:
spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show()

Bilt-in Fuctions for Complex Data Types

Instead of using those ways, we can use built-in fuction below

  • array_contains
  • array_distinct
  • array_except
  • array_intersect
  • array_join
  • array_max
  • array_min
  • array_position
  • array_remove
  • array_repeat
  • array_sort
  • array_union
  • arrays_overlap
  • arrays_zip
  • その他マニュアル参照

lambda Function

In [ ]:
-- In SQL
transform(values, value -> lambda expression)

The transform() function takes an array (values) and anonymous function (lambda expression) as input.
the function transparently creates a new array by applying the anonymous function to each element, and then
assigning the result to the output array (similar to the UDF approach, but more efficiently).
In [20]:
from pyspark.sql.types import *

schema = StructType([StructField("celsius", ArrayType(IntegerType()))]) 
# StructType()で列名(StructField)と型を設定
# ArrayType()でリスト型を指定
In [22]:
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]] # make tuple
t_c = spark.createDataFrame(t_list, schema)
In [23]:
t_c.show()
+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+

In [24]:
t_c.printSchema()
root
 |-- celsius: array (nullable = true)
 |    |-- element: integer (containsNull = true)

In [25]:
t_c.createOrReplaceTempView("tC")

transform()

transform(array, function<T, U>): array

  • The transform() fucnction produces an array by applying a function to each element of the input array ( similar to a map() function):
In [26]:
# Calculate Fahrenheit from Celsius for an array of tempratures
spark.sql("""
    SELECT
        celsius,
        transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
    FROM
        tC
""").show()
+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+

filter()

filter(array, function<T, Boolean>): array

  • The filter() function produces an array consisting of only the element of the input array for which the Boolean function is true:
In [27]:
# Filter tempratures > 38C for array of tempratures
spark.sql("""
    SELECT
        celsius,
        filter(celsius, t -> t > 38) AS high
    FROM
        tC
""").show()
+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+

exists()

exists(array, function<T, V, Boolean>): Boolean

  • The exists() function returns true if the Boolean function holds for any element in the input array:
In [28]:
# Is there a temprature of 38C in the array of tempratures
spark.sql("""
    SELECT
        celsius,
        exists(celsius, t -> t = 38) AS threshold
    FROM
        tC
""").show()
+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+

reduce()

reduce(array, B, function<B, T, B>, function<B, R>)

  • The reduce() function reduces the elements of the array to a single value by merging the elements into a buffer B using function <B, T, B> and applying a finishing function<B, R> on the final burrer:
In [13]:
# Calkculate avarage temperature and convert to F
spark.sql("""
    SELECT
        celsius,
        reduce(
            celsius,
            0,
            (t, acc) -> t + acc,
            acc -> (acc div size(celsius) * 9 div 5) + 32
            ) AS avgFahrenheit
    FROM
        tC
    """).show()
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-13-398883864370> in <module>
      1 # Calkculate avarage temperature and convert to F
----> 2 spark.sql("""
      3     SELECT
      4         celsius,
      5         reduce(

~/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/session.py in sql(self, sqlQuery)
    721         [Row(f1=1, f2='row1'), Row(f1=2, f2='row2'), Row(f1=3, f2='row3')]
    722         """
--> 723         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    724 
    725     def table(self, tableName):

~/spark-3.1.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

~/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Undefined function: 'reduce'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 4 pos 8
In [ ]:
# reduce() 関数が見つからない? 無くなった?
In [ ]:
 

Common DataFrames and Spark SQL Operation

  • Unions and joins
  • Windowing
  • Modifications
In [4]:
from pyspark.sql.functions import expr # 文字列をカラムに変換する
In [5]:
# create spark session
spark = SparkSession\
        .builder\
        .appName("SparkExampleApp")\
        .getOrCreate()
In [6]:
# Set file paths
tripdelaysFilePath = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
airportsnafilePath = "./LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
In [7]:
# Obtain airports data set
airpotsna = spark.read\
                .format("csv")\
                .options(header="true", inferSchema="true", sep="\t")\
                .load(airportsnafilePath)
In [8]:
airpotsna.show(5)
+----------+-----+-------+----+
|      City|State|Country|IATA|
+----------+-----+-------+----+
|Abbotsford|   BC| Canada| YXX|
|  Aberdeen|   SD|    USA| ABR|
|   Abilene|   TX|    USA| ABI|
|     Akron|   OH|    USA| CAK|
|   Alamosa|   CO|    USA| ALS|
+----------+-----+-------+----+
only showing top 5 rows

In [9]:
# make temp table
airpotsna.createOrReplaceTempView("airports_na")
In [10]:
# Obtain departure delays data set
departureDelays = spark.read\
                    .format("csv")\
                    .options(header="true")\
                    .load(tripdelaysFilePath)
In [11]:
departureDelays.show(5)
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 5 rows

In [12]:
departureDelays.createOrReplaceTempView("departureDelays")
In [13]:
departureDelays.printSchema()
root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)

In [14]:
# delay列、distance列の型変換
departureDelays = departureDelays\
                    .withColumn("delay", expr("CAST(delay as INT) as delay"))\
                    .withColumn("distance", expr("CAST(distance as INT) as distance"))
In [15]:
departureDelays.show(5)
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 5 rows

In [16]:
departureDelays.printSchema()
root
 |-- date: string (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)

In [17]:
# Create temporary small table
foo = departureDelays\
        .filter(expr("""
                    origin == 'SEA' and destination == 'SFO' and
                    date like '01010%' and delay > 0
                    """))
In [18]:
foo.show(5)
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

In [19]:
foo.createOrReplaceTempView("foo")
In [45]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()
+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+

In [46]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+

In [47]:
spark.sql("SELECT * FROM foo").show()
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

We will execute union, join, and windowing examples with this data

Unions

  • union two different DataFrames with the same schema together (pandasのconcatの縦連結)
In [48]:
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")
In [49]:
# Show the union (filtering for SEA and SFO in a specific time range)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
                    AND date like '01010%' AND delay > 0 """)).show()
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

Joins

  • By default, inner join
  • option : cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti
In [53]:
# Join departure delays data (foo) with  airport info
foo.join(
    airpotsna,
    airpotsna.IATA == foo.origin
    ).select("City", "State", "date", "delay", "distance", "destination").show()
+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+

In [56]:
# In SQL
spark.sql("""
    SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
    FROM foo f
    JOIN airports_na a
        ON a.IATA = f.origin
    """).show()
+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+

Windowing

In [ ]:
CREATE TABLE departureDelaysWindow AS
SELECT
    origin,
    destination,
    SUM(delay) AS TotalDelays
FROM
    departureDelays
WHERE
    origin IN ('SEA', 'SFO', 'JFK')
    AND
    destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY
    origin,
    destination;
  • What if for each of these origin airports you wanted to find the three destinations that experienced the most delays? You could achieve this by running three different queries for each origin and then unioning the results together, like this:
In [ ]:
# SQL
SELECT
    origin,
    destination,
    SUM(TotalDelays) AS TotalDelays
FROM
    departureDelaysWindow
WHERE
    origin = '[origin]'
GROUP BY
    origin, destination
ORDER BY
    SUM(TotalDelays) DESC
  • where [ORIGIN] is the three different origin values of JFK, SEA and SFO.
  • But a better approach would be to use a window function like dense_rank() to perform the following calculation:
In [ ]:
# SQL
spark.sql("""
    SELECT
        origin, destination, TotakDelays, rank
    FROM (
        SELECT
            origin, destination, TotalDelays, dense_rank()
            OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
        ) t
    WHERE
        rank <= 3
        """).show()

Modifications

  • spark DF is immutable. So we need to create new one.
In [20]:
foo.show()
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

Adding new columns

  • withColumn()
In [60]:
from pyspark.sql.functions import expr
In [21]:
foo2 = foo.withColumn(
            "status",
            expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
            )
In [22]:
foo2.show()
+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+

Droping columns

  • drop()
In [23]:
foo3 = foo2.drop("delay")
foo3.show()
+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+

Renaming columns

  • withColumnRenamed()
In [24]:
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()
+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+

Pivoting

In [ ]:
# SQL
SELECT
    destination,
    CAST(SUBSTRING(date, 0, 2) AS int) AS month,
    delay
FROM
    departureDelays
WHERE
    origin = 'SEA
In [ ]:
# Result of upper SQL
destination | month | delay
ORD | 1 | 92
JFK | 1 | -7
DFW | 1 | -5
MIA | 1 | -3
......

Pivoting allows you to place names in the month column (instead of 1 and 2 you can show Jan and Feb, respectively) as well as perform aggregate calculations (in this case average and max) on the delays by destination and month:

In [ ]:
# SQL
SELECT
    *
FROM (
    SELECT
        destination,
        CAST(SUBSTRING(date, 0, 2) AS int) AS month,
        delay
    FROM
        departureDelays
    WHERE
        origin = 'SEA'
)
PIVOT (
    CAST(AVG(delay) AS DECIMAL(4,2)) AS AvgDelay,
    MAX(delay) AS MaxDelay
    FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination
In [ ]:
# Result
destination | JAN_AvgDelay | JAN_MaxDelay | FEB_AvgDelay | FEB_MaxDelay
ABQ | 19.86 | 316 | 11.42 | 69
ANC | 4.44  | 149 | 7.90  | 141
ATL | 11.98 | 397 | 7.73  | 145
......
In [ ]: