Simplified, Efficient Incremental Data Load: Loading, Updating and Deleting Records

Pawan Kumar Ganjhu
Towards Dev
Published in
3 min readMay 29, 2023

--

Source

Data Load

Considering the Incremental data load Techniques using PostgreSQL as source table and Redshift as target table using PySpark.

To perform incremental data loads from a PostgreSQL table to a Redshift table using PySpark, you can follow the steps outlined below:

  1. Establish the connection to PostgreSQL and Redshift:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("IncrementalDataLoad") \
.getOrCreate()

jdbc_url_pg = "jdbc:postgresql://<host>:<port>/<database>"
properties_pg = {
"user": "<username>",
"password": "<password>",
"driver": "org.postgresql.Driver"
}

jdbc_url_rs = "jdbc:redshift://<host>:<port>/<database>"
properties_rs = {
"user": "<username>",
"password": "<password>",
"driver": "com.amazon.redshift.jdbc.Driver"
}

2. Load the initial full dataset from PostgreSQL:

full_data = spark.read \
.jdbc(url=jdbc_url_pg, table="<source_table>", properties=properties_pg)

3. Load the previously processed data from Redshift (if any):

processed_data = spark.read \
.jdbc(url=jdbc_url_rs, table="<target_table>", properties=properties_rs)

4. Determine the incremental data based on a suitable condition, such as the timestamp column:

from pyspark.sql.functions import col

condition = col("timestamp_column") > processed_data.selectExpr("MAX(timestamp_column)").collect()[0][0]
incremental_data = full_data.filter(condition)

5. Append the incremental data to the existing Redshift table:

incremental_data.write \
.jdbc(url=jdbc_url_rs, table="<target_table>", mode="append", properties=properties_rs)

6. Update the processed_data variable to include the newly appended data:

processed_data = spark.read \
.jdbc(url=jdbc_url_rs, table="<target_table>", properties=properties_rs)

7. Optionally, you can perform any necessary transformations or aggregations on the processed_data before further processing.

That’s it! You have successfully performed an incremental data load from PostgreSQL to Redshift using PySpark. Remember to replace <host>, <port>, <database>, <username>, <password>, <source_table>, and <target_table> with the appropriate values for your setup.

Update

To update modified data from the source table in PostgreSQL to the corresponding records in the target table in Redshift, you can follow the steps below:

  1. Establish the connection to PostgreSQL and Redshift (similar to the previous example).
  2. Load the previously processed data from Redshift (if any) and the current data from the source table in PostgreSQL:
processed_data = spark.read \
.jdbc(url=jdbc_url_rs, table="<target_table>", properties=properties_rs)

source_data = spark.read \
.jdbc(url=jdbc_url_pg, table="<source_table>", properties=properties_pg)

3. Identify the modified data based on a suitable condition, such as comparing the timestamp or any other relevant column:

from pyspark.sql.functions import col

join_condition = processed_data["id_column"] == source_data["id_column"]
modified_data = source_data.join(processed_data, join_condition, "inner") \
.where(processed_data["timestamp_column"] < source_data["timestamp_column"])

4. Update the modified records in the target Redshift table:

modified_data.write \
.jdbc(url=jdbc_url_rs, table="<target_table>", mode="overwrite", properties=properties_rs)

5. Optionally, you can perform any necessary transformations or aggregations on the processed_data before further processing.

That’s it! You have now updated the modified data from the source table in PostgreSQL to the target table in Redshift using PySpark. Make sure to replace <host>, <port>, <database>, <username>, <password>, <source_table>, <target_table>, <id_column>, and <timestamp_column> with the appropriate values for your setup.

Delete

To delete data from the target table in Redshift that is not present in the source table in PostgreSQL, you can follow these steps:

  1. Establish the connection to PostgreSQL and Redshift (similar to the previous examples).
  2. Load the previously processed data from Redshift (if any) and the current data from the source table in PostgreSQL:
processed_data = spark.read \
.jdbc(url=jdbc_url_rs, table="<target_table>", properties=properties_rs)

source_data = spark.read \
.jdbc(url=jdbc_url_pg, table="<source_table>", properties=properties_pg)

3. Identify the records present in the target table but not in the source table:

from pyspark.sql.functions import col

# Assuming there is a unique identifier column named "id_column" in both source and target tables
deleted_data = processed_data \
.join(source_data, processed_data["id_column"] == source_data["id_column"], "left_anti")

4. Delete the identified records from the target Redshift table:

deleted_data.write \
.jdbc(url=jdbc_url_rs, table="<target_table>", mode="overwrite", properties=properties_rs)

5. Optionally, you can perform any necessary transformations or aggregations on the processed_data before further processing.

That’s it! You have now deleted the data from the target table in Redshift that is not present in the source table in PostgreSQL using PySpark. Remember to replace <host>, <port>, <database>, <username>, <password>, <source_table>, and <target_table> with the appropriate values for your setup, and ensure that both tables have a common unique identifier column.

There are many ways even better to do so, here I have tried simplifying the process.

--

--