-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path4.1 DLT.sql
More file actions
148 lines (122 loc) · 3.06 KB
/
4.1 DLT.sql
File metadata and controls
148 lines (122 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
-- Databricks notebook source
-- MAGIC %md
-- MAGIC
-- MAGIC
-- MAGIC # Delta Live Tables
-- COMMAND ----------
-- MAGIC %md-sandbox
-- MAGIC
-- MAGIC <div style="text-align: center; line-height: 0; padding-top: 9px;">
-- MAGIC <img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Associate/main/Includes/images/bookstore_schema.png" alt="Databricks Learning" style="width: 600">
-- MAGIC </div>
-- COMMAND ----------
SET datasets.path=dbfs:/mnt/demo-datasets/bookstore;
-- COMMAND ----------
-- MAGIC %md
-- MAGIC ## Bronze Layer Tables
-- COMMAND ----------
-- MAGIC %md
-- MAGIC #### orders_raw
-- COMMAND ----------
CREATE
OR REFRESH STREAMING LIVE TABLE orders_raw
COMMENT "The raw books orders, ingested from orders-raw" AS
SELECT
*
FROM
cloud_files(
"${datasets.path}/orders-json-raw",
"json",
map("cloudFiles.inferColumnTypes", "true")
)
-- COMMAND ----------
-- MAGIC %md
-- MAGIC #### customers
-- COMMAND ----------
CREATE
OR REFRESH LIVE TABLE customers
COMMENT "The customers lookup table, ingested from customers-json" AS
SELECT
*
FROM
json.`${datasets.path}/customers-json`
-- COMMAND ----------
-- MAGIC %md
-- MAGIC
-- MAGIC
-- MAGIC
-- MAGIC ## Silver Layer Tables
-- MAGIC
-- MAGIC #### orders_cleaned
-- COMMAND ----------
CREATE
OR REFRESH STREAMING LIVE TABLE orders_cleaned (
CONSTRAINT valid_order_number EXPECT (order_id IS NOT NULL) ON VIOLATION DROP ROW
)
COMMENT "The cleaned books orders with valid order_id" AS
SELECT
order_id,
quantity,
o.customer_id,
c.profile :first_name as f_name,
c.profile :last_name as l_name,
cast(
from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp
) order_timestamp,
o.books,
c.profile :address :country as country
FROM
STREAM(LIVE.orders_raw) o
LEFT JOIN LIVE.customers c ON o.customer_id = c.customer_id
-- COMMAND ----------
-- MAGIC %md
-- MAGIC >> Constraint violation
-- MAGIC
-- MAGIC | **`ON VIOLATION`** | Behavior |
-- MAGIC | --- | --- |
-- MAGIC | **`DROP ROW`** | Discard records that violate constraints |
-- MAGIC | **`FAIL UPDATE`** | Violated constraint causes the pipeline to fail |
-- MAGIC | Omitted | Records violating constraints will be kept, and reported in metrics |
-- COMMAND ----------
-- MAGIC %md
-- MAGIC
-- MAGIC
-- MAGIC ## Gold Tables
-- COMMAND ----------
CREATE
OR REFRESH LIVE TABLE cn_daily_customer_books
COMMENT "Daily number of books per customer in China" AS
SELECT
customer_id,
f_name,
l_name,
date_trunc("DD", order_timestamp) order_date,
sum(quantity) books_counts
FROM
LIVE.orders_cleaned
WHERE
country = "China"
GROUP BY
customer_id,
f_name,
l_name,
date_trunc("DD", order_timestamp)
-- COMMAND ----------
CREATE
OR REFRESH LIVE TABLE fr_daily_customer_books
COMMENT "Daily number of books per customer in France" AS
SELECT
customer_id,
f_name,
l_name,
date_trunc("DD", order_timestamp) order_date,
sum(quantity) books_counts
FROM
LIVE.orders_cleaned
WHERE
country = "France"
GROUP BY
customer_id,
f_name,
l_name,
date_trunc("DD", order_timestamp)