Using PySpark on AWS Glue Studio to load data from RDS MySQL to Data Lake

Vuong Bach Doan - Jan 17 - - Dev Community

This post explores the powerful combination of PySpark and AWS Glue for streamlining data ETL (Extract, Transform, Load) processes. We'll delve into:

  • PySpark: Harnessing Python's flexibility for large-scale data analysis and transformations within the familiar Python environment.

  • AWS Glue: Simplifying and scaling ETL workflows with a fully managed, serverless service on AWS.

The Challenge:

Efficiently transferring data from an RDS MySQL database to an S3 data lake.

The Solution:

  1. Defining the ETL Job: Moving data from stats, ability, and info tables in MySQL to S3.

  2. Setting Up Glue Studio: Selecting Author code with a script editor, establishing IAM roles, and downloading the MySQL JDBC driver to S3.

  3. Coding with PySpark: Utilizing the provided code template for a smooth workflow:

    • Creating a SparkSession.
    • Adding the JDBC driver.
    • Defining a function to extract data from tables.
    • Reading data from multiple tables.
    • Transforming the "capture_rate" in the "info" table.
    • Partitioning data into timestamp-based subfolders.
    • Writing data to S3 in Parquet format.

I have prepared a code template for you to easier start with it:

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col
from datetime import datetime

spark = SparkSession.builder.getOrCreate()  # Create SparkSession
spark.sparkContext.addPyFile("s3://<you-bucket>/mysql-connector-j-8.3.0.jar")  # Add the MySQL JDBC driver to the classpath
jdbc_url = "jdbc:mysql://<your-host>:3306/<your-database>"
connection_properties = {"user": "admin", "password": "********"}

def extract_df_to_s3(spark_session, jdbc_url, connection_properties, table_name):
    df = spark_session.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    return df

# Read data from multiple tables
df_info = extract_df_to_s3(spark, jdbc_url, connection_properties, "info")
df_ability = extract_df_to_s3(spark, jdbc_url, connection_properties, "ability")
df_stats = extract_df_to_s3(spark, jdbc_url, connection_properties, "stats")

# Transform capture_rate in the info table and cast to int
df_info = df_info.withColumn("capture_rate", regexp_extract("capture_rate", r"^\D*(\d+).*$", 1)) \
                 .withColumn("capture_rate", col("capture_rate").cast("int"))

# Generate timestamp subfolders for each DataFrame
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
info_subfolder = f"pokemon_info_processed/{timestamp_str}"
ability_subfolder = f"pokemon_ability_processed/{timestamp_str}"
stats_subfolder = f"pokemon_stat_processed/{timestamp_str}"

# Write DataFrames to separate folders
df_info.write.parquet(f"s3://<your-bucket>-datalake/{info_subfolder}", mode="overwrite")
df_ability.write.parquet(f"s3://<your-bucket>-datalake/{ability_subfolder}", mode="overwrite")
df_stats.write.parquet(f"s3://<your-bucket>-datalake/{stats_subfolder}", mode="overwrite")

Enter fullscreen mode Exit fullscreen mode

Now you can run the job and check if the result appear in the S3 Data Lake bucket. ✨

. . . . . . . . . . . . . . .