-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvalidate_openlabel.py
More file actions
430 lines (348 loc) · 15.1 KB
/
validate_openlabel.py
File metadata and controls
430 lines (348 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
#!/usr/bin/env python3
"""
OpenLabel Validator
A tool to validate OpenLabel JSON files against the official OpenLabel schema.
© 2025 LUXOFT, A DXC TECHNOLOGY COMPANY. ALL RIGHTS RESERVED.
Licensed under the Apache License, Version 2.0. See NOTICE file for details.
"""
import argparse
import json
import logging
import time
from pathlib import Path
from typing import Dict, List, Tuple, Union
import jsonschema
from jsonschema import Draft7Validator, ValidationError
import requests
from tqdm import tqdm
__version__ = "1.2.1"
class OpenLabelValidator:
"""OpenLabel JSON validator with caching and error reporting."""
SCHEMA_URL = "https://openlabel.asam.net/V1-0-0/schema/openlabel_json_schema.json"
CACHE_DIR = Path.home() / ".cache" / "openlabel-validator"
CACHE_EXPIRY_DAYS = 7
def __init__(self, cache_dir: str = None, schema_url: str = None, bypass_ssl: bool = False,
cache_expiry_days: int = 7, timeout: int = 30):
"""
Initialize the OpenLabel validator.
Args:
cache_dir: Directory to cache the schema
schema_url: URL to the OpenLabel schema
bypass_ssl: Whether to bypass SSL certificate verification
cache_expiry_days: Number of days to cache the schema
timeout: Request timeout in seconds
"""
self.timeout = timeout
self.verify_ssl = not bypass_ssl
self.cache_expiry_days = cache_expiry_days
# Set cache directory
if cache_dir:
self.cache_dir = Path(cache_dir)
else:
self.cache_dir = self.CACHE_DIR
# Set schema URL
if schema_url:
self.schema_url = schema_url
else:
self.schema_url = self.SCHEMA_URL
self.logger = self._setup_logger()
# Ensure cache directory exists
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_enabled = True
except PermissionError:
self.logger.warning(f"Cannot create cache directory {self.cache_dir}, disabling cache")
self.cache_enabled = False
@property
def schema_cache_file(self) -> Path:
"""Get the path to the schema cache file."""
return self.cache_dir / "openlabel_schema_v1.0.0.json"
def _setup_logger(self) -> logging.Logger:
"""Set up logging configuration."""
logger = logging.getLogger("openlabel_validator")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _is_cache_valid(self) -> bool:
"""Check if cached schema is still valid."""
if not self.schema_cache_file.exists():
return False
cache_age = time.time() - self.schema_cache_file.stat().st_mtime
return cache_age < (self.cache_expiry_days * 24 * 3600)
def _download_schema(self) -> Dict:
"""Download OpenLabel schema from official source."""
self.logger.info(f"Downloading OpenLabel schema from {self.schema_url}")
try:
# Add SSL verification handling and user agent
headers = {
'User-Agent': f'OpenLabel-Validator/{__version__} (Python)'
}
response = requests.get(
self.schema_url,
timeout=self.timeout,
verify=self.verify_ssl,
headers=headers
)
response.raise_for_status()
schema = response.json()
# Cache the schema if caching is enabled
if self.cache_enabled:
with open(self.schema_cache_file, 'w') as f:
json.dump(schema, f, indent=2)
self.logger.info(f"Schema cached to {self.schema_cache_file}")
return schema
except requests.exceptions.SSLError as e:
if self.verify_ssl:
self.logger.warning("SSL certificate verification failed. You can use --bypass-ssl flag to bypass this.")
raise RuntimeError(f"SSL certificate verification failed: {e}")
else:
raise RuntimeError(f"SSL error even with verification disabled: {e}")
except requests.RequestException as e:
raise RuntimeError(f"Failed to download schema: {e}")
except json.JSONDecodeError as e:
raise RuntimeError(f"Invalid JSON schema received: {e}")
def _load_schema(self) -> Dict:
"""Load OpenLabel schema, using cache if available and valid."""
if self.cache_enabled and self._is_cache_valid():
self.logger.info("Using cached schema")
try:
with open(self.schema_cache_file, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
self.logger.warning(f"Cache corrupted, re-downloading: {e}")
return self._download_schema()
def _validate_json_syntax(self, file_path: Path) -> Tuple[bool, Union[str, Dict]]:
"""Validate basic JSON syntax."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return True, data
except json.JSONDecodeError as e:
return False, f"Invalid JSON syntax: {e}"
except IOError as e:
return False, f"File read error: {e}"
def _format_validation_errors(self, errors: List[ValidationError]) -> List[str]:
"""Format validation errors in a human-readable way."""
formatted_errors = []
for error in errors:
path = " → ".join(str(x) for x in error.absolute_path) if error.absolute_path else "root"
# Extract relevant information
error_info = {
"path": path,
"message": error.message,
"invalid_value": getattr(error, 'instance', None),
"schema_path": " → ".join(str(x) for x in error.schema_path) if error.schema_path else ""
}
formatted_error = f"Path: {error_info['path']}\n"
formatted_error += f" Error: {error_info['message']}\n"
if error_info['invalid_value'] is not None and len(str(error_info['invalid_value'])) < 100:
formatted_error += f" Invalid value: {error_info['invalid_value']}\n"
if error_info['schema_path']:
formatted_error += f" Schema path: {error_info['schema_path']}"
formatted_errors.append(formatted_error.strip())
return formatted_errors
def validate_file(self, file_path: Union[str, Path]) -> Tuple[bool, Union[str, List[str]]]:
"""
Validate a JSON file against OpenLabel v1.0.0 standard.
Args:
file_path: Path to the JSON file to validate
Returns:
Tuple of (is_valid, result_message_or_errors)
"""
file_path = Path(file_path)
if not file_path.exists():
return False, f"File not found: {file_path}"
self.logger.info(f"Validating file: {file_path}")
# Step 1: Validate JSON syntax
is_valid_json, data_or_error = self._validate_json_syntax(file_path)
if not is_valid_json:
return False, [f"JSON Syntax Error: {data_or_error}"]
data = data_or_error
# Step 2: Load schema
try:
schema = self._load_schema()
except RuntimeError as e:
return False, [f"Schema loading error: {e}"]
# Step 3: Validate against OpenLabel schema
try:
validator = Draft7Validator(schema)
errors = list(validator.iter_errors(data))
if not errors:
self.logger.info("Validation successful")
return True, "Valid OpenLabel v1.0.0 JSON file"
else:
self.logger.warning(f"Validation failed with {len(errors)} error(s)")
return False, self._format_validation_errors(errors)
except Exception as e:
return False, [f"Validation process error: {e}"]
def validate_multiple_files(self, file_paths: List[Union[str, Path]]) -> Dict[str, Tuple[bool, Union[str, List[str]]]]:
"""
Validate multiple OpenLabel files with progress indication.
Args:
file_paths: List of file paths to validate
Returns:
Dictionary mapping file paths to validation results (success, errors)
"""
results = {}
# Convert all paths to Path objects
paths = [Path(p) for p in file_paths]
# Validate files with progress bar
for file_path in tqdm(paths, desc="Validating files", unit="file"):
try:
results[str(file_path)] = self.validate_file(file_path)
except Exception as e:
results[str(file_path)] = (False, f"Unexpected error: {str(e)}")
return results
def scan_directory_for_json_files(self, directory_path: Union[str, Path], recursive: bool = False) -> List[Path]:
"""
Scan a directory for JSON files.
Args:
directory_path: Path to the directory to scan
recursive: Whether to scan subdirectories recursively
Returns:
List of JSON file paths found in the directory
"""
directory = Path(directory_path)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
if not directory.is_dir():
raise NotADirectoryError(f"Path is not a directory: {directory}")
# Find JSON files
if recursive:
json_files = list(directory.rglob("*.json"))
else:
json_files = list(directory.glob("*.json"))
# Sort files for consistent ordering
json_files.sort()
return json_files
def main():
"""Main entry point for the script."""
parser = argparse.ArgumentParser(
description="Validate OpenLabel JSON files against the official schema",
epilog="""
Examples:
%(prog)s data.json # Validate single file
%(prog)s data1.json data2.json # Validate multiple files
%(prog)s --folder /path/to/jsons # Validate all JSON files in directory
%(prog)s --folder /path --recursive # Validate JSON files recursively
%(prog)s --schema-url URL data.json # Use custom schema URL
%(prog)s --bypass-ssl data.json # Bypass SSL verification
""",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'files',
nargs='*',
help='OpenLabel JSON file(s) to validate (not required if using --folder)'
)
parser.add_argument(
'--folder',
type=str,
help='Directory to scan for JSON files to validate'
)
parser.add_argument(
'--recursive',
action='store_true',
help='Recursively scan subdirectories when using --folder'
)
parser.add_argument(
'--schema-url',
default='https://openlabel.asam.net/V1-0-0/schema/openlabel_json_schema.json',
help='URL to the OpenLabel schema (default: %(default)s)'
)
parser.add_argument(
'--cache-dir',
default=None,
help='Directory to cache the schema (default: ~/.cache/openlabel-validator)'
)
parser.add_argument(
'--bypass-ssl',
action='store_true',
help='Bypass SSL certificate verification (use with caution)'
)
parser.add_argument(
'--cache-expiry-days',
type=int,
default=7,
help='Number of days to cache the schema (default: %(default)s)'
)
parser.add_argument(
'--version',
action='version',
version=f'%(prog)s {__version__}'
)
args = parser.parse_args()
# Validate arguments
if not args.files and not args.folder:
parser.error("Must specify either files to validate or --folder parameter")
# Initialize validator
validator = OpenLabelValidator(
cache_dir=args.cache_dir,
schema_url=args.schema_url,
bypass_ssl=args.bypass_ssl,
cache_expiry_days=args.cache_expiry_days
)
# Determine files to validate
files_to_validate = []
if args.folder:
try:
json_files = validator.scan_directory_for_json_files(args.folder, recursive=args.recursive)
if not json_files:
recursive_text = " (including subdirectories)" if args.recursive else ""
print(f"No JSON files found in directory: {args.folder}{recursive_text}")
return 0
files_to_validate.extend(json_files)
recursive_text = " (including subdirectories)" if args.recursive else ""
print(f"Found {len(json_files)} JSON files in {args.folder}{recursive_text}")
except (FileNotFoundError, NotADirectoryError) as e:
print(f"Error: {e}")
return 1
if args.files:
files_to_validate.extend(args.files)
if not files_to_validate:
print("No files to validate")
return 0
# Handle multiple files with batch validation
if len(files_to_validate) > 1:
print(f"Validating {len(files_to_validate)} files...")
results = validator.validate_multiple_files(files_to_validate)
# Print results summary
valid_count = sum(1 for success, _ in results.values() if success)
invalid_count = len(results) - valid_count
print(f"\nValidation Summary:")
print(f"Valid files: {valid_count}")
print(f"Invalid files: {invalid_count}")
# Print details for invalid files
if invalid_count > 0:
print(f"\nInvalid files:")
for file_path, (success, errors) in results.items():
if not success:
print(f"\n{file_path}:")
if isinstance(errors, list):
for error in errors:
print(f" • {error}")
else:
print(f" • {errors}")
return 0 if invalid_count == 0 else 1
# Handle single file validation
file_path = Path(files_to_validate[0])
success, errors = validator.validate_file(file_path)
if success:
print(f"{file_path} is valid according to the OpenLabel schema")
return 0
else:
print(f"{file_path} failed validation:")
if isinstance(errors, list):
for error in errors:
print(f" • {error}")
else:
print(f" • {errors}")
return 1
if __name__ == "__main__":
main()