Easily integrate data in BigQuery
from pygbq import Client
import requests
client = Client()
token = client.get_secret('secret_name')
headers = {'Authorization': f'Bearer {token}'}
url = ...
data = requests.get(url, headers=headers).json()
response = client.update_table_using_temp(data=data, table_id='mydataset.mytable', how=['id'])This snippets gets some data from an url and (merges)[https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement] ((upserts)[https://en.wikipedia.org/wiki/Merge_(SQL)]) on id column it to the table mytable in the dataset mydataset.
pip install pygbq
Set up the authentication.
PyGBQ generates one or many temporary tables that are merged into the target table. During the merge all the columns of the target table are updated. Here's how it looks like:
- Split
datainto batches. - For every batch create
mydataset.mytable_tmp_SOMERANDOMPOSTFIX, put it inside and run
MERGE myproject.mydataset.mytable T
USING myproject.mydataset.mytable_tmp_SOMERANDOMPOSTFIX S
ON T.column1 = S.column1 AND T.column2 = S.column2
WHEN NOT MATCHED THEN
INSERT ROW
WHEN MATCHED THEN
UPDATE SET
column1 = S.column1,
column2 = S.column2,
column3 = S.column3,
column4 = S.column4
...- Creates a table
mydataset.mytablewith schema automatically generated bybigquery-schema-generator. - Splits data into batches and inserts it to
mydataset.mytable.
Identical to how='replace' except that it fails if mydataset.mytable exists.
Splits data into batches and inserts (appends) it to mydataset.mytable.
For more details look at Documentation section.
Here's the documentation with default parameters.
from pygbq import Client
client = Client(default_dataset=None, path_to_key=None)Initalizes a client. You can specify:
default_dataset- (str) default dataset that theclientwill be using to reference tablespath_to_key- (str) By default PyGQB usesfrom google.auth import defaultto get credentials, but you can specify this parameter if you wish to usefrom google.auth import load_credentials_from_fileinstead.
client.update_table_using_temp(data, table_id, how, schema: Union[str, List[dict]] = None, expiration=1, max_insert_num_rows=4000)Updates table.
data- list of dicttable_id- (str) Table id, could have one of the following forms:table_nameifdefault_datasetis setdataset_name.table_nameproject_id.dataset_name.table_name
how- (str or List[dict]) Look at How it works sectionexpiration- (float) temporary tables expiration time in hoursmax_insert_num_rows- (int) how many rows per temporary table is inserted
client.get_secret(self, secret_id, version="latest")Get a secret stored in Secret Manager.
secret_id- (str) Secret nameversion- Secret version
client.get_secret(self, secret_id, version="latest")Adds a new secret version in Secret Manager.
secret_id- (str) Secret namedata- (str) Secret value
from pygbq import read_jsonl
read_jsonl(name: str = "data.jsonl")Reads a new line delimited json.