One of the ways we migrate ETL jobs from tools like Informatica/Datastage to pyspark code is by converting every stage/transformation we have in the ETL job to a dataframe in pyspark code. While every stage/transformation in the ETL job has its own unique value, most stages/transformations retain the schema (unless explicitly changed). To replicate this logic in pyspark code becomes tedious as manual intervention is required to ensure consistency of datatypes, scales and precisions while we derive subsequent dataframes from the source dataframes. The notebook shown below addresses the issue by providing a simple function which takes the source dataframe and casts the target dataframe into the same datatype.
Download the below notebook here with comments from the blog.
OR
View this notebook on github here
This notebook has been tested on Databricks Community Edition
I have made the following assumptions, since this is a simple example.
Lets set up the dataframes init_df_with_schema and new_df_without_schema & initialize a dataframe with a specified schema denoted by init_schema. Note that new_df_without_schema does not have a specific schema, and the columns in new_df_without_schema need to be converted to appropriate datatypes to match the column datatypes in init_df_with_schema
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, TimestampType
from datetime import datetime
data1 = [
[1, "John1", "Doe1", datetime.now(), 31.5],
[2, "John2", "Doe2", datetime.now(), 37.5],
[3, "John3", "Doe3", datetime.now(), 62.5],
[4, "John4", "Doe4", datetime.now(), 74.5]
]
data2 = [
['1', "Jane1", "Doe1", '10:05:00.00', '31.5'],
['2', "Jane2", "Doe2", '13:10:00.12', '37.5'],
['3', "Jane3", "Doe3", '18:30:00.30', '62.5'],
['4', "Jane4", "Doe4", '21:45:00.44', '74.5']
]
init_schema = StructType([
StructField("id", IntegerType(), True),
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True),
StructField("SnapshotTime", TimestampType(), True),
StructField("Metric", FloatType(), True),
])
init_df_with_schema = spark.createDataFrame(data=data1, schema=init_schema)
init_df_with_schema.schema
new_df_without_schema = spark.createDataFrame(data2,["id", "FirstName", "LastName", "SnapshotTime", "Metric"])
new_df_without_schema.schema
We would use a simple for loop to iterate through the initial dataframes datatypes and column names and cast the new/target dataframe columns with the appropriate datatypes
from pyspark.sql.functions import col
for metadata in init_df_with_schema.dtypes:
new_df_without_schema = new_df_without_schema.withColumn(metadata[0], col(metadata[0]).cast(metadata[1]))
Compare the schemas using dataframe.schema.
if(new_df_without_schema.schema == init_df_with_schema.schema):
print("Schema matches!")
else:
print("Schema does not match")
print(new_df_without_schema.schema)
print('***************************')
print(init_df_with_schema.schema)