diff --git a/pydat5-dev/backend/pydat/scripts/elasticsearch_populate.py b/pydat5-dev/backend/pydat/scripts/elasticsearch_populate.py index 153ca28..31df936 100644 --- a/pydat5-dev/backend/pydat/scripts/elasticsearch_populate.py +++ b/pydat5-dev/backend/pydat/scripts/elasticsearch_populate.py @@ -1,190 +1,92 @@ import sys +import yaml +import json import getpass import argparse +from pathlib import Path + +from ingest.core import PydatIngestor +from ingest.config import Configuration, config_schema +from ingest.util import merge_nested_dictionaries def main(options): - # ingestor = PydatIngestor(options) - # ingestor.ingest() - pass + with Path(options.get('config')).open('r') as c: + config_file = yaml.safe_load(c) + config_file = merge_nested_dictionaries(config_file, options) + + try: + config = Configuration(**config_file) + except Exception as e: + print(e) + sys.exit(1) + + if config.elasticsearch.access.ask_password: + try: + config.elasticsearch.access.password = getpass.getpass("Enter ElasticSearch Password: ") + except Exception as e: + print("Unable to get password") + sys.exit(1) + + ingestor = PydatIngestor(config) + ingestor.ingest() + + +def unflatten_argparse(args): + def set_key(dictionary, keys, value): + for key in keys[:-1]: + dictionary = dictionary.setdefault(key, {}) + dictionary[keys[-1]] = value + + nested = dict() + [set_key(nested, arg.split('__'), val) for arg, val in args.items() if val] + return nested if __name__ == "__main__": parser = argparse.ArgumentParser() - dataSource = parser.add_mutually_exclusive_group() - dataSource.add_argument("-f", "--file", action="store", dest="file", + + data_source = parser.add_mutually_exclusive_group() + data_source.add_argument("-f", "--file", action="store", dest="data__file", default=None, help="Input CSV file") - dataSource.add_argument("-d", "--directory", action="store", - dest="directory", default=None, + data_source.add_argument("-d", "--directory", action="store", + dest="data__directory", default=None, help=("Directory to recursively search for CSV " "files -- mutually exclusive to '-f' " "option")) - parser.add_argument("-e", "--extension", action="store", dest="extension", - default='csv', - help=("When scanning for CSV files only parse " - "files with given extension (default: 'csv')")) - mode = parser.add_mutually_exclusive_group(required=True) - mode.add_argument("-i", "--identifier", action="store", dest="identifier", + mode.add_argument("-i", "--identifier", action="store", dest="core__identifier", type=int, default=None, help=("Numerical identifier to use in update to " "signify version (e.g., '8' or '20140120')")) - mode.add_argument("-r", "--redo", action="store_true", dest="redo", + mode.add_argument("-r", "--redo", action="store_true", dest="core__redo", default=False, help=("Attempt to re-import a failed import or import " "more data, uses stored metadata from previous " "import (-o, -n, and -x not required and will " "be ignored!!)")) - mode.add_argument("-z", "--update", action="store_true", dest="update", + mode.add_argument("-z", "--update", action="store_true", dest="core__update", default=False, help=("Run the script in update mode. Intended for " "taking daily whois data and adding new domains " "to the current existing index in ES.")) mode.add_argument("--config-template-only", action="store_true", - default=False, dest="config_template_only", + default=False, dest="core__config_template_only", help=("Configure the ElasticSearch template and " "then exit")) - parser.add_argument("-v", "--verbose", action="store_true", dest="verbose", - default=False, help="Be verbose") - parser.add_argument("--vverbose", action="store_true", dest="vverbose", - default=False, - help=("Be very verbose (Prints status of every " - "domain parsed, very noisy)")) - parser.add_argument("-s", "--stats", action="store_true", dest="stats", - default=False, help="Print out Stats after running") - - updateMethod = parser.add_mutually_exclusive_group() - updateMethod.add_argument("-x", "--exclude", action="store", - dest="exclude", default="", - help=("Comma separated list of keys to exclude " - "if updating entry")) - updateMethod.add_argument("-n", "--include", action="store", - dest="include", default="", - help=("Comma separated list of keys to include " - "if updating entry (mutually exclusive to " - "-x)")) + parser.add_argument("-v", "--verbose", action="count", dest="core__verbosity", + default=None, help="Be verbose, add extra v's for more verbosity (will override config file)") + parser.add_argument("--debug", action="store_true", default=False, dest="core__debug", + help="Enables debug logging (will override config file)") parser.add_argument("-o", "--comment", action="store", dest="comment", default="", help="Comment to store with metadata") - parser.add_argument("-u", "--es-uri", nargs="*", dest="es_uri", - default=['localhost:9200'], - help=("Location(s) of ElasticSearch Server (e.g., " - "foo.server.com:9200) Can take multiple " - "endpoints")) - parser.add_argument("--es-user", action="store", dest="es_user", - default=None, - help=("Username for ElasticSearch when Basic Auth" - "is enabled")) - parser.add_argument("--es-pass", action="store", dest="es_pass", - default=None, - help=("Password for ElasticSearch when Basic Auth" - "is enabled")) - parser.add_argument("--es-ask-pass", action="store_true", - dest="es_ask_pass", default=False, - help=("Prompt for ElasticSearch password")) - parser.add_argument("--es-enable-ssl", action="store", - dest="es_cacert", default=None, - help=("The path, on disk to the cacert of the " - "ElasticSearch server to enable ssl/https " - "support")) - parser.add_argument("--es-disable-sniffing", action="store_true", - dest="es_disable_sniffing", default=False, - help=("Disable ES sniffing, useful when ssl hostname" - "verification is not working properly")) - parser.add_argument("-p", "--index-prefix", action="store", - dest="index_prefix", default='pydat', - help=("Index prefix to use in ElasticSearch " - "(default: pydat)")) - parser.add_argument("-B", "--bulk-size", action="store", dest="bulk_size", - type=int, default=1000, - help="Size of Bulk Elasticsearch Requests") - parser.add_argument("-b", "--bulk-fetch-size", action="store", - dest="bulk_fetch_size", type=int, default=50, - help=("Number of documents to search for at a time " - "(default 50), note that this will be " - "multiplied by the number of indices you " - "have, e.g., if you have 10 pydat- " - "indices it results in a request for 500 " - "documents")) - parser.add_argument("--rollover-size", action="store", type=int, - dest="rollover_docs", default=50000000, - help=("Set the number of documents after which point " - "a new index should be created, defaults to " - "50 milllion, note that this is fuzzy since " - "the index count isn't continuously updated, " - "so should be reasonably below 2 billion per " - "ES shard and should take your ES " - "configuration into consideration")) - - parser.add_argument("--pipelines", action="store", dest="procs", type=int, - metavar="PIPELINES", - default=2, help="Number of pipelines, defaults to 2") - parser.add_argument("--shipper-threads", action="store", - dest="shipper_threads", type=int, default=1, - help=("How many threads per pipeline to spawn to send " - "bulk ES messages. The larger your cluster, " - "the more you can increase this, defaults to 1")) - parser.add_argument("--fetcher-threads", action="store", - dest="fetcher_threads", type=int, default=2, - help=("How many threads to spawn to search ES. The " - "larger your cluster, the more you can " - "increase this, defaults to 2")) - parser.add_argument("--ignore-field-prefixes", nargs='*', - dest="ignore_field_prefixes", type=str, - default=['zoneContact', - 'billingContact', - 'technicalContact'], - help=("list of fields (in whois data) to ignore when " - "extracting and inserting into ElasticSearch")) - - parser.add_argument("--debug", action="store_true", default=False, - help="Enables debug logging") - - options = parser.parse_args() - - options.es_args = { - 'hosts': options.es_uri, - 'sniff_on_start': (not options.es_disable_sniffing), - 'sniff_on_connection_fail': (not options.es_disable_sniffing), - 'sniff_timeout': (None if options.es_disable_sniffing else 1000), - 'max_retries': 100, - 'retry_on_timeout': True, - 'timeout': 100 - } - - if options.es_ask_pass: - try: - options.es_pass = getpass.getpass("Enter ElasticSearch Password: ") - except Exception as e: - print("Unable to get password") - sys.exit(1) - - if options.es_user is not None and options.es_pass is None: - print("Password must be supplied along with a username") - sys.exit(1) - - if options.es_user is not None and options.es_pass is not None: - options.es_args['http_auth'] = (options.es_user, - options.es_pass) - - if options.es_cacert is not None: - options.es_args['use_ssl'] = True - options.es_args['ca_certs'] = options.es_cacert - - if options.vverbose: - options.verbose = True - - options.firstImport = False - options.rolledOver = False - - # As these are crafted as optional args, but are really a required - # mutually exclusive group, must check that one is specified - if (not options.config_template_only and - (options.file is None and options.directory is None)): - print("A File or Directory source is required") - parser.parse_args(["-h"]) + parser.add_argument("--config", action="store", dest="config", + type=str, default="pydat/scripts/ingest/configuration/es_populate_config.yaml", + help=("path to a config file specifying the elasticsearch" + "instance settings and parameters")) - main(options) + main(unflatten_argparse(vars(parser.parse_args()))) diff --git a/pydat5-dev/backend/pydat/scripts/ingest/config.py b/pydat5-dev/backend/pydat/scripts/ingest/config.py new file mode 100644 index 0000000..9005b11 --- /dev/null +++ b/pydat5-dev/backend/pydat/scripts/ingest/config.py @@ -0,0 +1,80 @@ +from typing import List + +from pydantic import BaseModel, conint, root_validator + + +# --------- pydantic configuration schema classes ---------- +class CoreConfiguration(BaseModel): + debug: bool = False + verbosity: conint(ge=0, lt=5) = 0 + stats: bool + pipelines: conint(gt=0) = 2 + shipper_threads: conint(gt=0) = 1 + fetcher_threads: conint(gt=0) = 2 + + +class DataConfiguration(BaseModel): + file_extension: str + ignore_field_prefixes: List[str] = [] + include: List[str] = None + exclude: List[str] = None + + +class ElasticAccessConfigs(BaseModel): + user: str = None + password: str = None + ask_password: bool = False + enable_ssl: bool = False + ssl_cert: str = None + + @root_validator + def validate_user_pass_pairs(self, values): + if values.get('user') or values.get('password') or values.get('ask_password'): + if not (values.get('user') and (values.get('password') or values.get('ask_password'))): + raise ValueError('Both a user and password are required (or ask_password enabled)') + return values + + @root_validator + def validate_cert_if_enabled(self, values): + if values.get('enable_ssl') and not values.get('ssl_cert'): + raise ValueError('Please provide a ssl cert location when enabling ssl') + return values + + +class ElasticConnectionConfigs(BaseModel): + max_retries: conint(ge=0) = 100 + retry_on_timeout: bool = True + timeout: conint(gt=0) = 100 + + +class ElasticSniffingConfigs(BaseModel): + disable_sniffing: bool = True + sniff_on_start: bool = False + sniff_on_connection_fail: bool = False + sniff_timeout: conint(gt=0) = 100 + + +class ElasticIndexConfigs(BaseModel): + index_template: str + index_prefix: str = 'pydat' + rollover_size: conint(gt=0) = 50000000 + + +class ElasticRequestConfigs(BaseModel): + bulk_size: conint(gt=0) = 1000 + bulk_fetch_size: conint(gt=0) = 50 + + +class ElasticConfiguration(BaseModel): + host: str + access: ElasticAccessConfigs + connection: ElasticConnectionConfigs + sniffing: ElasticSniffingConfigs + index: ElasticIndexConfigs + requests: ElasticRequestConfigs + + +class Configuration(BaseModel): + core: CoreConfiguration + data: DataConfiguration + elasticsearch: ElasticConfiguration diff --git a/pydat5-dev/backend/pydat/scripts/ingest/configuration/es_populate_config.yaml b/pydat5-dev/backend/pydat/scripts/ingest/configuration/es_populate_config.yaml new file mode 100644 index 0000000..d7d720e --- /dev/null +++ b/pydat5-dev/backend/pydat/scripts/ingest/configuration/es_populate_config.yaml @@ -0,0 +1,60 @@ +# +# This is the default configuration for whodat +# feel free to copy and change any settings that may differ for your use case +# + +# core application configuration items +core: + debug: False # Enables debug logging + verbosity: 0 # Be verbose, level goes from 0 to 5 + stats: False # Print out Stats after running + pipelines: 2 # Number of pipelines, (default: 2) + shipper_threads: 1 # How many threads per pipeline to spawn to send bulk ES messages. The larger your cluster, + # the more you can increase this, (default: 1) + fetcher_threads: 2 # How many threads to spawn to search ES. The larger your cluster, the more you can increase this (default: 2) + +# configuration items related to the processing of input data +data: + file_extension: csv # When scanning for CSV files only parse files with given extension (default: 'csv') + ignore_field_prefixes: # list of fields (in whois data) to ignore when extracting and inserting into ElasticSearch + - zoneContact + - billingContact + - technicalContact + include: ~ # List of keys to include if updating entry (mutually exclusive to exclude) + exclude: ~ # List of keys to exclude if updating entry (mutually exclusive to include) + +# configuration items related to the elasticsearch instance being used +elasticsearch: + host: localhost:9200 # Location(s) of ElasticSearch Server (e.g., foo.server.com:9200) Can take multiple endpoints (comma separated) + + access: + user: null # Username for ElasticSearch when Basic Auth is enabled + password: null # Password for ElasticSearch when Basic Auth is enabled + ask_password: False # Prompt for ElasticSearch password (!!move out to commandline!!) + enable_ssl: False # flag to set to use ssl instead of basic auth + ssl_cert: null # The path, on disk to the cacert of the ElasticSearch server to enable ssl/https support + + connection: + max_retries: 100 + retry_on_timeout: True + timeout: 100 + + sniffing: + disable_sniffing: True + sniff_on_start: False + sniff_on_connection_fail: False + sniff_timeout: 1000 + + index: + index_template: ingest\configuration\es6.data.template # Path to the index template to use for creation of new indices + index_prefix: pydat # Index prefix to use in ElasticSearch (default: pydat) + rollover_size: 50000000 # Set the number of documents after which point a new index should be created, defaults to + # 50 milllion, note that this is fuzzy since the index count isn't continuously updated, + # so should be reasonably below 2 billion per ES shard and should take your ES + # configuration into consideration (default: 50M) + + requests: + bulk_size: 1000 # Size of Bulk Elasticsearch Requests (default: 1000) + bulk_fetch_size: 50 # Number of documents to search for at a time (default: 50), note that this will be + # multiplied by the number of indices you have, e.g., if you have 10 pydat- + # indices it results in a request for 500 documents diff --git a/pydat5-dev/backend/pydat/scripts/ingest/util.py b/pydat5-dev/backend/pydat/scripts/ingest/util.py index e69de29..cf20e68 100644 --- a/pydat5-dev/backend/pydat/scripts/ingest/util.py +++ b/pydat5-dev/backend/pydat/scripts/ingest/util.py @@ -0,0 +1,11 @@ +def merge_nested_dictionaries(d_a, d_b): + for k, v in d_b.items(): + if isinstance(v, (str, int, float)): # values overwrite + d_a[k] = v + elif isinstance(v, list) and isinstance(d_a.setdefault(k, []), list): # lists merge + d_a[k].extend(v) + elif isinstance(v, dict) and isinstance(d_a.setdefault(k, {}), dict): # dicts merge + d_a[k] = merge_nested_dictionaries(d_a[k], v) + else: + raise Exception('unsupported type to merge') + return d_a diff --git a/pydat5-dev/backend/pydat/scripts/requirements.txt b/pydat5-dev/backend/pydat/scripts/requirements.txt new file mode 100644 index 0000000..28989cc --- /dev/null +++ b/pydat5-dev/backend/pydat/scripts/requirements.txt @@ -0,0 +1 @@ +pydantic==1.7.3 \ No newline at end of file