-
Notifications
You must be signed in to change notification settings - Fork 47
Expand file tree
/
Copy pathtest_data_sourcev2.py
More file actions
201 lines (166 loc) · 6.55 KB
/
test_data_sourcev2.py
File metadata and controls
201 lines (166 loc) · 6.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import threading
from queue import Empty, Queue
from typing import Generator
from ldclient.impl.datasystem import BasisResult, Update
from ldclient.impl.datasystem.protocolv2 import (
Basis,
ChangeSetBuilder,
IntentCode,
ObjectKind,
Selector
)
from ldclient.impl.util import _Fail, _Success, current_time_millis
from ldclient.interfaces import (
DataSourceErrorInfo,
DataSourceErrorKind,
DataSourceState
)
class _TestDataSourceV2:
"""
Internal implementation of both Initializer and Synchronizer protocols for TestDataV2.
This component bridges the test data management in TestDataV2 with the FDv2 protocol
interfaces. Each instance implements both Initializer and Synchronizer protocols
and receives change notifications for dynamic updates.
"""
def __init__(self, test_data):
self._test_data = test_data
self._closed = False
self._update_queue = Queue()
self._lock = threading.Lock()
# Always register for change notifications
self._test_data._add_instance(self)
# Locking strategy:
# The threading.Lock instance (_lock) ensures thread safety for shared resources:
# - Used in `fetch` and `close` to prevent concurrent modification of `_closed`.
# - Added to `upsert_flag` to address potential race conditions.
# - The `sync` method relies on Queue's thread-safe properties for updates.
def fetch(self) -> BasisResult:
"""
Implementation of the Initializer.fetch method.
Returns the current test data as a Basis for initial data loading.
"""
try:
with self._lock:
if self._closed:
return _Fail("TestDataV2 source has been closed")
# Get all current flags from test data
init_data = self._test_data._make_init_data()
version = self._test_data._get_version()
# Build a full transfer changeset
builder = ChangeSetBuilder()
builder.start(IntentCode.TRANSFER_FULL)
# Add all flags to the changeset
for key, flag_data in init_data.items():
builder.add_put(
ObjectKind.FLAG,
key,
flag_data.get('version', 1),
flag_data
)
# Create selector for this version
selector = Selector.new_selector(str(version), version)
change_set = builder.finish(selector)
basis = Basis(
change_set=change_set,
persist=False,
environment_id=None
)
return _Success(basis)
except Exception as e:
return _Fail(f"Error fetching test data: {str(e)}")
def sync(self) -> Generator[Update, None, None]:
"""
Implementation of the Synchronizer.sync method.
Yields updates as test data changes occur.
"""
# First yield initial data
initial_result = self.fetch()
if isinstance(initial_result, _Fail):
yield Update(
state=DataSourceState.OFF,
error=DataSourceErrorInfo(
kind=DataSourceErrorKind.STORE_ERROR,
status_code=0,
time=current_time_millis(),
message=initial_result.error
)
)
return
# Yield the initial successful state
yield Update(
state=DataSourceState.VALID,
change_set=initial_result.value.change_set
)
# Continue yielding updates as they arrive
while not self._closed:
try:
# Wait for updates with a timeout to allow checking closed status
try:
update = self._update_queue.get(timeout=1.0)
except Empty:
continue
if update is None: # Sentinel value for shutdown
break
yield update
except Exception as e:
yield Update(
state=DataSourceState.OFF,
error=DataSourceErrorInfo(
kind=DataSourceErrorKind.UNKNOWN,
status_code=0,
time=current_time_millis(),
message=f"Error in test data synchronizer: {str(e)}"
)
)
break
def close(self):
"""Close the data source and clean up resources."""
with self._lock:
if self._closed:
return
self._closed = True
self._test_data._closed_instance(self)
# Signal shutdown to sync generator
self._update_queue.put(None)
def upsert_flag(self, flag_data: dict):
"""
Called by TestDataV2 when a flag is updated.
This method converts the flag update into an FDv2 changeset and
queues it for delivery through the sync() generator.
"""
with self._lock:
if self._closed:
return
try:
version = self._test_data._get_version()
# Build a changes transfer changeset
builder = ChangeSetBuilder()
builder.start(IntentCode.TRANSFER_CHANGES)
# Add the updated flag
builder.add_put(
ObjectKind.FLAG,
flag_data['key'],
flag_data.get('version', 1),
flag_data
)
# Create selector for this version
selector = Selector.new_selector(str(version), version)
change_set = builder.finish(selector)
# Queue the update
update = Update(
state=DataSourceState.VALID,
change_set=change_set
)
self._update_queue.put(update)
except Exception as e:
# Queue an error update
error_update = Update(
state=DataSourceState.OFF,
error=DataSourceErrorInfo(
kind=DataSourceErrorKind.STORE_ERROR,
status_code=0,
time=current_time_millis(),
message=f"Error processing flag update: {str(e)}"
)
)
self._update_queue.put(error_update)