How to be Test Driven with Spark: Chapter 3 - First Spark test

Nicoda-27 - Mar 1 - - Dev Community

This goal of this tutorial is to provide a way to easily be test driven with spark on your local setup without using cloud resources.

This is a series of tutorials and the initial chapters can be found in:

Chapter 3: Implement a first test with spark

This chapter will focus on implementing a first spark data manipulation with an associated test. It will go through the issues that will be encountered and how to solve them.

The data

A dummy use case is used to demonstrate the workflow.

The scenario is that production data is made of two tables persons and employments with the following schema and data types. Here is a sample of the data.

Persons

id: int PersonalityName: str PersonalitySurname: str birth: datetime(str)
1 George Washington 1732-02-22
2 Henry Ford 1863-06-30
3 Benjamin Franklin 1706-01-17
4 Martin Luther King Jr. 1929-01-15

Employments

id: int person_fk: int Employment
1 1 president
2 2 industrialist
3 3 inventor
4 4 minister

The goal is to change the names of the columns and to join the data. The data here is just a sample, it's overkill to use spark to process data like this. Yet, in a big data context, you need to foresee that the data will contains more lines and more complex joins. The sample is just here as a demonstration.

The dummy test

First, you need to add spark dependencies

uv add pyspark
Enter fullscreen mode Exit fullscreen mode

Before diving into the implementation, you need to make sure you can reproduce a very simple use case. It's not worth diving into complex data manipulation if you are not able to reproduce simple documentation snippet.

You will write your first test test_minimal_transfo.py. You will try first to use pyspark to do simple data frame creation.

from pyspark.sql import SparkSession


def test_minimal_transfo():
    spark: SparkSession = (
        SparkSession.builder.master("local")
        .appName("Testing PySpark Example")
        .getOrCreate()
    )

    df = spark.createDataFrame([(3, 4), (1, 2)], ["col1", "col2"])
    df.show()
Enter fullscreen mode Exit fullscreen mode

The first part with the session create or fetch a local spark session, the second part leverages the session to create a data frame.

Then you can launch:

pytest -k test_minimal_transfo -s
Enter fullscreen mode Exit fullscreen mode

If you have a minimal developer setup, it should not work because it's trying to use Java which you might be missing and the following error will be displayed:

FAILED tests/test_minimal_transfo.py::test_minimal_transfo - pyspark.errors.exceptions.base.PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number
Enter fullscreen mode Exit fullscreen mode

It's a bit annoying, because you need to have Java installed on our dev setup, the ci setup and all your collaborators setup. On the future chapters, a better alternative will be described.

There are different flavors of Java, you can simply install the openjdk one. It will require elevation of privileges:

apt-get install openjdk-8-jre
Enter fullscreen mode Exit fullscreen mode

You can now relaunch:

pytest -k test_minimal_transfo -s
Enter fullscreen mode Exit fullscreen mode

and it should display

+----+----+                                                                     
|col1|col2|
+----+----+
|   3|   4|
|   1|   2|
+----+----+
Enter fullscreen mode Exit fullscreen mode

This is a small victory, but you can now use a local spark session to manage data frames, yay !

The real test case - version 0

On the previous sample, it shows that the spark session plays a pivotal role, it will be instantiated differently in the tests context than in the production context.

This means we can leverage a pytest fixture to be reused for all tests later on; it can be created at the session level so there is only one spark session for the whole test suite. Meaning, you can create a tests/conftest.py to factorize common behavior. If you are not familiar with pytest and fixtures, it's advised to have a look at documentation.

In tests/conftest.py:

from typing import Any, Generator

import pytest
from pyspark.sql import SparkSession


@pytest.fixture(scope="session")
def spark_session() -> Generator[SparkSession, Any, Any]:
    yield (
        SparkSession.builder.master("local")
        .appName("Testing PySpark Example")
        .getOrCreate()
    )
Enter fullscreen mode Exit fullscreen mode

Then, it can be reused in tests/test_minimal_transo.py:

from pyspark.sql import SparkSession

def test_minimal_transfo(spark_session: SparkSession):

    df = spark_session.createDataFrame([(3, 4), (1, 2)], ["col1", "col2"])
    df.show()
Enter fullscreen mode Exit fullscreen mode

You can again run pytest -k test_minimal_transfo -s to check the behavior has not changed. It's important in a test driven approach to keep launching the tests after code modification to ensure nothing was broken.

To be closer to the business context, you can implement a data transformation object. There will be a clear separation between data generation and data transformation. You can do so in src/data_transform.py

from pyspark.sql import DataFrame, SparkSession


class DataProcessor:
    def __init__(self, spark_session: SparkSession):
        self.spark_session = spark_session

    def run(self, persons: DataFrame, employments: DataFrame) -> DataFrame:
        raise NotImplementedError
Enter fullscreen mode Exit fullscreen mode

Now, there is a prototype for DataProcessor, the tests can be improved to actually assert on elements like so in test_minimal_transfo.py

from pyspark.sql import DataFrame, SparkSession

from pyspark_tdd.data_processor import DataProcessor


def test_minimal_transfo(spark_session: SparkSession):
    persons = spark_session.createDataFrame(
        [
            (1, "George", "Washington", "1732-02-22"),
            (2, "Henry", "Ford", "1863-06-30"),
            (3, "Benjamin", "Franklin", "1706-01-17"),
            (4, "Martin", "Luther King Jr.", "1929-01-15"),
        ],
        ["id", "PersonalityName", "PersonalitySurname", "birth"],
    )
    employments = spark_session.createDataFrame(
        [
            (1, 1, "president"),
            (2, 2, "industrialist"),
            (3, 3, "inventor"),
            (4, 4, "minister"),
        ],
        ["id", "person_fk", "Employment"],
    )
    processor = DataProcessor(spark_session)
    df_out: DataFrame = processor.run(persons, employments)

    assert not df_out.isEmpty()
    assert set(df_out.columns) == set(
        ["name", "surname", "date_of_birth", "employment"]
    )

Enter fullscreen mode Exit fullscreen mode

The example above will ensure that the data frame fits some criteria, but it will raise an NotImplementedError as you have to implement the actual data processing. It's intended, the actual processing code can be created after testing is properly setup.

The actual test is still not ideal as test case generation is part of the test itself. Pytest parametrization can be leveraged:

import pytest 

from pyspark.sql import SparkSession

@pytest.mark.parametrize(
    "persons,employments",
    [
        (
            (
                [
                    (1, "George", "Washington", "1732-02-22"),
                    (2, "Henry", "Ford", "1863-06-30"),
                    (3, "Benjamin", "Franklin", "1706-01-17"),
                    (4, "Martin", "Luther King Jr.", "1929-01-15"),
                ],
                ["id", "PersonalityName", "PersonalitySurname", "birth"],
            ),
            (
                [
                    (1, 1, "president"),
                    (2, 2, "industrialist"),
                    (3, 3, "inventor"),
                    (4, 4, "minister"),
                ],
                ["id", "person_fk", "Employment"],
            ),
        )
    ],
)
def test_minimal_transfo(spark_session: SparkSession, persons, employments):
    persons = spark_session.createDataFrame(*persons)
    employments = spark_session.createDataFrame(*employments)

    processor = DataProcessor(spark_session)
    df_out: DataFrame = processor.run(persons, employments)

    assert not df_out.isEmpty()
    assert set(df_out.columns) == set(
        ["name", "surname", "date_of_birth", "employment"]
    )
Enter fullscreen mode Exit fullscreen mode

The above example show how test cases generation can be separated from test runs. It allows to see at first glance what this test is about without noise about test data. Most likely, the test data frames could be reused in another test, it needs to be refactored again. The test part becomes:

from pyspark.sql import DataFrame, SparkSession

from pyspark_tdd.data_processor import DataProcessor


def test_minimal_transfo(spark_session: SparkSession, persons: DataFrame, employments: DataFrame):
    processor = DataProcessor(spark_session)
    df_out: DataFrame = processor.run(persons, employments)

    assert not df_out.isEmpty()
    assert set(df_out.columns) == set(
        ["name", "surname", "date_of_birth", "employment"]
    )
Enter fullscreen mode Exit fullscreen mode

and two fixtures persons and employments are created in tests/conftest.py:

@pytest.fixture(scope="session")
def persons(spark_session: SparkSession) -> Generator[DataFrame, Any, Any]:
    yield spark_session.createDataFrame(
        [
            (1, "George", "Washington", "1732-02-22"),
            (2, "Henry", "Ford", "1863-06-30"),
            (3, "Benjamin", "Franklin", "1706-01-17"),
            (4, "Martin", "Luther King Jr.", "1929-01-15"),
        ],
        ["id", "PersonalityName", "PersonalitySurname", "birth"],
    )


@pytest.fixture(scope="session")
def employments(spark_session: SparkSession) -> Generator[DataFrame, Any, Any]:
    yield spark_session.createDataFrame(
        [
            (1, 1, "president"),
            (2, 2, "industrialist"),
            (3, 3, "inventor"),
            (4, 4, "minister"),
        ],
        ["id", "person_fk", "Employment"],
    )

Enter fullscreen mode Exit fullscreen mode

You can now relaunch pytest -k test_minimal_transfo -s and notice the NotImplementedError being raised; which is a good thing. The code has changed 3 times, yet the behavior remains the same, and the tests confirm it.

The real test case - version 1

Now that there is a proper testing in place, source code can be implemented. There could be variations of this, the intent here is not to provide the best source code, but the best way to test:

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import to_date


class DataProcessor:
    def __init__(self, spark_session: SparkSession):
        self.spark_session = spark_session
        self.persons_rename = {
            "PersonalityName": "name",
            "PersonalitySurname": "surname",
            "birth": "date_of_birth",
        }
        self.employments_rename = {"Employment": "employment"}

    def run(self, persons: DataFrame, employments: DataFrame) -> DataFrame:
        persons = persons.withColumn(
            "birth", to_date(persons.birth)
        ).withColumnsRenamed(colsMap=self.persons_rename)

        employments = employments.withColumnRenamed(colsMap=self.employments_rename)
        joined = persons.join(
            employments, persons.id == employments.person_fk, how="left"
        )
        joined = joined.drop("id", "person_fk")
        return joined
Enter fullscreen mode Exit fullscreen mode

If you rerun pytest -k test_minimal_transfo -s, then the test is successful.

What about ci?

A strong dependency to Java is now in place, running the tests in ci will depend on the ci having Java installed or not. This is an issue because it requires the developer to have a defined dev setup outside of the python ecosystem, there are extra steps for anyone to launch the tests.

Keep in mind, there is limited control over the developer setup, what if the Java already installed in the developer setup is not spark compliant? It will then be frustrating for the developer to investigate and most likely reinstall another Java version which might impact other projects. See the mess

Luckily, the ci runner on Github has Java installed for us; so the ci should run.

Clean up

You can now also clean up the repository to have a clean plate. For instance, src/pyspark_tdd/multiply.py and tests/test_dummy.py can be removed.

What's next

Now, you have a comfortable setup to modify and tweak the code. You can run the tests and be sure to reproduce.

In the next chapter, a more data driven approach to test case generation will be explored.

You can find the original materials in spark_tdd. This repository exposes what's the expected repository layout at the end of each chapter in each branch:

. . .