|
| 1 | +# Databricks notebook source |
| 2 | +# MAGIC %md |
| 3 | +# MAGIC # QueryForge - Delta Table Setup |
| 4 | +# MAGIC Run this notebook in Databricks Community Edition to create the financial data schemas as Delta tables. |
| 5 | + |
| 6 | +# COMMAND ---------- |
| 7 | + |
| 8 | +# MAGIC %md |
| 9 | +# MAGIC ## Create Database |
| 10 | + |
| 11 | +# COMMAND ---------- |
| 12 | + |
| 13 | +spark.sql("CREATE DATABASE IF NOT EXISTS queryforge") |
| 14 | +spark.sql("USE queryforge") |
| 15 | + |
| 16 | +# COMMAND ---------- |
| 17 | + |
| 18 | +# MAGIC %md |
| 19 | +# MAGIC ## Accounts Table |
| 20 | + |
| 21 | +# COMMAND ---------- |
| 22 | + |
| 23 | +from pyspark.sql.types import StructType, StructField, StringType, DecimalType, DateType |
| 24 | + |
| 25 | +accounts_data = [ |
| 26 | + ("ACC001", "Alice Chen", "checking", 125000.50, "2020-03-15", "NYC001", "active"), |
| 27 | + ("ACC002", "Bob Martinez", "savings", 89000.00, "2019-07-22", "LA002", "active"), |
| 28 | + ("ACC003", "Carol Williams", "loan", 250000.00, "2021-01-10", "CHI003", "active"), |
| 29 | + ("ACC004", "David Kim", "credit", 15000.75, "2018-11-05", "NYC001", "active"), |
| 30 | + ("ACC005", "Eva Patel", "checking", 340000.00, "2022-06-18", "SF004", "active"), |
| 31 | + ("ACC006", "Frank Johnson", "savings", 45000.00, "2017-02-28", "LA002", "closed"), |
| 32 | + ("ACC007", "Grace Lee", "checking", 178000.25, "2020-09-14", "CHI003", "active"), |
| 33 | + ("ACC008", "Henry Brown", "loan", 500000.00, "2023-03-01", "SF004", "active"), |
| 34 | + ("ACC009", "Irene Davis", "credit", 8500.00, "2021-08-20", "NYC001", "frozen"), |
| 35 | + ("ACC010", "Jack Wilson", "checking", 92000.00, "2019-12-03", "LA002", "active"), |
| 36 | +] |
| 37 | + |
| 38 | +accounts_df = spark.createDataFrame(accounts_data, ["account_id", "customer_name", "account_type", "balance", "open_date", "branch_code", "status"]) |
| 39 | +accounts_df = accounts_df.withColumn("balance", accounts_df.balance.cast(DecimalType(18, 2))) |
| 40 | +accounts_df = accounts_df.withColumn("open_date", accounts_df.open_date.cast(DateType())) |
| 41 | + |
| 42 | +accounts_df.write.format("delta").mode("overwrite").saveAsTable("queryforge.accounts") |
| 43 | +print(f"Accounts table created with {accounts_df.count()} rows") |
| 44 | + |
| 45 | +# COMMAND ---------- |
| 46 | + |
| 47 | +# MAGIC %md |
| 48 | +# MAGIC ## Transactions Table |
| 49 | + |
| 50 | +# COMMAND ---------- |
| 51 | + |
| 52 | +transactions_data = [ |
| 53 | + ("TXN001", "ACC001", "2024-01-15 09:30:00", 5000.00, "credit", "salary", "Monthly salary deposit"), |
| 54 | + ("TXN002", "ACC001", "2024-01-16 14:22:00", 150.00, "debit", "utilities", "Electric bill payment"), |
| 55 | + ("TXN003", "ACC002", "2024-01-15 10:00:00", 2000.00, "credit", "transfer", "Transfer from checking"), |
| 56 | + ("TXN004", "ACC003", "2024-01-20 08:00:00", 3500.00, "debit", "loan_payment", "Monthly loan payment"), |
| 57 | + ("TXN005", "ACC004", "2024-01-18 16:45:00", 250.00, "debit", "shopping", "Online purchase"), |
| 58 | + ("TXN006", "ACC005", "2024-01-15 09:00:00", 12000.00, "credit", "salary", "Monthly salary deposit"), |
| 59 | + ("TXN007", "ACC005", "2024-01-22 11:30:00", 800.00, "debit", "dining", "Restaurant payment"), |
| 60 | + ("TXN008", "ACC007", "2024-01-15 09:15:00", 8500.00, "credit", "salary", "Monthly salary deposit"), |
| 61 | + ("TXN009", "ACC008", "2024-01-25 08:00:00", 5000.00, "debit", "loan_payment", "Monthly loan payment"), |
| 62 | + ("TXN010", "ACC010", "2024-01-15 09:45:00", 6000.00, "credit", "salary", "Monthly salary deposit"), |
| 63 | + ("TXN011", "ACC001", "2024-02-15 09:30:00", 5000.00, "credit", "salary", "Monthly salary deposit"), |
| 64 | + ("TXN012", "ACC001", "2024-02-20 13:10:00", 2200.00, "debit", "rent", "Monthly rent payment"), |
| 65 | + ("TXN013", "ACC005", "2024-02-15 09:00:00", 12000.00, "credit", "salary", "Monthly salary deposit"), |
| 66 | + ("TXN014", "ACC007", "2024-02-15 09:15:00", 8500.00, "credit", "salary", "Monthly salary deposit"), |
| 67 | + ("TXN015", "ACC010", "2024-02-15 09:45:00", 6000.00, "credit", "salary", "Monthly salary deposit"), |
| 68 | +] |
| 69 | + |
| 70 | +from pyspark.sql.types import TimestampType |
| 71 | + |
| 72 | +transactions_df = spark.createDataFrame(transactions_data, ["txn_id", "account_id", "txn_date", "amount", "txn_type", "category", "description"]) |
| 73 | +transactions_df = transactions_df.withColumn("amount", transactions_df.amount.cast(DecimalType(18, 2))) |
| 74 | +transactions_df = transactions_df.withColumn("txn_date", transactions_df.txn_date.cast(TimestampType())) |
| 75 | + |
| 76 | +transactions_df.write.format("delta").mode("overwrite").saveAsTable("queryforge.transactions") |
| 77 | +print(f"Transactions table created with {transactions_df.count()} rows") |
| 78 | + |
| 79 | +# COMMAND ---------- |
| 80 | + |
| 81 | +# MAGIC %md |
| 82 | +# MAGIC ## Risk Metrics Table |
| 83 | + |
| 84 | +# COMMAND ---------- |
| 85 | + |
| 86 | +risk_data = [ |
| 87 | + ("ACC001", "2024-01-31", 750, "low", 0.0120, 0.3500), |
| 88 | + ("ACC002", "2024-01-31", 680, "medium", 0.0450, 0.4000), |
| 89 | + ("ACC003", "2024-01-31", 620, "high", 0.0890, 0.5500), |
| 90 | + ("ACC004", "2024-01-31", 580, "high", 0.1200, 0.6000), |
| 91 | + ("ACC005", "2024-01-31", 800, "low", 0.0050, 0.2500), |
| 92 | + ("ACC007", "2024-01-31", 720, "low", 0.0180, 0.3200), |
| 93 | + ("ACC008", "2024-01-31", 550, "critical", 0.1800, 0.7500), |
| 94 | + ("ACC009", "2024-01-31", 490, "critical", 0.2500, 0.8500), |
| 95 | + ("ACC010", "2024-01-31", 700, "medium", 0.0350, 0.3800), |
| 96 | +] |
| 97 | + |
| 98 | +from pyspark.sql.types import IntegerType |
| 99 | + |
| 100 | +risk_df = spark.createDataFrame(risk_data, ["account_id", "metric_date", "credit_score", "risk_rating", "probability_of_default", "loss_given_default"]) |
| 101 | +risk_df = risk_df.withColumn("metric_date", risk_df.metric_date.cast(DateType())) |
| 102 | +risk_df = risk_df.withColumn("credit_score", risk_df.credit_score.cast(IntegerType())) |
| 103 | +risk_df = risk_df.withColumn("probability_of_default", risk_df.probability_of_default.cast(DecimalType(5, 4))) |
| 104 | +risk_df = risk_df.withColumn("loss_given_default", risk_df.loss_given_default.cast(DecimalType(5, 4))) |
| 105 | + |
| 106 | +risk_df.write.format("delta").mode("overwrite").saveAsTable("queryforge.risk_metrics") |
| 107 | +print(f"Risk metrics table created with {risk_df.count()} rows") |
| 108 | + |
| 109 | +# COMMAND ---------- |
| 110 | + |
| 111 | +# MAGIC %md |
| 112 | +# MAGIC ## Model Inventory Table |
| 113 | + |
| 114 | +# COMMAND ---------- |
| 115 | + |
| 116 | +model_data = [ |
| 117 | + ("MDL001", "Credit Scoring v3", "classification", "2023-06-15", "Risk Team", "active", "2024-01-15"), |
| 118 | + ("MDL002", "Fraud Detection v2", "anomaly_detection", "2023-09-01", "Fraud Team", "active", "2024-01-20"), |
| 119 | + ("MDL003", "Churn Predictor", "classification", "2022-11-10", "Marketing", "retired", "2023-06-10"), |
| 120 | + ("MDL004", "LGD Model v1", "regression", "2024-01-05", "Risk Team", "validation", "2024-01-05"), |
| 121 | + ("MDL005", "Transaction Classifier", "classification", "2023-03-20", "Operations", "active", "2023-12-15"), |
| 122 | +] |
| 123 | + |
| 124 | +model_df = spark.createDataFrame(model_data, ["model_id", "model_name", "model_type", "deployment_date", "owner", "status", "last_validation_date"]) |
| 125 | +model_df = model_df.withColumn("deployment_date", model_df.deployment_date.cast(DateType())) |
| 126 | +model_df = model_df.withColumn("last_validation_date", model_df.last_validation_date.cast(DateType())) |
| 127 | + |
| 128 | +model_df.write.format("delta").mode("overwrite").saveAsTable("queryforge.model_inventory") |
| 129 | +print(f"Model inventory table created with {model_df.count()} rows") |
| 130 | + |
| 131 | +# COMMAND ---------- |
| 132 | + |
| 133 | +# MAGIC %md |
| 134 | +# MAGIC ## Verify All Tables |
| 135 | + |
| 136 | +# COMMAND ---------- |
| 137 | + |
| 138 | +for table in ["accounts", "transactions", "risk_metrics", "model_inventory"]: |
| 139 | + count = spark.sql(f"SELECT COUNT(*) FROM queryforge.{table}").collect()[0][0] |
| 140 | + print(f"queryforge.{table}: {count} rows") |
0 commit comments