Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 55 additions & 153 deletions pydat5-dev/backend/pydat/scripts/elasticsearch_populate.py
Original file line number Diff line number Diff line change
@@ -1,190 +1,92 @@
import sys
import yaml
import json
import getpass
import argparse
from pathlib import Path

from ingest.core import PydatIngestor
from ingest.config import Configuration, config_schema
from ingest.util import merge_nested_dictionaries


def main(options):
# ingestor = PydatIngestor(options)
# ingestor.ingest()
pass
with Path(options.get('config')).open('r') as c:
config_file = yaml.safe_load(c)
config_file = merge_nested_dictionaries(config_file, options)

try:
config = Configuration(**config_file)
except Exception as e:
print(e)
sys.exit(1)

if config.elasticsearch.access.ask_password:
try:
config.elasticsearch.access.password = getpass.getpass("Enter ElasticSearch Password: ")
except Exception as e:
print("Unable to get password")
sys.exit(1)

ingestor = PydatIngestor(config)
ingestor.ingest()


def unflatten_argparse(args):
def set_key(dictionary, keys, value):
for key in keys[:-1]:
dictionary = dictionary.setdefault(key, {})
dictionary[keys[-1]] = value

nested = dict()
[set_key(nested, arg.split('__'), val) for arg, val in args.items() if val]
return nested


if __name__ == "__main__":
parser = argparse.ArgumentParser()
dataSource = parser.add_mutually_exclusive_group()
dataSource.add_argument("-f", "--file", action="store", dest="file",

data_source = parser.add_mutually_exclusive_group()
data_source.add_argument("-f", "--file", action="store", dest="data__file",
default=None, help="Input CSV file")
dataSource.add_argument("-d", "--directory", action="store",
dest="directory", default=None,
data_source.add_argument("-d", "--directory", action="store",
dest="data__directory", default=None,
help=("Directory to recursively search for CSV "
"files -- mutually exclusive to '-f' "
"option"))

parser.add_argument("-e", "--extension", action="store", dest="extension",
default='csv',
help=("When scanning for CSV files only parse "
"files with given extension (default: 'csv')"))

mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument("-i", "--identifier", action="store", dest="identifier",
mode.add_argument("-i", "--identifier", action="store", dest="core__identifier",
type=int, default=None,
help=("Numerical identifier to use in update to "
"signify version (e.g., '8' or '20140120')"))
mode.add_argument("-r", "--redo", action="store_true", dest="redo",
mode.add_argument("-r", "--redo", action="store_true", dest="core__redo",
default=False,
help=("Attempt to re-import a failed import or import "
"more data, uses stored metadata from previous "
"import (-o, -n, and -x not required and will "
"be ignored!!)"))
mode.add_argument("-z", "--update", action="store_true", dest="update",
mode.add_argument("-z", "--update", action="store_true", dest="core__update",
default=False,
help=("Run the script in update mode. Intended for "
"taking daily whois data and adding new domains "
"to the current existing index in ES."))
mode.add_argument("--config-template-only", action="store_true",
default=False, dest="config_template_only",
default=False, dest="core__config_template_only",
help=("Configure the ElasticSearch template and "
"then exit"))

parser.add_argument("-v", "--verbose", action="store_true", dest="verbose",
default=False, help="Be verbose")
parser.add_argument("--vverbose", action="store_true", dest="vverbose",
default=False,
help=("Be very verbose (Prints status of every "
"domain parsed, very noisy)"))
parser.add_argument("-s", "--stats", action="store_true", dest="stats",
default=False, help="Print out Stats after running")

updateMethod = parser.add_mutually_exclusive_group()
updateMethod.add_argument("-x", "--exclude", action="store",
dest="exclude", default="",
help=("Comma separated list of keys to exclude "
"if updating entry"))
updateMethod.add_argument("-n", "--include", action="store",
dest="include", default="",
help=("Comma separated list of keys to include "
"if updating entry (mutually exclusive to "
"-x)"))
parser.add_argument("-v", "--verbose", action="count", dest="core__verbosity",
default=None, help="Be verbose, add extra v's for more verbosity (will override config file)")
parser.add_argument("--debug", action="store_true", default=False, dest="core__debug",
help="Enables debug logging (will override config file)")

parser.add_argument("-o", "--comment", action="store", dest="comment",
default="", help="Comment to store with metadata")

parser.add_argument("-u", "--es-uri", nargs="*", dest="es_uri",
default=['localhost:9200'],
help=("Location(s) of ElasticSearch Server (e.g., "
"foo.server.com:9200) Can take multiple "
"endpoints"))
parser.add_argument("--es-user", action="store", dest="es_user",
default=None,
help=("Username for ElasticSearch when Basic Auth"
"is enabled"))
parser.add_argument("--es-pass", action="store", dest="es_pass",
default=None,
help=("Password for ElasticSearch when Basic Auth"
"is enabled"))
parser.add_argument("--es-ask-pass", action="store_true",
dest="es_ask_pass", default=False,
help=("Prompt for ElasticSearch password"))
parser.add_argument("--es-enable-ssl", action="store",
dest="es_cacert", default=None,
help=("The path, on disk to the cacert of the "
"ElasticSearch server to enable ssl/https "
"support"))
parser.add_argument("--es-disable-sniffing", action="store_true",
dest="es_disable_sniffing", default=False,
help=("Disable ES sniffing, useful when ssl hostname"
"verification is not working properly"))
parser.add_argument("-p", "--index-prefix", action="store",
dest="index_prefix", default='pydat',
help=("Index prefix to use in ElasticSearch "
"(default: pydat)"))
parser.add_argument("-B", "--bulk-size", action="store", dest="bulk_size",
type=int, default=1000,
help="Size of Bulk Elasticsearch Requests")
parser.add_argument("-b", "--bulk-fetch-size", action="store",
dest="bulk_fetch_size", type=int, default=50,
help=("Number of documents to search for at a time "
"(default 50), note that this will be "
"multiplied by the number of indices you "
"have, e.g., if you have 10 pydat-<number> "
"indices it results in a request for 500 "
"documents"))
parser.add_argument("--rollover-size", action="store", type=int,
dest="rollover_docs", default=50000000,
help=("Set the number of documents after which point "
"a new index should be created, defaults to "
"50 milllion, note that this is fuzzy since "
"the index count isn't continuously updated, "
"so should be reasonably below 2 billion per "
"ES shard and should take your ES "
"configuration into consideration"))

parser.add_argument("--pipelines", action="store", dest="procs", type=int,
metavar="PIPELINES",
default=2, help="Number of pipelines, defaults to 2")
parser.add_argument("--shipper-threads", action="store",
dest="shipper_threads", type=int, default=1,
help=("How many threads per pipeline to spawn to send "
"bulk ES messages. The larger your cluster, "
"the more you can increase this, defaults to 1"))
parser.add_argument("--fetcher-threads", action="store",
dest="fetcher_threads", type=int, default=2,
help=("How many threads to spawn to search ES. The "
"larger your cluster, the more you can "
"increase this, defaults to 2"))
parser.add_argument("--ignore-field-prefixes", nargs='*',
dest="ignore_field_prefixes", type=str,
default=['zoneContact',
'billingContact',
'technicalContact'],
help=("list of fields (in whois data) to ignore when "
"extracting and inserting into ElasticSearch"))

parser.add_argument("--debug", action="store_true", default=False,
help="Enables debug logging")

options = parser.parse_args()

options.es_args = {
'hosts': options.es_uri,
'sniff_on_start': (not options.es_disable_sniffing),
'sniff_on_connection_fail': (not options.es_disable_sniffing),
'sniff_timeout': (None if options.es_disable_sniffing else 1000),
'max_retries': 100,
'retry_on_timeout': True,
'timeout': 100
}

if options.es_ask_pass:
try:
options.es_pass = getpass.getpass("Enter ElasticSearch Password: ")
except Exception as e:
print("Unable to get password")
sys.exit(1)

if options.es_user is not None and options.es_pass is None:
print("Password must be supplied along with a username")
sys.exit(1)

if options.es_user is not None and options.es_pass is not None:
options.es_args['http_auth'] = (options.es_user,
options.es_pass)

if options.es_cacert is not None:
options.es_args['use_ssl'] = True
options.es_args['ca_certs'] = options.es_cacert

if options.vverbose:
options.verbose = True

options.firstImport = False
options.rolledOver = False

# As these are crafted as optional args, but are really a required
# mutually exclusive group, must check that one is specified
if (not options.config_template_only and
(options.file is None and options.directory is None)):
print("A File or Directory source is required")
parser.parse_args(["-h"])
parser.add_argument("--config", action="store", dest="config",
type=str, default="pydat/scripts/ingest/configuration/es_populate_config.yaml",
help=("path to a config file specifying the elasticsearch"
"instance settings and parameters"))

main(options)
main(unflatten_argparse(vars(parser.parse_args())))
80 changes: 80 additions & 0 deletions pydat5-dev/backend/pydat/scripts/ingest/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import List

from pydantic import BaseModel, conint, root_validator


# --------- pydantic configuration schema classes ----------
class CoreConfiguration(BaseModel):
debug: bool = False
verbosity: conint(ge=0, lt=5) = 0
stats: bool
pipelines: conint(gt=0) = 2
shipper_threads: conint(gt=0) = 1
fetcher_threads: conint(gt=0) = 2


class DataConfiguration(BaseModel):
file_extension: str
ignore_field_prefixes: List[str] = []
include: List[str] = None
exclude: List[str] = None


class ElasticAccessConfigs(BaseModel):
user: str = None
password: str = None
ask_password: bool = False
enable_ssl: bool = False
ssl_cert: str = None

@root_validator
def validate_user_pass_pairs(self, values):
if values.get('user') or values.get('password') or values.get('ask_password'):
if not (values.get('user') and (values.get('password') or values.get('ask_password'))):
raise ValueError('Both a user and password are required (or ask_password enabled)')
return values

@root_validator
def validate_cert_if_enabled(self, values):
if values.get('enable_ssl') and not values.get('ssl_cert'):
raise ValueError('Please provide a ssl cert location when enabling ssl')
return values


class ElasticConnectionConfigs(BaseModel):
max_retries: conint(ge=0) = 100
retry_on_timeout: bool = True
timeout: conint(gt=0) = 100


class ElasticSniffingConfigs(BaseModel):
disable_sniffing: bool = True
sniff_on_start: bool = False
sniff_on_connection_fail: bool = False
sniff_timeout: conint(gt=0) = 100


class ElasticIndexConfigs(BaseModel):
index_template: str
index_prefix: str = 'pydat'
rollover_size: conint(gt=0) = 50000000


class ElasticRequestConfigs(BaseModel):
bulk_size: conint(gt=0) = 1000
bulk_fetch_size: conint(gt=0) = 50


class ElasticConfiguration(BaseModel):
host: str
access: ElasticAccessConfigs
connection: ElasticConnectionConfigs
sniffing: ElasticSniffingConfigs
index: ElasticIndexConfigs
requests: ElasticRequestConfigs


class Configuration(BaseModel):
core: CoreConfiguration
data: DataConfiguration
elasticsearch: ElasticConfiguration
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# This is the default configuration for whodat
# feel free to copy and change any settings that may differ for your use case
#

# core application configuration items
core:
debug: False # Enables debug logging
verbosity: 0 # Be verbose, level goes from 0 to 5
stats: False # Print out Stats after running
pipelines: 2 # Number of pipelines, (default: 2)
shipper_threads: 1 # How many threads per pipeline to spawn to send bulk ES messages. The larger your cluster,
# the more you can increase this, (default: 1)
fetcher_threads: 2 # How many threads to spawn to search ES. The larger your cluster, the more you can increase this (default: 2)

# configuration items related to the processing of input data
data:
file_extension: csv # When scanning for CSV files only parse files with given extension (default: 'csv')
ignore_field_prefixes: # list of fields (in whois data) to ignore when extracting and inserting into ElasticSearch
- zoneContact
- billingContact
- technicalContact
include: ~ # List of keys to include if updating entry (mutually exclusive to exclude)
exclude: ~ # List of keys to exclude if updating entry (mutually exclusive to include)

# configuration items related to the elasticsearch instance being used
elasticsearch:
host: localhost:9200 # Location(s) of ElasticSearch Server (e.g., foo.server.com:9200) Can take multiple endpoints (comma separated)

access:
user: null # Username for ElasticSearch when Basic Auth is enabled
password: null # Password for ElasticSearch when Basic Auth is enabled
ask_password: False # Prompt for ElasticSearch password (!!move out to commandline!!)
enable_ssl: False # flag to set to use ssl instead of basic auth
ssl_cert: null # The path, on disk to the cacert of the ElasticSearch server to enable ssl/https support

connection:
max_retries: 100
retry_on_timeout: True
timeout: 100

sniffing:
disable_sniffing: True
sniff_on_start: False
sniff_on_connection_fail: False
sniff_timeout: 1000

index:
index_template: ingest\configuration\es6.data.template # Path to the index template to use for creation of new indices
index_prefix: pydat # Index prefix to use in ElasticSearch (default: pydat)
rollover_size: 50000000 # Set the number of documents after which point a new index should be created, defaults to
# 50 milllion, note that this is fuzzy since the index count isn't continuously updated,
# so should be reasonably below 2 billion per ES shard and should take your ES
# configuration into consideration (default: 50M)

requests:
bulk_size: 1000 # Size of Bulk Elasticsearch Requests (default: 1000)
bulk_fetch_size: 50 # Number of documents to search for at a time (default: 50), note that this will be
# multiplied by the number of indices you have, e.g., if you have 10 pydat-<number>
# indices it results in a request for 500 documents
11 changes: 11 additions & 0 deletions pydat5-dev/backend/pydat/scripts/ingest/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
def merge_nested_dictionaries(d_a, d_b):
for k, v in d_b.items():
if isinstance(v, (str, int, float)): # values overwrite
d_a[k] = v
elif isinstance(v, list) and isinstance(d_a.setdefault(k, []), list): # lists merge
d_a[k].extend(v)
elif isinstance(v, dict) and isinstance(d_a.setdefault(k, {}), dict): # dicts merge
d_a[k] = merge_nested_dictionaries(d_a[k], v)
else:
raise Exception('unsupported type to merge')
return d_a
Loading