From 550e334ba83951793a5cc38a1c1d5051b5d49496 Mon Sep 17 00:00:00 2001 From: satoshihirose Date: Tue, 24 Aug 2021 16:21:29 +0900 Subject: [PATCH 1/5] Add new import logs senario --- scenarios/import_td_logs/README.md | 36 ++++++++++++++++ scenarios/import_td_logs/config/query_log.yml | 41 ++++++++++++++++++ .../import_td_logs/config/workflow_log.yml | 40 ++++++++++++++++++ scenarios/import_td_logs/import_td_logs.dig | 42 +++++++++++++++++++ .../import_td_logs/script/import_td_users.py | 20 +++++++++ 5 files changed, 179 insertions(+) create mode 100644 scenarios/import_td_logs/README.md create mode 100644 scenarios/import_td_logs/config/query_log.yml create mode 100644 scenarios/import_td_logs/config/workflow_log.yml create mode 100644 scenarios/import_td_logs/import_td_logs.dig create mode 100644 scenarios/import_td_logs/script/import_td_users.py diff --git a/scenarios/import_td_logs/README.md b/scenarios/import_td_logs/README.md new file mode 100644 index 00000000..4164a10d --- /dev/null +++ b/scenarios/import_td_logs/README.md @@ -0,0 +1,36 @@ +# Workflow: Import Treasure Data Logs from Data Landing Area +This example shows how you can use workflow to ingest Treasure Data Logs From Data Landing Areas to your Treasure Data account. + +# How to Run +## Requirement +The workflow requires that Data Landing Areas feature is enabled in your Treasure Data account and thne you've got your User ID to access to it. + +## Steps +First, edit configurations. You can find the following settings in the `import_td_logs.dig` file. + +| Parameter | Description | +| ---- | ---- | +| api_endpoint | The endpoint of the Treasure Data API. See this [document]('https://docs.treasuredata.com/display/public/PD/Sites+and+Endpoints'). (e.g. https://api.treasuredata.com) | +| dla_host | The hostname of the Data Landing Area (e.g. dla1.treasuredata-co.jp) | +| user_id | Your user_id received from TD when you enabled Data Landing Areas feature | +| account_id | Your TD account_id | +| query_logs_table | The table name where query logs are stored (e.g. query_logs) | +| workflow_logs_table | The table name where workflow logs are stored (e.g. workflow_logs) | +| users_table | The table name where users data are stored (e.g. users) | + +Next, upload the workflow to Treasure Data. + + # Upload + $ td wf push import_td_logs + +Set secrets with your private key that is the rest of public key you gave to TD when you enabled Data Landing Areas feature. + + $ td wf secrets --project import_td_logs --set sftp.dla_secret_key_file=@~/.ssh/id_rsa_dla + $ td wf secrets --project import_td_logs --set td.apikey + +You can trigger the session manually to watch it execute. + + # Run + $ td wf start import_td_logs import_td_logs --session now + +If you have any questions, contact to support@treasuredata.com. diff --git a/scenarios/import_td_logs/config/query_log.yml b/scenarios/import_td_logs/config/query_log.yml new file mode 100644 index 00000000..05d90408 --- /dev/null +++ b/scenarios/import_td_logs/config/query_log.yml @@ -0,0 +1,41 @@ +in: + type: sftp + host: ${dla_host} + user: ${user_id} + secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"} + path_prefix: "/treasure-data-logs/production/${account_id}/query_logs/v1/data.csv" + parser: + charset: UTF-8 + newline: CRLF + type: csv + delimiter: "," + quote: "\"" + escape: "\"" + trim_if_not_quoted: false + skip_header_lines: 1 + allow_extra_columns: false + allow_optional_columns: false + columns: + - {name: date, type: string} + - {name: account_id, type: string} + - {name: user_id, type: string} + - {name: project_name, type: string} + - {name: workflow_name, type: string} + - {name: task_id, type: string} + - {name: job_id, type: long} + - {name: query_id, type: string} + - {name: created_at, type: string} + - {name: scheduled_at, type: string} + - {name: start_at, type: string} + - {name: end_at, type: string} + - {name: queued_sec, type: long} + - {name: running_sec, type: long} + - {name: result_type, type: string} + - {name: load_type, type: string} + - {name: records, type: long} + - {name: type, type: string} + - {name: query_status, type: string} + - {name: result_size, type: long} + - {name: split_hours, type: double} + - {name: time, type: long} +out: {} \ No newline at end of file diff --git a/scenarios/import_td_logs/config/workflow_log.yml b/scenarios/import_td_logs/config/workflow_log.yml new file mode 100644 index 00000000..91efca9e --- /dev/null +++ b/scenarios/import_td_logs/config/workflow_log.yml @@ -0,0 +1,40 @@ +in: + type: sftp + host: ${dla_host} + user: ${user_id} + secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"} + path_prefix: "/treasure-data-logs/production/${account_id}/workflow_logs/v1/data.csv" + parser: + charset: UTF-8 + newline: CRLF + type: csv + delimiter: "," + quote: "\"" + escape: "\"" + trim_if_not_quoted: false + skip_header_lines: 1 + allow_extra_columns: false + allow_optional_columns: false + columns: + - {name: account_id, type: string} + - {name: project_id, type: string} + - {name: workflow_id, type: string} + - {name: session_id, type: string} + - {name: attempt_id, type: string} + - {name: task_id, type: string} + - {name: user_id, type: string} + - {name: project_name, type: string} + - {name: workflow_name, type: string} + - {name: timezone, type: string} + - {name: session_time, type: string} + - {name: attempt_created_at, type: string} + - {name: attempt_finished_at, type: string} + - {name: task_name, type: string} + - {name: task_start_at, type: string} + - {name: task_end_at, type: string} + - {name: attempt_running_sec, type: string} + - {name: task_running_sec, type: string} + - {name: state, type: string} + - {name: date, type: string} + - {name: time, type: long} +out: {} \ No newline at end of file diff --git a/scenarios/import_td_logs/import_td_logs.dig b/scenarios/import_td_logs/import_td_logs.dig new file mode 100644 index 00000000..c85b6262 --- /dev/null +++ b/scenarios/import_td_logs/import_td_logs.dig @@ -0,0 +1,42 @@ +timezone: UTC + +schedule: + daily>: 03:00:00 + +_export: + td: + database: treaure-data-logs + api_endpoint: https://api.treasuredata.com + dla_host: dla1.treasuredata-co.jp + user_id: abcdefg012345 + account_id: 1 + query_logs_table: query_logs + workflow_logs_table: workflow_logs + users_table: users + ++create_databases: + td_ddl>: + create_databases: [${td.database}] + ++create_table: + td_ddl>: + create_tables: [${query_logs_table}, ${workflow_logs_table}, ${users_table}] + ++import: + +query_logs: + td_load>: config/query_log.yml + table: ${query_logs_table} + + +workflow_logs: + td_load>: config/workflow_log.yml + table: ${workflow_logs_table} + + +users: + _env: + TD_API_KEY: ${secret:td.apikey} + py>: script.import_td_users.import_users + database: ${td.database} + table: ${users_table} + api_endpoint: ${api_endpoint} + docker: + image: "digdag/digdag-python:3.9" \ No newline at end of file diff --git a/scenarios/import_td_logs/script/import_td_users.py b/scenarios/import_td_logs/script/import_td_users.py new file mode 100644 index 00000000..65639040 --- /dev/null +++ b/scenarios/import_td_logs/script/import_td_users.py @@ -0,0 +1,20 @@ +import os +import sys +os.system(f"{sys.executable} -m pip install -U pandas requests pytd==1.3.0") +import pandas as pd +import pytd +import requests + +td_apikey = os.getenv("TD_API_KEY") + + +def import_users(database, table, api_endpoint): + # get users data + headers = {'Authorization': 'TD1 {}'.format(td_apikey)} + r = requests.get('{}/v3/user/list'.format(api_endpoint), headers=headers) + + # write users data + df = pd.json_normalize(r.json(), record_path=['users']) + client = pytd.Client(apikey=td_apikey, database=database) + client.load_table_from_dataframe( + df, table, writer='bulk_import', if_exists='overwrite') From d255f005178c55a751596538b697a1562f6aed29 Mon Sep 17 00:00:00 2001 From: satoshihirose Date: Tue, 24 Aug 2021 18:57:13 +0900 Subject: [PATCH 2/5] Fix words and add a parameter --- scenarios/import_td_logs/README.md | 6 ++++-- scenarios/import_td_logs/config/query_log.yml | 2 +- scenarios/import_td_logs/config/workflow_log.yml | 2 +- scenarios/import_td_logs/import_td_logs.dig | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/scenarios/import_td_logs/README.md b/scenarios/import_td_logs/README.md index 4164a10d..8002fdd4 100644 --- a/scenarios/import_td_logs/README.md +++ b/scenarios/import_td_logs/README.md @@ -1,18 +1,20 @@ # Workflow: Import Treasure Data Logs from Data Landing Area This example shows how you can use workflow to ingest Treasure Data Logs From Data Landing Areas to your Treasure Data account. +This is Opt-in feature. Please contact your Customer Success rep or Technical Support if you have an interest in this feature. # How to Run ## Requirement -The workflow requires that Data Landing Areas feature is enabled in your Treasure Data account and thne you've got your User ID to access to it. +The workflow requires that Data Landing Areas feature is enabled in your Treasure Data account and then you've got your User ID to access to it. ## Steps First, edit configurations. You can find the following settings in the `import_td_logs.dig` file. | Parameter | Description | | ---- | ---- | -| api_endpoint | The endpoint of the Treasure Data API. See this [document]('https://docs.treasuredata.com/display/public/PD/Sites+and+Endpoints'). (e.g. https://api.treasuredata.com) | +| api_endpoint | The endpoint of the Treasure Data API. See this [document](https://docs.treasuredata.com/display/public/PD/Sites+and+Endpoints). (e.g. https://api.treasuredata.com) | | dla_host | The hostname of the Data Landing Area (e.g. dla1.treasuredata-co.jp) | | user_id | Your user_id received from TD when you enabled Data Landing Areas feature | +| site | The site of your account (e.g. aws, aws-tokyo, eu01, ap02) | | account_id | Your TD account_id | | query_logs_table | The table name where query logs are stored (e.g. query_logs) | | workflow_logs_table | The table name where workflow logs are stored (e.g. workflow_logs) | diff --git a/scenarios/import_td_logs/config/query_log.yml b/scenarios/import_td_logs/config/query_log.yml index 05d90408..bb6d681b 100644 --- a/scenarios/import_td_logs/config/query_log.yml +++ b/scenarios/import_td_logs/config/query_log.yml @@ -3,7 +3,7 @@ in: host: ${dla_host} user: ${user_id} secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"} - path_prefix: "/treasure-data-logs/production/${account_id}/query_logs/v1/data.csv" + path_prefix: "/treasure-data-logs/${site}/${account_id}/query_logs/v1/data.csv" parser: charset: UTF-8 newline: CRLF diff --git a/scenarios/import_td_logs/config/workflow_log.yml b/scenarios/import_td_logs/config/workflow_log.yml index 91efca9e..a46671d3 100644 --- a/scenarios/import_td_logs/config/workflow_log.yml +++ b/scenarios/import_td_logs/config/workflow_log.yml @@ -3,7 +3,7 @@ in: host: ${dla_host} user: ${user_id} secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"} - path_prefix: "/treasure-data-logs/production/${account_id}/workflow_logs/v1/data.csv" + path_prefix: "/treasure-data-logs/${site}/${account_id}/workflow_logs/v1/data.csv" parser: charset: UTF-8 newline: CRLF diff --git a/scenarios/import_td_logs/import_td_logs.dig b/scenarios/import_td_logs/import_td_logs.dig index c85b6262..1c97fe20 100644 --- a/scenarios/import_td_logs/import_td_logs.dig +++ b/scenarios/import_td_logs/import_td_logs.dig @@ -9,6 +9,7 @@ _export: api_endpoint: https://api.treasuredata.com dla_host: dla1.treasuredata-co.jp user_id: abcdefg012345 + site: aws account_id: 1 query_logs_table: query_logs workflow_logs_table: workflow_logs From ccb03d444a6ce62ede7b4f041af14019deda5aa1 Mon Sep 17 00:00:00 2001 From: satoshihirose Date: Thu, 26 Aug 2021 14:56:08 +0900 Subject: [PATCH 3/5] Update output mode --- scenarios/import_td_logs/config/query_log.yml | 3 ++- scenarios/import_td_logs/config/workflow_log.yml | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/scenarios/import_td_logs/config/query_log.yml b/scenarios/import_td_logs/config/query_log.yml index bb6d681b..01a44e5b 100644 --- a/scenarios/import_td_logs/config/query_log.yml +++ b/scenarios/import_td_logs/config/query_log.yml @@ -38,4 +38,5 @@ in: - {name: result_size, type: long} - {name: split_hours, type: double} - {name: time, type: long} -out: {} \ No newline at end of file +out: + mode: replace \ No newline at end of file diff --git a/scenarios/import_td_logs/config/workflow_log.yml b/scenarios/import_td_logs/config/workflow_log.yml index a46671d3..67df0d9b 100644 --- a/scenarios/import_td_logs/config/workflow_log.yml +++ b/scenarios/import_td_logs/config/workflow_log.yml @@ -37,4 +37,5 @@ in: - {name: state, type: string} - {name: date, type: string} - {name: time, type: long} -out: {} \ No newline at end of file +out: + mode: replace \ No newline at end of file From ec259a1b488566db9c8e2545d4439a546785d1be Mon Sep 17 00:00:00 2001 From: satoshihirose Date: Thu, 26 Aug 2021 17:15:18 +0900 Subject: [PATCH 4/5] Remove indent --- scenarios/import_td_logs/import_td_logs.dig | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scenarios/import_td_logs/import_td_logs.dig b/scenarios/import_td_logs/import_td_logs.dig index 1c97fe20..53618992 100644 --- a/scenarios/import_td_logs/import_td_logs.dig +++ b/scenarios/import_td_logs/import_td_logs.dig @@ -34,10 +34,10 @@ _export: +users: _env: - TD_API_KEY: ${secret:td.apikey} + TD_API_KEY: ${secret:td.apikey} py>: script.import_td_users.import_users database: ${td.database} table: ${users_table} api_endpoint: ${api_endpoint} docker: - image: "digdag/digdag-python:3.9" \ No newline at end of file + image: "digdag/digdag-python:3.9" \ No newline at end of file From 1f1590e2f1d7bc21c9f0fcd10acc3abbe6963379 Mon Sep 17 00:00:00 2001 From: satoshihirose Date: Thu, 26 Aug 2021 17:25:25 +0900 Subject: [PATCH 5/5] Add new column for hive --- scenarios/import_td_logs/config/query_log.yml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/scenarios/import_td_logs/config/query_log.yml b/scenarios/import_td_logs/config/query_log.yml index 01a44e5b..ad6d2b19 100644 --- a/scenarios/import_td_logs/config/query_log.yml +++ b/scenarios/import_td_logs/config/query_log.yml @@ -19,11 +19,7 @@ in: - {name: date, type: string} - {name: account_id, type: string} - {name: user_id, type: string} - - {name: project_name, type: string} - - {name: workflow_name, type: string} - - {name: task_id, type: string} - {name: job_id, type: long} - - {name: query_id, type: string} - {name: created_at, type: string} - {name: scheduled_at, type: string} - {name: start_at, type: string} @@ -36,7 +32,12 @@ in: - {name: type, type: string} - {name: query_status, type: string} - {name: result_size, type: long} + - {name: query_id, type: string} - {name: split_hours, type: double} + - {name: average_hive_cores, type: double} + - {name: project_name, type: string} + - {name: workflow_name, type: string} + - {name: task_id, type: string} - {name: time, type: long} out: mode: replace \ No newline at end of file