-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathBigQuery.py
More file actions
84 lines (71 loc) · 2.72 KB
/
BigQuery.py
File metadata and controls
84 lines (71 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# Source code is taken from here: https://github.com/googleapis/python-bigquery/blob/master/samples/load_table_uri_json.py
import os
from google.cloud import bigquery
#setup environmental variable with GOOGLE_APPLICATION_CREDENTIALS to get access from the local PC to your project
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/.../Credentials.json"
client = bigquery.Client()
#Set table_id to the ID of the table to create.
table_id = "project.dataset.table_id"
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("clicks", "INTEGER"),
bigquery.SchemaField("cost", "FLOAT"),
bigquery.SchemaField("countryCode", "STRING"),
bigquery.SchemaField("countryName", "STRING"),
bigquery.SchemaField("customConversions1", "INTEGER"),
bigquery.SchemaField("customConversions2", "INTEGER"),
bigquery.SchemaField("revenue", "FLOAT"),
bigquery.SchemaField("visits", "INTEGER"),
bigquery.SchemaField("date", "DATE"),
],
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
# set the path to the data in GCS
uri = "gs://.../sample.json"
load_job = client.load_table_from_uri(
uri,
table_id,
location="US", # Must match the destination dataset location.
job_config=job_config,
)
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
#
#
# To update data in the already existing BQ table use the code below
#
#
import os
from datetime import timedelta
from datetime import date
from google.cloud import bigquery
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "C:/.../Credentials.json"
from google.cloud.bigquery.client import Client
client = Client()
sql = """
DELETE FROM `dataset.table_id`
WHERE date >= '""" + str(start_date) + """'
"""
# Start the query, passing in the extra configuration.
query_job = client.query(sql) # Make an API request.
query_job.result() # Wait for the job to complete.
table_id = "project.dataset.table_id"
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
)
# set the path to the data in GCS
uri = "gs://dataset/sample.json"
load_job = client.load_table_from_uri(
uri,
table_id,
location="US", # Must match the destination dataset location.
job_config=job_config,
)
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("\nLoaded data for {}-{}.{}.{}.".format(start_date.day, date_today.day, date_today.month, date_today.year))
print("\nTotally {} rows.".format(destination_table.num_rows))