-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjson_validator.py
More file actions
executable file
·185 lines (141 loc) · 5.54 KB
/
json_validator.py
File metadata and controls
executable file
·185 lines (141 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
from typing import Union, List
import sys
from libs.file_module import JsonInstance, SchemaInstance
from libs.fields import BaseField, MissingField, IncorrectField
from libs.report import Report
from libs.util import json_type_to_python_type, export_to_html
def validate(target: str, schema: str,
target_subdir: bool = False,
schemas_subdir: bool = False,
field_with_schema_name: str = None) -> Union[None, List[Report]]:
"""
Validate an instance(s) under a given schema(s).
args:
- target can be a directory or a JSON file.
- schema can be a directory or a SCHEMA file.
- target_subdir: include all target subdirectories, recursively.
- schemas_subdir: include all schema subdirectories, recursively.
- field_with_schema_name: field name in a JSON file
with schema file name.
(if the schema is a directory you should specify `field_with_schema_name`)
"""
reports = []
schemas = SchemaInstance(schema, 'schema', schemas_subdir)
objects = JsonInstance(target, 'json', target_subdir)
for i in (objects, schemas):
if i.files is None:
print(f'File or directory "{i.path}" not found.')
return None
g = objects.generator()
for obj, file_name in g:
report = Report(file_name)
if field_with_schema_name is None:
# If didn't set `field_with_schema_name` to use a first schema.
schema_name = schemas.files.__iter__().__next__()
else:
try:
schema_name = obj[field_with_schema_name]
except (AttributeError, KeyError, TypeError):
report.add_error('Schema didn\'t specify.')
reports.append(report)
continue
report.schema_name = schema_name
schema = schemas.get_schema(schema_name)
if schema is None:
report.add_error('Schema didn\'t find.')
reports.append(report)
continue
errors = _validate(obj, schema)
report.errors = errors
report.status = not bool(errors)
reports.append(report)
return reports
def _validate(target: Union[dict, list], schema: dict) -> List[str]:
"""
Validate an instance under a given schema.
"""
report = []
if not validate_schema(schema):
report.append('Schema is not valid.')
return report
fields_with_error = _validate_field(target, schema)
[report.append(str(e)) for e in fields_with_error]
return report
def _validate_field(field: object, schema_field: object) -> List[BaseField]:
"""
Recursively check fields for:
- required
- correct type
"""
fields_with_error = []
expected_type = json_type_to_python_type(schema_field.get('type'))
# Check type
if not isinstance(field, expected_type):
fields_with_error.append(
IncorrectField(name='',
real_type=type(field),
expected_type=expected_type)
)
elif isinstance(field, dict):
# Check required fields in dict
for required_field in schema_field.get('required'):
if required_field not in field.keys():
fields_with_error.append(
MissingField(name=required_field)
)
# Check all fields in dict (recursively)
for key, val in schema_field.get('properties').items():
if field.get(key):
errors = _validate_field(field[key], val)
# Update name fields
for e in errors:
e.add_parent(key)
# Update fields_with_error
fields_with_error.extend(errors)
elif isinstance(field, (list, tuple, set,)):
# Check items in array
if schema_field.get('items'):
for idx, item in enumerate(field):
errors = _validate_field(item, schema_field['items'])
# Update name fields
for e in errors:
e.add_parent(f'[{idx}]')
# Update fields_with_error
fields_with_error.extend(errors)
return fields_with_error
def validate_schema(schema: dict) -> bool:
if not isinstance(schema, dict):
return False
for attr in ('type', 'required', 'properties'):
if attr not in schema.keys():
return False
return True
def main(*args, **kwargs):
report = validate(*args)
if report is not None:
export_to_html(report)
if __name__ == '__main__':
helps = '''
Run program with args:\n
python3 {} target schema t_subdir s_subdir field_name
\nWhere:
target - directory or a JSON file. (required)
schema - directory or a SCHEMA file. (required)
t_subdir - include all target subdirectories.(default: false)
s_subdir - include all schema subdirectories.(default: false)
field_name - field name in a JSON file with schema file name.\n
* if the schema is a directory you should specify `field_name`
'''
if len(sys.argv) < 3:
print(helps.format(sys.argv[0]))
elif len(sys.argv) > 6:
err = 'Error: takes from 2 to 5 positional arguments but {} were given'
print(err.format(len(sys.argv) - 1))
print(helps.format(sys.argv[0]))
else:
if len(sys.argv) > 3:
sys.argv[3] = True if sys.argv[3] == 'true' else False
if len(sys.argv) > 4:
sys.argv[4] = True if sys.argv[4] == 'true' else False
main(*sys.argv[1:])