Using Parquet on Athena to Save Money on AWS

Francois LAGIER - Apr 15 '19 - - Dev Community

Originally published at cloudforecast.io/blog

To show you how you can optimize your Athena query and save money, we will use the ‘2018 Flight On-Time Performance’ dataset from the Bureau of Transportation Statistics (bts.gov). We will also drop a few interesting facts about US Airports ✈️queried from the dataset while using Athena.

All datasets and queries used in this post can be found in our Github repo.

Let's start with some terminology

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Pricing is based on the amount of data scanned by each query.

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.

If pricing is based on the amount of data scanned, you should always optimize your dataset to process the least amount of data using one of the following techniques: compressing, partitioning and using a columnar file format. We will demonstrate the benefits of compression and using a columnar format.

Using compressions will reduce the amount of data scanned by Athena, and also reduce your S3 storage. It's a Win-Win for your AWS bill. Supported formats: GZIP, LZO, SNAPPY (Parquet) and ZLIB.

Instead of using a row-level approach, columnar format is storing data by columns. This allows Athena to only query and process the required columns and ignore the rest. If you want to learn more about columnar, check out Wikipedia and/or the "The beauty of column-oriented data" article by Maxim Zaks

Getting familiar with the data

While using the ‘2018 Flight On-Time Performance’ dataset, we will use three different types of files to compare performance with the data processed: CSV, GZip, and Parquet files.

The files downloaded from the Bureau of Transportation Statistics are simple CSV files with 23 columns (such as FlightDate, Airline, Flight #, Origin, Destination, Delay, Cancelled, ...)

Same data, 3 versions:

  • Raw CSV:
    • Description: No compression, just a plain set of CSV files
    • Files: 12 ~55MB files (one for each month)
    • Total dataset size: ~666MBs
  • GZip CSV:
    • Description: Simple CSV files compressed using GZip to compress them.
    • Files: 12 ~10MB Gzipped CSV files (one for each month).
    • Total dataset size: ~126MBs
  • Compressed Parquet:
    • Description: We converted to the CSV file to parquet using Spark. The same process could also be done with (AWS Glue)
    • Files: 12 ~8MB Parquet file using the default compression (Snappy).
    • Total dataset size: ~84MBs

Find the three dataset versions on our Github repo.

Creating the various tables

Since the various formats and/or compressions are different, each CREATE statement needs to indicate to Athena which format/compression it should use.

Raw CSVs
CREATE EXTERNAL TABLE IF NOT EXISTS flights.raw_data (
year SMALLINT,
month SMALLINT,
day_of_month SMALLINT,
flight_date STRING,
op_unique_carrier STRING,
flight_num STRING,
origin STRING,
destination STRING,
crs_dep_time STRING,
dep_time STRING,
dep_delay DOUBLE,
taxi_out DOUBLE,
wheels_off STRING,
arr_delay DOUBLE,
cancelled DOUBLE,
cancellation_code STRING,
diverted DOUBLE,
air_time DOUBLE,
carrier_delay DOUBLE,
weather_delay DOUBLE,
nas_delay DOUBLE,
security_delay DOUBLE,
late_aircraft_delay DOUBLE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\"'
LINES TERMINATED BY '\n'
LOCATION 's3://INSERT_BUCKET_NAME/raw'
TBLPROPERTIES (
'skip.header.line.count'='1',
'serialization.null.format'=''
);



Gzipped CSVs

Athena automatically detects the gzip format (based on the ".gz" suffix) so we can re-use the query from above. See the full query here.

Parquet Files using Snappy

Using both STORED AS PARQUET and "parquet.compress"="SNAPPY", Athena will be able to process our data flawlessly.

CREATE EXTERNAL TABLE IF NOT EXISTS flights.parquet_snappy_data (
year SMALLINT,
month SMALLINT,
day_of_month SMALLINT,
flight_date STRING,
op_unique_carrier STRING,
flight_num STRING,
origin STRING,
destination STRING,
crs_dep_time STRING,
dep_time STRING,
dep_delay DOUBLE,
taxi_out DOUBLE,
wheels_off STRING,
arr_delay DOUBLE,
cancelled DOUBLE,
cancellation_code STRING,
diverted DOUBLE,
air_time DOUBLE,
carrier_delay DOUBLE,
weather_delay DOUBLE,
nas_delay DOUBLE,
security_delay DOUBLE,
late_aircraft_delay DOUBLE
)
STORED AS PARQUET
LOCATION 's3://INSERT_BUCKET_NAME/parquet'
tblproperties ("parquet.compress"="SNAPPY")




Picking a couple of queries to play with

Query #1: Top route in the U.S. in 2018 alongside the number of airlines providing flights for it.

Spoiler Alert: It's Chicago O'Hare International to New York LaGuardia with 6 unique airlines offering the route

SELECT 
origin,
destination,
count(*) as total_flights,
count(distinct op_unique_carrier) as uniq_airlines
FROM flights.parquet_snappy_data
GROUP BY origin, destination
ORDER BY total_flights DESC
LIMIT 1



Query #2: Best airport to fly out from to avoid any major delay (> 30 minutes)

Spoiler Alert: Honolulu International Airport has less than 4% flight delayed, giving you one more reason to visit Hawaii. Newark Airport had an average of 16.2% flights delayed (>30 minutes).

SELECT
origin,
percent_delayed as "% Delayed (> 30)",
total_flights as "Total Flights"
FROM (
SELECT
origin,
DENSE_RANK() OVER (ORDER BY percent_delayed desc) AS worst_rank,
DENSE_RANK() OVER (ORDER BY percent_delayed asc) AS top_rank,
percent_delayed,
total_flights
FROM (
SELECT
origin,
sum(CAST (dep_delay > 30 as DOUBLE)) / count() as percent_delayed,
count(
) as total_flights
FROM flights.parquet_snappy_data
GROUP BY 1
HAVING COUNT(*) > 10000
) as t1
) as t2
WHERE top_rank <= 1
OR worst_rank <= 3




Comparing performances

For this category, I ran each query 10 times and averaged the numbers. We will use the Raw dataset as our baseline.

  • Query #1
    • Raw: 665.71 MB scanned in 2.60 seconds. $0.00333 per query
    • GZip: 125.54 MB scanned in 2.08 seconds. $0.00063 per query (-81% savings).
    • Parquet: 8.29 MB scanned in 1.26 seconds. $0.000050 per query (-98% savings).
  • Query #2
    • Raw: 665.71 MB scanned in 2.60 seconds. $0.00333 per query.
    • GZip: 125.54 MB scanned in 2.08 seconds. $0.00063 per query (-81% savings).
    • Parquet: 8.29 MB scanned in 0.81 seconds. $0.000062 per query (-98% savings).

Conclusions

Thanks to Parquet’s columnar format, Athena is only reading the columns that are needed from the query. This reduces the query time by more than 50+% and reduces the query price by 98%.

If you have any questions about Athena, Parquet or CloudForecast, feel free to ping me via email francois@cloudforecast.io or by Twitter: @francoislagier.

. . . . . . . . . . . . . . . .