diff --git a/machine-learning-box/automl/.ruby-version b/machine-learning-box/automl/.ruby-version new file mode 100644 index 00000000..ec1cf33c --- /dev/null +++ b/machine-learning-box/automl/.ruby-version @@ -0,0 +1 @@ +2.6.3 diff --git a/machine-learning-box/automl/README.md b/machine-learning-box/automl/README.md new file mode 100644 index 00000000..98025bc4 --- /dev/null +++ b/machine-learning-box/automl/README.md @@ -0,0 +1,15 @@ +## How to use + +Workflow example of AutoML operator. + +Note: this feature is still in Beta and available to limited customers. + + +```sh +# Push project +$ td -c ~/.td/td.conf wf push --project . + +# Setting td.apikey secret is required for automl operator. + +$ td -c ~/.td/td.conf wf secrets --project --set td.apikey +``` \ No newline at end of file diff --git a/machine-learning-box/automl/cltv.dig b/machine-learning-box/automl/cltv.dig new file mode 100644 index 00000000..7137a69a --- /dev/null +++ b/machine-learning-box/automl/cltv.dig @@ -0,0 +1,25 @@ +_export: + !include : config/params.yaml + td: + engine: presto + database: ${output_database} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["${output_database}"] + ++load_datasets: + ipynb>: + notebook: ml_datasets + output_database: ${input_database} + datasets: online_retail + ++run_cltv: + ipynb>: + notebook: CLTV + input_table: ${input_database}.online_retail_txn + output_table: ${output_database}.online_retail_cltv_result + user_column: customerid + tstamp_column: invoicedate + amount_column: purchaseamount + audience_name: online_retail_cltv diff --git a/machine-learning-box/automl/clustering.dig b/machine-learning-box/automl/clustering.dig new file mode 100644 index 00000000..b204cb5a --- /dev/null +++ b/machine-learning-box/automl/clustering.dig @@ -0,0 +1,23 @@ +_export: + !include : config/params.yaml + td: + engine: presto + database: ${output_database} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["${output_database}"] + ++load_datasets: + ipynb>: + notebook: ml_datasets + output_database: ml_datasets + datasets: dermatology + ++clustering_gluon_new_model: + ipynb>: + notebook: clustering + input_table: ml_datasets.dermatology + output_table: ${output_database}.dermatology_clusters_${session_id} + export_feature_importance: ${output_database}.feature_importance_${session_id} + export_shap_values: ${output_database}.shap_values_${session_id} diff --git a/machine-learning-box/automl/config/params.yaml b/machine-learning-box/automl/config/params.yaml new file mode 100644 index 00000000..8eb7d609 --- /dev/null +++ b/machine-learning-box/automl/config/params.yaml @@ -0,0 +1,10 @@ +input_database: ml_datasets +output_database: automl_test + +train_data_table: gluon_train +target_column: class +test_data_table: gluon_test + +fit_time_limit: 60 * 3 # fit timeout in sec. 3 min just for demo. Default: 60 * 60 (1hr). + +drift_auc_threshold: 0.93 diff --git a/machine-learning-box/automl/eda.dig b/machine-learning-box/automl/eda.dig new file mode 100644 index 00000000..d6edcf9d --- /dev/null +++ b/machine-learning-box/automl/eda.dig @@ -0,0 +1,27 @@ +timezone: Asia/Tokyo +#timezone: PST + ++load_datasets: + ipynb>: + notebook: ml_datasets + output_database: ml_datasets + datasets: all +# datasets: gluon, bank_marketing, vehicle_coupon, online_retail, telco_churn, boston_house + ++datasets: + for_each>: + table: [gluon_train, bank_marketing_train, vehicle_coupon_train, online_retail_ltv_train, telco_churn_train, boston_house_train] + _parallel: + limit: 3 + _do: + +run_eda: + ipynb>: + docker: + task_mem: 128g + notebook: EDA + input_table: ml_datasets.${table} + # The following options are optional ones + eda: all + # eda: pandas-profiling, sweetviz + # target_column: label + sampling_threshold: 1000000 diff --git a/machine-learning-box/automl/ml_datasets.dig b/machine-learning-box/automl/ml_datasets.dig new file mode 100644 index 00000000..f5c495be --- /dev/null +++ b/machine-learning-box/automl/ml_datasets.dig @@ -0,0 +1,13 @@ +timezone: Asia/Tokyo +#timezone: PST + +_export: + td: + engine: presto + ++load_datasets: + ipynb>: + notebook: ml_datasets + output_database: ml_datasets + datasets: all +# datasets: gluon, bank_marketing \ No newline at end of file diff --git a/machine-learning-box/automl/ml_experiment.dig b/machine-learning-box/automl/ml_experiment.dig new file mode 100644 index 00000000..abf9b3a8 --- /dev/null +++ b/machine-learning-box/automl/ml_experiment.dig @@ -0,0 +1,71 @@ +_export: + !include : config/params.yaml + td: + engine: presto + database: ${output_database} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["${output_database}"] + create_tables: ["automl_experiments", "automl_eval_results"] + ++train: + ml_train>: + docker: + task_mem: 128g # 64g/128g/256g/384g/512g + notebook: gluon_train + model_name: gluon_model_${session_id} + input_table: ${input_database}.${train_data_table} + target_column: ${target_column} + time_limit: ${fit_time_limit} + share_model: true + export_leaderboard: ${output_database}.leaderboard_${train_data_table} + export_feature_importance: ${output_database}.feature_importance_${train_data_table} + ++track_experiment: + td>: queries/track_experiment.sql + insert_into: ${output_database}.automl_experiments + last_executed_notebook: ${automl.last_executed_notebook} + user_id: ${automl.last_executed_user_id} + user_email: ${automl.last_executed_user_email} + model_name: gluon_model_${session_id} + shared_model: ${automl.shared_model} + task_attempt_id: ${attempt_id} + session_time: ${session_local_time} + engine: presto + +# Note: If input_table contains target labels, ml_predict shows evaluation results ++predict: + ml_predict>: + docker: + task_mem: 64g # 64g/128g/256g/384g/512g + notebook: gluon_predict + model_name: gluon_model_${session_id} + input_table: ${input_database}.${test_data_table} + output_table: ${output_database}.predicted_${test_data_table}_${session_id} + ++evaluation: + td>: queries/auc.sql + table: ${output_database}.predicted_${test_data_table}_${session_id} + target_column: ${target_column} + positive_class: ' >50K' + store_last_results: true + engine: hive + ++alert_if_drift_detected: + if>: ${td.last_results.auc < drift_auc_threshold} + _do: + mail>: + data: Detect drift in model performance. AUC was ${td.last_results.auc}. + subject: Drift detected + to: [me+alerts@example.com] + # bcc: [foo@example.com,bar@example.com] + ++record_evaluation: + td>: queries/record_evaluation.sql + insert_into: ${output_database}.automl_eval_results + engine: presto + model_name: gluon_model_${session_id} + test_table: ${input_database}.${test_data_table} + session_time: ${session_local_time} + auc: ${td.last_results.auc} diff --git a/machine-learning-box/automl/ml_experiment_demo.dig b/machine-learning-box/automl/ml_experiment_demo.dig new file mode 100644 index 00000000..d8fd4d18 --- /dev/null +++ b/machine-learning-box/automl/ml_experiment_demo.dig @@ -0,0 +1,67 @@ +timezone: Asia/Tokyo +#timezone: PST + +_export: + !include : config/params.yaml + td: + engine: presto + database: ${output_database} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["${output_database}"] + create_tables: ["${expr_tracking_table}"] + ++load_datasets: + ipynb>: + notebook: ml_datasets + output_database: ${input_database} + input_table: ${input_database}.dummy +# datasets: gluon, bank_marketing + datasets: gluon + ++gluon_train: + ml_train>: + notebook: gluon_train + model_name: gluon_model_${session_id} + input_table: ${input_database}.gluon_train # expect database_name.table_name + target_column: class + # The following options are optional ones + #problem_type: binary # ‘binary’, ‘multiclass’, ‘regression’, or ‘quantile’. autolugon automatically detect problem types + #eval_metric: roc_auc # autolugon automatically select a right eval_metric for a given setting if not specified. + ignore_columns: time,rowid # Note time column is ignored by the default. + time_limit: 60 * 3 # fit timeout. 3 min just for training time. Default: 60 * 60 (1hr). 1hr or more is recommended for production purposes (Note 24 hours at max). Note this is a soft limit, not hard limit. + # timeout: 60 * 3 # timeout for notebook cell-level execution. This is a hard limit. Note it's cell-level timeout. No timeout if not specified. + export_leaderboard: ${output_database}.leaderboard_gluon_train + export_feature_importance: ${output_database}.feature_importance_gluon_train + # hide_table_contents: true + ++print_train_result: + echo>: "executed ${automl.last_executed_notebook}.ipynb" + ++track_experiment: + td>: queries/track_experiment.sql + insert_into: automl_experiments + last_executed_notebook: ${automl.last_executed_notebook} + user_id: ${automl.last_executed_user_id} + user_email: ${automl.last_executed_user_email} + model_name: gluon_model_${session_id} + task_attempt_id: ${attempt_id} + session_time: ${session_local_time} + engine: presto + ++gluon_predict: + ml_predict>: + notebook: gluon_predict + model_name: gluon_model_${session_id} + input_table: ${input_database}.gluon_test # expect database_name.table_name + output_table: ${output_database}.gluon_predicted # expect database_name.table_name. DB will be created if not exists. table is overwrite'd. + # optional + #rowid_column: rowid # Note when rowid_column is specified, only rowid column + prediction result columns are resulted in the output table + #ignore_columns: time # target column should not be in test data + export_leaderboard: ${output_database}.leaderboard_gluon_predict + export_feature_importance: ${output_database}.feature_importance_gluon_predict + # hide_table_contents: true + ++print_predict_result: + echo>: "executed ${automl.last_executed_notebook}.ipynb" \ No newline at end of file diff --git a/machine-learning-box/automl/mta.dig b/machine-learning-box/automl/mta.dig new file mode 100644 index 00000000..358b78fb --- /dev/null +++ b/machine-learning-box/automl/mta.dig @@ -0,0 +1,41 @@ +#timezone: Asia/Tokyo +#timezone: PST + +_export: + !include : config/params.yaml + td: + engine: presto + database: sample_datasets # dummy to avoid error on create_databases + output_db: ml_test + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["ml_datasets", "${output_db}"] + ++load_datasets: + ipynb>: + docker: + task_mem: 64g + notebook: ml_datasets + output_database: ml_datasets + datasets: mta + ++run_mta: + ipynb>: + docker: + task_mem: 128g # 64g/128g/256g/384g/512g + notebook: MTA + # required param + input_table: ml_datasets.mta + # optional param + tstamp_column: tstamp + user_column: user + channel_column: channel + conversion_column: conversion + # optional columns (usually not needed) + analyze_topk_channels: 50 + ignore_channels: Facebook + overwrite_channel: Direct + export_channel_interactions: ${output_db}.channel_interactions + export_shapley_attributions: ${output_db}.shapley_attributions + export_attributed_conversions: ${output_db}.attributed_conversions diff --git a/machine-learning-box/automl/nba.dig b/machine-learning-box/automl/nba.dig new file mode 100644 index 00000000..c8df7c14 --- /dev/null +++ b/machine-learning-box/automl/nba.dig @@ -0,0 +1,51 @@ +_export: + !include : config/params.yaml + td: + engine: presto + database: ${output_database} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["${output_database}"] + ++load_datasets: + ipynb>: + notebook: ml_datasets + output_database: ml_datasets + datasets: nba + ++nba_only_qtable: + ipynb>: + notebook: NBA + train_table: ml_datasets.nba_train + # optional + export_q_table: ${output_database}.rl_qtable_${session_id} + export_state_action: ${output_database}.rl_state_action_${session_id} + ++nba_with_eval: + ipynb>: + notebook: NBA + train_table: ml_datasets.nba_train + test_table: ml_datasets.nba_test + budget: 10000 + value_per_cv: 100 + # optional + # export_q_table: ${output_database}.rl_qtable_${session_id} + export_channel_ratio: ${output_database}.rl_channel_ratio_${session_id} + export_predictions: ${output_database}.rl_predictions_${session_id} + export_model_performance: ${output_database}.rl_model_performance_${session_id} + ignore_actions: client_domain_organic_visit, organic_search + action_cost: | + { + "display": 2, + "social-social": 1.4, + "social": 2, + "social-paid": 5, + "organic_search": 1, + "emai": 3.2, + "cpc": 3, + "referral": 2, + "linkedin": 3, + "search-paid": 2, + "twitter": 1 + } \ No newline at end of file diff --git a/machine-learning-box/automl/nba_cdp.dig b/machine-learning-box/automl/nba_cdp.dig new file mode 100644 index 00000000..40602deb --- /dev/null +++ b/machine-learning-box/automl/nba_cdp.dig @@ -0,0 +1,58 @@ +_export: + !include : config/params.yaml + td: + engine: presto + database: ${output_database} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["${output_database}"] + ++load_datasets: + ipynb>: + notebook: ml_datasets + output_database: ml_datasets + datasets: nba + ++create_master_segment: + py>: scripts.audience.create_master_segment + name: nba_${session_id} + description: NBA test audience + master: + database: ml_datasets + table: nba_test + run: false + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + TD_API_SERVER: "api.treasuredata.com" + ++nba_with_eval: + ipynb>: + notebook: NBA + train_table: ml_datasets.nba_train + test_table: ml_datasets.nba_test + budget: 10000 + value_per_cv: 100 + # optional + audience_name: nba_${session_id} + # export_q_table: ${output_database}.rl_qtable_${session_id} + export_channel_ratio: ${output_database}.rl_channel_ratio_${session_id} + export_predictions: ${output_database}.rl_predictions_${session_id} + export_model_performance: ${output_database}.rl_model_performance_${session_id} + ignore_actions: client_domain_organic_visit, organic_search + action_cost: | + { + "display": 2, + "social-social": 1.4, + "social": 2, + "social-paid": 5, + "organic_search": 1, + "emai": 3.2, + "cpc": 3, + "referral": 2, + "linkedin": 3, + "search-paid": 2, + "twitter": 1 + } \ No newline at end of file diff --git a/machine-learning-box/automl/network_analysis.dig b/machine-learning-box/automl/network_analysis.dig new file mode 100644 index 00000000..c214264f --- /dev/null +++ b/machine-learning-box/automl/network_analysis.dig @@ -0,0 +1,10 @@ ++load_datasets: + ipynb>: + notebook: ml_datasets + output_database: ml_datasets + datasets: transition_matrix + ++network_analysis: + ipynb>: + notebook: network_analysis + input_table: ml_datasets.transition_matrix diff --git a/machine-learning-box/automl/queries/assign_rowid.sql b/machine-learning-box/automl/queries/assign_rowid.sql new file mode 100644 index 00000000..a07119b3 --- /dev/null +++ b/machine-learning-box/automl/queries/assign_rowid.sql @@ -0,0 +1,2 @@ +-- DIGDAG_INSERT_LINE +select rownum() as ${rowid_column}, * from ${table} diff --git a/machine-learning-box/automl/queries/auc.sql b/machine-learning-box/automl/queries/auc.sql new file mode 100644 index 00000000..06d35e75 --- /dev/null +++ b/machine-learning-box/automl/queries/auc.sql @@ -0,0 +1,7 @@ +select + auc(prob, label) as auc +from ( + select predicted_proba as prob, if(cast(${target_column} as string)=="${positive_class}", 1, 0) as label + from ${table} + ORDER BY prob DESC +) t diff --git a/machine-learning-box/automl/queries/record_evaluation.sql b/machine-learning-box/automl/queries/record_evaluation.sql new file mode 100644 index 00000000..88070aec --- /dev/null +++ b/machine-learning-box/automl/queries/record_evaluation.sql @@ -0,0 +1,6 @@ +-- DIGDAG_INSERT_LINE +select + '${session_time}' as session_time, + '${model_name}' as model_name, + '${test_table}' as test_table, + '${auc}' as auroc diff --git a/machine-learning-box/automl/queries/track_experiment.sql b/machine-learning-box/automl/queries/track_experiment.sql new file mode 100644 index 00000000..7db45323 --- /dev/null +++ b/machine-learning-box/automl/queries/track_experiment.sql @@ -0,0 +1,10 @@ +-- DIGDAG_INSERT_LINE +select + '${task_attempt_id}' as task_attempt_id, + '${session_time}' as session_time, + '${user_id}' as user_id, + '${user_email}' as user_email, + '${model_name}' as model_name, + '${shared_model}' as shared_model, + '${last_executed_notebook}' as notebook_url + diff --git a/machine-learning-box/automl/rfm.dig b/machine-learning-box/automl/rfm.dig new file mode 100644 index 00000000..645c2f8f --- /dev/null +++ b/machine-learning-box/automl/rfm.dig @@ -0,0 +1,26 @@ +_export: + !include : config/params.yaml + td: + engine: presto + database: ${output_database} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["${output_database}"] + ++load_datasets: + ipynb>: + notebook: ml_datasets + output_database: ${input_database} + datasets: cosmetics_store + ++run_rfm: + ipynb>: + notebook: RFM + input_table: ${input_database}.cosmetics_store + output_table: ${output_database}.rfm_output_cosmetics_store + user_column: user_id + # tstamp_column: event_time + # tstamp_column: tstamp + amount_column: price + audience_name: cosmetics_${session_id} diff --git a/machine-learning-box/automl/scripts/audience.py b/machine-learning-box/automl/scripts/audience.py new file mode 100644 index 00000000..e8445369 --- /dev/null +++ b/machine-learning-box/automl/scripts/audience.py @@ -0,0 +1,527 @@ +__all__ = ['CdpAudience'] + +import sys, os +import requests +import json +import pytd +import re +import faulthandler +import warnings + +from typing import List, Tuple, Optional + +from requests.models import Response +from requests.packages.urllib3.util.retry import Retry +from requests.adapters import HTTPAdapter +from requests import Session + + +def deprecated(func): + """This is a decorator which can be used to mark functions + as deprecated. It will result in a warning being emitted + when the function is used.""" + import functools + + @functools.wraps(func) + def new_func(*args, **kwargs): + warnings.simplefilter('always', DeprecationWarning) # turn off filter + warnings.warn("Call to deprecated function {}.".format(func.__name__), + category=DeprecationWarning, + stacklevel=2) + warnings.simplefilter('default', DeprecationWarning) # reset filter + return func(*args, **kwargs) + return new_func + +class ApiRequestError(Exception): + def __init__(self, response: requests.Response, msg: str=None): + if msg is None: + msg = f"{response.status_code} ERROR\n{response.text}" + super().__init__(msg) + +class CdpApiClient: + def __init__(self, endpoint, headers: dict) -> None: + retry_strategy = Retry( + total=3, status_forcelist=[429, 500, 502, 503, 504], backoff_factor=2 + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + s = Session() + s.headers = headers + s.mount("http://", adapter) + s.mount("https://", adapter) + self.endpoint = f"https://{endpoint}" + self.client: Session = s + + def get(self, path, **kwargs) -> Response: + return self.client.get(url=self.endpoint+path, **kwargs) + + def put(self, path: str, data=None, **kwargs) -> Response: + return self.client.put(url=self.endpoint+path, data=data, **kwargs) + + def post(self, path: str, data=None, json=None, **kwargs) -> Response: + return self.client.post(url=self.endpoint+path, data=data, json=json, **kwargs) + + +def to_boolean(o) -> bool: + if o == None: + return False + s = str(o) + + try: + from distutils.util import strtobool + return bool(strtobool(s)) + except ValueError as e: + return False + + +def validate_db_resource_name(name: str) -> str: + ''' + Validate DB_NAME or TABLE_NAME + ''' + # https://docs.treasuredata.com/display/public/PD/Naming+Requirements+and+Restrictions+for+Treasure+Data+Entities + TD_DB_RESOURCE_REGEX = "[a-z0-9_]+" + assert re.fullmatch(rf"^{TD_DB_RESOURCE_REGEX}$", name) is not None, f"Invalid DB resource name: {name}" + return name + + +def parse_table(table: str) -> Tuple[str, str]: + ''' + Parse DB_NAME.TABLE_NAME to DB_NAME, TABLE_NAME + ''' + assert table.count(".") == 1, f"Invalid table name {table}, DB_NAME.TABLE_NAME is expected." + database, table = table.split(".") + validate_db_resource_name(database) + validate_db_resource_name(table) + return database, table + +@deprecated +def resolve_type(table, column_name: str): + # workaround for ValueError: not enough values to unpack (expected 3, got 2) + schema = [c if len(c) == 3 else [c[0], c[1], ""] for c in table.schema] + # column_name:str, column_type:str, alias:str + for (c_name, c_type, _) in schema: + if c_name == column_name: + # Note: Only string, number, timestamp, string_array, or number_array is accepted for attr_type + # https://github.com/treasure-data/td-cdp-api/blob/master/app/models/audience_attribute.rb#L9 + # https://docs.treasuredata.com/display/PD/Using+TD+CLI+to+Annotate+Schema+-+Legacy + if c_type in ['int', 'long', 'double', 'float']: + return 'number' + else: + return 'string' + raise KeyError(f"column {column_name} not found in {table.schema}") + + +class CdpAudience: + ''' + Usage: + cdp = CdpAudience() + cdp.add_attribute(audience_name=audience_name, attr_db=attr_db, attr_table=attr_table, attr_column=attr_column, join_key=join_key, foreign_key=foreign_key, replace_attr_if_exists=True) + ''' + + def __init__(self): + TD_API_KEY = os.environ["TD_API_KEY"] + TD_ENDPOINT = os.environ["TD_API_SERVER"] + + CDP_ENDPOINT = TD_ENDPOINT.replace('.treasuredata', '-cdp.treasuredata') + HEADERRS = {'Authorization': f'TD1 {TD_API_KEY}', 'Content-Type': 'application/json'} + self.cdp_api = CdpApiClient(endpoint=CDP_ENDPOINT, headers=HEADERRS) + self.td_api = pytd.Client(retry_post_requests=True).api_client + + def create_master_segment(self, *, name: str, database: str, table: str, description: str=None, run:bool=False): + payload = {} + payload['name'] = name + payload['description'] = "" if description is None else description + payload['master'] = {} + payload['master']['parentDatabaseName'] = database + payload['master']['parentTableName'] = table + + res = self.cdp_api.post('/audiences', data=json.dumps(payload)) + if not res.ok: + raise ApiRequestError(res) + + audience = json.loads(res.text) + audience_id = audience['id'] + print(f"ⓘ Successfully created Master Segment '{name}': {audience_id}", file=sys.stderr) + + if run: + res = self.cdp_api.post(f"/audiences/{audience_id}/run") + print(f"ⓘ Run Master Segment {name}", file=sys.stderr) + + TD_ENDPOINT = os.environ["TD_API_SERVER"] + ms_url = f"https://{TD_ENDPOINT.replace('api', 'console')}/app/ms/{audience_id}" + print(f"💎 Created a Master Segment: {ms_url}", file=sys.stderr) + return audience_id + + def add_attribute( + self, *, audience_id: str=None, audience_name: str=None, attr_db: str=None, attr_table: str, attr_columns: List[str], join_key: str, foreign_key: str, + attr_aliases: List[str]=None, attr_group: str="AutoML", rerun_master_segment: bool=True, replace_attr_if_exists: bool=False, + **kwargs + ): + assert len(attr_columns) >= 1, "At least one element in attr_columns but it was empty" + if attr_aliases is None: + attr_aliases = attr_columns + else: + assert len(attr_aliases) == len(attr_columns), f"len(attr_aliases) {len(attr_aliases)} is expected to be equals to len(attr_columns) {len(attr_columns)}" + + if attr_db is None: + attr_db, attr_table = parse_table(attr_table) + + if audience_id is None: + assert audience_name is not None, "Either audience_id or audience_name argument is required" + audience_id = self.get_parent_segment_id(audience_name) + + res = self.cdp_api.put(f"/audiences/{audience_id}") + if not res.ok: + raise ApiRequestError(res) + audience = res.json() + + if 'attributes' in audience: + attributes = audience['attributes'] + else: + attributes = [] + audience['attributes'] = attributes + + existing_attr_names = [attr['name'] for attr in attributes] + + # Workaround for attribute column does not exists in the attribute table + if len(attributes) >= 1: + table = self.td_api.table(attr_db, attr_table) + existing_column_names = [col[2] if len(col) == 3 else col[0] for col in table.schema] + + def remove_attribute(attr) -> bool: + if attr['parentDatabaseName'] == attr_db and attr['parentTableName'] == attr_table: + if attr['parentColumn'] not in existing_column_names: + print(f"⚠ Remove an attribute column '{attr['name']}' in Master Segment {audience_id} because '{attr['parentColumn']}' column does not exists in the Atrribute table '{attr_db}.{attr_table}'", file=sys.stderr) + return True + return False + audience['attributes'] = [attr for attr in attributes if not remove_attribute(attr)] + + for i, attr_column in enumerate(attr_columns): + attr_alias = attr_aliases[i] + + new_attr = { + #'audienceId': audience_id, # ID of Master Segment for this attribute + 'name': attr_alias, # Column name to be defined on Master Segment + #'type': attr_type, # Type of the column + 'parentDatabaseName': attr_db, # Database name of the attribute table + 'parentTableName': attr_table, # Table name of the attribute table + 'parentColumn': attr_column, # Column name of the attribute table which is imported into customer table + 'parentKey': join_key, # Join key of the attribute table + 'foreignKey': foreign_key, # Foreign key of the master table + 'groupingName': attr_group, # Group name of the attribute + } + + if attr_alias in existing_attr_names: + if replace_attr_if_exists: + attributes[existing_attr_names.index(attr_alias)] = new_attr + print(f"⚠ Replace an existing attribute column '{attr_alias}' in Master Segment {audience_id}", file=sys.stderr) + else: + print(f"⚠ Skip adding an attribute because the attribute column '{attr_alias}' already exists", file=sys.stderr) + else: + attributes.append(new_attr) + + res = self.cdp_api.put(f"/audiences/{audience_id}", json=audience) + if res.ok: + print(f"ⓘ Successfully added an attribute table '{attr_table}' to master segment {audience_id}", file=sys.stderr) + else: + try: + res_value = res.json()['base'][0] + assert 'not unique' in res_value, f"Unexpected error: {res_value}" + print(f"⚠ Attribute '{attr_column}' already exists in Parent Segment and thus skip adding an attribue.", file=sys.stderr) + return + except: + print(f"failed to PUT /audiences/{audience_id}: {new_attr}") + raise ApiRequestError(res, f"{res.status_code} error on PUT /audiences/{audience_id}: {res.json()}") + + if rerun_master_segment: + res = self.cdp_api.post(f"/audiences/{audience_id}/run") + if res.ok: + print(f"ⓘ Successfully triggered rerun of Master Segment: {audience_id}", file=sys.stderr) + else: + raise ApiRequestError(res, f"{res.status_code} error on POST /audiences/{audience_id}/run: {res.json()}") + + + def get_parent_segment_id(self, name: str) -> str: + ''' + Retrive parent segment ID if exists. + ''' + assert name is not None + + # Note: console-next (v5) uses different endpoints for listing audience + res = self.cdp_api.get('/entities/parent_segments') + if res.ok: + v5_res = res.json() + for audience in v5_res.get('data',{}): + if audience.get('attributes',{}).get('name') == name: + return audience['id'] + + # Fall back to v4 + res = self.cdp_api.get('/audiences') + if not res.ok: + raise ApiRequestError(res) + + audiences = res.json() + for audience in audiences: + if name == audience.get('name'): + return audience['id'] + + raise ValueError(f"Cannot find parent segment: {name}") + + + def create_folder(self, name: str, audience_id: str) -> str: + folder = self.cdp_api.post(f'/audiences/{audience_id}/folders', json={ + 'name': name, + 'description': 'AutoML Segments' + }) + + if folder.ok: + return folder.json()['id'] + else: + res = self.cdp_api.get(f'/audiences/{audience_id}/folders') + if not res.ok: + raise ApiRequestError(res, f"{res.status_code} error on GET /audiences/{audience_id}/folders: {res.json()}") + + folders = res.json() + for f in folders: + if f.get('name') == name: + print(f"Reuse folder `{name}` already existing in audience `{audience_id}`") + return f['id'] + + raise ApiRequestError(folder, f"{folder.status_code} error on POST /audiences/{audience_id}/folders: {folder.json()}") + + + def create_segments(self, *, column_name: str, column_values: List[str], folder: Optional[str]="AutoML", + audience_id: Optional[str]=None, audience_name: Optional[str]=None, rerun_master_segment: Optional[bool]=False + ): + assert len(column_values) >= 1, "At least 1 column_values are required." + if audience_id is None: + assert audience_name is not None, "Either audience_id or audience_name argument is required" + audience_id = self.get_parent_segment_id(audience_name) + + res = self.cdp_api.get(f"/entities/parent_segments/{audience_id}") + use_v4_api = False + if res.ok: + print(f"ⓘ Successfully retrieved the audience", file=sys.stderr) + folder_id = res.json()['data']['relationships']['parentFolder']['data']['id'] + else: + try: + assert res.json()['errors'].split(':')[0] == 'v5 endpoints flag should be enabled for audience' + print("v5 API is not enabled. Fall back to v4 API") + use_v4_api = True + except: + raise ApiRequestError(res, f"{res.status_code} error on GET /entities/parent_segments/{audience_id}: {res.json()}") + + if folder: + folder_id = self.create_folder(folder, audience_id) + + for value in column_values: + attribute_name = column_name.replace('_', ' ').title() + ' = ' + str(value).title() + rule = { + 'type': 'And', + 'conditions': [{ + 'conditions': [{ + 'type': 'Value', + 'leftValue': {'name': column_name, 'visibility': 'clear'}, + 'operator': {'not': False, 'rightValue': value, 'type': 'Equal'}, + 'arrayMatching': None, + 'exclude': False + }], + 'type': 'And', + }], + 'expr': '', + } + + if use_v4_api: + segment = { + 'name': attribute_name, + 'kind': 0, # batch, + 'description': f'{column_name} = {value}', + 'countPopulation': True, + 'rule': rule, + } + if folder: + segment['segmentFolderId'] = folder_id + res = self.cdp_api.post(f"/audiences/{audience_id}/segments", json=segment) + if res.ok: + print(f"ⓘ Successfully created a segment '{attribute_name}' to master segment {audience_id}", file=sys.stderr) + else: + try: + assert res.json()['errors']['name'][0] == 'has already been taken' + print(f"Segment `{attribute_name}` already exists") + except: + raise ApiRequestError(res, f"{res.status_code} error on POST /entities/segments: {res.json()}") + else: # v5 API + segment = { + 'attributes': { + 'name': attribute_name, + 'description': f'{column_name} = {value}', + 'rule': rule, + }, + 'relationships': {'parentFolder': {'data': {'id': folder_id, 'type': 'folder-segment'}}} + } + res = self.cdp_api.post("/entities/segments", json=segment) + if res.ok: + print(f"ⓘ Successfully created a segment '{attribute_name}' to master segment {audience_id}", file=sys.stderr) + else: + try: + assert res.json()['errors']['name'][0] == 'has already been taken' + print(f"Segment `{attribute_name}` already exists") + except: + raise ApiRequestError(res, f"{res.status_code} error on POST /entities/segments: {res.json()}") + + if rerun_master_segment: + res = self.cdp_api.post(f"/audiences/{audience_id}/run") + if res.ok: + print(f"ⓘ Successfully triggered rerun of Master Segment: {audience_id}", file=sys.stderr) + else: + raise ApiRequestError(res, f"{res.status_code} error on POST /audiences/{audience_id}/run: {res.json()}") + + TD_ENDPOINT = os.environ["TD_API_SERVER"] + if use_v4_api: + s_url = f"https://{TD_ENDPOINT.replace('api', 'console')}/app/ms/{audience_id}/se" + else: + s_url = f"https://{TD_ENDPOINT.replace('api', 'console').replace('.treasuredata', '-next.treasuredata')}/app/ps/{audience_id}" + print(f"💎 Created new segments: {s_url}", file=sys.stderr) + + +def add_attribute(**kwargs): + faulthandler.enable() + + def parse_arguments(kwargs: dict) -> dict: + assert os.environ.get('TD_API_KEY') is not None, "TD_API_KEY ENV variable is required" + assert os.environ.get('TD_API_SERVER') is not None, "TD_API_SERVER ENV variable is required" + + ret = {} + + audience = kwargs.pop('audience', None) + assert audience is not None, "audience argument is required" + audience_id = audience.pop('id', None) + if audience_id is not None: ret['audience_id'] = audience_id + audience_name = audience.pop('name', None) + if audience_name is not None: ret['audience_name'] = audience_name + foreign_key = audience.pop('foreign_key', None) + assert foreign_key is not None, "foreign_key argument is required" + ret['foreign_key'] = foreign_key + ret['rerun_master_segment'] = to_boolean(audience.pop('rerun', 'False')) + + attribute = kwargs.pop('attribute', None) + assert attribute is not None, "attribute argument is required" + attr_table = attribute.pop('table', None) + assert attr_table is not None, "attr_table argument is required" + ret['attr_table'] = attr_table + join_key = attribute.pop('join_key', None) + assert join_key is not None, "join_key argument is required" + ret['join_key'] = join_key + attr_db = attribute.pop('database', None) + if attr_db is not None: ret['attr_db'] = attr_db + + attr_columns = attribute.pop('attr_columns', None) + if attr_columns is None: + attr_column = attribute.pop('attr_column', None) + assert attr_column is not None, "Either attr_columns or attr_column is required" + ret['attr_columns'] = [attr_column] + else: + ret['attr_columns'] = [s.strip() for s in attr_columns.split(',')] + + attr_aliases = attribute.pop('attr_aliases', None) + if attr_aliases is None: + attr_alias = attribute.pop('attr_alias', None) + assert attr_alias is not None, "Either attr_aliases or attr_alias is required" + ret['attr_aliases'] = [attr_alias] + else: + ret['attr_aliases'] = [s.strip() for s in attr_aliases.split(',')] + + attr_group = attribute.pop('attr_group', "AutoML") + ret['attr_group'] = attr_group + replace_attr_if_exists = to_boolean(attribute.pop('replace_if_exists', 'False')) + ret['replace_attr_if_exists'] = replace_attr_if_exists + + return ret + + try: + params = parse_arguments(kwargs) + cdp = CdpAudience() + cdp.add_attribute(**params) + finally: + # force flush + sys.stdout.flush() + sys.stderr.flush() + + +def create_master_segment(**kwargs): + faulthandler.enable() + + def parse_arguments(kwargs: dict) -> dict: + assert os.environ.get('TD_API_KEY') is not None, "TD_API_KEY ENV variable is required" + assert os.environ.get('TD_API_SERVER') is not None, "TD_API_SERVER ENV variable is required" + + ret = {} + + name = kwargs.pop('name', None) + assert name is not None, "name argument is required" + ret['name'] = name + description = kwargs.pop('description', None) + if description is not None: ret['description'] = description + + master = kwargs.pop('master', None) + assert master is not None, "audience argument is required" + database = master.pop('database', None) + assert database is not None, "master.database argument is required" + ret['database'] = database + table = master.pop('table', None) + assert table is not None, "master.table argument is required" + ret['table'] = table + + ret['run'] = to_boolean(kwargs.pop('run', None)) + return ret + + try: + params = parse_arguments(kwargs) + cdp = CdpAudience() + audience_id = cdp.create_master_segment(**params) + + import digdag + digdag.env.store({'audience_id': audience_id}) + finally: + # force flush + sys.stdout.flush() + sys.stderr.flush() + + +def create_segments(**kwargs): + faulthandler.enable() + + def parse_arguments(kwargs: dict) -> dict: + assert os.environ.get('TD_API_KEY') is not None, "TD_API_KEY ENV variable is required" + assert os.environ.get('TD_API_SERVER') is not None, "TD_API_SERVER ENV variable is required" + + ret = {} + + column_name = kwargs.pop('column_name', None) + assert column_name is not None, "column_name argument is required" + ret['column_name'] = column_name + + column_values = kwargs.pop('column_values', None) + assert column_values is not None, "column_values argument is required" + ret['column_values'] = [s.strip() for s in column_values.split(',')] + + folder = kwargs.pop('folder', None) + if folder is not None: ret['folder'] = folder + + audience = kwargs.pop('audience', None) + assert audience is not None, "audience argument is required" + audience_id = audience.pop('id', None) + if audience_id is not None: ret['audience_id'] = audience_id + audience_name = audience.pop('name', None) + if audience_name is not None: ret['audience_name'] = audience_name + ret['rerun_master_segment'] = to_boolean(audience.pop('rerun', 'False')) + + return ret + + try: + params = parse_arguments(kwargs) + cdp = CdpAudience() + cdp.create_segments(**params) + finally: + # force flush + sys.stdout.flush() + sys.stderr.flush() diff --git a/machine-learning-box/automl/shapley.dig b/machine-learning-box/automl/shapley.dig new file mode 100644 index 00000000..da8c73f1 --- /dev/null +++ b/machine-learning-box/automl/shapley.dig @@ -0,0 +1,19 @@ +timezone: Asia/Tokyo +#timezone: PST + +_export: + !include : config/params.yaml + td: + engine: presto + database: ${output_database} + ++run_ml_experiment_demo: + call>: ml_experiment_demo.dig + ++explain_predictions_by_shap: + ipynb>: + docker: + task_mem: 128g # 64g/128g/256g/384g/512g + notebook: shapley + model_name: gluon_model_${session_id} # model used for prediction + input_table: ${input_database}.gluon_test # test data used for prediction diff --git a/machine-learning-box/automl/ts_forecast.dig b/machine-learning-box/automl/ts_forecast.dig new file mode 100644 index 00000000..d065c6dc --- /dev/null +++ b/machine-learning-box/automl/ts_forecast.dig @@ -0,0 +1,33 @@ +#timezone: Asia/Tokyo +#timezone: PST + +_export: + !include : config/params.yaml + td: + engine: presto + database: sample_datasets # dummy to avoid error on create_databases + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["ml_datasets", "ml_test"] + ++load_datasets: + ipynb>: + docker: + task_mem: 64g + notebook: ml_datasets + output_database: ml_datasets + datasets: ts_airline + ++run_ts_forecast: + ipynb>: + docker: + task_mem: 256g # 64g/128g/256g/384g/512g + notebook: ts_forecast + train_table: ml_datasets.ts_airline + tstamp_column: period + target_column: number_of_airline_passengers + forecast_length: 30 + output_table: ml_test.ts_airline_predicted + time_limit: 10 * 60 # 10 min by the default + diff --git a/machine-learning-box/automl/vehicle_coupon.dig b/machine-learning-box/automl/vehicle_coupon.dig new file mode 100644 index 00000000..a849bc93 --- /dev/null +++ b/machine-learning-box/automl/vehicle_coupon.dig @@ -0,0 +1,60 @@ +_export: + output_database: ml_test + td: + engine: presto + database: ${output_database} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["ml_test"] + ++load_datasets: + ipynb>: + docker: + task_mem: 64g # 64g/128g/256g/384g/512g + notebook: ml_datasets + output_database: ml_datasets + datasets: vehicle_coupon + ++train: + ml_train>: + docker: + task_mem: 256g # 64g/128g/256g/384g/512g + notebook: gluon_train + model_name: gluon_model_${session_id} + input_table: ml_datasets.vehicle_coupon_train + target_column: y + time_limit: 3 * 60 # 3 min + ++prepare_input: + td>: queries/assign_rowid.sql + table: ml_datasets.vehicle_coupon_test + rowid_column: userid + create_table: ml_datasets.vehicle_coupon_test_with_rowid + engine: hive + ++create_master_segment: + py>: scripts.audience.create_master_segment + name: vehicle_coupon_${session_id} + # description: xxx + master: + database: ml_datasets + table: vehicle_coupon_test_with_rowid + run: false + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + TD_API_SERVER: "api.treasuredata.com" + ++predict: + ml_predict>: + docker: + task_mem: 128g # 64g/128g/256g/384g/512g + notebook: gluon_predict + model_name: gluon_model_${session_id} + input_table: ml_datasets.vehicle_coupon_test_with_rowid + rowid_column: userid + output_table: ${output_database}.predicted_${session_id} + audience_name: vehicle_coupon_${session_id} + foreign_key: userid diff --git a/machine-learning-box/automl/vehicle_coupon_custom_script_version.dig b/machine-learning-box/automl/vehicle_coupon_custom_script_version.dig new file mode 100644 index 00000000..1db3b0b6 --- /dev/null +++ b/machine-learning-box/automl/vehicle_coupon_custom_script_version.dig @@ -0,0 +1,80 @@ +_export: + output_database: ml_test + td: + engine: presto + database: ${output_database} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: ["ml_test"] + ++load_datasets: + ipynb>: + docker: + task_mem: 64g # 64g/128g/256g/384g/512g + notebook: ml_datasets + output_database: ml_datasets + datasets: vehicle_coupon + ++train: + ml_train>: + docker: + task_mem: 256g # 64g/128g/256g/384g/512g + notebook: gluon_train + model_name: gluon_model_${session_id} + input_table: ml_datasets.vehicle_coupon_train + target_column: y + time_limit: 3 * 60 # 3 min + ++prepare_input: + td>: queries/assign_rowid.sql + table: ml_datasets.vehicle_coupon_test + rowid_column: userid + create_table: ml_datasets.vehicle_coupon_test_with_rowid + engine: hive + ++predict: + ml_predict>: + docker: + task_mem: 128g # 64g/128g/256g/384g/512g + notebook: gluon_predict + model_name: gluon_model_${session_id} + input_table: ml_datasets.vehicle_coupon_test_with_rowid + output_table: ${output_database}.predicted_${session_id} + ++create_master_segment: + py>: scripts.audience.create_master_segment + name: vehicle_coupon_${session_id} + # description: xxx + master: + database: ml_datasets + table: vehicle_coupon_test_with_rowid + run: false + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + TD_API_SERVER: "api.treasuredata.com" + ++add_attribute: + py>: scripts.audience.add_attribute + audience: + name: vehicle_coupon_${session_id} + # id: 1111 + foreign_key: userid + ### optional + rerun: true + attribute: + table: ${output_database}.predicted_${session_id} + attr_columns: predicted_proba, y + # attr_column: predicted_proba + join_key: "userid" + ### optional + attr_aliases: predicted_proba, y2 + attr_group: "AutoML" + replace_if_exists: true + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + TD_API_SERVER: "api.treasuredata.com"