-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathsqlalchemy_example.py
More file actions
202 lines (173 loc) · 5.61 KB
/
sqlalchemy_example.py
File metadata and controls
202 lines (173 loc) · 5.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""
databricks-sql-connector includes a SQLAlchemy 2.0 dialect compatible with Databricks SQL. To install
its dependencies you can run `pip install databricks-sql-connector[sqlalchemy]`.
The expected connection string format which you can pass to create_engine() is:
databricks://token:dapi***@***.cloud.databricks.com?http_path=/sql/***&catalog=**&schema=**
Our dialect implements the majority of SQLAlchemy 2.0's API. Because of the extent of SQLAlchemy's
capabilities it isn't feasible to provide examples of every usage in a single script, so we only
provide a basic one here. Learn more about usage in README.sqlalchemy.md in this repo.
"""
# fmt: off
import os
from datetime import date, datetime, time, timedelta, timezone
from decimal import Decimal
from uuid import UUID
import json
# By convention, backend-specific SQLA types are defined in uppercase
# This dialect exposes Databricks SQL's TIMESTAMP, TINYINT, and VARIANT types
# as these are not covered by the generic, camelcase types shown below
from databricks.sqlalchemy import TIMESTAMP, TINYINT, DatabricksVariant
# Beside the CamelCase types shown below, line comments reflect
# the underlying Databricks SQL / Delta table type
from sqlalchemy import (
BigInteger, # BIGINT
Boolean, # BOOLEAN
Column,
Date, # DATE
DateTime, # TIMESTAMP_NTZ
Integer, # INTEGER
Numeric, # DECIMAL
String, # STRING
Time, # STRING
Uuid, # STRING
create_engine,
select,
)
from sqlalchemy.orm import DeclarativeBase, Session
host = os.getenv("DATABRICKS_SERVER_HOSTNAME")
http_path = os.getenv("DATABRICKS_HTTP_PATH")
access_token = os.getenv("DATABRICKS_TOKEN")
catalog = os.getenv("DATABRICKS_CATALOG")
schema = os.getenv("DATABRICKS_SCHEMA")
# Extra arguments are passed untouched to databricks-sql-connector
# See src/databricks/sql/thrift_backend.py for complete list
extra_connect_args = {
"_tls_verify_hostname": True,
"_user_agent_entry": "PySQL Example Script",
}
engine = create_engine(
f"databricks://token:{access_token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}",
connect_args=extra_connect_args, echo=True,
)
class Base(DeclarativeBase):
pass
# This object gives a usage example for each supported type
# for more details on these, see README.sqlalchemy.md
class SampleObject(Base):
__tablename__ = "pysql_sqlalchemy_example_table"
bigint_col = Column(BigInteger, primary_key=True)
string_col = Column(String)
tinyint_col = Column(TINYINT)
int_col = Column(Integer)
numeric_col = Column(Numeric(10, 2))
boolean_col = Column(Boolean)
date_col = Column(Date)
datetime_col = Column(TIMESTAMP)
datetime_col_ntz = Column(DateTime)
time_col = Column(Time)
uuid_col = Column(Uuid)
variant_col = Column(DatabricksVariant)
Base.metadata.drop_all(engine)
# Output SQL is:
# DROP TABLE pysql_sqlalchemy_example_table
# This generates a CREATE TABLE statement against the catalog and schema
# specified in the connection string
Base.metadata.create_all(engine)
# Output SQL is:
# CREATE TABLE pysql_sqlalchemy_example_table (
# bigint_col BIGINT NOT NULL,
# string_col STRING,
# tinyint_col SMALLINT,
# int_col INT,
# numeric_col DECIMAL(10, 2),
# boolean_col BOOLEAN,
# date_col DATE,
# datetime_col TIMESTAMP,
# datetime_col_ntz TIMESTAMP_NTZ,
# time_col STRING,
# uuid_col STRING,
# variant_col VARIANT,
# PRIMARY KEY (bigint_col)
# ) USING DELTA
# The code that follows will INSERT a record using SQLAlchemy ORM containing these values
# and then SELECT it back out. The output is compared to the input to demonstrate that
# all type information is preserved.
sample_object = {
"bigint_col": 1234567890123456789,
"string_col": "foo",
"tinyint_col": -100,
"int_col": 5280,
"numeric_col": Decimal("525600.01"),
"boolean_col": True,
"date_col": date(2020, 12, 25),
"datetime_col": datetime(
1991, 8, 3, 21, 30, 5, tzinfo=timezone(timedelta(hours=-8))
),
"datetime_col_ntz": datetime(1990, 12, 4, 6, 33, 41),
"time_col": time(23, 59, 59),
"uuid_col": UUID(int=255),
"variant_col": {
"name": "John Doe",
"age": 30,
"address": {
"street": "123 Main St",
"city": "San Francisco",
"state": "CA",
"zip": "94105"
},
"hobbies": ["reading", "hiking", "cooking"],
"is_active": True,
"metadata": {
"created_at": "2024-01-15T10:30:00Z",
"version": 1.2,
"tags": ["premium", "verified"]
}
},
}
sa_obj = SampleObject(**sample_object)
session = Session(engine)
session.add(sa_obj)
session.commit()
# Output SQL is:
# INSERT INTO
# pysql_sqlalchemy_example_table (
# bigint_col,
# string_col,
# tinyint_col,
# int_col,
# numeric_col,
# boolean_col,
# date_col,
# datetime_col,
# datetime_col_ntz,
# time_col,
# uuid_col,
# variant_col
# )
# VALUES
# (
# :bigint_col,
# :string_col,
# :tinyint_col,
# :int_col,
# :numeric_col,
# :boolean_col,
# :date_col,
# :datetime_col,
# :datetime_col_ntz,
# :time_col,
# :uuid_col,
# PARSE_JSON(:variant_col)
# )
# Here we build a SELECT query using ORM
stmt = select(SampleObject).where(SampleObject.int_col == 5280)
# Then fetch one result with session.scalar()
result = session.scalar(stmt)
# Finally, we read out the input data and compare it to the output
compare = {key: getattr(result, key) for key in sample_object.keys()}
compare['variant_col'] = json.loads(compare['variant_col'])
assert compare == sample_object
# Then we drop the demonstration table
Base.metadata.drop_all(engine)
# Output SQL is:
# DROP TABLE pysql_sqlalchemy_example_table