-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathyamlRx.py
More file actions
131 lines (95 loc) · 3.48 KB
/
yamlRx.py
File metadata and controls
131 lines (95 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# yamlRx.py
# Created by frankV
# PythonPubCrawler
# https://github.com/frankV/pythonpubcrawl
""" yamlRx.py -- main """
import os, sys
""" import rx (yaml verification) from submodule directory """
sys.path.insert(0, os.getcwd()+'/rx/python/')
import Rx
import re, yaml
# plan(None)
rx = Rx.Factory({ "register_core_types": True });
# isa_ok(rx, Rx.Factory)
def verify(filename):
index = yaml.load(filename)
test_data = {}
test_schemata = {}
def normalize(entries, test_data):
if entries == '*':
entries = { "*": None }
if type(entries) is type([]):
new_entries = { }
for n in entries: new_entries[n] = None
entries = new_entries
if len(entries) == 1 and entries.has_key('*'):
value = entries["*"]
entries = { }
for k in test_data.keys(): entries[k] = value
return entries
for filename in index:
if filename == yaml.load(filename): continue
payload = json.loads(file(filename).read())
parts = filename.split('/')
parts.pop(0)
leaf_name = '/'.join(parts[1:])
leaf_name = re.sub('\.yaml$', '', leaf_name)
filetype = parts.pop(0)
if filetype == 'schemata':
test_schemata[ leaf_name ] = payload
elif filetype == 'data':
test_data[ leaf_name ] = {}
if type(payload) is type([]):
for data_str in payload:
boxed_data = json.loads("[ %s ]" % data_str)
test_data[ leaf_name ][ data_str ] = boxed_data[0]
else:
for entry in payload.keys():
boxed_data = json.loads("[ %s ]" % payload[entry])
test_data[ leaf_name ][ entry ] = boxed_data[0]
else:
raise StandardError("weird file in data dir: %s" % filename)
schema_names = test_schemata.keys()
schema_names.sort()
for schema_name in schema_names:
rx = Rx.Factory({ "register_core_types": True });
schema_test_spec = test_schemata[ schema_name ]
if schema_test_spec.get("composedtype", False):
try:
rx.learn_type(schema_test_spec['composedtype']['uri'],
schema_test_spec['composedtype']['schema'])
except Rx.Error, e:
if schema_test_spec['composedtype'].get("invalid", False):
ok(1, "BAD COMPOSED TYPE: schemata %s" % schema_name)
continue
else:
raise
if schema_test_spec['composedtype'].get("invalid", False):
ok(0, "BAD COMPOSED TYPE: schemata %s" % schema_name)
if schema_test_spec['composedtype'].get("prefix", False):
rx.add_prefix(schema_test_spec['composedtype']['prefix'][0],
schema_test_spec['composedtype']['prefix'][1])
try:
schema = rx.make_schema(schema_test_spec["schema"])
except Rx.Error, e:
if schema_test_spec.get("invalid", False):
ok(1, "BAD SCHEMA: schemata %s" % schema_name)
continue
else:
raise
if schema_test_spec.get("invalid", False):
ok(0, "BAD SCHEMA: schemata %s" % schema_name)
continue
if not schema: raise StandardError("got no schema obj for valid input")
for pf in [ 'pass', 'fail' ]:
for source in schema_test_spec.get(pf, []):
to_test = schema_test_spec[pf][ source ]
to_test = normalize(to_test, test_data[ source ])
# if to_test == '*': to_test = test_data[ source ].keys()
for entry in to_test:
result = schema.check( test_data[source][entry] )
desc = "%s/%s against %s" % (source, entry, schema_name)
if pf == 'pass':
ok(result, "VALID : %s" % desc)
else:
ok(not(result), "INVALID: %s" % desc)