### Data Source

Dataset is derived from Fannie Maeâ€™s [Single-Family Loan Performance Data](http://www.fanniemae.com/portal/funding-the-market/data/loan-performance-data.html) with all rights reserved by Fannie Mae. This processed dataset is redistributed with permission and consent from Fannie Mae. For the full raw dataset visit [Fannie Mae]() to register for an account and to download

Instruction is available at NVIDIA [RAPIDS demo site](https://rapidsai.github.io/demos/datasets/mortgage-data).

### Prerequisite

This notebook runs in a Dataproc cluster with GPU nodes, with [Spark RAPIDS](https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/rapids) set up.

### Define ETL Process

Define data schema and steps to do the ETL process:

In [1]:
import time
from pyspark import broadcast
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

def _get_quarter_from_csv_file_name():
    return substring_index(substring_index(input_file_name(), '.', 1), '_', -1)

_csv_perf_schema = StructType([
    StructField('loan_id', LongType()),
    StructField('monthly_reporting_period', StringType()),
    StructField('servicer', StringType()),
    StructField('interest_rate', DoubleType()),
    StructField('current_actual_upb', DoubleType()),
    StructField('loan_age', DoubleType()),
    StructField('remaining_months_to_legal_maturity', DoubleType()),
    StructField('adj_remaining_months_to_maturity', DoubleType()),
    StructField('maturity_date', StringType()),
    StructField('msa', DoubleType()),
    StructField('current_loan_delinquency_status', IntegerType()),
    StructField('mod_flag', StringType()),
    StructField('zero_balance_code', StringType()),
    StructField('zero_balance_effective_date', StringType()),
    StructField('last_paid_installment_date', StringType()),
    StructField('foreclosed_after', StringType()),
    StructField('disposition_date', StringType()),
    StructField('foreclosure_costs', DoubleType()),
    StructField('prop_preservation_and_repair_costs', DoubleType()),
    StructField('asset_recovery_costs', DoubleType()),
    StructField('misc_holding_expenses', DoubleType()),
    StructField('holding_taxes', DoubleType()),
    StructField('net_sale_proceeds', DoubleType()),
    StructField('credit_enhancement_proceeds', DoubleType()),
    StructField('repurchase_make_whole_proceeds', StringType()),
    StructField('other_foreclosure_proceeds', DoubleType()),
    StructField('non_interest_bearing_upb', DoubleType()),
    StructField('principal_forgiveness_upb', StringType()),
    StructField('repurchase_make_whole_proceeds_flag', StringType()),
    StructField('foreclosure_principal_write_off_amount', StringType()),
    StructField('servicing_activity_indicator', StringType())])
_csv_acq_schema = StructType([
    StructField('loan_id', LongType()),
    StructField('orig_channel', StringType()),
    StructField('seller_name', StringType()),
    StructField('orig_interest_rate', DoubleType()),
    StructField('orig_upb', IntegerType()),
    StructField('orig_loan_term', IntegerType()),
    StructField('orig_date', StringType()),
    StructField('first_pay_date', StringType()),
    StructField('orig_ltv', DoubleType()),
    StructField('orig_cltv', DoubleType()),
    StructField('num_borrowers', DoubleType()),
    StructField('dti', DoubleType()),
    StructField('borrower_credit_score', DoubleType()),
    StructField('first_home_buyer', StringType()),
    StructField('loan_purpose', StringType()),
    StructField('property_type', StringType()),
    StructField('num_units', IntegerType()),
    StructField('occupancy_status', StringType()),
    StructField('property_state', StringType()),
    StructField('zip', IntegerType()),
    StructField('mortgage_insurance_percent', DoubleType()),
    StructField('product_type', StringType()),
    StructField('coborrow_credit_score', DoubleType()),
    StructField('mortgage_insurance_type', DoubleType()),
    StructField('relocation_mortgage_indicator', StringType())])
_name_mapping = [
        ("WITMER FUNDING, LLC", "Witmer"),
        ("WELLS FARGO CREDIT RISK TRANSFER SECURITIES TRUST 2015", "Wells Fargo"),
        ("WELLS FARGO BANK,  NA" , "Wells Fargo"),
        ("WELLS FARGO BANK, N.A." , "Wells Fargo"),
        ("WELLS FARGO BANK, NA" , "Wells Fargo"),
        ("USAA FEDERAL SAVINGS BANK" , "USAA"),
        ("UNITED SHORE FINANCIAL SERVICES, LLC D\\/B\\/A UNITED WHOLESALE MORTGAGE" , "United Seq(e"),
        ("U.S. BANK N.A." , "US Bank"),
        ("SUNTRUST MORTGAGE INC." , "Suntrust"),
        ("STONEGATE MORTGAGE CORPORATION" , "Stonegate Mortgage"),
        ("STEARNS LENDING, LLC" , "Stearns Lending"),
        ("STEARNS LENDING, INC." , "Stearns Lending"),
        ("SIERRA PACIFIC MORTGAGE COMPANY, INC." , "Sierra Pacific Mortgage"),
        ("REGIONS BANK" , "Regions"),
        ("RBC MORTGAGE COMPANY" , "RBC"),
        ("QUICKEN LOANS INC." , "Quicken Loans"),
        ("PULTE MORTGAGE, L.L.C." , "Pulte Mortgage"),
        ("PROVIDENT FUNDING ASSOCIATES, L.P." , "Provident Funding"),
        ("PROSPECT MORTGAGE, LLC" , "Prospect Mortgage"),
        ("PRINCIPAL RESIDENTIAL MORTGAGE CAPITAL RESOURCES, LLC" , "Principal Residential"),
        ("PNC BANK, N.A." , "PNC"),
        ("PMT CREDIT RISK TRANSFER TRUST 2015-2" , "PennyMac"),
        ("PHH MORTGAGE CORPORATION" , "PHH Mortgage"),
        ("PENNYMAC CORP." , "PennyMac"),
        ("PACIFIC UNION FINANCIAL, LLC" , "Other"),
        ("OTHER" , "Other"),
        ("NYCB MORTGAGE COMPANY, LLC" , "NYCB"),
        ("NEW YORK COMMUNITY BANK" , "NYCB"),
        ("NETBANK FUNDING SERVICES" , "Netbank"),
        ("NATIONSTAR MORTGAGE, LLC" , "Nationstar Mortgage"),
        ("METLIFE BANK, NA" , "Metlife"),
        ("LOANDEPOT.COM, LLC" , "LoanDepot.com"),
        ("J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2015-1" , "JP Morgan Chase"),
        ("J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2014-1" , "JP Morgan Chase"),
        ("JPMORGAN CHASE BANK, NATIONAL ASSOCIATION" , "JP Morgan Chase"),
        ("JPMORGAN CHASE BANK, NA" , "JP Morgan Chase"),
        ("JP MORGAN CHASE BANK, NA" , "JP Morgan Chase"),
        ("IRWIN MORTGAGE, CORPORATION" , "Irwin Mortgage"),
        ("IMPAC MORTGAGE CORP." , "Impac Mortgage"),
        ("HSBC BANK USA, NATIONAL ASSOCIATION" , "HSBC"),
        ("HOMEWARD RESIDENTIAL, INC." , "Homeward Mortgage"),
        ("HOMESTREET BANK" , "Other"),
        ("HOMEBRIDGE FINANCIAL SERVICES, INC." , "HomeBridge"),
        ("HARWOOD STREET FUNDING I, LLC" , "Harwood Mortgage"),
        ("GUILD MORTGAGE COMPANY" , "Guild Mortgage"),
        ("GMAC MORTGAGE, LLC (USAA FEDERAL SAVINGS BANK)" , "GMAC"),
        ("GMAC MORTGAGE, LLC" , "GMAC"),
        ("GMAC (USAA)" , "GMAC"),
        ("FREMONT BANK" , "Fremont Bank"),
        ("FREEDOM MORTGAGE CORP." , "Freedom Mortgage"),
        ("FRANKLIN AMERICAN MORTGAGE COMPANY" , "Franklin America"),
        ("FLEET NATIONAL BANK" , "Fleet National"),
        ("FLAGSTAR CAPITAL MARKETS CORPORATION" , "Flagstar Bank"),
        ("FLAGSTAR BANK, FSB" , "Flagstar Bank"),
        ("FIRST TENNESSEE BANK NATIONAL ASSOCIATION" , "Other"),
        ("FIFTH THIRD BANK" , "Fifth Third Bank"),
        ("FEDERAL HOME LOAN BANK OF CHICAGO" , "Fedral Home of Chicago"),
        ("FDIC, RECEIVER, INDYMAC FEDERAL BANK FSB" , "FDIC"),
        ("DOWNEY SAVINGS AND LOAN ASSOCIATION, F.A." , "Downey Mortgage"),
        ("DITECH FINANCIAL LLC" , "Ditech"),
        ("CITIMORTGAGE, INC." , "Citi"),
        ("CHICAGO MORTGAGE SOLUTIONS DBA INTERFIRST MORTGAGE COMPANY" , "Chicago Mortgage"),
        ("CHICAGO MORTGAGE SOLUTIONS DBA INTERBANK MORTGAGE COMPANY" , "Chicago Mortgage"),
        ("CHASE HOME FINANCE, LLC" , "JP Morgan Chase"),
        ("CHASE HOME FINANCE FRANKLIN AMERICAN MORTGAGE COMPANY" , "JP Morgan Chase"),
        ("CHASE HOME FINANCE (CIE 1)" , "JP Morgan Chase"),
        ("CHASE HOME FINANCE" , "JP Morgan Chase"),
        ("CASHCALL, INC." , "CashCall"),
        ("CAPITAL ONE, NATIONAL ASSOCIATION" , "Capital One"),
        ("CALIBER HOME LOANS, INC." , "Caliber Funding"),
        ("BISHOPS GATE RESIDENTIAL MORTGAGE TRUST" , "Bishops Gate Mortgage"),
        ("BANK OF AMERICA, N.A." , "Bank of America"),
        ("AMTRUST BANK" , "AmTrust"),
        ("AMERISAVE MORTGAGE CORPORATION" , "Amerisave"),
        ("AMERIHOME MORTGAGE COMPANY, LLC" , "AmeriHome Mortgage"),
        ("ALLY BANK" , "Ally Bank"),
        ("ACADEMY MORTGAGE CORPORATION" , "Academy Mortgage"),
        ("NO CASH-OUT REFINANCE" , "OTHER REFINANCE"),
        ("REFINANCE - NOT SPECIFIED" , "OTHER REFINANCE"),
        ("Other REFINANCE" , "OTHER REFINANCE")]

cate_col_names = [
        "orig_channel",
        "first_home_buyer",
        "loan_purpose",
        "property_type",
        "occupancy_status",
        "property_state",
        "relocation_mortgage_indicator",
        "seller_name",
        "mod_flag"
]
# Numberic columns
label_col_name = "delinquency_12"
numeric_col_names = [
        "orig_interest_rate",
        "orig_upb",
        "orig_loan_term",
        "orig_ltv",
        "orig_cltv",
        "num_borrowers",
        "dti",
        "borrower_credit_score",
        "num_units",
        "zip",
        "mortgage_insurance_percent",
        "current_loan_delinquency_status",
        "current_actual_upb",
        "interest_rate",
        "loan_age",
        "msa",
        "non_interest_bearing_upb",
        label_col_name
]
all_col_names = cate_col_names + numeric_col_names

def read_perf_csv(spark, path):
    return spark.read.format('csv') \
            .option('nullValue', '') \
            .option('header', 'false') \
            .option('delimiter', '|') \
            .schema(_csv_perf_schema) \
            .load(path) \
            .withColumn('quarter', _get_quarter_from_csv_file_name())

def read_acq_csv(spark, path):
    return spark.read.format('csv') \
            .option('nullValue', '') \
            .option('header', 'false') \
            .option('delimiter', '|') \
            .schema(_csv_acq_schema) \
            .load(path) \
            .withColumn('quarter', _get_quarter_from_csv_file_name())

def _parse_dates(perf):
    return perf \
            .withColumn('monthly_reporting_period', to_date(col('monthly_reporting_period'), 'MM/dd/yyyy')) \
            .withColumn('monthly_reporting_period_month', month(col('monthly_reporting_period'))) \
            .withColumn('monthly_reporting_period_year', year(col('monthly_reporting_period'))) \
            .withColumn('monthly_reporting_period_day', dayofmonth(col('monthly_reporting_period'))) \
            .withColumn('last_paid_installment_date', to_date(col('last_paid_installment_date'), 'MM/dd/yyyy')) \
            .withColumn('foreclosed_after', to_date(col('foreclosed_after'), 'MM/dd/yyyy')) \
            .withColumn('disposition_date', to_date(col('disposition_date'), 'MM/dd/yyyy')) \
            .withColumn('maturity_date', to_date(col('maturity_date'), 'MM/yyyy')) \
            .withColumn('zero_balance_effective_date', to_date(col('zero_balance_effective_date'), 'MM/yyyy'))

def _create_perf_deliquency(spark, perf):
    aggDF = perf.select(
            col("quarter"),
            col("loan_id"),
            col("current_loan_delinquency_status"),
            when(col("current_loan_delinquency_status") >= 1, col("monthly_reporting_period")).alias("delinquency_30"),
            when(col("current_loan_delinquency_status") >= 3, col("monthly_reporting_period")).alias("delinquency_90"),
            when(col("current_loan_delinquency_status") >= 6, col("monthly_reporting_period")).alias("delinquency_180")) \
                    .groupBy("quarter", "loan_id") \
                    .agg(
                            max("current_loan_delinquency_status").alias("delinquency_12"),
                            min("delinquency_30").alias("delinquency_30"),
                            min("delinquency_90").alias("delinquency_90"),
                            min("delinquency_180").alias("delinquency_180")) \
                                    .select(
                                            col("quarter"),
                                            col("loan_id"),
                                            (col("delinquency_12") >= 1).alias("ever_30"),
                                            (col("delinquency_12") >= 3).alias("ever_90"),
                                            (col("delinquency_12") >= 6).alias("ever_180"),
                                            col("delinquency_30"),
                                            col("delinquency_90"),
                                            col("delinquency_180"))
    joinedDf = perf \
            .withColumnRenamed("monthly_reporting_period", "timestamp") \
            .withColumnRenamed("monthly_reporting_period_month", "timestamp_month") \
            .withColumnRenamed("monthly_reporting_period_year", "timestamp_year") \
            .withColumnRenamed("current_loan_delinquency_status", "delinquency_12") \
            .withColumnRenamed("current_actual_upb", "upb_12") \
            .select("quarter", "loan_id", "timestamp", "delinquency_12", "upb_12", "timestamp_month", "timestamp_year") \
            .join(aggDF, ["loan_id", "quarter"], "left_outer")

    # calculate the 12 month delinquency and upb values
    months = 12
    monthArray = [lit(x) for x in range(0, 12)]
    # explode on a small amount of data is actually slightly more efficient than a cross join
    testDf = joinedDf \
            .withColumn("month_y", explode(array(monthArray))) \
            .select(
                    col("quarter"),
                    floor(((col("timestamp_year") * 12 + col("timestamp_month")) - 24000) / months).alias("josh_mody"),
                    floor(((col("timestamp_year") * 12 + col("timestamp_month")) - 24000 - col("month_y")) / months).alias("josh_mody_n"),
                    col("ever_30"),
                    col("ever_90"),
                    col("ever_180"),
                    col("delinquency_30"),
                    col("delinquency_90"),
                    col("delinquency_180"),
                    col("loan_id"),
                    col("month_y"),
                    col("delinquency_12"),
                    col("upb_12")) \
                            .groupBy("quarter", "loan_id", "josh_mody_n", "ever_30", "ever_90", "ever_180", "delinquency_30", "delinquency_90", "delinquency_180", "month_y") \
                            .agg(max("delinquency_12").alias("delinquency_12"), min("upb_12").alias("upb_12")) \
                            .withColumn("timestamp_year", floor((lit(24000) + (col("josh_mody_n") * lit(months)) + (col("month_y") - 1)) / lit(12))) \
                            .selectExpr('*', 'pmod(24000 + (josh_mody_n * {}) + month_y, 12) as timestamp_month_tmp'.format(months)) \
                            .withColumn("timestamp_month", when(col("timestamp_month_tmp") == lit(0), lit(12)).otherwise(col("timestamp_month_tmp"))) \
                            .withColumn("delinquency_12", ((col("delinquency_12") > 3).cast("int") + (col("upb_12") == 0).cast("int")).alias("delinquency_12")) \
                            .drop("timestamp_month_tmp", "josh_mody_n", "month_y")

    return perf.withColumnRenamed("monthly_reporting_period_month", "timestamp_month") \
            .withColumnRenamed("monthly_reporting_period_year", "timestamp_year") \
            .join(testDf, ["quarter", "loan_id", "timestamp_year", "timestamp_month"], "left") \
            .drop("timestamp_year", "timestamp_month")

def _create_acquisition(spark, acq):
    nameMapping = spark.createDataFrame(_name_mapping, ["from_seller_name", "to_seller_name"])
    return acq.join(nameMapping, col("seller_name") == col("from_seller_name"), "left") \
      .drop("from_seller_name") \
      .withColumn("old_name", col("seller_name")) \
      .withColumn("seller_name", coalesce(col("to_seller_name"), col("seller_name"))) \
      .drop("to_seller_name") \
      .withColumn("orig_date", to_date(col("orig_date"), "MM/yyyy")) \
      .withColumn("first_pay_date", to_date(col("first_pay_date"), "MM/yyyy")) \

def _gen_dictionary(etl_df, col_names):
    cnt_table = etl_df.select(posexplode(array([col(i) for i in col_names])))\
                    .withColumnRenamed("pos", "column_id")\
                    .withColumnRenamed("col", "data")\
                    .filter("data is not null")\
                    .groupBy("column_id", "data")\
                    .count()
    windowed = Window.partitionBy("column_id").orderBy(desc("count"))
    return cnt_table.withColumn("id", row_number().over(windowed)).drop("count")


def _cast_string_columns_to_numeric(spark, input_df):
    cached_dict_df = _gen_dictionary(input_df, cate_col_names).cache()
    output_df = input_df
    #  Generate the final table with all columns being numeric.
    for col_pos, col_name in enumerate(cate_col_names):
        col_dict_df = cached_dict_df.filter(col("column_id") == col_pos)\
                                    .drop("column_id")\
                                    .withColumnRenamed("data", col_name)
        
        output_df = output_df.join(broadcast(col_dict_df), col_name, "left")\
                        .drop(col_name)\
                        .withColumnRenamed("id", col_name)
    return output_df

def run_mortgage(spark, perf, acq):
    parsed_perf = _parse_dates(perf)
    perf_deliqency = _create_perf_deliquency(spark, parsed_perf)
    cleaned_acq = _create_acquisition(spark, acq)
    df = perf_deliqency.join(cleaned_acq, ["loan_id", "quarter"], "inner")
    test_quarters = ['2016Q1','2016Q2','2016Q3','2016Q4']
    train_df = df.filter(~df.quarter.isin(test_quarters)).drop("quarter")
    test_df = df.filter(df.quarter.isin(test_quarters)).drop("quarter")
    casted_train_df = _cast_string_columns_to_numeric(spark, train_df)\
                    .select(all_col_names)\
                    .withColumn(label_col_name, when(col(label_col_name) > 0, 1).otherwise(0))\
                    .fillna(float(0))
    casted_test_df = _cast_string_columns_to_numeric(spark, test_df)\
                    .select(all_col_names)\
                    .withColumn(label_col_name, when(col(label_col_name) > 0, 1).otherwise(0))\
                    .fillna(float(0))
    return casted_train_df, casted_test_df

### Define Spark conf and Create Spark Session
For details explanation for spark conf, please go to Spark RAPIDS [config guide](https://nvidia.github.io/spark-rapids/docs/configs.html).

In [2]:
sc.stop()

conf = SparkConf().setAppName("MortgageETL")
conf.set('spark.rapids.sql.explain', 'ALL')
conf.set("spark.executor.instances", "20")
conf.set("spark.executor.cores", "7")
conf.set("spark.task.cpus", "1")
conf.set("spark.rapids.sql.concurrentGpuTasks", "2")
conf.set("spark.executor.memory", "4g")
conf.set("spark.rapids.memory.pinnedPool.size", "2G")
conf.set("spark.executor.memoryOverhead", "2G")
conf.set("spark.executor.extraJavaOptions", "-Dai.rapids.cudf.prefer-pinned=true")
conf.set("spark.locality.wait", "0s")
conf.set("spark.sql.files.maxPartitionBytes", "512m")
conf.set("spark.executor.resource.gpu.amount", "1")
conf.set("spark.task.resource.gpu.amount", "0.142")
conf.set("spark.plugins", "com.nvidia.spark.SQLPlugin")
conf.set("spark.rapids.sql.hasNans", "false")
conf.set('spark.rapids.sql.batchSizeBytes', '512M')
conf.set('spark.rapids.sql.reader.batchSizeBytes', '768M')
conf.set('spark.rapids.sql.variableFloatAgg.enabled', 'true')

spark = SparkSession.builder \
                    .config(conf=conf) \
                    .getOrCreate()

sc = spark.sparkContext

### Define Data Input/Output location

In [3]:
orig_perf_path = 'gs://dataproc-nv-demo/mortgage_full/perf/*'
orig_acq_path = 'gs://dataproc-nv-demo/mortgage_full/acq/*'

train_path = 'gs://dataproc-nv-demo/mortgage_full/train/'
test_path = 'gs://dataproc-nv-demo/mortgage_full/test/'
tmp_perf_path = 'gs://dataproc-nv-demo/mortgage_parquet_gpu/perf/'
tmp_acq_path = 'gs://dataproc-nv-demo/mortgage_parquet_gpu/acq/'

### Read CSV data and Transcode to Parquet

In [4]:
# Lets transcode the data first
start = time.time()
# we want a few big files instead of lots of small files
spark.conf.set('spark.sql.files.maxPartitionBytes', '200G')
acq = read_acq_csv(spark, orig_acq_path)
acq.repartition(20).write.parquet(tmp_acq_path, mode='overwrite')
perf = read_perf_csv(spark, orig_perf_path)
perf.coalesce(80).write.parquet(tmp_perf_path, mode='overwrite')
end = time.time()
print(end - start)

108.28529238700867


### Execute ETL Code Defined in 1st Cell

In [5]:
# Now lets actually process the data\n",
start = time.time()
spark.conf.set('spark.sql.files.maxPartitionBytes', '1G')
spark.conf.set('spark.sql.shuffle.partitions', '160')
perf = spark.read.parquet(tmp_perf_path)
acq = spark.read.parquet(tmp_acq_path)
train_out, test_out = run_mortgage(spark, perf, acq)
train_out.write.parquet(train_path, mode='overwrite')
end = time.time()
print(end - start)
test_out.write.parquet(test_path, mode='overwrite')
end = time.time()
print(end - start)

137.99262690544128
171.97584056854248


### Print Physical Plan

In [8]:
train_out.explain()

== Physical Plan ==
*(2) GpuColumnarToRow false
+- GpuProject [gpucoalesce(orig_channel#1922, 0) AS orig_channel#3686, gpucoalesce(first_home_buyer#2124, 0) AS first_home_buyer#3687, gpucoalesce(loan_purpose#2326, 0) AS loan_purpose#3688, gpucoalesce(property_type#2528, 0) AS property_type#3689, gpucoalesce(occupancy_status#2730, 0) AS occupancy_status#3690, gpucoalesce(property_state#2932, 0) AS property_state#3691, gpucoalesce(relocation_mortgage_indicator#3134, 0) AS relocation_mortgage_indicator#3692, gpucoalesce(seller_name#3336, 0) AS seller_name#3693, gpucoalesce(id#1728, 0) AS mod_flag#3694, gpucoalesce(gpunanvl(orig_interest_rate#297, null), 0.0) AS orig_interest_rate#3695, gpucoalesce(orig_upb#298, 0) AS orig_upb#3696, gpucoalesce(orig_loan_term#299, 0) AS orig_loan_term#3697, gpucoalesce(gpunanvl(orig_ltv#302, null), 0.0) AS orig_ltv#3698, gpucoalesce(gpunanvl(orig_cltv#303, null), 0.0) AS orig_cltv#3699, gpucoalesce(gpunanvl(num_borrowers#304, null), 0.0) AS num_borrowers#3

## 