Building Resilient Data Pipelines with PySpark and Azure Data Factory: Lessons from Real-World Deployments

Insights, Ideas and Best Practices from Successful PySpark and Azure Data Factory Implementations

Kuharan Bhowmik
Towards Dev
Published in
10 min readMar 8, 2023

--

Scenario1 — Fix for parallel processing and logging in Databricks

If you’re working on a project that involves sending data to AWS, you may run into some challenges along the way. One of the most common issues is the slow transfer process, which can be exacerbated by sending data in the form of payloads to request URLs. In my own project, I found that each request created a transaction ID, and the payload was read from each file and sent to AWS as the body of the request. However, the payload had a size limit, and as a result, the data had to be divided into chunks to be sent in batches. This led to the generation of thousands of files, which caused delays in the transfer process.

To speed up the data transfer, we came up with the idea of parallelism. By sending all the chunks parallelly, we were able to use thread pools and overcome the transfer delay. However, while implementing this solution, we discovered that we had inadvertently disrupted the existing logging framework.

What are Thread pools?
In Databricks, thread pools refer to a group of threads that are created and managed by the Databricks runtime environment to execute user code. The threads in the thread pool are pre-allocated and shared among different tasks to improve the overall performance of the Databricks cluster.

Thread pools are commonly used in Databricks to manage the execution of user-defined functions (UDFs) that are applied to large datasets. By using thread pools, Databricks can allocate threads to execute different parts of the UDF in parallel, which can result in faster execution times.

In Databricks, thread pools are managed automatically by the runtime environment, so users do not need to manually configure or manage them. However, users can control the size of the thread pool used by the Databricks cluster by adjusting the configuration settings of the cluster.

The logging framework was designed to capture each step of the data transfer process. However, our implementation of parallelism led to some steps being skipped or executed out of order, which caused gaps in the logging. To fix this issue, we had to modify the logging framework to capture each step of the parallel process accurately.

The older code looked something like this:

# initial entries into the logging table
if status == 1:
insert into the logging table with initial values and status = 1
if status == 2 or status == 3:
update the log table with status 1 to 3

The new code:

# initial entries into the logging table
if status == 1:
insert into the logging table with initial values and status = 1
if status == 2 or status == 3:
if the step name == "the name of the parallelism step":
update the log table with status 1 to 3 for the stepname and msg like '%fileaname%'

Overall, our experience with implementing parallelism for sending data to AWS taught us the importance of considering the implications of any changes we make to our systems. By taking the time to carefully evaluate our solutions and test them thoroughly, we can avoid potential issues like the ones we faced with our logging framework.

Scenario2 — Fix for concurrency control with Azure Data Factory

Concurrency control can be a tricky issue to handle when it comes to data processing. When working with Azure Data Factory, you may encounter situations where multiple pipelines are running simultaneously, and the data is being processed concurrently. In such cases, it’s essential to have a robust concurrency control mechanism in place to avoid any data inconsistencies or conflicts. In this blog post, we’ll explore some fixes for concurrency control issues with Azure Data Factory.

What is Concurrency Control?

Concurrency control is the process of managing the simultaneous execution of multiple data processing operations, ensuring that they don’t conflict with each other. It’s a crucial aspect of data processing, as data inconsistencies can occur when multiple operations try to modify the same data at the same time.

In our project, we encountered a concurrency control issue that was causing multiple pipelines to generate the same session IDs. We discovered that this was happening because the step that generated the session ID from a Databricks notebook did not have any locking mechanism, which allowed multiple pipelines to call it simultaneously.

To address this issue, we implemented a solution that involved placing the notebook activity inside an inner pipeline and configuring it to only allow a single concurrent execution. By doing so, we were able to effectively lock the process and prevent multiple pipelines from generating the same session IDs.

Implementing this solution required a few steps. First, we created a new pipeline that contained the notebook activity responsible for generating the session ID. We then configured this pipeline to only allow a single concurrent execution at a time. This ensured that only one pipeline could access the notebook activity and generate a session ID at any given time.

Next, we integrated this new pipeline into our existing workflows by invoking it from the main pipeline. This allowed us to maintain the overall structure of our workflows while still addressing the concurrency control issue.

Overall, implementing this solution helped us avoid potential data inconsistencies caused by multiple pipelines generating the same session IDs. By introducing a locking mechanism and limiting concurrent executions, we were able to ensure that our data processing operations ran smoothly and efficiently without any conflicts.

ADF pipeline settings

Scenario3- Fix for parallel pipelines logging table

When we first encountered the parallelism issue for sending data to AWS in scenario 1, we attempted to address it by introducing parallel processing. However, we soon realized that this approach was causing heavy computations in the table and was not an optimal solution. To further optimize our process, we decided to replace all the updates with inserts and only keep the final status values.

To achieve this, we created a main table where all entries from the framework were inserted. We then grabbed all the maximum status values from the main table for the current session ID and removed the intermediate status values, replacing the main table again. By doing so, we were able to eliminate unnecessary updates and improve the efficiency of our process.

However, we encountered another challenge with the log table, which contained entries for all sessions. To address this, we created a temporary table to store the final status values. We then cleaned all entries from the main table and pushed the cleaned data back into the main table. This allowed us to maintain the integrity of our data while still optimizing our process.

# initial entries into the logging table
if status == 1:
insert into the logging table with initial values and status = 1
if status == 2 or status == 3:
if the step name == "the name of the parallelism step":
insert into the log table with status 1 to 3 for the stepname and msg like '%fileaname%'
logs_with_final_status = sql("select a.* from log table a, (select cols, max(status) from log table where session = session_id) group by session id,\
stepname") b where a.session=b.session and a.stepname=b.stepname and a.status=b.max(status)")

write the above into a persistent table.
delete logs from log table where sessionid=sessionid

append the logs_with_final_status into log table

In summary, we were able to significantly improve the efficiency of our data processing operations by replacing updates with inserts and only keeping the final status values. By doing so, we were able to eliminate unnecessary computations and streamline our process. Despite encountering challenges with the log table, we were able to successfully address these issues and ensure the accuracy of our data.

Scenario 4- Converting recursive CTE to Pyspark

A recursive Common Table Expression (CTE) is a powerful tool in Spark SQL that allows you to traverse and process hierarchical or graph-like data structures. With a recursive CTE, you can define a query that recursively references itself until a certain termination condition is met.

The basic syntax for a recursive CTE in Spark SQL is as follows:

WITH RECURSIVE cte_name (col1, col2, ...) AS (
-- base case query
SELECT col1, col2, ...
FROM base_table
WHERE <termination_condition>
UNION ALL
-- recursive case query
SELECT col1, col2, ...
FROM cte_name
JOIN recursive_table
ON cte_name.col1 = recursive_table.col1
WHERE <termination_condition>
)
SELECT * FROM cte_name;

In this syntax, cte_name is the name of the CTE, col1, col2, etc. are the columns being selected, and base_table is the table being selected from.

The WHERE clause in the base case query specifies the termination condition for the recursion, i.e., the condition that must be met for the recursion to stop.

In the recursive case query, the UNION ALL clause is used to combine the results of the previous iteration of the CTE with the current iteration. The JOIN clause joins the CTE with the recursive table, and the WHERE clause specifies the termination condition for the recursion.

Once the CTE is defined, you can use it as a subquery in the final SELECT statement to retrieve the results.

Recursive CTEs are useful in many scenarios, such as calculating hierarchical or graph-like data structures, generating sequences, or traversing relationships in data. However, they can also be computationally expensive, so it’s important to use them with care and optimize them for performance.

Spark SQL does not support recursive CTE when using Dataframe operations

Recursive Common Table Expressions (CTEs) in SQL can be translated to PySpark using the following steps:

  1. Write a base query that retrieves the first set of data and assigns it to a data frame.
  2. Use PySpark’s union() method to join the base query with a subsequent query that retrieves additional data based on the results of the first query. Repeat this step as needed to build up the complete result set.

Here is an example of a CTE in SQL:

CREATE TABLE employee (
id INT PRIMARY KEY,
name VARCHAR(50),
manager_id INT
);

-- Insert data into the table
INSERT INTO employee (id, name, manager_id)
VALUES
(1, 'John', NULL),
(2, 'Jane', 1),
(3, 'Jim', 1),
(4, 'Jake', 2),
(5, 'Jill', 4);

-- Use a recursive CTE to find the hierarchy of employees
WITH cte (id, name, manager_id, level) AS (
SELECT id, name, manager_id, 1
FROM employee
WHERE manager_id IS NULL
UNION ALL
SELECT e.id, e.name, e.manager_id, c.level + 1
FROM employee e
JOIN cte c ON e.manager_id = c.id
)
SELECT * FROM cte;

-- this code will not work and result in an error in databricks

And here is the equivalent code in PySpark:

from pyspark.sql.functions import lit, col

# Create a dataframe to store data
data = [(1, 'John', None), (2, 'Jane', 1), (3, 'Jim', 1), (4, 'Jake', 2), (5, 'Jill', 4)]
df = spark.createDataFrame(data, ['id', 'name', 'manager_id'])

# Use a union operation to build a hierarchy of employees
base_df = df.filter(col("manager_id").isNull()).withColumn("level", lit(1))

i = 1
while True:
new_df = df.join(base_df, df.manager_id == base_df.id) \
.select(df.id, df.name, df.manager_id, base_df.level + 1).alias("new_df")

if new_df.count()==0:
break
else:
i += 1
base_df = base_df.union(new_df)

base_df.show()

Scenario 5 — Sending data to AWS chronologically

When sending records to AWS, it is crucial to ensure that the records are in chronological order. This is because the end-dating logic used in AWS relies heavily on the order of the data. If the records are not in chronological order, the end-dating results may be incorrect, leading to missed critical insights. AWS algorithms assume that the data is ordered chronologically, and any deviations from this order can cause errors.

Additionally, the number of dates in each run may vary, making it challenging to define interfaces based on them. To address this, you can use the row number function and partition by the key columns, and order by date from and name as RNUM. You can then extract all the RNUMs into a list and loop over them to generate JSONs in parallel for all chunks of data. Add the RNUM to the file name to identify it in the next step.

In the next interface that sends data to AWS, read the list of RNUMs again from the saved table. Call the existing function with an extra parameter of RNUM in a loop of order by RNUM asc so that you can chronologically send the data. You can filter the file list based on the RNUM parameter and send only the particular list of RNUMs. By following these steps, you can ensure that your records are in chronological order, enabling you to obtain accurate insights from your data.

Scenario 6 — Building auto waiting for pipelines in adf

  1. In your Databricks notebook, create a database table with two columns — “pipeline_name” and “is_running”. The “pipeline_name” column should store the name of the ADF pipeline, and the “is_running” column should store a boolean value indicating whether the pipeline is currently running or not.
  2. When an ADF pipeline starts, insert a new row into the semaphore table with the pipeline name and set “is_running” to true.
  3. Before an ADF pipeline starts, check the semaphore table to see if the pipeline is already running. If it is, wait for a specified amount of time and check again. Repeat this process until the semaphore table indicates that the pipeline is not running.
  4. When an ADF pipeline finishes, update the corresponding row in the semaphore table to set “is_running” to false.
  5. Optionally, you can add a timestamp column to the semaphore table to track when each pipeline started and finished, which can be useful for tracking performance.

By using a semaphore table, you can prevent multiple instances of the same ADF pipeline from running concurrently, which can avoid conflicts and improve performance.

Thank you for taking the time to read my blog. I hope you found it informative and insightful.

If you enjoyed this content and would like to stay updated on my latest articles, I invite you to follow me here.

To follow me, simply visit my Medium profile and click the “Follow” button. You’ll receive notifications when I publish new content, and you’ll be able to engage with me and other readers in the comments section.

Once again, thank you for reading. I look forward to connecting with you soon!

A short video on the above —

--

--