Enhancing Data Security with Spark: A Guide to Column-Level Encryption - Part 2

Mostefa Brougui - Dec 22 - - Dev Community

This post describes how you can build an AWS Glue ingestion job with PySpark aes_encrypt() function to encrypt sensitive columns. It is part of a series that shows how column-level encryption can be deployed at scale using AWS Glue, AWS KMS and Amazon Athena or Amazon Redshift.

Introduction

In this post, I demonstrate setting up an AWS Glue ingestion job to encrypt sensitive columns using AWS KMS. We will also explore key management approaches and their impact on organizational security practices.

In the previous post, I introduced column-level encryption using a Jupyter Notebook and a static AWS KMS-generated data key. While useful for learning, hardcoding encryption keys is insecure and impractical for production. Instead, encryption keys should be dynamically managed and accessed securely at runtime.

AWS Glue is a "scalable, serverless data integration service that simplifies data discovery, preparation, and combination for analytics, machine learning, and application development." Glue's ability to attach IAM roles to jobs allows seamless interaction with other AWS services like S3 and KMS, enabling tasks such as data ingestion, manipulation (e.g., encryption/decryption), and storage.

We will also address key management questions: How should your Glue job encrypt sensitive columns? Where should encryption material be stored? Who should have access to it?

Let’s dive in!

Getting Started

Prepare Your Environment

To set up your AWS Glue for the first time, from the AWS Management Console:

  • Open the AWS Glue console and select Prepare your account for AWS Glue.
  • You can ignore Choose IAM users and roles for AWS Glue and choose Next if you'll be perfoming the next steps using your current role. Otherwise, select the IAM roles or users (I don't recommend using IAM users) that need to have access.
  • Under Grant Amazon S3 access, choose Next unless you want to edit the options selected by default. For the sake of this article, I granted access to all my S3 buckets. You will not do the same in a Production setting.
  • Under Choose a default service role, keep the default settings and choose Next, unless you have an existing IAM role for Glue.
  • Review and confirm your changes by choosing Apply changes.

To start building your ingestion job, create a Glue notebook by following the steps below.

  • From the AWS Glue console, on the left pane, under ETL jobs, choose Notebooks.
  • On the page Create job, select Notebook under Author using an interactive code notebook.
  • On the Notebook pop-up, keep the default settings and choose Create notebook.

AWS Glue will spin-up a Glue Studio notebook for you. Start by running the cell that initializes the Glue job and wait for Waiting for session <GUID> to get into ready status.... Your notebook is ready !

Encrypting Sensitive Columns with aes_encrypt()

Now that your Glue job is ready, let’s encrypt specific columns in your dataset using the PySpark function aes_encrypt(). We’ll build a reusable Python module, ColEncrypt, to handle both encryption and decryption, simplifying column-level encryption management in your Glue jobs.

Key Components

  • KeyManager: Manages the creation and decryption of Data Encryption Keys (DEKs) using AWS KMS.
  • ColEncrypt: Handles column encryption and decryption, leveraging PySpark's built-in AES functions.

These two components work together to provide a flexible, an almost production-ready solution for column-level encryption with better error handling and monitoring.

The source code is here.

Setting Up Key Management

Hardcoding encryption keys is a bad practice. Instead, use AWS KMS to generate and decrypt DEKs securely at runtime. Here’s how KeyManager handles this:

Here's how you can accomplish this in the class KeyManager.

class KeyManager:
    def __init__(self, kms_client):
        self.kms_client = kms_client

    def generate_data_key(self, key_id: str) -> bytes:
        try:
            response = self.kms_client.generate_data_key(KeyId=key_id, KeySpec="AES_256")
            return response["CiphertextBlob"]
        except ClientError as err:
            logger.error(f"Failed to generate data key: {err}")
            raise

    def decrypt_data_key(self, encrypted_key: bytes) -> bytes:
        try:
            response = self.kms_client.decrypt(CiphertextBlob=encrypted_key)
            return response["Plaintext"]
        except ClientError as err:
            logger.error(f"Failed to decrypt data key: {err}")
            raise
Enter fullscreen mode Exit fullscreen mode

The generate_data_key() method fetches an encrypted DEK from KMS, while decrypt_data_key() decrypts it for use in encryption tasks. This ensures a secure, scalable, and auditable approach to key management.

Encrypting Columns with ColEncrypt

The ColEncrypt class applies AES encryption to specified columns, using DEKs managed by KeyManager.

encrypt() Method

This method handles the encryption process:

class ColEncrypt:
    def encrypt(self):
        key_manager = KeyManager(self.kms_client)
        for column in self.columns:
            dek = key_manager.generate_data_key(self.key_id)
            decrypted_dek = key_manager.decrypt_data_key(dek)
            dek_b64 = b64.b64encode(dek).decode("utf-8")

            self.df = self.df.withColumn(
                "key", lit(decrypted_dek)
            ).withColumn(
                column,
                concat(lit(dek_b64 + "::"), base64(expr(f"aes_encrypt({column}, key)")))
            ).drop("key")
        return self.df
Enter fullscreen mode Exit fullscreen mode

Key Points

  1. Base64 Encode Encrypted DEK:
    • The encrypted DEK is encoded using Base64 for storage as a string.
    • The prefix format is encrypted_dek_b64::encrypted_column_value.
  2. Concatenation with Separator:
    • The concat function adds the encrypted_dek_b64 followed by a separator (::) to the encrypted column value.
  3. Temporary Key Column:
    • A temporary key column is used to store the decrypted DEK during encryption, and it's dropped afterward.

Example Output

For a column named sensitive_column:

  • Original Value: 12345
  • Encrypted Column: ENCRYPTED_DEK_BASE64::ENCRYPTED_VALUE

Usage Example

Here’s how you can use ColEncrypt in your Glue job:

from ColEncrypt import ColEncrypt

# Load the data frame
df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": ["s3://your-bucket/your-data.csv"]},
    format="csv",
    format_options={"withHeader": True}
).toDF()

# Specify columns to encrypt
kms_client = boto3.client("kms", region_name="us-east-2")
columns_to_encrypt = ["column_1",..., "column_n"]

# Initialize and encrypt
encryptor = ColEncrypt(df, columns_to_encrypt, "alias/your-kms-key", kms_client, "arn:aws:iam::123456789012:role/YourRole")
encrypted_df = encryptor.encrypt()
Enter fullscreen mode Exit fullscreen mode
  1. Initialize the ColEncrypt class with your DataFrame, sensitive columns, KMS Key ID, and KMS client or IAM Role if you want Glue to use different credentials to interact with the KMS key.
  2. Call the encrypt() method with or without specifying additional columns.

This approach ensures the encrypted DEK is stored alongside the encrypted value, enabling efficient decryption during data processing. You can generate and/or fetch the DEKs from a centralized Key Store as well, which will not be covered in this post.

Decrypting Columns with ColEncrypt

To decrypt, extract the DEK from the first row of the encrypted column, decrypt it using KMS, and apply aes_decrypt().

decrypt() Method

class ColEncrypt:
    def decrypt(self):
        key_manager = KeyManager(self.kms_client)
        for column in self.columns:
            first_row = self.df.select(column).first()
            dek_b64, _ = first_row[column].split("::", 1)
            encrypted_dek = b64.b64decode(dek_b64)
            decrypted_dek = key_manager.decrypt_data_key(encrypted_dek)
            decrypted_dek_b64 = b64.b64encode(decrypted_dek).decode("utf-8")

            self.df = self.df.withColumn(
                column,
                expr(f"aes_decrypt(unbase64(split({column}, '::')[1]), unbase64('{decrypted_dek_b64}'))").cast("string")
            )
        return self.df
Enter fullscreen mode Exit fullscreen mode

Usage Example

Here is an example of how you can decrypt a previously encrypted data frame using the same KMS key.

decryptor = ColEncrypt(encrypted_df, columns_to_encrypt, "alias/your-kms-key", kms_client)
decrypted_df = decryptor.decrypt()
Enter fullscreen mode Exit fullscreen mode

With these methods, you can easily decrypt sensitive data columns for downstream processing or analysis.

Validating Data Integrity

After decryption, validate that the original and decrypted data match:

if original_df.subtract(decrypted_df).isEmpty() and decrypted_df.subtract(original_df).isEmpty():
    print("Original and decrypted files are identical!")
else:
    print("Original and decrypted files differ!")
Enter fullscreen mode Exit fullscreen mode

This step ensures your encryption and decryption workflows are functioning correctly and that no data is lost or altered.

Conclusion

Column-level encryption with AWS Glue, PySpark, and AWS KMS ensures secure data handling. By implementing ColEncrypt, you can streamline encryption and decryption while adhering to best practices for key management. This approach not only secures sensitive data but also provides a scalable, auditable framework for enterprise-grade security.

Stay tuned for the last part of this series, where I'll explore secure analytics on encrypted data with Amazon Redshift.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .